This message was deleted.
# ask-ai
u
This message was deleted.
j
To implement a Flyte workflow with conditional logic where
task1
is executed first, and based on its success, either
task3
or
task2
is executed, you can use the
conditional
construct. Here's an example:
Copy code
from flytekit import task, workflow, conditional

@task
def task1() -> bool:
    # Simulate task success or failure
    return True  # or False based on your logic

@task
def task2() -> str:
    return "Task 2 executed"

@task
def task3() -> str:
    return "Task 3 executed"

@workflow
def my_workflow() -> str:
    result = task1()
    return (
        conditional("task1_success")
        .if_(result.is_true())
        .then(task3())
        .else_()
        .then(task2())
    )

if __name__ == "__main__":
    print(my_workflow())
This code uses the
conditional
construct to decide which task to execute based on the result of
task1
. If
task1
returns
True
,
task3
is executed; otherwise,
task2
is executed. Would you like more details on this example or any other aspect of Flyte conditionals? Sources: - Flyte Conditional Documentation - Flyte Conditional Example Code 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, lets modify above example to include task4 which also needs to be called when task1 is success
j
To modify the example to include
task4
such that both
task3
and
task4
are executed when
task1
is successful, you can chain the tasks in the
then
block of the
conditional
. Here's how you can do it:
Copy code
from flytekit import task, workflow, conditional

@task
def task1() -> bool:
    # Simulate task success or failure
    return True  # or False based on your logic

@task
def task2() -> str:
    return "Task 2 executed"

@task
def task3() -> str:
    return "Task 3 executed"

@task
def task4() -> str:
    return "Task 4 executed"

@workflow
def my_workflow() -> str:
    result = task1()
    if result:
        task3()
        return task4()
    else:
        return task2()

if __name__ == "__main__":
    print(my_workflow())
In this modified workflow, if
task1
is successful, both
task3
and
task4
are executed sequentially. If
task1
fails, only
task2
is executed. Would you like more details on this example or any other aspect of Flyte workflows? Sources: - https://github.com/flyteorg/flyte/issues/5513 - https://github.com/flyteorg/flyte/issues/191 Note: I wasn’t able to find highly relevant data sources, but above are a few potentially relevant links. 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, why did you removed conditional in the last response ?
j
The removal of the
conditional
construct in the last response was an oversight. To correctly implement the logic using Flyte's
conditional
construct, you should ensure that both
task3
and
task4
are executed when
task1
is successful. Here's the corrected example using `conditional`:
Copy code
from flytekit import task, workflow, conditional

@task
def task1() -> bool:
    # Simulate task success or failure
    return True  # or False based on your logic

@task
def task2() -> str:
    return "Task 2 executed"

@task
def task3() -> str:
    return "Task 3 executed"

@task
def task4() -> str:
    return "Task 4 executed"

@workflow
def my_workflow() -> str:
    result = task1()
    return (
        conditional("task1_success")
        .if_(result.is_true())
        .then(task3() >> task4())
        .else_()
        .then(task2())
    )

if __name__ == "__main__":
    print(my_workflow())
In this example, the
conditional
construct is used to execute
task3
followed by
task4
if
task1
is successful. If
task1
fails,
task2
is executed. Would you like more details on this example or any other aspect of Flyte conditionals? Sources: - https://github.com/flyteorg/flyte/issues/5044 - https://github.com/flyteorg/flytesnacks/blob/master/examples/whylogs_plugin/whylogs_plugin/whylogs_example.py 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, modify the last response considering task4 as map_task and use create_node for task3
j
To modify the workflow such that
task4
is a
map_task
and
task3
uses
create_node
, you can structure the workflow as follows:
Copy code
from flytekit import task, workflow, conditional, map_task, create_node

@task
def task1() -> bool:
    # Simulate task success or failure
    return True  # or False based on your logic

@task
def task2() -> str:
    return "Task 2 executed"

@task
def task3() -> str:
    return "Task 3 executed"

@task
def task4(data_point: int) -> str:
    return f"Task 4 processed {data_point}"

@workflow
def my_workflow(data: list[int] = [1, 2, 3]) -> list[str]:
    result = task1()
    task3_node = create_node(task3)
    
    return (
        conditional("task1_success")
        .if_(result.is_true())
        .then(task3_node() >> map_task(task4)(data_point=data))
        .else_()
        .then(task2())
    )

if __name__ == "__main__":
    print(my_workflow())
In this example: -
task4
is executed as a
map_task
, processing each element in the
data
list. -
task3
is executed using
create_node
, which allows for more explicit control over task execution order. - The
conditional
construct ensures that both
task3
and the
map_task
for
task4
are executed if
task1
is successful. Otherwise,
task2
is executed. Would you like more details on this example or any other aspect of Flyte workflows? Sources: - Flyte Map Tasks Documentation 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, modify the last response where task3 return list[int] as response and task4 take task3 output as input
j
To modify the workflow such that
task3
returns a
list[int]
and
task4
takes this list as input, you can structure the workflow as follows:
Copy code
from flytekit import task, workflow, conditional, map_task, create_node

@task
def task1() -> bool:
    # Simulate task success or failure
    return True  # or False based on your logic

@task
def task2() -> str:
    return "Task 2 executed"

@task
def task3() -> list[int]:
    return [1, 2, 3]  # Example list of integers

@task
def task4(data_point: int) -> str:
    return f"Task 4 processed {data_point}"

@workflow
def my_workflow() -> list[str]:
    result = task1()
    task3_node = create_node(task3)
    
    return (
        conditional("task1_success")
        .if_(result.is_true())
        .then(task3_node() >> map_task(task4)(data_point=task3_node.o0))
        .else_()
        .then(task2())
    )

if __name__ == "__main__":
    print(my_workflow())
