Hema Jayachandran
03/25/2024, 2:36 PMpyflyte backfill
functionality. We are using a custom function to fetch the input date in our pipelines. The pipeline where I am testing this functionality, runs on current date when no input is provided else picks the passed date. Problem I am facing now is that when I run backfill for the launchplan for 2 days, it is always picking the current date (because of existing logic) for all the subworkflows. While there is no option where we could pass the date via backfill, how can we fully utilise the functionality here ? Any inputs are really appreciated πSamhita Alla
from-date
or to-date
suitable for your use case?Hema Jayachandran
03/26/2024, 9:33 AMcurrent-date - 1
or current-date
that is required.
@task
def get_input_date(date: Optional[str]) -> str:
date = datetime.now().strftime("%Y-%m-%d") if not date else date
return date
@task
def input_date_task(date: Optional[str] = None) -> str:
if not date:
return (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d")
else:
return date
In this case the behaviour I saw by running the below
pyflyte backfill -p project_test -d feature-branch -n test-backfill --execute --from-date 2024-03-23 --to-date 2024-03-25 launch_plan_fi
is a workflow that ran sub workflows for every cron schedule associated to the launch_plan but for each of those executions, it only picked the current date(25th March, 2024) because of our logic.
I assume that from-date
or to-date
works if our input date picks the kickoff_time
otherwise its the above behaviour.Hema Jayachandran
03/26/2024, 9:43 AMkickoff_time
for our 2 pipelines that runs on current date but wonβt work with the rest because of current-date -1
Samhita Alla
input_date_task
?Hema Jayachandran
03/26/2024, 9:59 AM@workflow(interruptible=True)
def test_workflow(ecosystem: str, date: Optional[str] = None) -> bool:
input_date_to_task = get_input_date(date=date)
path = task_1(date=input_date_to_task)
...
we are not passing it as default_inputs because we had problems in the past and we both had a chat about this https://flyte-org.slack.com/archives/CP2HDHKE1/p1689249082278999 πHema Jayachandran
03/26/2024, 10:31 AMkickoff_time
in our pipeline and will post the results here. πHema Jayachandran
03/26/2024, 12:23 PMHema Jayachandran
03/26/2024, 12:24 PM@workflow(interruptible=True)
def workflow_test(ecosystem: str, kickoff_time: datetime) -> bool:
# input_date_to_task = get_input_date(date=date)
input_date_to_task = kickoff_time.strftime("%Y-%m-%d")
but this fails 'Promise' object has no attribute 'strftime'
Samhita Alla
Samhita Alla
Hema Jayachandran
03/27/2024, 1:37 PM