acoustic-carpenter-78188
02/03/2023, 5:19 PMdef generate_backfill_workflow(
start_date: datetime, end_date: datetime, base_lp: LaunchPlan
) -> Workflow:
if base_lp.schedule is None:
raise ValueError("Backfill can only be created for scheduled launchplans")
if isinstance(base_lp.schedule, CronSchedule):
pass
else:
raise NotImplementedError("The launchplan schedule needs to be a cron schedule")
if start_date >= end_date:
raise ValueError("Start date should be greater than end date")
print(f"Generating backfill for {start_date} to {end_date}")
wf = Workflow(name=f"backfill-{base_lp.name}")
lp_iter = croniter(
base_lp.schedule.cron_schedule.schedule, start_time=start_date, ret_type=datetime
)
while True:
next_start_date = lp_iter.get_next()
if next_start_date > end_date:
break
print(f"Adding -> {next_start_date}")
wf.add_launch_plan(base_lp, kickoff_time=next_start_date)
`
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteacoustic-carpenter-78188
02/03/2023, 5:19 PMFlyte enables production-grade orchestration for machine learning workflows and data processing created to accelerate local workflows to production.
Powered by