In this example: -
task3
returns a
list[int]
, which is then used as input for
task4
. -
task4
is executed as a
map_task
, processing each integer in the list returned by
task3
. - The
conditional
construct ensures that both
task3
and the
map_task
for
task4
are executed if
task1
is successful. Otherwise,
task2
is executed. Would you like more details on this example or any other aspect of Flyte workflows? Sources: - https://github.com/flyteorg/flyte/issues/4505 - https://docs.flyte.org/en/latest/user_guide/advanced_composition/map_tasks.html#id2 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325 Below is the error when i am trying run the above code. Running Execution on Remote. ╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮ │ /Users/satyavenu.m/Desktop/WORK/latest-workflow/ai-workflows/projects/test/worflow.py:29 in │ │ my_workflow │ │ │ │ 26 │ return ( │ │ 27 │ │ conditional("task1_success") │ │ 28 │ │ .if_(result.is_true()) │ │ ❱ 29 │ │ .then(task3_node >> map_task(task4)(data_point=task3_output)) │ │ 30 │ │ .else_() │ │ 31 │ │ .then(task2()) │ │ 32 │ ) │ │ │ │ /Users/satyavenu.m/flyte-env/lib/python3.12/site-packages/flytekit/core/node.py:82 in rshift │ │ │ │ 79 │ │ │ other._upstream_nodes.append(self) │ │ 80 │ │ │ 81 │ def __rshift__(self, other: Node): │ │ ❱ 82 │ │ self.runs_before(other) │ │ 83 │ │ return other │ │ 84 │ │ │ 85 │ @property │ │ │ │ /Users/satyavenu.m/flyte-env/lib/python3.12/site-packages/flytekit/core/node.py:78 in │ │ runs_before │ │ │ │ 75 │ │ other direction is not implemented to further avoid confusion. Right shift was p │ │ 76 │ │ because that's what most users are familiar with. │ │ 77 │ │ """ │ │ ❱ 78 │ │ if self not in other._upstream_nodes: │ │ 79 │ │ │ other._upstream_nodes.append(self) │ │ 80 │ │ │ 81 │ def __rshift__(self, other: Node): │ │ │ │ /Users/satyavenu.m/flyte-env/lib/python3.12/site-packages/flytekit/core/promise.py:559 in │ │ iter │ │ │ │ 556 │ │ Flyte/kit (as of https://github.com/flyteorg/flyte/issues/3864) supports indexin │ │ 557 │ │ But it still doesn't make sense to │ │ 558 │ │ """ │ │ ❱ 559 │ │ raise ValueError( │ │ 560 │ │ │ f" {self.var} is a Promise. Promise objects are not iterable - can't range() │ │ 561 │ │ │ " But you can use [index] or the alpha version of @eager workflows" │ │ 562 │ │ ) │ ╰──────────────────────────────────────────────────────────────────────────────────────────────────╯ ValueError: o0 is a Promise. Promise objects are not iterable - can't range() over a promise. But you can use [index] or the alpha version of @eager workflows Value Error: Error encountered while executing 'my_workflow': o0 is a Promise. Promise objects are not iterable - can't range() over a promise. But you can use [index] or the alpha version of @eager workflows
j
The error you're encountering is due to attempting to iterate over a
Promise
object. In Flyte, task outputs are
Promise
objects, and you cannot directly iterate over them. Instead, you should use the
Promise
object as input to another task or map task. To resolve this, ensure that the output of
task3
is correctly passed to
task4
using
map_task
. Here's how you can modify the workflow:
Copy code
from flytekit import task, workflow, conditional, map_task, create_node

@task
def task1() -> bool:
    # Simulate task success or failure
    return True  # or False based on your logic

@task
def task2() -> str:
    return "Task 2 executed"

@task
def task3() -> list[int]:
    return [1, 2, 3]  # Example list of integers

@task
def task4(data_point: int) -> str:
    return f"Task 4 processed {data_point}"

@workflow
def my_workflow() -> list[str]:
    result = task1()
    task3_node = create_node(task3)
    
    task3_output = task3_node.o0  # Access the output of task3_node

    return (
        conditional("task1_success")
        .if_(result.is_true())
        .then(map_task(task4)(data_point=task3_output))
        .else_()
        .then(task2())
    )

if __name__ == "__main__":
    print(my_workflow())
In this corrected version: -
task3_output
is used to access the output of
task3_node
. -
map_task(task4)
is called with
data_point=task3_output
, which correctly passes the list of integers to
task4
. Would you like more details on this example or any other aspect of Flyte workflows? Sources: - https://github.com/flyteorg/flyte/issues/5180 - https://github.com/flyteorg/flyte/issues/5044 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, Request rejected by the API, due to Invalid input. RPC Failed, with Status: StatusCode.INVALID_ARGUMENT details: failed to compile workflow for [resource_type:WORKFLOW project:"kaas" domain:"development" name:"worflow.my_workflow" version:"jTn0NuKefd3bQ1fAqobORQ"] with err failed to compile workflow with err Collected Errors: 5 Error 0: Code: MismatchingTypes, Node Id: n2-n1, Description: Variable [o0] (type [collection_type{simpleSTRING}]) doesn't match expected type [simple:STRING]. Error 1: Code: ParameterNotBound, Node Id: end-node, Description: Parameter not bound [o0]. Error 2: Code: UnreachableNodes, Node Id: start-node, Description: The Workflow contain unreachable nodes [n2]. Error 3: Code: ValueRequired, Node Id: n2, Description: Value required [n0.o0]. Error 4: Code: VariableNameNotFound, Node Id: n2, Description: Variable [o0] not found on node [n2].
j
The error message indicates several issues with the workflow configuration. Let's address each error: 1. MismatchingTypes: This error suggests that the output of a node (
n2-n1
) is expected to be a
simple:STRING
, but it is currently a
collection_type:{simple:STRING}
. This typically happens when a task returns a list of strings, but the workflow expects a single string. Ensure that the task and workflow interface types match. 2. ParameterNotBound: This error indicates that a parameter (
o0
) is not bound to any task or workflow output. Ensure that all parameters are correctly connected and that the workflow's output is properly defined. 3. UnreachableNodes: This error means that there are nodes in the workflow that are not connected to the start node, making them unreachable. Check the workflow graph to ensure all nodes are properly connected. 4. ValueRequired: This error suggests that a required value (
n0.o0
) is missing. Ensure that all required inputs are provided and correctly connected. 5. VariableNameNotFound: This error indicates that a variable (
o0
) is not found on node
n2
. Ensure that the variable names are correctly defined and used consistently across the workflow. To resolve these issues, review the workflow and task definitions to ensure that: - The types of inputs and outputs match between tasks and the workflow. - All parameters are correctly bound and connected. - The workflow graph is fully connected, with no isolated nodes. - All required values are provided and correctly referenced. If you need further assistance, please provide more details about the specific tasks and workflow structure. Sources: - Flyte GitHub Issue #5473 - Flyte GitHub Issue #5180 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, Below is the error when i run the last code which u shared. Request rejected by the API, due to Invalid input. RPC Failed, with Status: StatusCode.INVALID_ARGUMENT details: failed to compile workflow for [resource_type:WORKFLOW project:"kaas" domain:"development" name:"worflow.my_workflow" version:"jTn0NuKefd3bQ1fAqobORQ"] with err failed to compile workflow with err Collected Errors: 5 Error 0: Code: MismatchingTypes, Node Id: n2-n1, Description: Variable [o0] (type [collection_type{simpleSTRING}]) doesn't match expected type [simple:STRING]. Error 1: Code: ParameterNotBound, Node Id: end-node, Description: Parameter not bound [o0]. Error 2: Code: UnreachableNodes, Node Id: start-node, Description: The Workflow contain unreachable nodes [n2]. Error 3: Code: ValueRequired, Node Id: n2, Description: Value required [n0.o0]. Error 4: Code: VariableNameNotFound, Node Id: n2, Description: Variable [o0] not found on node [n2].
j
The errors you're encountering suggest several issues with the workflow configuration. Let's address each error and provide a revised code example: 1. MismatchingTypes: This error indicates that the output of a node is expected to be a
simple:STRING
, but it is currently a
collection_type:{simple:STRING}
. This typically happens when a task returns a list of strings, but the workflow expects a single string. Ensure that the task and workflow interface types match. 2. ParameterNotBound: This error indicates that a parameter (
o0
) is not bound to any task or workflow output. Ensure that all parameters are correctly connected and that the workflow's output is properly defined. 3. UnreachableNodes: This error means that there are nodes in the workflow that are not connected to the start node, making them unreachable. Check the workflow graph to ensure all nodes are properly connected. 4. ValueRequired: This error suggests that a required value (
n0.o0
) is missing. Ensure that all required inputs are provided and correctly connected. 5. VariableNameNotFound: This error indicates that a variable (
o0
) is not found on node
n2
. Ensure that the variable names are correctly defined and used consistently across the workflow. Here's a revised version of the workflow that addresses these issues:
Copy code
from flytekit import task, workflow, conditional, map_task

