Hamza Azelmat

    Hamza Azelmat

    4 months ago
    Hey :flyte: Users, can we use ContainerTasks inside of a conditional? I'm getting this error
    File "/Users/hamzaa/data/iron-throne/venv38py/lib/python3.8/site-packages/flytekit/core/condition.py", line 406, in to_case_block
        n = c.output_promise.ref.node  # type: ignore
    AttributeError: 'ContainerTask' object has no attribute 'ref'
    Ketan (kumare3)

    Ketan (kumare3)

    4 months ago
    You should be able to. Can you share the snippet
    Germain Tanguy

    Germain Tanguy

    4 months ago
    Hello @Ketan (kumare3), I work with @Hamza Azelmat on this. Here is the code snippet:
    from datetime import datetime
    
    from flytekit import ContainerTask, LaunchPlan, conditional, task, workflow
    from flytekit.core.node_creation import create_node
    
    
    def generate_container_task(name):
        return ContainerTask(
            name=name,
            image="alpine",
            command=["echo", "toto"]
        )
    
    @task
    def dummy_task() -> str:
        return "DummyTask" 
    
    @task
    def is_monday(partition_date: str) -> bool:
        return datetime.strptime(partition_date, '%Y-%m-%d').weekday() != 0
    
    
    @workflow
    def workflow_pipeline(partition_date: str) -> str:
        
        tasks_for_dag = [
                {"id": "taskA", "schedule": "weekly", "downstreams": ["taskC"]},
                {"id": "taskB", "schedule": "daily", "downstreams": ["taskC"]},
                {"id": "taskC", "schedule": "daily"}
        ]
    
        nodes = {}
    
        for task_for_dag in tasks_for_dag:
            id = task_for_dag["id"]
            schedule = task_for_dag["schedule"]
    
            task_x = generate_container_task(id)
            if schedule == "weekly":
                result = is_monday(partition_date=partition_date)
                cond = (
                       conditional("weekly")
                       .if_(result.is_true())
                       .then(task_x()) #don't work but work if replace with dummy_task()
                       .else_()
                       .then(dummy_task())
                )
                nodes[id] = cond.ref.node.with_overrides(node_name=id)
            else:
                nodes[id] = create_node(task_x) 
    
        for task_for_dag in tasks_for_dag:
            id = task_for_dag["id"]
            downstreams = task_for_dag.get("downstreams", [])
            for downstream_id in downstreams:
                nodes[id] >> nodes[downstream_id]
    
        return "Finished!"
    
    
    lp_hello_world = LaunchPlan.create("generic_dag", workflow_pipeline)
    Error:
    site-packages/flytekit/core/condition.py", line 406, in to_case_block
        n = c.output_promise.ref.node  # type: ignore
    AttributeError: 'VoidPromise' object has no attribute 'ref'
    Ketan (kumare3)

    Ketan (kumare3)

    4 months ago
    cc @Yee, will look in a bit. Thank you
    Hamza Azelmat

    Hamza Azelmat

    4 months ago
    Any update on this please ๐Ÿ™?
    Ketan (kumare3)

    Ketan (kumare3)

    4 months ago
    Cc @Kevin Su can you help
    I am at pycon today
    Kevin Su

    Kevin Su

    4 months ago
    Looking
    There is a bug inside conditional, will create a PR ASAP.
    Hamza Azelmat

    Hamza Azelmat

    4 months ago
    Thanks ๐Ÿ‘
    Was this bug fixed please ? @Kevin Su @Ketan (kumare3)
    Ketan (kumare3)

    Ketan (kumare3)

    2 months ago
    cc @Yee / @Kevin Su may have an update on this
    cc @Hamza Azelmat was there an issue created?
    Yee

    Yee

    2 months ago
    i believe there was a PR created in response to this, but it was unclear how to proceed
    Kevin Su

    Kevin Su

    2 months ago
    I think those two issues are slightly different. The problem is that void promise doesnโ€™t have โ€œrefโ€ attribute. It was fixed in this PR https://github.com/flyteorg/flytekit/pull/981 @Hamza Azelmat Try to upgrade flytekit to v1.0.5 I just ran the example you gave, it works for me.