Hi! I am trying to build a workflow which will be...
# ask-the-community
r
Hi! I am trying to build a workflow which will be triggered on a daily basis and also will have the possibility to be triggered for a specific day (more like how airflow is working). So i created this test:
Copy code
@task
def print_the_date(execution_date: str) -> str:
    print(f"in print_the_date: execution_date={execution_date}")
    return execution_date


@workflow
def my_wf(execution_date: str) -> dict[str, str]:
    print(f"inside my_wf: execution_date={execution_date}")
    print_the_date(execution_date=execution_date)
    return {"execution_date": execution_date}
This works perfectly fine running locally like this:
Copy code
pyflyte run workflow.py my_wf --execution_date 2023-01-01
But not sure how to do this on a scheduled basis, i have tried with this:
Copy code
LaunchPlan.get_or_create(
    name="my_lp",
    workflow=my_wf,
    schedule=CronSchedule(schedule="0 7 * * *"),
    security_context=security_context,
    default_inputs={
        "execution_date": datetime.today().strftime('%Y-%m-%d %H:%M:%S')
    }
)
But i understand this doesn't work because it created the default inputs in "compile-time" of the workflow(?). Also i would like to be able to do like this when running locally:
Copy code
pyflyte run workflow.py my_wf
But it seems Flyte are not supporting default input like this:
Copy code
@workflow
def my_wf(execution_date: Union[str, None] = None) -> dict[str, str]:
   ...
Anyone who have done this before that can point me in the right direction, would appreciate the help!
m
My best guess at an approach would be to make
execution_date
Optional[str] = None
, then inside the
print_the_date
task deal with the None being set to
datetime.today()
. That way you don't need to dynamically create the
today()
in the launchplan definition. Optionals should be supported as per Flyte 1.1.0, so make sure you're using that or a later version
f
You can set kickoff_time_input_arg in the CronSchedule and refer to that argument in the workflow definition. So you can set the argument to be “execution_date” get the current date that way and shouldn’t need a default input arg. I guess you could even create a task that computes datetime.today() and return it to the workflow? That way you don’t refer to it in the launch plan. For example:
Copy code
CronSchedule(schedule="@daily", kickoff_time_input_arg="execution_date")

@task
def todays_date() -> datetime:
    return datetime.today()
r
i now have it like this:
Copy code
@task
def print_the_date(execution_date: Optional[str] = None) -> str:
    print(f"in test_task: execution_date={execution_date}")
    return execution_date


@workflow
def my_wf(execution_date: Optional[str] = None) -> dict[str, str]:
    print(f"inside wf: execution_date={execution_date}")
    print_the_date(execution_date=execution_date)
    return {"execution_date": execution_date}
execute it like this:
Copy code
pyflyte run workflow.py my_wf
this is the output:
Copy code
inside wf: execution_date=Promise(node:.execution_date)
Usage: pyflyte run workflow.py my_wf [OPTIONS]
Try 'pyflyte run workflow.py my_wf --help' for help.

Error: Missing option '--execution_date'.
Trying to find the version of
pyflyte
but can not find it 🤔
Thanks @Fredrik Lyford i will try that out!
kickoff_time_input_arg
sounds exactly what i am looking for
k
@Maarten de Jong / @Robin Eklund - what @Fredrik Lyford proposed is the best way. Date time.now can be variable. Kickoff time is precise from the core scheduler
160 Views