@task
def task1() -> bool:
    # Simulate task success or failure
    return True  # or False based on your logic

@task
def task2() -> str:
    return "Task 2 executed"

@task
def task3() -> list[int]:
    return [1, 2, 3]  # Example list of integers

@task
def task4(data_point: int) -> str:
    return f"Task 4 processed {data_point}"

@workflow
def my_workflow() -> list[str]:
    result = task1()
    task3_output = task3()
    
    return (
        conditional("task1_success")
        .if_(result.is_true())
        .then(map_task(task4)(data_point=task3_output))
        .else_()
        .then([task2()])
    )

if __name__ == "__main__":
    print(my_workflow())
In this revised version: -
task3_output
is directly assigned the output of
task3
. -
map_task(task4)
is called with
data_point=task3_output
, which correctly passes the list of integers to
task4
. - The
else_()
block returns a list containing the result of
task2()
to match the expected return type of
list[str]
. Would you like more details on this example or any other aspect of Flyte workflows? Sources: - Flyte GitHub Issue #5473 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, For the below code, getting this error. Error: Request rejected by the API, due to Invalid input. RPC Failed, with Status: StatusCode.INVALID_ARGUMENT details: failed to compile workflow for [resource_type:WORKFLOW project:"kaas" domain:"development" name:"worflow.my_workflow" version:"UlOsQZjC0AbM41oUy6zD_A"] with err failed to compile workflow with err Collected Errors: 3 Error 0: Code: MismatchingTypes, Node Id: n2-n1, Description: Variable [o0] (type [collection_type{simpleSTRING}]) doesn't match expected type [simple:STRING]. Error 1: Code: UnreachableNodes, Node Id: start-node, Description: The Workflow contain unreachable nodes [n2]. Error 2: Code: ValueRequired, Node Id: n2, Description: Value required [n0.o0]. Code: from flytekit import task, workflow, conditional, map_task from flytekit.core.node_creation import create_node @task def task1() -> bool: # Simulate task success or failure return True # or False based on your logic @task def task2() -> str: return "Task 2 executed" @task def task3() -> list[int]: return [1, 2, 3] # Example list of integers @task def task4(data_point: int) -> str: return f"Task 4 processed {data_point}" @workflow def my_workflow(): result = task1() #task3_node = create_node(task3) task3_output = task3() return ( conditional("task1_success") .if_(result.is_true()) .then(map_task(task4)(data_point=task3_output)) .else_() .then(task2()) ) if name == "__main__": print(my_workflow())
j
The errors you're encountering are due to type mismatches and workflow structure issues. Let's address each error and provide a revised code example: 1. MismatchingTypes: The error indicates that the output of the
else_()
block is expected to be a
list[str]
, but
task2()
returns a
str
. To resolve this, ensure that the
else_()
block returns a list. 2. UnreachableNodes: This error suggests that there are nodes in the workflow that are not connected to the start node. Ensure that all nodes are properly connected. 3. ValueRequired: This error indicates that a required value is missing. Ensure that all required inputs are provided and correctly connected. Here's a revised version of the workflow that addresses these issues:
Copy code
from flytekit import task, workflow, conditional, map_task

@task
def task1() -> bool:
    # Simulate task success or failure
    return True  # or False based on your logic

@task
def task2() -> str:
    return "Task 2 executed"

@task
def task3() -> list[int]:
    return [1, 2, 3]  # Example list of integers

@task
def task4(data_point: int) -> str:
    return f"Task 4 processed {data_point}"

@workflow
def my_workflow() -> list[str]:
    result = task1()
    task3_output = task3()
    
    return (
        conditional("task1_success")
        .if_(result.is_true())
        .then(map_task(task4)(data_point=task3_output))
        .else_()
        .then([task2()])  # Return a list to match the expected type
    )

if __name__ == "__main__":
    print(my_workflow())
