user
07/09/2024, 11:01 AMancient-wolf-19325
07/09/2024, 11:01 AMfrom flytekit import task, workflow
@task
def task_a() -> str:
return "Task A completed"
@task
def task_b() -> str:
return "Task B completed"
@workflow
def my_workflow() -> (str, str):
result_a = task_a()
result_b = task_b()
return result_a, result_b
if __name__ == "__main__":
print(my_workflow())
In this example, task_a
and task_b
will run in parallel because there are no dependencies specified between them.
Would you like more detail?
Sources:
- Discuss Flyte 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.aloof-magazine-14878
07/09/2024, 11:05 AMancient-wolf-19325
07/09/2024, 11:05 AMaloof-magazine-14878
07/09/2024, 11:09 AMaloof-magazine-14878
07/09/2024, 11:09 AMancient-wolf-19325
07/09/2024, 11:09 AMfrom flytekit import task, workflow
@task
def task1() -> str:
return "Task 1 completed"
@task
def task2() -> str:
return "Task 2 completed"
@task
def task3() -> str:
return "Task 3 completed"
@task
def task4() -> str:
return "Task 4 completed"
@task
def task5() -> str:
return "Task 5 completed"
@task
def task6() -> str:
return "Task 6 completed"
@workflow
def flow1() -> (str, str, str):
result1 = task1()
result2 = task2()
result3 = task3()
return result1, result2, result3
@workflow
def flow2() -> (str, str, str):
result4 = task4()
result5 = task5()
result6 = task6()
return result4, result5, result6
@workflow
def main_workflow() -> ((str, str, str), (str, str, str)):
flow1_results = flow1()
flow2_results = flow2()
return flow1_results, flow2_results
if __name__ == "__main__":
print(main_workflow())
In this example:
1. flow1
consists of task1
, task2
, and task3
.
2. flow2
consists of task4
, task5
, and task6
.
3. The main_workflow
calls flow1
and flow2
to run them in parallel.
Would you like more detail?
Sources:
- Flyte Workflows 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.aloof-magazine-14878
07/17/2024, 7:03 AMancient-wolf-19325
07/17/2024, 7:03 AMlist_executions_paginated
method from the FlyteRemote client. Here's an example:
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
from flytekit.models.common import NamedEntityIdentifier
# Initialize FlyteRemote
remote = FlyteRemote(config=Config.auto(), default_project="your_project", default_domain="your_domain")
# Define the workflow identifier
workflow_identifier = NamedEntityIdentifier(project="your_project", domain="your_domain", name="your_workflow_name")
# Fetch all executions of the workflow
executions, _ = remote.client.list_executions_paginated(limit=100, identifier=workflow_identifier)
# Print execution IDs
for execution in executions:
print(execution.id)
In this example:
1. Replace "your_project"
, "your_domain"
, and "your_workflow_name"
with your actual project, domain, and workflow name.
2. The list_executions_paginated
method fetches executions with a specified limit. You can adjust the limit as needed.
Would you like more detail?
Sources:
- Discuss Flyte 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.aloof-magazine-14878
07/24/2024, 1:32 PMaloof-magazine-14878
07/24/2024, 1:32 PMancient-wolf-19325
07/24/2024, 1:32 PMconditional
construct to check the value of a task result and apply conditional logic. Flyte requires the use of bitwise operators (&
, |
) instead of logical operators (and
, or
) due to Python's PEP-335. Here's an example of how to check if a task result indicates failure and apply conditional logic:
from flytekit import task, workflow, conditional
@task
def task_that_may_fail() -> bool:
# Simulate a task that may fail
import random
return random.choice([True, False]) # True indicates success, False indicates failure
@task
def handle_failure() -> str:
return "Task failed"
@task
def handle_success() -> str:
return "Task succeeded"
@workflow
def main_workflow() -> str:
result = task_that_may_fail()
return (
conditional("check_task_result")
.if_(result.is_false())
.then(handle_failure())
.else_()
.then(handle_success())
)
if __name__ == "__main__":
print(main_workflow())
In this example:
1. task_that_may_fail
simulates a task that may fail by returning a boolean value.
2. handle_failure
and handle_success
are tasks that handle the failure and success cases, respectively.
3. The main_workflow
uses the conditional
construct to check if the result of task_that_may_fail
is False
(indicating failure) and then calls the appropriate task.
Would you like more detail?
Sources:
- Flyte Conditionals 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.aloof-magazine-14878
07/24/2024, 1:34 PMancient-wolf-19325
07/24/2024, 1:34 PMTaskResult
class and its failed
parameter? If this doesn't help, please reach out to the Flyte support team for further assistance. 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.aloof-magazine-14878
07/25/2024, 2:01 PMancient-wolf-19325
07/25/2024, 2:01 PMmap_task
function to execute tasks in parallel and then follow it with a task that processes the results. Here's an example:
from flytekit import task, workflow, map_task
import typing
@task
def parallel_task(x: int) -> int:
return x * 2
@task
def final_task(results: typing.List[int]) -> int:
return sum(results)
@workflow
def main_workflow(inputs: typing.List[int]) -> int:
# Run parallel tasks
parallel_results = map_task(parallel_task)(a=inputs)
# Run final task after all parallel tasks complete
final_result = final_task(results=parallel_results)
return final_result
if __name__ == "__main__":
inputs = [1, 2, 3, 4, 5]
print(main_workflow(inputs=inputs))
In this example:
1. parallel_task
is the task that will be run in parallel.
2. final_task
is the task that will run after all parallel tasks complete.
3. main_workflow
uses map_task
to run parallel_task
in parallel on the input list and then calls final_task
with the results of the parallel tasks.
Would you like more detail?
Sources:
- Flyte Nested Parallelization 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.aloof-magazine-14878
07/25/2024, 2:03 PMancient-wolf-19325
07/25/2024, 2:03 PM>>
operator to specify dependencies without passing data. Here's an example:
from flytekit import task, workflow, map_task
@task
def parallel_task(x: int) -> None:
print(f"Processing {x}")
@task
def final_task() -> str:
return "All parallel tasks completed"
@workflow
def main_workflow(inputs: list[int]) -> str:
# Run parallel tasks
parallel_results = map_task(parallel_task)(a=inputs)
# Ensure final_task runs after all parallel tasks complete
final_result = final_task()
parallel_results >> final_result
return final_result
if __name__ == "__main__":
inputs = [1, 2, 3, 4, 5]
print(main_workflow(inputs=inputs))
In this example:
1. parallel_task
is the task that will be run in parallel.
2. final_task
is the task that will run after all parallel tasks complete.
3. main_workflow
uses map_task
to run parallel_task
in parallel on the input list and then ensures final_task
runs after all parallel tasks complete using the >>
operator.
Would you like more detail?
Sources:
- Flyte Workflows 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.aloof-magazine-14878
07/25/2024, 2:04 PMparallel_results >> final_result
ancient-wolf-19325
07/25/2024, 2:04 PMparallel_results >> final_result
in Flyte is used to specify that final_result
should run after parallel_results
have completed. This is a way to define dependencies between tasks or workflows without passing data between them.
However, it seems there was a misunderstanding in my previous response. The >>
operator is not directly applicable to the results of map_task
. Instead, you should use the with_overrides
method to ensure that the final task runs after the parallel tasks complete.
Here's the corrected example:
from flytekit import task, workflow, map_task
@task
def parallel_task(x: int) -> None:
print(f"Processing {x}")
@task
def final_task() -> str:
return "All parallel tasks completed"
@workflow
def main_workflow(inputs: list[int]) -> str:
# Run parallel tasks
parallel_results = map_task(parallel_task)(a=inputs)
# Ensure final_task runs after all parallel tasks complete
final_result = final_task().with_overrides(after=parallel_results)
return final_result
if __name__ == "__main__":
inputs = [1, 2, 3, 4, 5]
print(main_workflow(inputs=inputs))
In this example:
1. parallel_task
is the task that will be run in parallel.
2. final_task
is the task that will run after all parallel tasks complete.
3. main_workflow
uses map_task
to run parallel_task
in parallel on the input list and then ensures final_task
runs after all parallel tasks complete using the with_overrides
method.
Would you like more detail?
Sources:
- Flyte Workflows 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.aloof-magazine-14878
07/25/2024, 2:06 PMancient-wolf-19325
07/25/2024, 2:06 PM>>
operator to specify dependencies.
2. Using the with_overrides
method to ensure task execution order.
1. Using the >>
Operator
The >>
operator is used to specify dependencies between tasks or workflows without passing data. This operator ensures that one task runs after another. However, this operator is not directly applicable to the results of map_task
.
2. Using the with_overrides
Method
The with_overrides
method allows you to specify that a task should run after another task or a set of tasks. This method is more flexible and can be used to ensure that a task runs after all parallel tasks complete, even when using map_task
.
Example Comparison
Using the >>
Operator (Not Applicable for map_task
Results)
from flytekit import task, workflow
@task
def task1() -> None:
print("Task 1")
@task
def task2() -> None:
print("Task 2")
@workflow
def main_workflow() -> None:
t1 = task1()
t2 = task2()
t1 >> t2 # Ensure task2 runs after task1
if __name__ == "__main__":
main_workflow()
Using the with_overrides
Method (Applicable for map_task
Results)
from flytekit import task, workflow, map_task
@task
def parallel_task(x: int) -> None:
print(f"Processing {x}")
@task
def final_task() -> str:
return "All parallel tasks completed"
@workflow
def main_workflow(inputs: list[int]) -> str:
# Run parallel tasks
parallel_results = map_task(parallel_task)(a=inputs)
# Ensure final_task runs after all parallel tasks complete
final_result = final_task().with_overrides(after=parallel_results)
return final_result
if __name__ == "__main__":
inputs = [1, 2, 3, 4, 5]
print(main_workflow(inputs=inputs))
Key Differences
- Applicability: The >>
operator is not suitable for map_task
results, while with_overrides
can handle this scenario.
- Flexibility: with_overrides
provides more flexibility in specifying task execution order, especially in complex workflows involving parallel tasks.
Would you like more detail?
Sources:
- https://github.com/flyteorg/flyte/issues/5085
- https://github.com/flyteorg/flyte/issues/5044
Note: I wasn’t able to find highly relevant data sources, but above are a few potentially relevant links. 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.aloof-magazine-14878
07/26/2024, 9:42 AMancient-wolf-19325
07/26/2024, 9:42 AMwith_overrides
is used for resource overrides and not for specifying task dependencies. To ensure that a task runs after all parallel map tasks complete, you should use the Promise
objects returned by the map tasks to enforce the dependency.
Here's the correct way to ensure that a task runs after all parallel map tasks complete:
from flytekit import task, workflow, map_task
import typing
@task
def parallel_task(x: int) -> None:
print(f"Processing {x}")
@task
def final_task() -> str:
return "All parallel tasks completed"
@workflow
def main_workflow(inputs: typing.List[int]) -> str:
# Run parallel tasks
parallel_results = map_task(parallel_task)(a=inputs)
# Ensure final_task runs after all parallel tasks complete
final_result = final_task()
return final_result
if __name__ == "__main__":
inputs = [1, 2, 3, 4, 5]
print(main_workflow(inputs=inputs))
In this example:
1. parallel_task
is the task that will be run in parallel.
2. final_task
is the task that will run after all parallel tasks complete.
3. main_workflow
uses map_task
to run parallel_task
in parallel on the input list and then ensures final_task
runs after all parallel tasks complete by simply calling it after the map task.
Would you like more detail?
Sources:
- Flyte Workflows 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.aloof-magazine-14878
07/26/2024, 9:45 AMancient-wolf-19325
07/26/2024, 9:45 AMPromise
objects returned by the map tasks to enforce the dependency. You can achieve this by using a dummy task that takes the results of the map tasks as input, ensuring that the final task runs only after all map tasks have completed.
Here's an example:
from flytekit import task, workflow, map_task
import typing
@task
def parallel_task(x: int) -> None:
print(f"Processing {x}")
@task
def dummy_task(results: typing.List[None]) -> None:
pass
@task
def final_task() -> str:
return "All parallel tasks completed"
@workflow
def main_workflow(inputs: typing.List[int]) -> str:
# Run parallel tasks
parallel_results = map_task(parallel_task)(a=inputs)
# Dummy task to ensure final_task runs after all parallel tasks complete
dummy_task(results=parallel_results)
# Run final task
final_result = final_task()
return final_result
if __name__ == "__main__":
inputs = [1, 2, 3, 4, 5]
print(main_workflow(inputs=inputs))
In this example:
1. parallel_task
is the task that will be run in parallel.
2. dummy_task
is a task that takes the results of the parallel tasks as input, ensuring that it runs after all parallel tasks complete.
3. final_task
is the task that will run after dummy_task
, ensuring it runs only after all parallel tasks have completed.
Would you like more detail?
Sources:
- Flyte Workflows 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.aloof-magazine-14878
07/29/2024, 7:31 PMancient-wolf-19325
07/29/2024, 7:31 PMlow
, high
, and unsigned
fields, which is a known issue in Flyte.
To work around this issue, you can ensure that the values are passed as strings to avoid this conversion. Here's an example of how to trigger a Flyte workflow using FlyteRemote
and ensure that the values are passed correctly:
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
from flytekit.models.core.identifier import WorkflowExecutionIdentifier
# Initialize FlyteRemote
remote = FlyteRemote(config=Config.auto(), default_project="your_project", default_domain="your_domain")
# Define the workflow identifier
workflow_name = "your_workflow_name"
workflow = remote.fetch_workflow(name=workflow_name)
# Define the input values as strings
input_values = ["1234567890123456789", "9876543210987654321"]
# Trigger the workflow
execution = remote.execute(workflow, inputs={"input_array": input_values})
# Fetch the execution details
execution_id = WorkflowExecutionIdentifier(project="your_project", domain="your_domain", name=execution.id.name)
execution_details = remote.fetch_execution(execution_id)
print(f"Execution ID: {execution.id.name}")
print(f"Execution State: {execution_details.closure.phase}")
In this example:
1. Replace "your_project"
, "your_domain"
, and "your_workflow_name"
with your actual project, domain, and workflow name.
2. The input_values
are defined as strings to avoid the conversion issue.
3. The workflow is triggered using remote.execute
, and the execution details are fetched and printed.
Would you like more detail?
Sources:
- FlyteRemote 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.aloof-magazine-14878
07/29/2024, 7:32 PMancient-wolf-19325
07/29/2024, 7:32 PMlow
, high
, and unsigned
fields. To avoid this, you should convert the integers to strings before passing them as inputs.
Here's an example of how to trigger a Flyte workflow using FlyteRemote
and ensure that the large integer values are passed as strings:
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
from flytekit.models.core.identifier import WorkflowExecutionIdentifier
# Initialize FlyteRemote
remote = FlyteRemote(config=Config.auto(), default_project="your_project", default_domain="your_domain")
# Define the workflow identifier
workflow_name = "your_workflow_name"
workflow = remote.fetch_workflow(name=workflow_name)
# Define the input values as strings
adset_ids = [120212323137260134, 120209925648450134]
adset_ids_as_strings = [str(adset_id) for adset_id in adset_ids]
# Trigger the workflow
execution = remote.execute(workflow, inputs={"adset_ids": adset_ids_as_strings})
# Fetch the execution details
execution_id = WorkflowExecutionIdentifier(project="your_project", domain="your_domain", name=execution.id.name)
execution_details = remote.fetch_execution(execution_id)
print(f"Execution ID: {execution.id.name}")
print(f"Execution State: {execution_details.closure.phase}")
In this example:
1. Replace "your_project"
, "your_domain"
, and "your_workflow_name"
with your actual project, domain, and workflow name.
2. The adset_ids
are converted to strings using a list comprehension before being passed as inputs.
3. The workflow is triggered using remote.execute
, and the execution details are fetched and printed.
Would you like more detail?
Sources:
- FlyteRemote 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.aloof-magazine-14878
07/29/2024, 7:35 PMancient-wolf-19325
07/29/2024, 7:35 PMfrom flytekit.remote import FlyteRemote
from flytekit.configuration import Config
from flytekit.models.core.identifier import WorkflowExecutionIdentifier
# Initialize FlyteRemote
remote = FlyteRemote(config=Config.auto(), default_project="your_project", default_domain="your_domain")
# Define the workflow identifier
workflow_name = "your_workflow_name"
workflow = remote.fetch_workflow(name=workflow_name)
# Define the input values as strings within a dictionary
adset_ids = [120212323137260134, 120209925648450134]
adset_ids_as_strings = [str(adset_id) for adset_id in adset_ids]
input_dict = {"adset_ids": adset_ids_as_strings}
# Trigger the workflow
execution = remote.execute(workflow, inputs={"input_dict": input_dict})
# Fetch the execution details
execution_id = WorkflowExecutionIdentifier(project="your_project", domain="your_domain", name=execution.id.name)
execution_details = remote.fetch_execution(execution_id)
print(f"Execution ID: {execution.id.name}")
print(f"Execution State: {execution_details.closure.phase}")
In this example:
1. Replace "your_project"
, "your_domain"
, and "your_workflow_name"
with your actual project, domain, and workflow name.
2. The adset_ids
are converted to strings and placed within a dictionary.
3. The workflow is triggered using remote.execute
, and the execution details are fetched and printed.
If the issue persists, it might be necessary to investigate further or reach out to the Flyte community for more specific guidance.
Would you like more detail?
Sources:
- FlyteRemote 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.aloof-magazine-14878
07/29/2024, 7:36 PMancient-wolf-19325
07/29/2024, 7:36 PMfrom flytekit import task, workflow
from flytekit.types.schema import FlyteSchema
import typing
class CustomDataClass:
def __init__(self, adset_ids: typing.List[str]):
self.adset_ids = adset_ids
def to_dict(self):
return {"adset_ids": self.adset_ids}
@classmethod
def from_dict(cls, data: dict):
return cls(adset_ids=data["adset_ids"])
@task
def process_data(data: CustomDataClass) -> str:
print(f"Processing data: {data.adset_ids}")
return "Processing completed"
@workflow
def main_workflow(data: CustomDataClass) -> str:
return process_data(data=data)
2. Trigger the Workflow Using FlyteRemote:
Use FlyteRemote to trigger the workflow with the custom data class.
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
from flytekit.models.core.identifier import WorkflowExecutionIdentifier
# Initialize FlyteRemote
remote = FlyteRemote(config=Config.auto(), default_project="your_project", default_domain="your_domain")
# Define the workflow identifier
workflow_name = "your_workflow_name"
workflow = remote.fetch_workflow(name=workflow_name)
# Define the input values as strings within a custom data class
adset_ids = [120212323137260134, 120209925648450134]
adset_ids_as_strings = [str(adset_id) for adset_id in adset_ids]
custom_data = CustomDataClass(adset_ids=adset_ids_as_strings)
# Trigger the workflow
execution = remote.execute(workflow, inputs={"data": custom_data.to_dict()})
# Fetch the execution details
execution_id = WorkflowExecutionIdentifier(project="your_project", domain="your_domain", name=execution.id.name)
execution_details = remote.fetch_execution(execution_id)
print(f"Execution ID: {execution.id.name}")
print(f"Execution State: {execution_details.closure.phase}")
In this example:ancient-wolf-19325
07/29/2024, 7:36 PMCustomDataClass
is defined to handle the serialization and deserialization of large integers.
2. The process_data
task processes the custom data class.
3. The main_workflow
workflow uses the custom data class as input.
4. The FlyteRemote client is used to trigger the workflow with the custom data class.
If the issue persists, it might be necessary to reach out to the Flyte community or open an issue on the Flyte GitHub repository for more specific guidance and support.
Would you like more detail?
Sources:
- FlyteRemote
- Flyte GitHub Issues 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.aloof-magazine-14878
07/30/2024, 8:13 PMancient-wolf-19325
07/30/2024, 8:13 PMFlyteRemote
. However, you can archive a launch plan, which effectively deactivates it and removes it from active use.
Here's how you can archive a launch plan using `FlyteRemote`:
1. Initialize FlyteRemote: Set up the FlyteRemote client.
2. Fetch the Launch Plan: Retrieve the launch plan you want to archive.
3. Archive the Launch Plan: Use the update_launch_plan
method to set the launch plan to "INACTIVE".
Here's an example:
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
from flytekit.models.core.identifier import NamedEntityIdentifier
# Initialize FlyteRemote
remote = FlyteRemote(config=Config.auto(), default_project="your_project", default_domain="your_domain")
# Define the launch plan identifier
launch_plan_name = "your_launch_plan_name"
launch_plan = remote.fetch_launch_plan(name=launch_plan_name)
# Archive the launch plan
remote.client.update_launch_plan(launch_plan.id, "INACTIVE")
print(f"Launch plan {launch_plan_name} has been archived.")
In this example:
1. Replace "your_project"
, "your_domain"
, and "your_launch_plan_name"
with your actual project, domain, and launch plan name.
2. The update_launch_plan
method is used to set the launch plan to "INACTIVE", effectively archiving it.
Would you like more detail?
Sources:
- https://github.com/flyteorg/flyte/issues/1619
- https://github.com/flyteorg/flytesnacks/blob/master/docs/getting_started/run_schedule.md
Note: I wasn’t able to find highly relevant data sources, but above are a few potentially relevant links. 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.