Hi! I am trying to build a workflow which will be triggered on a daily basis and also will have the...
r

Robin Eklund

almost 3 years ago
Hi! I am trying to build a workflow which will be triggered on a daily basis and also will have the possibility to be triggered for a specific day (more like how airflow is working). So i created this test:
@task
def print_the_date(execution_date: str) -> str:
    print(f"in print_the_date: execution_date={execution_date}")
    return execution_date


@workflow
def my_wf(execution_date: str) -> dict[str, str]:
    print(f"inside my_wf: execution_date={execution_date}")
    print_the_date(execution_date=execution_date)
    return {"execution_date": execution_date}
This works perfectly fine running locally like this:
pyflyte run workflow.py my_wf --execution_date 2023-01-01
But not sure how to do this on a scheduled basis, i have tried with this:
LaunchPlan.get_or_create(
    name="my_lp",
    workflow=my_wf,
    schedule=CronSchedule(schedule="0 7 * * *"),
    security_context=security_context,
    default_inputs={
        "execution_date": datetime.today().strftime('%Y-%m-%d %H:%M:%S')
    }
)
But i understand this doesn't work because it created the default inputs in "compile-time" of the workflow(?). Also i would like to be able to do like this when running locally:
pyflyte run workflow.py my_wf
But it seems Flyte are not supporting default input like this:
@workflow
def my_wf(execution_date: Union[str, None] = None) -> dict[str, str]:
   ...
Anyone who have done this before that can point me in the right direction, would appreciate the help!