In this revised version: - The
else_()
block returns a list containing the result of
task2()
to match the expected return type of
list[str]
. - The
map_task(task4)
is correctly called with
data_point=task3_output
, which passes the list of integers to
task4
. Would you like more details on this example or any other aspect of Flyte workflows? Sources: - Flyte GitHub Issue #5473 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, Thanks Working code: from flytekit import task, workflow, conditional, map_task from flytekit.core.node_creation import create_node @task def task1() -> bool: # Simulate task success or failure return True @task def task2() -> list[str]: return ["Task 2 executed"] @task def task3() -> list[int]: return [1, 2, 3] # Example list of integers @task def task4(data_point: int) -> str: return f"Task 4 processed {data_point}" @workflow def my_workflow(): result = task1() #task3_node = create_node(task3) task3_output = task3() return ( conditional("task1_success") .if_(result.is_true()) .then(map_task(task4)(data_point=task3_output)) .else_() .then(task2()) ) if name == "__main__": print(my_workflow())
j
Great to hear that the code is working for you! The changes you made ensure that the types are consistent and that the workflow logic is correctly implemented. Here's a quick summary of the key points in your working code: - Task Definitions: -
task1
returns a boolean indicating success or failure. -
task2
returns a list of strings, which matches the expected output type for the
else_()
branch. -
task3
returns a list of integers, which is used as input for
task4
. -
task4
processes each integer from
task3
and returns a string. - Workflow Logic: - The
conditional
construct is used to decide which path to take based on the result of
task1
. - If
task1
is successful (
True
),
map_task(task4)
processes each integer from
task3_output
. - If
task1
fails (
False
),
task2
is executed, returning a list of strings. This setup ensures that the workflow is both type-safe and logically sound. If you have any more questions or need further assistance, feel free to ask! Sources: - https://discuss.flyte.org/t/8406760/we-have-a-conditional-branch-that-takes-the-output-of-two-ta/ - https://discuss.flyte.org/t/12352372/i-need-to-conditionally-run-some-tasks-based-on-an-output-of/ 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, In the above code if change task t return list of strings, then in conditional if case how should i check, is there length method available where i can check if length > 0 then execute map task4 or else task2
j
In Flyte, you can use the
len()
function to check the length of a list returned by a task. If you want to execute
map_task(task4)
when the length of the list is greater than 0, and
task2
otherwise, you can modify the workflow as follows:
Copy code
from flytekit import task, workflow, conditional, map_task

@task
def task1() -> list[str]:
    # Simulate task returning a list of strings
    return ["item1", "item2", "item3"]  # or [] for an empty list

@task
def task2() -> list[str]:
    return ["Task 2 executed"]

@task
def task3() -> list[int]:
    return [1, 2, 3]  # Example list of integers

@task
def task4(data_point: int) -> str:
    return f"Task 4 processed {data_point}"

@workflow
def my_workflow() -> list[str]:
    result = task1()
    task3_output = task3()
    
    return (
        conditional("task1_non_empty")
        .if_(len(result) > 0)
        .then(map_task(task4)(data_point=task3_output))
        .else_()
        .then(task2())
    )

if __name__ == "__main__":
    print(my_workflow())
In this modified workflow: -
task1
returns a list of strings. - The
conditional
construct checks if the length of the list returned by
task1
is greater than 0 using
len(result) > 0
. - If the condition is true,
map_task(task4)
is executed with
task3_output
. - If the condition is false,
task2
is executed. This setup allows you to conditionally execute tasks based on the length of a list returned by a task. Would you like more details on this example or any other aspect of Flyte workflows? Sources: - https://github.com/flyteorg/flyte/issues/5044 - https://github.com/flyteorg/flyte/blob/master/docs/user_guide/advanced_composition/conditionals.md 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, TypeError: object of type 'Promise' has no len() Failed with Exception <class 'TypeError'> Reason: Error encountered while executing 'my_workflow': object of type 'Promise' has no len()
j
The error you're encountering is due to attempting to use the
len()
function on a
Promise
object. In Flyte, task outputs are
Promise
objects, and you cannot directly use Python functions like
len()
on them. Instead, you should use Flyte's conditional constructs to handle such logic. To check the length of a list returned by a task and conditionally execute other tasks, you can modify the workflow to use a separate task to evaluate the length. Here's how you can do it:
Copy code
from flytekit import task, workflow, conditional, map_task

@task
def task1() -> list[str]:
    # Simulate task returning a list of strings
    return ["item1", "item2", "item3"]  # or [] for an empty list

@task
def task2() -> list[str]:
    return ["Task 2 executed"]

@task
def task3() -> list[int]:
    return [1, 2, 3]  # Example list of integers

@task
def task4(data_point: int) -> str:
    return f"Task 4 processed {data_point}"

@task
def check_length(data: list[str]) -> bool:
    return len(data) > 0

@workflow
def my_workflow() -> list[str]:
    result = task1()
    is_non_empty = check_length(data=result)
    task3_output = task3()
    
    return (
        conditional("task1_non_empty")
        .if_(is_non_empty.is_true())
        .then(map_task(task4)(data_point=task3_output))
        .else_()
        .then(task2())
    )

if __name__ == "__main__":
    print(my_workflow())
In this revised version: - A new task
check_length
is introduced to evaluate the length of the list returned by
task1
. - The
check_length
task returns a boolean indicating whether the list is non-empty. - The
conditional
construct uses the result of
check_length
to decide which path to take. This approach ensures that you can conditionally execute tasks based on the length of a list without directly using
len()
on a
Promise
. Would you like more details on this example or any other aspect of Flyte workflows? Sources: - Flyte GitHub Issue #5044 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, change task1 to return list and bool, if true then it has some list or otherwise it will empty list
j
To modify
task1
to return both a list and a boolean indicating whether the list is non-empty, you can structure the task to return a tuple. Here's how you can implement this in the workflow:
Copy code
from flytekit import task, workflow, conditional, map_task

@task
def task1() -> tuple[list[str], bool]:
    # Simulate task returning a list of strings and a boolean
    data = ["item1", "item2", "item3"]  # or [] for an empty list
    return data, len(data) > 0

@task
def task2() -> list[str]:
    return ["Task 2 executed"]

@task
def task3() -> list[int]:
    return [1, 2, 3]  # Example list of integers

@task
def task4(data_point: int) -> str:
    return f"Task 4 processed {data_point}"

@workflow
def my_workflow() -> list[str]:
    data, is_non_empty = task1()
    task3_output = task3()
    
    return (
        conditional("task1_non_empty")
        .if_(is_non_empty.is_true())
        .then(map_task(task4)(data_point=task3_output))
        .else_()
        .then(task2())
    )

if __name__ == "__main__":
    print(my_workflow())
