user
10/20/2024, 11:09 AMjolly-florist-66728
10/20/2024, 11:09 AMtask1
is executed first, and based on its success, either task3
or task2
is executed, you can use the conditional
construct. Here's an example:
from flytekit import task, workflow, conditional
@task
def task1() -> bool:
# Simulate task success or failure
return True # or False based on your logic
@task
def task2() -> str:
return "Task 2 executed"
@task
def task3() -> str:
return "Task 3 executed"
@workflow
def my_workflow() -> str:
result = task1()
return (
conditional("task1_success")
.if_(result.is_true())
.then(task3())
.else_()
.then(task2())
)
if __name__ == "__main__":
print(my_workflow())
This code uses the conditional
construct to decide which task to execute based on the result of task1
. If task1
returns True
, task3
is executed; otherwise, task2
is executed.
Would you like more details on this example or any other aspect of Flyte conditionals?
Sources:
- Flyte Conditional Documentation
- Flyte Conditional Example Code 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
10/20/2024, 11:10 AMjolly-florist-66728
10/20/2024, 11:10 AMtask4
such that both task3
and task4
are executed when task1
is successful, you can chain the tasks in the then
block of the conditional
. Here's how you can do it:
from flytekit import task, workflow, conditional
@task
def task1() -> bool:
# Simulate task success or failure
return True # or False based on your logic
@task
def task2() -> str:
return "Task 2 executed"
@task
def task3() -> str:
return "Task 3 executed"
@task
def task4() -> str:
return "Task 4 executed"
@workflow
def my_workflow() -> str:
result = task1()
if result:
task3()
return task4()
else:
return task2()
if __name__ == "__main__":
print(my_workflow())
In this modified workflow, if task1
is successful, both task3
and task4
are executed sequentially. If task1
fails, only task2
is executed.
Would you like more details on this example or any other aspect of Flyte workflows?
Sources:
- https://github.com/flyteorg/flyte/issues/5513
- https://github.com/flyteorg/flyte/issues/191
Note: I wasn’t able to find highly relevant data sources, but above are a few potentially relevant links. 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
10/20/2024, 11:11 AMjolly-florist-66728
10/20/2024, 11:11 AMconditional
construct in the last response was an oversight. To correctly implement the logic using Flyte's conditional
construct, you should ensure that both task3
and task4
are executed when task1
is successful. Here's the corrected example using `conditional`:
from flytekit import task, workflow, conditional
@task
def task1() -> bool:
# Simulate task success or failure
return True # or False based on your logic
@task
def task2() -> str:
return "Task 2 executed"
@task
def task3() -> str:
return "Task 3 executed"
@task
def task4() -> str:
return "Task 4 executed"
@workflow
def my_workflow() -> str:
result = task1()
return (
conditional("task1_success")
.if_(result.is_true())
.then(task3() >> task4())
.else_()
.then(task2())
)
if __name__ == "__main__":
print(my_workflow())
In this example, the conditional
construct is used to execute task3
followed by task4
if task1
is successful. If task1
fails, task2
is executed.
Would you like more details on this example or any other aspect of Flyte conditionals?
Sources:
- https://github.com/flyteorg/flyte/issues/5044
- https://github.com/flyteorg/flytesnacks/blob/master/examples/whylogs_plugin/whylogs_plugin/whylogs_example.py 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
10/20/2024, 11:13 AMjolly-florist-66728
10/20/2024, 11:13 AMtask4
is a map_task
and task3
uses create_node
, you can structure the workflow as follows:
from flytekit import task, workflow, conditional, map_task, create_node
@task
def task1() -> bool:
# Simulate task success or failure
return True # or False based on your logic
@task
def task2() -> str:
return "Task 2 executed"
@task
def task3() -> str:
return "Task 3 executed"
@task
def task4(data_point: int) -> str:
return f"Task 4 processed {data_point}"
@workflow
def my_workflow(data: list[int] = [1, 2, 3]) -> list[str]:
result = task1()
task3_node = create_node(task3)
return (
conditional("task1_success")
.if_(result.is_true())
.then(task3_node() >> map_task(task4)(data_point=data))
.else_()
.then(task2())
)
if __name__ == "__main__":
print(my_workflow())
In this example:
- task4
is executed as a map_task
, processing each element in the data
list.
- task3
is executed using create_node
, which allows for more explicit control over task execution order.
- The conditional
construct ensures that both task3
and the map_task
for task4
are executed if task1
is successful. Otherwise, task2
is executed.
Would you like more details on this example or any other aspect of Flyte workflows?
Sources:
- Flyte Map Tasks Documentation 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
10/20/2024, 11:17 AMjolly-florist-66728
10/20/2024, 11:17 AMtask3
returns a list[int]
and task4
takes this list as input, you can structure the workflow as follows:
from flytekit import task, workflow, conditional, map_task, create_node
@task
def task1() -> bool:
# Simulate task success or failure
return True # or False based on your logic
@task
def task2() -> str:
return "Task 2 executed"
@task
def task3() -> list[int]:
return [1, 2, 3] # Example list of integers
@task
def task4(data_point: int) -> str:
return f"Task 4 processed {data_point}"
@workflow
def my_workflow() -> list[str]:
result = task1()
task3_node = create_node(task3)
return (
conditional("task1_success")
.if_(result.is_true())
.then(task3_node() >> map_task(task4)(data_point=task3_node.o0))
.else_()
.then(task2())
)
if __name__ == "__main__":
print(my_workflow())
In this example:
- task3
returns a list[int]
, which is then used as input for task4
.
- task4
is executed as a map_task
, processing each integer in the list returned by task3
.
- The conditional
construct ensures that both task3
and the map_task
for task4
are executed if task1
is successful. Otherwise, task2
is executed.
Would you like more details on this example or any other aspect of Flyte workflows?
Sources:
- https://github.com/flyteorg/flyte/issues/4505
- https://docs.flyte.org/en/latest/user_guide/advanced_composition/map_tasks.html#id2 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
10/20/2024, 11:32 AMjolly-florist-66728
10/20/2024, 11:32 AMPromise
object. In Flyte, task outputs are Promise
objects, and you cannot directly iterate over them. Instead, you should use the Promise
object as input to another task or map task.
To resolve this, ensure that the output of task3
is correctly passed to task4
using map_task
. Here's how you can modify the workflow:
from flytekit import task, workflow, conditional, map_task, create_node
@task
def task1() -> bool:
# Simulate task success or failure
return True # or False based on your logic
@task
def task2() -> str:
return "Task 2 executed"
@task
def task3() -> list[int]:
return [1, 2, 3] # Example list of integers
@task
def task4(data_point: int) -> str:
return f"Task 4 processed {data_point}"
@workflow
def my_workflow() -> list[str]:
result = task1()
task3_node = create_node(task3)
task3_output = task3_node.o0 # Access the output of task3_node
return (
conditional("task1_success")
.if_(result.is_true())
.then(map_task(task4)(data_point=task3_output))
.else_()
.then(task2())
)
if __name__ == "__main__":
print(my_workflow())
In this corrected version:
- task3_output
is used to access the output of task3_node
.
- map_task(task4)
is called with data_point=task3_output
, which correctly passes the list of integers to task4
.
Would you like more details on this example or any other aspect of Flyte workflows?
Sources:
- https://github.com/flyteorg/flyte/issues/5180
- https://github.com/flyteorg/flyte/issues/5044 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
10/20/2024, 11:35 AMjolly-florist-66728
10/20/2024, 11:35 AMn2-n1
) is expected to be a simple:STRING
, but it is currently a collection_type:{simple:STRING}
. This typically happens when a task returns a list of strings, but the workflow expects a single string. Ensure that the task and workflow interface types match.
2. ParameterNotBound: This error indicates that a parameter (o0
) is not bound to any task or workflow output. Ensure that all parameters are correctly connected and that the workflow's output is properly defined.
3. UnreachableNodes: This error means that there are nodes in the workflow that are not connected to the start node, making them unreachable. Check the workflow graph to ensure all nodes are properly connected.
4. ValueRequired: This error suggests that a required value (n0.o0
) is missing. Ensure that all required inputs are provided and correctly connected.
5. VariableNameNotFound: This error indicates that a variable (o0
) is not found on node n2
. Ensure that the variable names are correctly defined and used consistently across the workflow.
To resolve these issues, review the workflow and task definitions to ensure that:
- The types of inputs and outputs match between tasks and the workflow.
- All parameters are correctly bound and connected.
- The workflow graph is fully connected, with no isolated nodes.
- All required values are provided and correctly referenced.
If you need further assistance, please provide more details about the specific tasks and workflow structure.
Sources:
- Flyte GitHub Issue #5473
- Flyte GitHub Issue #5180 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
10/20/2024, 11:37 AMjolly-florist-66728
10/20/2024, 11:37 AMsimple:STRING
, but it is currently a collection_type:{simple:STRING}
. This typically happens when a task returns a list of strings, but the workflow expects a single string. Ensure that the task and workflow interface types match.
2. ParameterNotBound: This error indicates that a parameter (o0
) is not bound to any task or workflow output. Ensure that all parameters are correctly connected and that the workflow's output is properly defined.
3. UnreachableNodes: This error means that there are nodes in the workflow that are not connected to the start node, making them unreachable. Check the workflow graph to ensure all nodes are properly connected.
4. ValueRequired: This error suggests that a required value (n0.o0
) is missing. Ensure that all required inputs are provided and correctly connected.
5. VariableNameNotFound: This error indicates that a variable (o0
) is not found on node n2
. Ensure that the variable names are correctly defined and used consistently across the workflow.
Here's a revised version of the workflow that addresses these issues:
from flytekit import task, workflow, conditional, map_task
@task
def task1() -> bool:
# Simulate task success or failure
return True # or False based on your logic
@task
def task2() -> str:
return "Task 2 executed"
@task
def task3() -> list[int]:
return [1, 2, 3] # Example list of integers
@task
def task4(data_point: int) -> str:
return f"Task 4 processed {data_point}"
@workflow
def my_workflow() -> list[str]:
result = task1()
task3_output = task3()
return (
conditional("task1_success")
.if_(result.is_true())
.then(map_task(task4)(data_point=task3_output))
.else_()
.then([task2()])
)
if __name__ == "__main__":
print(my_workflow())
In this revised version:
- task3_output
is directly assigned the output of task3
.
- map_task(task4)
is called with data_point=task3_output
, which correctly passes the list of integers to task4
.
- The else_()
block returns a list containing the result of task2()
to match the expected return type of list[str]
.
Would you like more details on this example or any other aspect of Flyte workflows?
Sources:
- Flyte GitHub Issue #5473 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
10/20/2024, 11:49 AMjolly-florist-66728
10/20/2024, 11:49 AMelse_()
block is expected to be a list[str]
, but task2()
returns a str
. To resolve this, ensure that the else_()
block returns a list.
2. UnreachableNodes: This error suggests that there are nodes in the workflow that are not connected to the start node. Ensure that all nodes are properly connected.
3. ValueRequired: This error indicates that a required value is missing. Ensure that all required inputs are provided and correctly connected.
Here's a revised version of the workflow that addresses these issues:
from flytekit import task, workflow, conditional, map_task
@task
def task1() -> bool:
# Simulate task success or failure
return True # or False based on your logic
@task
def task2() -> str:
return "Task 2 executed"
@task
def task3() -> list[int]:
return [1, 2, 3] # Example list of integers
@task
def task4(data_point: int) -> str:
return f"Task 4 processed {data_point}"
@workflow
def my_workflow() -> list[str]:
result = task1()
task3_output = task3()
return (
conditional("task1_success")
.if_(result.is_true())
.then(map_task(task4)(data_point=task3_output))
.else_()
.then([task2()]) # Return a list to match the expected type
)
if __name__ == "__main__":
print(my_workflow())
In this revised version:
- The else_()
block returns a list containing the result of task2()
to match the expected return type of list[str]
.
- The map_task(task4)
is correctly called with data_point=task3_output
, which passes the list of integers to task4
.
Would you like more details on this example or any other aspect of Flyte workflows?
Sources:
- Flyte GitHub Issue #5473 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
10/20/2024, 12:14 PMjolly-florist-66728
10/20/2024, 12:14 PMtask1
returns a boolean indicating success or failure.
- task2
returns a list of strings, which matches the expected output type for the else_()
branch.
- task3
returns a list of integers, which is used as input for task4
.
- task4
processes each integer from task3
and returns a string.
- Workflow Logic:
- The conditional
construct is used to decide which path to take based on the result of task1
.
- If task1
is successful (True
), map_task(task4)
processes each integer from task3_output
.
- If task1
fails (False
), task2
is executed, returning a list of strings.
This setup ensures that the workflow is both type-safe and logically sound. If you have any more questions or need further assistance, feel free to ask!
Sources:
- https://discuss.flyte.org/t/8406760/we-have-a-conditional-branch-that-takes-the-output-of-two-ta/
- https://discuss.flyte.org/t/12352372/i-need-to-conditionally-run-some-tasks-based-on-an-output-of/ 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
10/22/2024, 9:18 AMjolly-florist-66728
10/22/2024, 9:18 AMlen()
function to check the length of a list returned by a task. If you want to execute map_task(task4)
when the length of the list is greater than 0, and task2
otherwise, you can modify the workflow as follows:
from flytekit import task, workflow, conditional, map_task
@task
def task1() -> list[str]:
# Simulate task returning a list of strings
return ["item1", "item2", "item3"] # or [] for an empty list
@task
def task2() -> list[str]:
return ["Task 2 executed"]
@task
def task3() -> list[int]:
return [1, 2, 3] # Example list of integers
@task
def task4(data_point: int) -> str:
return f"Task 4 processed {data_point}"
@workflow
def my_workflow() -> list[str]:
result = task1()
task3_output = task3()
return (
conditional("task1_non_empty")
.if_(len(result) > 0)
.then(map_task(task4)(data_point=task3_output))
.else_()
.then(task2())
)
if __name__ == "__main__":
print(my_workflow())
In this modified workflow:
- task1
returns a list of strings.
- The conditional
construct checks if the length of the list returned by task1
is greater than 0 using len(result) > 0
.
- If the condition is true, map_task(task4)
is executed with task3_output
.
- If the condition is false, task2
is executed.
This setup allows you to conditionally execute tasks based on the length of a list returned by a task.
Would you like more details on this example or any other aspect of Flyte workflows?
Sources:
- https://github.com/flyteorg/flyte/issues/5044
- https://github.com/flyteorg/flyte/blob/master/docs/user_guide/advanced_composition/conditionals.md 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
10/22/2024, 9:21 AMjolly-florist-66728
10/22/2024, 9:21 AMlen()
function on a Promise
object. In Flyte, task outputs are Promise
objects, and you cannot directly use Python functions like len()
on them. Instead, you should use Flyte's conditional constructs to handle such logic.
To check the length of a list returned by a task and conditionally execute other tasks, you can modify the workflow to use a separate task to evaluate the length. Here's how you can do it:
from flytekit import task, workflow, conditional, map_task
@task
def task1() -> list[str]:
# Simulate task returning a list of strings
return ["item1", "item2", "item3"] # or [] for an empty list
@task
def task2() -> list[str]:
return ["Task 2 executed"]
@task
def task3() -> list[int]:
return [1, 2, 3] # Example list of integers
@task
def task4(data_point: int) -> str:
return f"Task 4 processed {data_point}"
@task
def check_length(data: list[str]) -> bool:
return len(data) > 0
@workflow
def my_workflow() -> list[str]:
result = task1()
is_non_empty = check_length(data=result)
task3_output = task3()
return (
conditional("task1_non_empty")
.if_(is_non_empty.is_true())
.then(map_task(task4)(data_point=task3_output))
.else_()
.then(task2())
)
if __name__ == "__main__":
print(my_workflow())
In this revised version:
- A new task check_length
is introduced to evaluate the length of the list returned by task1
.
- The check_length
task returns a boolean indicating whether the list is non-empty.
- The conditional
construct uses the result of check_length
to decide which path to take.
This approach ensures that you can conditionally execute tasks based on the length of a list without directly using len()
on a Promise
.
Would you like more details on this example or any other aspect of Flyte workflows?
Sources:
- Flyte GitHub Issue #5044 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
10/22/2024, 9:24 AMjolly-florist-66728
10/22/2024, 9:24 AMtask1
to return both a list and a boolean indicating whether the list is non-empty, you can structure the task to return a tuple. Here's how you can implement this in the workflow:
from flytekit import task, workflow, conditional, map_task
@task
def task1() -> tuple[list[str], bool]:
# Simulate task returning a list of strings and a boolean
data = ["item1", "item2", "item3"] # or [] for an empty list
return data, len(data) > 0
@task
def task2() -> list[str]:
return ["Task 2 executed"]
@task
def task3() -> list[int]:
return [1, 2, 3] # Example list of integers
@task
def task4(data_point: int) -> str:
return f"Task 4 processed {data_point}"
@workflow
def my_workflow() -> list[str]:
data, is_non_empty = task1()
task3_output = task3()
return (
conditional("task1_non_empty")
.if_(is_non_empty.is_true())
.then(map_task(task4)(data_point=task3_output))
.else_()
.then(task2())
)
if __name__ == "__main__":
print(my_workflow())
In this version:
- task1
returns a tuple containing a list of strings and a boolean. The boolean indicates whether the list is non-empty.
- The conditional
construct uses the boolean value is_non_empty
to decide which path to take.
- If is_non_empty
is True
, map_task(task4)
is executed with task3_output
.
- If is_non_empty
is False
, task2
is executed.
This setup allows you to conditionally execute tasks based on whether task1
returns a non-empty list.
Would you like more details on this example or any other aspect of Flyte workflows?
Sources:
- https://discuss.flyte.org/t/12352372/i-need-to-conditionally-run-some-tasks-based-on-an-output-of/
- https://github.com/flyteorg/flyte/issues/5044
Note: I wasn’t able to find highly relevant data sources, but above are a few potentially relevant links. 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
10/22/2024, 9:30 AMjolly-florist-66728
10/22/2024, 9:30 AMPromise
object. In Flyte, task outputs are Promise
objects, and you cannot directly iterate over them. Instead, you should use Flyte's constructs to handle such logic.
To resolve this, ensure that you are using map_task
correctly, which is designed to handle lists of Promise
objects. Here's how you can modify the workflow to avoid this error:
from flytekit import task, workflow, conditional, map_task
@task
def task1() -> tuple[list[str], bool]:
# Simulate task returning a list of strings and a boolean
data = ["item1", "item2", "item3"] # or [] for an empty list
return data, len(data) > 0
@task
def task2() -> list[str]:
return ["Task 2 executed"]
@task
def task3() -> list[int]:
return [1, 2, 3] # Example list of integers
@task
def task4(data_point: int) -> str:
return f"Task 4 processed {data_point}"
@workflow
def my_workflow() -> list[str]:
data, is_non_empty = task1()
task3_output = task3()
return (
conditional("task1_non_empty")
.if_(is_non_empty.is_true())
.then(map_task(task4)(data_point=task3_output))
.else_()
.then(task2())
)
if __name__ == "__main__":
print(my_workflow())
In this version:
- task1
returns a tuple containing a list of strings and a boolean. The boolean indicates whether the list is non-empty.
- The conditional
construct uses the boolean value is_non_empty
to decide which path to take.
- map_task(task4)
is used to process each element in task3_output
, which is a list of integers.
Ensure that map_task
is used correctly to handle lists of Promise
objects, and avoid directly iterating over Promise
objects in your workflow logic.
Would you like more details on this example or any other aspect of Flyte workflows?
Sources:
- Flyte GitHub Issue #5044 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
10/23/2024, 4:45 AMjolly-florist-66728
10/23/2024, 4:45 AMcreate_node
and the handling of task outputs. Specifically, the error indicates that the input data_point
for task4
was not specified correctly. Let's address this by ensuring that the task outputs are properly connected and used.
Here's how you can modify the workflow to resolve the issue:
from flytekit import task, workflow, conditional, map_task
@task
def task1() -> tuple[list[str], bool]:
# Simulate task returning a list of strings and a boolean
data = ["item1", "item2", "item3"] # or [] for an empty list
return data, len(data) > 0
@task
def task2() -> list[str]:
return ["Task 2 executed"]
@task
def task3() -> list[int]:
return [1, 2, 3] # Example list of integers
@task
def task4(data_point: int) -> str:
return f"Task 4 processed {data_point}"
@task
def task5() -> str:
return "Task 5"
@workflow
def my_workflow() -> list[str]:
res, allowed = task1()
task3_output = task3()
return (
conditional("task1_success")
.if_(allowed.is_true())
.then(task5() >> map_task(task4)(data_point=task3_output))
.else_()
.then(task2())
)
if __name__ == "__main__":
print(my_workflow())
In this revised version:
- task3_output
is directly assigned the output of task3
.
- map_task(task4)
is used to process each element in task3_output
, which is a list of integers.
- The conditional
construct uses the boolean value allowed
to decide which path to take.
- The >>
operator is used to chain tasks, ensuring that task5
is executed before map_task(task4)
.
This setup ensures that the inputs and outputs are correctly connected and that the workflow logic is properly implemented.
Would you like more details on this example or any other aspect of Flyte workflows?
Sources:
- https://github.com/flyteorg/flyte/blob/master/docs/user_guide/advanced_composition/map_tasks.md
- https://github.com/flyteorg/flyte/issues/5684 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
10/23/2024, 5:12 AM.if_(allowed.is_true())
.then( map_task(task4)(data_point=task3_output) >> task5())
jolly-florist-66728
10/23/2024, 5:12 AM>>
operator to chain tasks, it defines the order of execution, but it does not mean that all tasks in the chain will be executed as part of the conditional block. The then
block in a conditional
construct will execute the entire chain, but the order of tasks in the chain matters.
If you want both task5
and map_task(task4)
to execute when the condition is true, you need to ensure that both tasks are part of the execution chain. The order in which you place them in the chain determines which task runs first.
Here's how you can ensure both tasks are executed:
from flytekit import task, workflow, conditional, map_task
@task
def task1() -> tuple[list[str], bool]:
# Simulate task returning a list of strings and a boolean
data = ["item1", "item2", "item3"] # or [] for an empty list
return data, len(data) > 0
@task
def task2() -> list[str]:
return ["Task 2 executed"]
@task
def task3() -> list[int]:
return [1, 2, 3] # Example list of integers
@task
def task4(data_point: int) -> str:
return f"Task 4 processed {data_point}"
@task
def task5() -> str:
return "Task 5 executed"
@workflow
def my_workflow() -> list[str]:
res, allowed = task1()
task3_output = task3()
return (
conditional("task1_success")
.if_(allowed.is_true())
.then(task5() >> map_task(task4)(data_point=task3_output))
.else_()
.then(task2())
)
if __name__ == "__main__":
print(my_workflow())
In this setup:
- task5()
is executed first, followed by map_task(task4)
.
- The >>
operator ensures that task5
runs before map_task(task4)
, but both are part of the then
block and will execute if the condition is true.
If you want map_task(task4)
to execute first, simply reverse the order in the chain:
.then(map_task(task4)(data_point=task3_output) >> task5())
This ensures that both tasks are executed in the desired order when the condition is true.
Would you like more details on this example or any other aspect of Flyte workflows?
Sources:
- https://docs.flyte.org/en/latest/user_guide/advanced_composition/conditionals.html#id2
- https://github.com/flyteorg/flytesnacks/blob/master/examples/advanced_composition/advanced_composition/conditional.py 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
10/23/2024, 5:16 AMjolly-florist-66728
10/23/2024, 5:16 AM>>
operator, it defines a dependency, meaning the first task must complete before the second task starts. However, if the second task does not have any inputs that depend on the first task's outputs, it might not execute as expected in a conditional block.
To ensure both task5
and map_task(task4)
are executed, you can explicitly call both tasks in the then
block without chaining them. Here's how you can modify the workflow:
from flytekit import task, workflow, conditional, map_task
@task
def task1() -> tuple[list[str], bool]:
# Simulate task returning a list of strings and a boolean
data = ["item1", "item2", "item3"] # or [] for an empty list
return data, len(data) > 0
@task
def task2() -> list[str]:
return ["Task 2 executed"]
@task
def task3() -> list[int]:
return [1, 2, 3] # Example list of integers
@task
def task4(data_point: int) -> str:
return f"Task 4 processed {data_point}"
@task
def task5() -> str:
return "Task 5 executed"
@workflow
def my_workflow() -> list[str]:
res, allowed = task1()
task3_output = task3()
if allowed:
task5() # Explicitly call task5
return map_task(task4)(data_point=task3_output)
else:
return task2()
if __name__ == "__main__":
print(my_workflow())
In this version:
- task5()
is explicitly called when allowed
is True
.
- map_task(task4)
is executed with task3_output
as input.
- The else
block executes task2()
if allowed
is False
.
This setup ensures that both task5
and map_task(task4)
are executed when the condition is true.
Would you like more details on this example or any other aspect of Flyte workflows?
Sources:
- Flyte Conditional Documentation
- Flyte Conditional Example Code 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
10/23/2024, 5:18 AMjolly-florist-66728
10/23/2024, 5:18 AMtask5
and map_task(task4)
are executed within a conditional
block, you can use a create_node
approach to explicitly define the execution order. Here's how you can modify the workflow using `conditional`:
from flytekit import task, workflow, conditional, map_task, create_node
@task
def task1() -> tuple[list[str], bool]:
# Simulate task returning a list of strings and a boolean
data = ["item1", "item2", "item3"] # or [] for an empty list
return data, len(data) > 0
@task
def task2() -> list[str]:
return ["Task 2 executed"]
@task
def task3() -> list[int]:
return [1, 2, 3] # Example list of integers
@task
def task4(data_point: int) -> str:
return f"Task 4 processed {data_point}"
@task
def task5() -> str:
return "Task 5 executed"
@workflow
def my_workflow() -> list[str]:
res, allowed = task1()
task3_output = task3()
task5_node = create_node(task5)
map_task4_node = map_task(task4)(data_point=task3_output)
return (
conditional("task1_non_empty")
.if_(allowed.is_true())
.then(task5_node() >> map_task4_node)
.else_()
.then(task2())
)
if __name__ == "__main__":
print(my_workflow())
In this setup:
- task5_node
is created using create_node
to ensure it is part of the execution chain.
- map_task4_node
is executed after task5_node
using the >>
operator to define the order.
- The conditional
construct ensures that both task5
and map_task(task4)
are executed if allowed
is True
.
- If allowed
is False
, task2()
is executed.
This approach ensures that both tasks are executed in the desired order when the condition is true.
Would you like more details on this example or any other aspect of Flyte workflows?
Sources:
- Flyte Conditional Example Code 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
10/23/2024, 5:31 AMjolly-florist-66728
10/23/2024, 5:31 AMPromise
object. In Flyte, task outputs are Promise
objects, and you cannot directly iterate over them. Instead, you should use Flyte's constructs to handle such logic.
To resolve this, ensure that you are using map_task
correctly, which is designed to handle lists of Promise
objects. Here's how you can modify the workflow to avoid this error:
from flytekit import task, workflow, conditional, map_task
@task
def task1() -> tuple[list[str], bool]:
# Simulate task returning a list of strings and a boolean
data = ["item1", "item2", "item3"] # or [] for an empty list
return data, len(data) > 0
@task
def task2() -> list[str]:
return ["Task 2 executed"]
@task
def task3() -> list[int]:
return [1, 2, 3] # Example list of integers
@task
def task4(data_point: int) -> str:
return f"Task 4 processed {data_point}"
@task
def task5() -> str:
return "Task 5 executed"
@workflow
def my_workflow() -> list[str]:
res, allowed = task1()
task3_output = task3()
return (
conditional("task1_non_empty")
.if_(allowed.is_true())
.then(task5() >> map_task(task4)(data_point=task3_output))
.else_()
.then(task2())
)
if __name__ == "__main__":
print(my_workflow())
In this version:
- task3_output
is directly assigned the output of task3
.
- map_task(task4)
is used to process each element in task3_output
, which is a list of integers.
- The conditional
construct uses the boolean value allowed
to decide which path to take.
- The >>
operator is used to chain tasks, ensuring that task5
is executed before map_task(task4)
.
This setup ensures that the inputs and outputs are correctly connected and that the workflow logic is properly implemented.
Would you like more details on this example or any other aspect of Flyte workflows?
Sources:
- Flyte GitHub Issue #5044 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
10/23/2024, 5:35 AMjolly-florist-66728
10/23/2024, 5:35 AMtask5
and map_task(task4)
are executed when the condition is true, you need to make sure that both tasks are explicitly part of the execution chain. The >>
operator is used to define dependencies, but if task5
is not executing, it might be due to how the tasks are chained or how the conditional logic is structured.
Here's how you can modify the workflow to ensure both tasks are executed:
from flytekit import task, workflow, conditional, map_task
@task
def task1() -> tuple[list[str], bool]:
# Simulate task returning a list of strings and a boolean
data = ["item1", "item2", "item3"] # or [] for an empty list
return data, len(data) > 0
@task
def task2() -> list[str]:
return ["Task 2 executed"]
@task
def task3() -> list[int]:
return [1, 2, 3] # Example list of integers
@task
def task4(data_point: int) -> str:
return f"Task 4 processed {data_point}"
@task
def task5() -> str:
return "Task 5 executed"
@workflow
def my_workflow() -> list[str]:
res, allowed = task1()
task3_output = task3()
return (
conditional("task1_non_empty")
.if_(allowed.is_true())
.then(
task5() >> map_task(task4)(data_point=task3_output)
)
.else_()
.then(task2())
)
if __name__ == "__main__":
print(my_workflow())
In this setup:
- task5()
is explicitly called and chained with map_task(task4)
.
- The >>
operator ensures that task5
runs before map_task(task4)
, but both are part of the then
block and will execute if the condition is true.
- If allowed
is False
, task2()
is executed.
This approach ensures that both tasks are executed in the desired order when the condition is true. If task5
is still not executing, ensure that there are no other issues in the environment or configuration that might be affecting task execution.
Would you like more details on this example or any other aspect of Flyte workflows?
Sources:
- https://discuss.flyte.org/t/12352372/i-need-to-conditionally-run-some-tasks-based-on-an-output-of/
- https://docs.flyte.org/en/latest/user_guide/advanced_composition/conditionals.html 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
10/23/2024, 5:44 AMjolly-florist-66728
10/23/2024, 5:44 AMtask5
is executed along with map_task(task4)
when the condition is true, you need to make sure that both tasks are explicitly part of the execution chain. If task5
is not executing, it might be due to how the tasks are chained or how the conditional logic is structured.
Here's a revised approach to ensure both tasks are executed:
from flytekit import task, workflow, conditional, map_task
@task
def task1() -> tuple[list[str], bool]:
# Simulate task returning a list of strings and a boolean
data = ["item1", "item2", "item3"] # or [] for an empty list
return data, len(data) > 0
@task
def task2() -> list[str]:
return ["Task 2 executed"]
@task
def task3() -> list[int]:
return [1, 2, 3] # Example list of integers
@task
def task4(data_point: int) -> str:
return f"Task 4 processed {data_point}"
@task
def task5() -> str:
return "Task 5 executed"
@workflow
def my_workflow() -> list[str]:
res, allowed = task1()
task3_output = task3()
# Execute task5 and map_task(task4) independently
task5_result = task5()
task4_results = map_task(task4)(data_point=task3_output)
return (
conditional("task1_non_empty")
.if_(allowed.is_true())
.then([task5_result] + task4_results)
.else_()
.then(task2())
)
if __name__ == "__main__":
print(my_workflow())
In this setup:
- task5()
is executed independently and its result is stored in task5_result
.
- map_task(task4)
processes each element in task3_output
, and its results are stored in task4_results
.
- The then
block of the conditional
combines the results of task5
and map_task(task4)
into a single list, ensuring both are executed when the condition is true.
- If allowed
is False
, task2()
is executed.
This approach ensures that both task5
and map_task(task4)
are executed when the condition is true. If task5
is still not executing, ensure that there are no other issues in the environment or configuration that might be affecting task execution.
Would you like more details on this example or any other aspect of Flyte workflows?
Sources:
- https://github.com/flyteorg/flyte/issues/5044
- https://discuss.flyte.org/t/12355947/my-subworkflow-inside-a-flyte-conditional-receives-inputs-fr/ 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
10/23/2024, 5:47 AMjolly-florist-66728
10/23/2024, 5:47 AMPromise
object with a list. In Flyte, task outputs are Promise
objects, and you cannot directly concatenate them with lists. Instead, you should ensure that all elements being concatenated are lists.
To resolve this, you can modify the workflow to handle the Promise
objects correctly. Here's how you can do it:
from flytekit import task, workflow, conditional, map_task
@task
def task1() -> tuple[list[str], bool]:
# Simulate task returning a list of strings and a boolean
data = ["item1", "item2", "item3"] # or [] for an empty list
return data, len(data) > 0
@task
def task2() -> list[str]:
return ["Task 2 executed"]
@task
def task3() -> list[int]:
return [1, 2, 3] # Example list of integers
@task
def task4(data_point: int) -> str:
return f"Task 4 processed {data_point}"
@task
def task5() -> list[str]:
return ["Task 5 executed"]
@workflow
def my_workflow() -> list[str]:
res, allowed = task1()
task3_output = task3()
# Execute task5 and map_task(task4) independently
task5_result = task5()
task4_results = map_task(task4)(data_point=task3_output)
return (
conditional("task1_non_empty")
.if_(allowed.is_true())
.then(task5_result + task4_results)
.else_()
.then(task2())
)
if __name__ == "__main__":
print(my_workflow())
In this setup:
- task5()
now returns a list of strings to match the expected type for concatenation.
- task5_result
and task4_results
are both lists, allowing them to be concatenated in the then
block of the conditional
.
- The conditional
construct ensures that both task5
and map_task(task4)
are executed when the condition is true, and their results are combined into a single list.
This approach ensures that all elements being concatenated are lists, resolving the TypeError
.
Would you like more details on this example or any other aspect of Flyte workflows?
Sources:
- Flyte GitHub Issue #5044 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
10/23/2024, 5:50 AMjolly-florist-66728
10/23/2024, 5:50 AM+
operator on Promise
objects. In Flyte, task outputs are Promise
objects, and you cannot directly concatenate them using the +
operator. Instead, you should handle them in a way that respects their nature as deferred computation results.
To resolve this, you can modify the workflow to handle the Promise
objects correctly. Here's how you can do it:
from flytekit import task, workflow, conditional, map_task
@task
def task1() -> tuple[list[str], bool]:
# Simulate task returning a list of strings and a boolean
data = ["item1", "item2", "item3"] # or [] for an empty list
return data, len(data) > 0
@task
def task2() -> list[str]:
return ["Task 2 executed"]
@task
def task3() -> list[int]:
return [1, 2, 3] # Example list of integers
@task
def task4(data_point: int) -> str:
return f"Task 4 processed {data_point}"
@task
def task5() -> list[str]:
return ["Task 5 executed"]
@workflow
def my_workflow() -> list[str]:
res, allowed = task1()
task3_output = task3()
# Execute task5 and map_task(task4) independently
task5_result = task5()
task4_results = map_task(task4)(data_point=task3_output)
return (
conditional("task1_non_empty")
.if_(allowed.is_true())
.then([task5_result] + task4_results)
.else_()
.then(task2())
)
if __name__ == "__main__":
print(my_workflow())
In this setup:
- task5()
returns a list of strings, and map_task(task4)
returns a list of strings as well.
- Instead of using the +
operator, you should ensure that both task5_result
and task4_results
are lists and can be combined in a way that respects their Promise
nature.
- The conditional
construct ensures that both task5
and map_task(task4)
are executed when the condition is true, and their results are combined into a single list.
This approach ensures that all elements being combined are lists, resolving the TypeError
.
Would you like more details on this example or any other aspect of Flyte workflows?
Sources:
- https://github.com/flyteorg/flyte/issues/5296
- https://discuss.flyte.org/t/11995181/sorry-another-question-what-is-an-flyte-way-of-running-a-fly/ 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.victorious-jelly-28729
10/23/2024, 5:56 AMjolly-florist-66728
10/23/2024, 5:56 AMPromise
object with a list. In Flyte, task outputs are Promise
objects, and you cannot directly concatenate them with lists. Instead, you should ensure that all elements being concatenated are lists and handle Promise
objects appropriately.
To resolve this, you can modify the workflow to handle the Promise
objects correctly. Here's how you can do it:
from flytekit import task, workflow, conditional, map_task
@task
def task1() -> tuple[list[str], bool]:
# Simulate task returning a list of strings and a boolean
data = ["item1", "item2", "item3"] # or [] for an empty list
return data, len(data) > 0
@task
def task2() -> list[str]:
return ["Task 2 executed"]
@task
def task3() -> list[int]:
return [1, 2, 3] # Example list of integers
@task
def task4(data_point: int) -> str:
return f"Task 4 processed {data_point}"
@task
def task5() -> list[str]:
return ["Task 5 executed"]
@workflow
def my_workflow() -> list[str]:
res, allowed = task1()
task3_output = task3()
# Execute task5 and map_task(task4) independently
task5_result = task5()
task4_results = map_task(task4)(data_point=task3_output)
return (
conditional("task1_non_empty")
.if_(allowed.is_true())
.then(task5_result + [task4_results])
.else_()
.then(task2())
)
if __name__ == "__main__":
print(my_workflow())
In this setup:
- task5()
returns a list of strings, and map_task(task4)
returns a list of strings as well.
- Instead of using the +
operator directly on Promise
objects, ensure that both task5_result
and task4_results
are lists and can be combined in a way that respects their Promise
nature.
- The conditional
construct ensures that both task5
and map_task(task4)
are executed when the condition is true, and their results are combined into a single list.
This approach ensures that all elements being combined are lists, resolving the TypeError
.
Would you like more details on this example or any other aspect of Flyte workflows?
Sources:
- Flyte GitHub Issue #5044 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.