silly-jelly-43096
01/05/2023, 9:52 AM@task
def print_the_date(execution_date: str) -> str:
print(f"in print_the_date: execution_date={execution_date}")
return execution_date
@workflow
def my_wf(execution_date: str) -> dict[str, str]:
print(f"inside my_wf: execution_date={execution_date}")
print_the_date(execution_date=execution_date)
return {"execution_date": execution_date}
This works perfectly fine running locally like this:
pyflyte run workflow.py my_wf --execution_date 2023-01-01
But not sure how to do this on a scheduled basis, i have tried with this:
LaunchPlan.get_or_create(
name="my_lp",
workflow=my_wf,
schedule=CronSchedule(schedule="0 7 * * *"),
security_context=security_context,
default_inputs={
"execution_date": datetime.today().strftime('%Y-%m-%d %H:%M:%S')
}
)
But i understand this doesn't work because it created the default inputs in "compile-time" of the workflow(?).
Also i would like to be able to do like this when running locally:
pyflyte run workflow.py my_wf
But it seems Flyte are not supporting default input like this:
@workflow
def my_wf(execution_date: Union[str, None] = None) -> dict[str, str]:
...
Anyone who have done this before that can point me in the right direction, would appreciate the help!rapid-vegetable-16315
01/05/2023, 10:59 AMexecution_date
Optional[str] = None
, then inside the print_the_date
task deal with the None being set to datetime.today()
. That way you don't need to dynamically create the today()
in the launchplan definition.
Optionals should be supported as per Flyte 1.1.0, so make sure you're using that or a later versionwooden-sandwich-59360
01/05/2023, 11:10 AMCronSchedule(schedule="@daily", kickoff_time_input_arg="execution_date")
@task
def todays_date() -> datetime:
return datetime.today()
silly-jelly-43096
01/05/2023, 11:10 AM@task
def print_the_date(execution_date: Optional[str] = None) -> str:
print(f"in test_task: execution_date={execution_date}")
return execution_date
@workflow
def my_wf(execution_date: Optional[str] = None) -> dict[str, str]:
print(f"inside wf: execution_date={execution_date}")
print_the_date(execution_date=execution_date)
return {"execution_date": execution_date}
execute it like this:
pyflyte run workflow.py my_wf
this is the output:
inside wf: execution_date=Promise(node:.execution_date)
Usage: pyflyte run workflow.py my_wf [OPTIONS]
Try 'pyflyte run workflow.py my_wf --help' for help.
Error: Missing option '--execution_date'.
Trying to find the version of pyflyte
but can not find it 🤔silly-jelly-43096
01/05/2023, 11:12 AMsilly-jelly-43096
01/05/2023, 11:16 AMkickoff_time_input_arg
sounds exactly what i am looking forfreezing-airport-6809