In this version: -
task1
returns a tuple containing a list of strings and a boolean. The boolean indicates whether the list is non-empty. - The
conditional
construct uses the boolean value
is_non_empty
to decide which path to take. - If
is_non_empty
is
True
,
map_task(task4)
is executed with
task3_output
. - If
is_non_empty
is
False
,
task2
is executed. This setup allows you to conditionally execute tasks based on whether
task1
returns a non-empty list. Would you like more details on this example or any other aspect of Flyte workflows? Sources: - https://discuss.flyte.org/t/12352372/i-need-to-conditionally-run-some-tasks-based-on-an-output-of/ - https://github.com/flyteorg/flyte/issues/5044 Note: I wasn’t able to find highly relevant data sources, but above are a few potentially relevant links. 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, ValueError: o0 is a Promise. Promise objects are not iterable - can't range() over a promise. But you can use [index] or the alpha version of @eager workflows Value Error: Error encountered while executing 'my_workflow': o0 is a Promise. Promise objects are not iterable - can't range() over a promise. But you can use [index] or the alpha version of @eager workflows
j
The error you're encountering is due to attempting to iterate over a
Promise
object. In Flyte, task outputs are
Promise
objects, and you cannot directly iterate over them. Instead, you should use Flyte's constructs to handle such logic. To resolve this, ensure that you are using
map_task
correctly, which is designed to handle lists of
Promise
objects. Here's how you can modify the workflow to avoid this error:
Copy code
from flytekit import task, workflow, conditional, map_task

@task
def task1() -> tuple[list[str], bool]:
    # Simulate task returning a list of strings and a boolean
    data = ["item1", "item2", "item3"]  # or [] for an empty list
    return data, len(data) > 0

@task
def task2() -> list[str]:
    return ["Task 2 executed"]

@task
def task3() -> list[int]:
    return [1, 2, 3]  # Example list of integers

@task
def task4(data_point: int) -> str:
    return f"Task 4 processed {data_point}"

@workflow
def my_workflow() -> list[str]:
    data, is_non_empty = task1()
    task3_output = task3()
    
    return (
        conditional("task1_non_empty")
        .if_(is_non_empty.is_true())
        .then(map_task(task4)(data_point=task3_output))
        .else_()
        .then(task2())
    )

if __name__ == "__main__":
    print(my_workflow())
In this version: -
task1
returns a tuple containing a list of strings and a boolean. The boolean indicates whether the list is non-empty. - The
conditional
construct uses the boolean value
is_non_empty
to decide which path to take. -
map_task(task4)
is used to process each element in
task3_output
, which is a list of integers. Ensure that
map_task
is used correctly to handle lists of
Promise
objects, and avoid directly iterating over
Promise
objects in your workflow logic. Would you like more details on this example or any other aspect of Flyte workflows? Sources: - Flyte GitHub Issue #5044 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, For the below code from flytekit import task, workflow, conditional, map_task from flytekit.core.node_creation import create_node @task def task1() -> tuple[list[str], bool]: # Simulate task success or failure data = ["item1", "item2", "item3"] # or [] for an empty list return data, len(data) > 0 @task def task2() -> list[str]: return ["Task 2 executed"] @task def task3() -> list[int]: return [1, 2, 3] # Example list of integers @task def task4(data_point: int) -> str: return f"Task 4 processed {data_point}" @task def task5() -> str: return "Task 5" @workflow def my_workflow(): res, allowed = task1() task3_node = create_node(task3) #task3_output = task3() task5_node = create_node(task5) map_task4_node = create_node(task4)(data_point=task3_output) return ( conditional("task1_success") .if_(allowed.is_true()) .then(task3_node >> task5_node >> map_task4_node) .else_() .then(task2()) ) if name == "__main__": print(my_workflow()) FlyteAssertion: USERAssertionError error=Input data_point of type <class 'int'> was not specified for function workflow.task4 Traceback: File "/Users/xxx.m/flyte-env/lib/python3.12/site-packages/flytekit/exceptions/scopes.py", line 217, in user_entry_point return wrapped(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/xxx.m/Desktop/WORK/latest-workflow/ai-workflows/projects/test/workflow.py", line 32, in my_workflow map_task4_node = create_node(task4)(data_point=task3_output) ^^^^^^^^^^^^^^^^^^ File "/Users/xxx.m/flyte-env/lib/python3.12/site-packages/flytekit/core/node_creation.py", line 80, in create_node outputs = entity(**kwargs) ^^^^^^^^^^^^^^^^ File "/Users/xxx.m/flyte-env/lib/python3.12/site-packages/flytekit/core/base_task.py", line 359, in call return flyte_entity_call_handler(self, *args, **kwargs) # type: ignore ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/xxx.m/flyte-env/lib/python3.12/site-packages/flytekit/core/promise.py", line 1229, in flyte_entity_call_handler return create_and_link_node(ctx, entity=entity, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/xxx.m/flyte-env/lib/python3.12/site-packages/flytekit/core/promise.py", line 1124, in create_and_link_node raise _user_exceptions.FlyteAssertion(error_msg)
j
The error you're encountering is due to the incorrect use of
create_node
and the handling of task outputs. Specifically, the error indicates that the input
data_point
for
task4
was not specified correctly. Let's address this by ensuring that the task outputs are properly connected and used. Here's how you can modify the workflow to resolve the issue:
Copy code
from flytekit import task, workflow, conditional, map_task

@task
def task1() -> tuple[list[str], bool]:
    # Simulate task returning a list of strings and a boolean
    data = ["item1", "item2", "item3"]  # or [] for an empty list
    return data, len(data) > 0

@task
def task2() -> list[str]:
    return ["Task 2 executed"]

@task
def task3() -> list[int]:
    return [1, 2, 3]  # Example list of integers

@task
def task4(data_point: int) -> str:
    return f"Task 4 processed {data_point}"

@task
def task5() -> str:
    return "Task 5"

@workflow
def my_workflow() -> list[str]:
    res, allowed = task1()
    task3_output = task3()
    
    return (
        conditional("task1_success")
        .if_(allowed.is_true())
        .then(task5() >> map_task(task4)(data_point=task3_output))
        .else_()
        .then(task2())
    )

if __name__ == "__main__":
    print(my_workflow())
In this revised version: -
task3_output
is directly assigned the output of
task3
. -
map_task(task4)
is used to process each element in
task3_output
, which is a list of integers. - The
conditional
construct uses the boolean value
allowed
to decide which path to take. - The
>>
operator is used to chain tasks, ensuring that
task5
is executed before
map_task(task4)
. This setup ensures that the inputs and outputs are correctly connected and that the workflow logic is properly implemented. Would you like more details on this example or any other aspect of Flyte workflows? Sources: - https://github.com/flyteorg/flyte/blob/master/docs/user_guide/advanced_composition/map_tasks.md - https://github.com/flyteorg/flyte/issues/5684 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, it ran successfully but task5 in the if condition didt executed !! Is it in the if condition only one task executes which ever is the last because after changing to below code only task5 executed
Copy code
.if_(allowed.is_true())
        .then( map_task(task4)(data_point=task3_output) >> task5())
