Hello
@freezing-airport-6809,
I work with
@straight-ghost-77338 on this. Here is the code snippet:
from datetime import datetime
from flytekit import ContainerTask, LaunchPlan, conditional, task, workflow
from flytekit.core.node_creation import create_node
def generate_container_task(name):
return ContainerTask(
name=name,
image="alpine",
command=["echo", "toto"]
)
@task
def dummy_task() -> str:
return "DummyTask"
@task
def is_monday(partition_date: str) -> bool:
return datetime.strptime(partition_date, '%Y-%m-%d').weekday() != 0
@workflow
def workflow_pipeline(partition_date: str) -> str:
tasks_for_dag = [
{"id": "taskA", "schedule": "weekly", "downstreams": ["taskC"]},
{"id": "taskB", "schedule": "daily", "downstreams": ["taskC"]},
{"id": "taskC", "schedule": "daily"}
]
nodes = {}
for task_for_dag in tasks_for_dag:
id = task_for_dag["id"]
schedule = task_for_dag["schedule"]
task_x = generate_container_task(id)
if schedule == "weekly":
result = is_monday(partition_date=partition_date)
cond = (
conditional("weekly")
.if_(result.is_true())
.then(task_x()) #don't work but work if replace with dummy_task()
.else_()
.then(dummy_task())
)
nodes[id] = cond.ref.node.with_overrides(node_name=id)
else:
nodes[id] = create_node(task_x)
for task_for_dag in tasks_for_dag:
id = task_for_dag["id"]
downstreams = task_for_dag.get("downstreams", [])
for downstream_id in downstreams:
nodes[id] >> nodes[downstream_id]
return "Finished!"
lp_hello_world = LaunchPlan.create("generic_dag", workflow_pipeline)
Error:
site-packages/flytekit/core/condition.py", line 406, in to_case_block
n = c.output_promise.ref.node # type: ignore
AttributeError: 'VoidPromise' object has no attribute 'ref'