Hey :flyte: Users, can we use ContainerTasks insid...
# flytekit
h
Hey flyte Users, can we use ContainerTasks inside of a conditional? I'm getting this error
Copy code
File "/Users/hamzaa/data/iron-throne/venv38py/lib/python3.8/site-packages/flytekit/core/condition.py", line 406, in to_case_block
    n = c.output_promise.ref.node  # type: ignore
AttributeError: 'ContainerTask' object has no attribute 'ref'
k
You should be able to. Can you share the snippet
g
Hello @Ketan (kumare3), I work with @Hamza Azelmat on this. Here is the code snippet:
Copy code
from datetime import datetime

from flytekit import ContainerTask, LaunchPlan, conditional, task, workflow
from flytekit.core.node_creation import create_node


def generate_container_task(name):
    return ContainerTask(
        name=name,
        image="alpine",
        command=["echo", "toto"]
    )

@task
def dummy_task() -> str:
    return "DummyTask" 

@task
def is_monday(partition_date: str) -> bool:
    return datetime.strptime(partition_date, '%Y-%m-%d').weekday() != 0


@workflow
def workflow_pipeline(partition_date: str) -> str:
    
    tasks_for_dag = [
            {"id": "taskA", "schedule": "weekly", "downstreams": ["taskC"]},
            {"id": "taskB", "schedule": "daily", "downstreams": ["taskC"]},
            {"id": "taskC", "schedule": "daily"}
    ]

    nodes = {}

    for task_for_dag in tasks_for_dag:
        id = task_for_dag["id"]
        schedule = task_for_dag["schedule"]

        task_x = generate_container_task(id)
        if schedule == "weekly":
            result = is_monday(partition_date=partition_date)
            cond = (
                   conditional("weekly")
                   .if_(result.is_true())
                   .then(task_x()) #don't work but work if replace with dummy_task()
                   .else_()
                   .then(dummy_task())
            )
            nodes[id] = cond.ref.node.with_overrides(node_name=id)
        else:
            nodes[id] = create_node(task_x) 

    for task_for_dag in tasks_for_dag:
        id = task_for_dag["id"]
        downstreams = task_for_dag.get("downstreams", [])
        for downstream_id in downstreams:
            nodes[id] >> nodes[downstream_id]

    return "Finished!"


lp_hello_world = LaunchPlan.create("generic_dag", workflow_pipeline)
Error:
Copy code
site-packages/flytekit/core/condition.py", line 406, in to_case_block
    n = c.output_promise.ref.node  # type: ignore
AttributeError: 'VoidPromise' object has no attribute 'ref'
k
cc @Yee, will look in a bit. Thank you
h
Any update on this please ๐Ÿ™?
k
Cc @Kevin Su can you help
I am at pycon today
k
Looking
There is a bug inside conditional, will create a PR ASAP.
๐Ÿ‘ 1
h
Thanks ๐Ÿ‘
Was this bug fixed please ? @Kevin Su @Ketan (kumare3)
k
cc @Yee / @Kevin Su may have an update on this
cc @Hamza Azelmat was there an issue created?
y
i believe there was a PR created in response to this, but it was unclear how to proceed
k
I think those two issues are slightly different. The problem is that void promise doesnโ€™t have โ€œrefโ€ attribute. It was fixed in this PR https://github.com/flyteorg/flytekit/pull/981 @Hamza Azelmat Try to upgrade flytekit to v1.0.5 I just ran the example you gave, it works for me.
๐Ÿ™ 1
170 Views