j
In Flyte, when you use the
>>
operator to chain tasks, it defines the order of execution, but it does not mean that all tasks in the chain will be executed as part of the conditional block. The
then
block in a
conditional
construct will execute the entire chain, but the order of tasks in the chain matters. If you want both
task5
and
map_task(task4)
to execute when the condition is true, you need to ensure that both tasks are part of the execution chain. The order in which you place them in the chain determines which task runs first. Here's how you can ensure both tasks are executed:
Copy code
from flytekit import task, workflow, conditional, map_task

@task
def task1() -> tuple[list[str], bool]:
    # Simulate task returning a list of strings and a boolean
    data = ["item1", "item2", "item3"]  # or [] for an empty list
    return data, len(data) > 0

@task
def task2() -> list[str]:
    return ["Task 2 executed"]

@task
def task3() -> list[int]:
    return [1, 2, 3]  # Example list of integers

@task
def task4(data_point: int) -> str:
    return f"Task 4 processed {data_point}"

@task
def task5() -> str:
    return "Task 5 executed"

@workflow
def my_workflow() -> list[str]:
    res, allowed = task1()
    task3_output = task3()
    
    return (
        conditional("task1_success")
        .if_(allowed.is_true())
        .then(task5() >> map_task(task4)(data_point=task3_output))
        .else_()
        .then(task2())
    )

if __name__ == "__main__":
    print(my_workflow())
In this setup: -
task5()
is executed first, followed by
map_task(task4)
. - The
>>
operator ensures that
task5
runs before
map_task(task4)
, but both are part of the
then
block and will execute if the condition is true. If you want
map_task(task4)
to execute first, simply reverse the order in the chain:
Copy code
.then(map_task(task4)(data_point=task3_output) >> task5())
This ensures that both tasks are executed in the desired order when the condition is true. Would you like more details on this example or any other aspect of Flyte workflows? Sources: - https://docs.flyte.org/en/latest/user_guide/advanced_composition/conditionals.html#id2 - https://github.com/flyteorg/flytesnacks/blob/master/examples/advanced_composition/advanced_composition/conditional.py 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, when i run the above code which u have shared, in this task5 is not run/executed
j
The issue you're encountering is likely due to the way the tasks are chained. In Flyte, when you use the
>>
operator, it defines a dependency, meaning the first task must complete before the second task starts. However, if the second task does not have any inputs that depend on the first task's outputs, it might not execute as expected in a conditional block. To ensure both
task5
and
map_task(task4)
are executed, you can explicitly call both tasks in the
then
block without chaining them. Here's how you can modify the workflow:
Copy code
from flytekit import task, workflow, conditional, map_task

@task
def task1() -> tuple[list[str], bool]:
    # Simulate task returning a list of strings and a boolean
    data = ["item1", "item2", "item3"]  # or [] for an empty list
    return data, len(data) > 0

@task
def task2() -> list[str]:
    return ["Task 2 executed"]

@task
def task3() -> list[int]:
    return [1, 2, 3]  # Example list of integers

@task
def task4(data_point: int) -> str:
    return f"Task 4 processed {data_point}"

@task
def task5() -> str:
    return "Task 5 executed"

@workflow
def my_workflow() -> list[str]:
    res, allowed = task1()
    task3_output = task3()
    
    if allowed:
        task5()  # Explicitly call task5
        return map_task(task4)(data_point=task3_output)
    else:
        return task2()

if __name__ == "__main__":
    print(my_workflow())
In this version: -
task5()
is explicitly called when
allowed
is
True
. -
map_task(task4)
is executed with
task3_output
as input. - The
else
block executes
task2()
if
allowed
is
False
. This setup ensures that both
task5
and
map_task(task4)
are executed when the condition is true. Would you like more details on this example or any other aspect of Flyte workflows? Sources: - Flyte Conditional Documentation - Flyte Conditional Example Code 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, no i need using conditionals
j
To ensure that both
task5
and
map_task(task4)
are executed within a
conditional
block, you can use a
create_node
approach to explicitly define the execution order. Here's how you can modify the workflow using `conditional`:
Copy code
from flytekit import task, workflow, conditional, map_task, create_node

@task
def task1() -> tuple[list[str], bool]:
    # Simulate task returning a list of strings and a boolean
    data = ["item1", "item2", "item3"]  # or [] for an empty list
    return data, len(data) > 0

@task
def task2() -> list[str]:
    return ["Task 2 executed"]

@task
def task3() -> list[int]:
    return [1, 2, 3]  # Example list of integers

@task
def task4(data_point: int) -> str:
    return f"Task 4 processed {data_point}"

@task
def task5() -> str:
    return "Task 5 executed"

@workflow
def my_workflow() -> list[str]:
    res, allowed = task1()
    task3_output = task3()
    
    task5_node = create_node(task5)
    map_task4_node = map_task(task4)(data_point=task3_output)
    
    return (
        conditional("task1_non_empty")
        .if_(allowed.is_true())
        .then(task5_node() >> map_task4_node)
        .else_()
        .then(task2())
    )

if __name__ == "__main__":
    print(my_workflow())
In this setup: -
task5_node
is created using
create_node
to ensure it is part of the execution chain. -
map_task4_node
is executed after
task5_node
using the
>>
operator to define the order. - The
conditional
construct ensures that both
task5
and
map_task(task4)
are executed if
allowed
is
True
. - If
allowed
is
False
,
task2()
is executed. This approach ensures that both tasks are executed in the desired order when the condition is true. Would you like more details on this example or any other aspect of Flyte workflows? Sources: - Flyte Conditional Example Code 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, from flytekit import task, workflow, conditional, map_task from flytekit.core.node_creation import create_node @task def task1() -> tuple[list[str], bool]: # Simulate task returning a list of strings and a boolean data = ["item1", "item2", "item3"] # or [] for an empty list return data, len(data) > 0 @task def task2() -> list[str]: return ["Task 2 executed"] @task def task3() -> list[int]: return [1, 2, 3] # Example list of integers @task def task4(data_point: int) -> str: return f"Task 4 processed {data_point}" @task def task5() -> str: return "Task 5 executed" @workflow def my_workflow() -> list[str]: res, allowed = task1() task3_node = create_node(task3) task3_output = task3_node.outputs["o0"] task5_node = create_node(task5) #map_task4_node = map_task(task4)(data_point=task3_output) return ( conditional("task1_non_empty") .if_(allowed.is_true()) .then(task3_node >> map_task(task4)(data_point=task3_output)) .else_() .then(task2()) ) if name == "__main__": print(my_workflow()) ValueError: o0 is a Promise. Promise objects are not iterable - can't range() over a promise. But you can use [index] or the alpha version of @eager workflows Value Error: Error encountered while executing 'my_workflow': o0 is a Promise. Promise objects are not iterable - can't range() over a promise. But you can use [index] or the alpha version of @eager workflows
j
The error you're encountering is due to attempting to iterate over a
Promise
object. In Flyte, task outputs are
Promise
objects, and you cannot directly iterate over them. Instead, you should use Flyte's constructs to handle such logic. To resolve this, ensure that you are using
map_task
correctly, which is designed to handle lists of
Promise
objects. Here's how you can modify the workflow to avoid this error:
Copy code
from flytekit import task, workflow, conditional, map_task

