https://flyte.org logo
Title
h

Hamza Azelmat

04/27/2022, 11:33 AM
Hey :flyte: Users, can we use ContainerTasks inside of a conditional? I'm getting this error
File "/Users/hamzaa/data/iron-throne/venv38py/lib/python3.8/site-packages/flytekit/core/condition.py", line 406, in to_case_block
    n = c.output_promise.ref.node  # type: ignore
AttributeError: 'ContainerTask' object has no attribute 'ref'
k

Ketan (kumare3)

04/27/2022, 1:29 PM
You should be able to. Can you share the snippet
g

Germain Tanguy

04/27/2022, 4:59 PM
Hello @Ketan (kumare3), I work with @Hamza Azelmat on this. Here is the code snippet:
from datetime import datetime

from flytekit import ContainerTask, LaunchPlan, conditional, task, workflow
from flytekit.core.node_creation import create_node


def generate_container_task(name):
    return ContainerTask(
        name=name,
        image="alpine",
        command=["echo", "toto"]
    )

@task
def dummy_task() -> str:
    return "DummyTask" 

@task
def is_monday(partition_date: str) -> bool:
    return datetime.strptime(partition_date, '%Y-%m-%d').weekday() != 0


@workflow
def workflow_pipeline(partition_date: str) -> str:
    
    tasks_for_dag = [
            {"id": "taskA", "schedule": "weekly", "downstreams": ["taskC"]},
            {"id": "taskB", "schedule": "daily", "downstreams": ["taskC"]},
            {"id": "taskC", "schedule": "daily"}
    ]

    nodes = {}

    for task_for_dag in tasks_for_dag:
        id = task_for_dag["id"]
        schedule = task_for_dag["schedule"]

        task_x = generate_container_task(id)
        if schedule == "weekly":
            result = is_monday(partition_date=partition_date)
            cond = (
                   conditional("weekly")
                   .if_(result.is_true())
                   .then(task_x()) #don't work but work if replace with dummy_task()
                   .else_()
                   .then(dummy_task())
            )
            nodes[id] = cond.ref.node.with_overrides(node_name=id)
        else:
            nodes[id] = create_node(task_x) 

    for task_for_dag in tasks_for_dag:
        id = task_for_dag["id"]
        downstreams = task_for_dag.get("downstreams", [])
        for downstream_id in downstreams:
            nodes[id] >> nodes[downstream_id]

    return "Finished!"


lp_hello_world = LaunchPlan.create("generic_dag", workflow_pipeline)
Error:
site-packages/flytekit/core/condition.py", line 406, in to_case_block
    n = c.output_promise.ref.node  # type: ignore
AttributeError: 'VoidPromise' object has no attribute 'ref'
k

Ketan (kumare3)

04/27/2022, 6:30 PM
cc @Yee, will look in a bit. Thank you
h

Hamza Azelmat

04/29/2022, 10:04 AM
Any update on this please ๐Ÿ™?
k

Ketan (kumare3)

04/29/2022, 2:38 PM
Cc @Kevin Su can you help
I am at pycon today
k

Kevin Su

04/29/2022, 2:41 PM
Looking
There is a bug inside conditional, will create a PR ASAP.
๐Ÿ‘ 1
h

Hamza Azelmat

04/29/2022, 3:40 PM
Thanks ๐Ÿ‘
Was this bug fixed please ? @Kevin Su @Ketan (kumare3)
k

Ketan (kumare3)

06/29/2022, 4:11 PM
cc @Yee / @Kevin Su may have an update on this
cc @Hamza Azelmat was there an issue created?
y

Yee

06/29/2022, 4:14 PM
i believe there was a PR created in response to this, but it was unclear how to proceed
k

Kevin Su

06/29/2022, 4:31 PM
I think those two issues are slightly different. The problem is that void promise doesnโ€™t have โ€œrefโ€ attribute. It was fixed in this PR https://github.com/flyteorg/flytekit/pull/981 @Hamza Azelmat Try to upgrade flytekit to v1.0.5 I just ran the example you gave, it works for me.
๐Ÿ™ 1