@task
def task1() -> tuple[list[str], bool]:
    # Simulate task returning a list of strings and a boolean
    data = ["item1", "item2", "item3"]  # or [] for an empty list
    return data, len(data) > 0

@task
def task2() -> list[str]:
    return ["Task 2 executed"]

@task
def task3() -> list[int]:
    return [1, 2, 3]  # Example list of integers

@task
def task4(data_point: int) -> str:
    return f"Task 4 processed {data_point}"

@task
def task5() -> str:
    return "Task 5 executed"

@workflow
def my_workflow() -> list[str]:
    res, allowed = task1()
    task3_output = task3()
    
    return (
        conditional("task1_non_empty")
        .if_(allowed.is_true())
        .then(task5() >> map_task(task4)(data_point=task3_output))
        .else_()
        .then(task2())
    )

if __name__ == "__main__":
    print(my_workflow())
In this version: -
task3_output
is directly assigned the output of
task3
. -
map_task(task4)
is used to process each element in
task3_output
, which is a list of integers. - The
conditional
construct uses the boolean value
allowed
to decide which path to take. - The
>>
operator is used to chain tasks, ensuring that
task5
is executed before
map_task(task4)
. This setup ensures that the inputs and outputs are correctly connected and that the workflow logic is properly implemented. Would you like more details on this example or any other aspect of Flyte workflows? Sources: - Flyte GitHub Issue #5044 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, with the above code task5 not executing
j
To ensure that both
task5
and
map_task(task4)
are executed when the condition is true, you need to make sure that both tasks are explicitly part of the execution chain. The
>>
operator is used to define dependencies, but if
task5
is not executing, it might be due to how the tasks are chained or how the conditional logic is structured. Here's how you can modify the workflow to ensure both tasks are executed:
Copy code
from flytekit import task, workflow, conditional, map_task

@task
def task1() -> tuple[list[str], bool]:
    # Simulate task returning a list of strings and a boolean
    data = ["item1", "item2", "item3"]  # or [] for an empty list
    return data, len(data) > 0

@task
def task2() -> list[str]:
    return ["Task 2 executed"]

@task
def task3() -> list[int]:
    return [1, 2, 3]  # Example list of integers

@task
def task4(data_point: int) -> str:
    return f"Task 4 processed {data_point}"

@task
def task5() -> str:
    return "Task 5 executed"

@workflow
def my_workflow() -> list[str]:
    res, allowed = task1()
    task3_output = task3()
    
    return (
        conditional("task1_non_empty")
        .if_(allowed.is_true())
        .then(
            task5() >> map_task(task4)(data_point=task3_output)
        )
        .else_()
        .then(task2())
    )

if __name__ == "__main__":
    print(my_workflow())
In this setup: -
task5()
is explicitly called and chained with
map_task(task4)
. - The
>>
operator ensures that
task5
runs before
map_task(task4)
, but both are part of the
then
block and will execute if the condition is true. - If
allowed
is
False
,
task2()
is executed. This approach ensures that both tasks are executed in the desired order when the condition is true. If
task5
is still not executing, ensure that there are no other issues in the environment or configuration that might be affecting task execution. Would you like more details on this example or any other aspect of Flyte workflows? Sources: - https://discuss.flyte.org/t/12352372/i-need-to-conditionally-run-some-tasks-based-on-an-output-of/ - https://docs.flyte.org/en/latest/user_guide/advanced_composition/conditionals.html 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, still task5 not executed
j
To ensure that
task5
is executed along with
map_task(task4)
when the condition is true, you need to make sure that both tasks are explicitly part of the execution chain. If
task5
is not executing, it might be due to how the tasks are chained or how the conditional logic is structured. Here's a revised approach to ensure both tasks are executed:
Copy code
from flytekit import task, workflow, conditional, map_task

@task
def task1() -> tuple[list[str], bool]:
    # Simulate task returning a list of strings and a boolean
    data = ["item1", "item2", "item3"]  # or [] for an empty list
    return data, len(data) > 0

@task
def task2() -> list[str]:
    return ["Task 2 executed"]

@task
def task3() -> list[int]:
    return [1, 2, 3]  # Example list of integers

@task
def task4(data_point: int) -> str:
    return f"Task 4 processed {data_point}"

@task
def task5() -> str:
    return "Task 5 executed"

@workflow
def my_workflow() -> list[str]:
    res, allowed = task1()
    task3_output = task3()
    
    # Execute task5 and map_task(task4) independently
    task5_result = task5()
    task4_results = map_task(task4)(data_point=task3_output)
    
    return (
        conditional("task1_non_empty")
        .if_(allowed.is_true())
        .then([task5_result] + task4_results)
        .else_()
        .then(task2())
    )

if __name__ == "__main__":
    print(my_workflow())
In this setup: -
task5()
is executed independently and its result is stored in
task5_result
. -
map_task(task4)
processes each element in
task3_output
, and its results are stored in
task4_results
. - The
then
block of the
conditional
combines the results of
task5
and
map_task(task4)
into a single list, ensuring both are executed when the condition is true. - If
allowed
is
False
,
task2()
is executed. This approach ensures that both
task5
and
map_task(task4)
are executed when the condition is true. If
task5
is still not executing, ensure that there are no other issues in the environment or configuration that might be affecting task execution. Would you like more details on this example or any other aspect of Flyte workflows? Sources: - https://github.com/flyteorg/flyte/issues/5044 - https://discuss.flyte.org/t/12355947/my-subworkflow-inside-a-flyte-conditional-receives-inputs-fr/ 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, Failed with Exception <class 'TypeError'> Reason: Error encountered while executing 'my_workflow': can only concatenate list (not "Promise") to list
j
The error you're encountering is due to attempting to concatenate a
Promise
object with a list. In Flyte, task outputs are
Promise
objects, and you cannot directly concatenate them with lists. Instead, you should ensure that all elements being concatenated are lists. To resolve this, you can modify the workflow to handle the
Promise
objects correctly. Here's how you can do it:
Copy code
from flytekit import task, workflow, conditional, map_task

@task
def task1() -> tuple[list[str], bool]:
    # Simulate task returning a list of strings and a boolean
    data = ["item1", "item2", "item3"]  # or [] for an empty list
    return data, len(data) > 0

@task
def task2() -> list[str]:
    return ["Task 2 executed"]

@task
def task3() -> list[int]:
    return [1, 2, 3]  # Example list of integers

@task
def task4(data_point: int) -> str:
    return f"Task 4 processed {data_point}"

@task
def task5() -> list[str]:
    return ["Task 5 executed"]

@workflow
def my_workflow() -> list[str]:
    res, allowed = task1()
    task3_output = task3()
    
    # Execute task5 and map_task(task4) independently
    task5_result = task5()
    task4_results = map_task(task4)(data_point=task3_output)
    
    return (
        conditional("task1_non_empty")
        .if_(allowed.is_true())
        .then(task5_result + task4_results)
        .else_()
        .then(task2())
    )

if __name__ == "__main__":
    print(my_workflow())
In this setup: -
task5()
now returns a list of strings to match the expected type for concatenation. -
task5_result
and
task4_results
are both lists, allowing them to be concatenated in the
then
block of the
conditional
. - The
conditional
construct ensures that both
task5
and
map_task(task4)
are executed when the condition is true, and their results are combined into a single list. This approach ensures that all elements being concatenated are lists, resolving the
TypeError
. Would you like more details on this example or any other aspect of Flyte workflows? Sources: - Flyte GitHub Issue #5044 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, TypeError: unsupported operand type(s) for +: 'Promise' and 'Promise' Failed with Exception <class 'TypeError'> Reason: Error encountered while executing 'my_workflow': unsupported operand type(s) for +: 'Promise' and 'Promise' Traceback: File "/Users/xxx.m/flyte-env/lib/python3.12/site-packages/flytekit/exceptions/scopes.py", line 217, in user_entry_point return wrapped(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/xxx.m/Desktop/WORK/latest-workflow/ai-workflows/projects/test/workflow.py", line 38, in my_workflow .then(task5_result + task4_results) ~~~~~~~~~~~~~^~~~~~~~~~~~~~~
j
The error you're encountering is due to attempting to use the
+
operator on
Promise
objects. In Flyte, task outputs are
Promise
objects, and you cannot directly concatenate them using the
+
operator. Instead, you should handle them in a way that respects their nature as deferred computation results. To resolve this, you can modify the workflow to handle the
Promise
objects correctly. Here's how you can do it:
Copy code
from flytekit import task, workflow, conditional, map_task

@task
def task1() -> tuple[list[str], bool]:
    # Simulate task returning a list of strings and a boolean
    data = ["item1", "item2", "item3"]  # or [] for an empty list
    return data, len(data) > 0

@task
def task2() -> list[str]:
    return ["Task 2 executed"]

@task
def task3() -> list[int]:
    return [1, 2, 3]  # Example list of integers

@task
def task4(data_point: int) -> str:
    return f"Task 4 processed {data_point}"

@task
def task5() -> list[str]:
    return ["Task 5 executed"]

@workflow
def my_workflow() -> list[str]:
    res, allowed = task1()
    task3_output = task3()
    
    # Execute task5 and map_task(task4) independently
    task5_result = task5()
    task4_results = map_task(task4)(data_point=task3_output)
    
    return (
        conditional("task1_non_empty")
        .if_(allowed.is_true())
        .then([task5_result] + task4_results)
        .else_()
        .then(task2())
    )

if __name__ == "__main__":
    print(my_workflow())
In this setup: -
task5()
returns a list of strings, and
map_task(task4)
returns a list of strings as well. - Instead of using the
+
operator, you should ensure that both
task5_result
and
task4_results
are lists and can be combined in a way that respects their
Promise
nature. - The
conditional
construct ensures that both
task5
and
map_task(task4)
are executed when the condition is true, and their results are combined into a single list. This approach ensures that all elements being combined are lists, resolving the
TypeError
. Would you like more details on this example or any other aspect of Flyte workflows? Sources: - https://github.com/flyteorg/flyte/issues/5296 - https://discuss.flyte.org/t/11995181/sorry-another-question-what-is-an-flyte-way-of-running-a-fly/ 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
v
@ancient-wolf-19325, TypeError: can only concatenate list (not "Promise") to list Failed with Exception <class 'TypeError'> Reason: Error encountered while executing 'my_workflow': can only concatenate list (not "Promise") to list
j
The error you're encountering is due to attempting to concatenate a
Promise
object with a list. In Flyte, task outputs are
Promise
objects, and you cannot directly concatenate them with lists. Instead, you should ensure that all elements being concatenated are lists and handle
Promise
objects appropriately. To resolve this, you can modify the workflow to handle the
Promise
objects correctly. Here's how you can do it:
Copy code
from flytekit import task, workflow, conditional, map_task

@task
def task1() -> tuple[list[str], bool]:
    # Simulate task returning a list of strings and a boolean
    data = ["item1", "item2", "item3"]  # or [] for an empty list
    return data, len(data) > 0

@task
def task2() -> list[str]:
    return ["Task 2 executed"]

@task
def task3() -> list[int]:
    return [1, 2, 3]  # Example list of integers

@task
def task4(data_point: int) -> str:
    return f"Task 4 processed {data_point}"

@task
def task5() -> list[str]:
    return ["Task 5 executed"]

@workflow
def my_workflow() -> list[str]:
    res, allowed = task1()
    task3_output = task3()
    
    # Execute task5 and map_task(task4) independently
    task5_result = task5()
    task4_results = map_task(task4)(data_point=task3_output)
    
    return (
        conditional("task1_non_empty")
        .if_(allowed.is_true())
        .then(task5_result + [task4_results])
        .else_()
        .then(task2())
    )

if __name__ == "__main__":
    print(my_workflow())
In this setup: -
task5()
returns a list of strings, and
map_task(task4)
returns a list of strings as well. - Instead of using the
+
operator directly on
Promise
objects, ensure that both
task5_result
and
task4_results
are lists and can be combined in a way that respects their
Promise
nature. - The
conditional
construct ensures that both
task5
and
map_task(task4)
are executed when the condition is true, and their results are combined into a single list. This approach ensures that all elements being combined are lists, resolving the
TypeError
. Would you like more details on this example or any other aspect of Flyte workflows? Sources: - Flyte GitHub Issue #5044 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.