wooden-sandwich-59360
01/12/2023, 2:36 PMTraceback (most recent call last):
File "/root/.varner_dp/venv/3.7.1/lib/python3.9/site-packages/flytekit/exceptions/scopes.py", line 165, in system_entry_point
return wrapped(*args, **kwargs)
File "/root/.varner_dp/venv/3.7.1/lib/python3.9/site-packages/flytekit/core/base_task.py", line 472, in dispatch_execute
native_inputs = TypeEngine.literal_map_to_kwargs(exec_ctx, input_literal_map, self.python_interface.inputs)
File "/root/.varner_dp/venv/3.7.1/lib/python3.9/site-packages/flytekit/core/type_engine.py", line 800, in literal_map_to_kwargs
return {k: TypeEngine.to_python_value(ctx, lm.literals[k], python_types[k]) for k, v in lm.literals.items()}
File "/root/.varner_dp/venv/3.7.1/lib/python3.9/site-packages/flytekit/core/type_engine.py", line 800, in <dictcomp>
return {k: TypeEngine.to_python_value(ctx, lm.literals[k], python_types[k]) for k, v in lm.literals.items()}
Message:
'n2.o0'
SYSTEM ERROR! Contact platform administrators.
freezing-airport-6809
wooden-sandwich-59360
01/13/2023, 7:27 AM@workflow
def greeting_wf(kickoff_time: datetime, chain: str, country: str):
date = kickoff_time_converter(kickoff_time=kickoff_time)
output_check = generate_output_flag_check_greeting(
date=kickoff_time, chain=chain, country=country
)
output_path = output_path_greeting(date=kickoff_time, chain=chain, country=country)
conditional_flow = (
conditional("flag_check")
.if_(output_check.is_false())
.then(
greeting_shell_task(
date=kickoff_time, chain=chain, country=country, output_path=output_path
)
)
.elif_(output_check.is_true())
.then(
previous_completion(
chain=chain, country=country, date=date, task_name="helloflyte_greeting"
)
)
.else_()
.fail("Something went wrong. Please contact DPAR.")
)
output_check >> conditional_flow
So in this case the conditional flow needs output_check in order to evaluate which method should be called (previous completion, greeting shell task). But it also needs chain, country, date, output_path in order to actually execute.
These are the inputs we expect to see on the conditional branch, and do see most of the time:
{
"chain": "dummy_chain",
"country": "dummy_country",
"date": "1/2/2022 12:00:00 AM UTC",
"output_path": "<gs://root_bucket/examples/helloflyte/greeting/v1/chain=dummy_chain/country=dummy_country/year=2022/month=01/day=02/>"
}
And this is what we see once in a while:
{1 item
n1.o0:false
}
n1 is the node that evaluated the flag check.
It is worth noting the workflow is in two additional methods to generate Workflows across a range of values:
def generate_backfill_workflow(
start_date: datetime, end_date: datetime, base_lp: LaunchPlan, **kwargs
) -> Workflow:
if base_lp.schedule is None:
raise ValueError("Backfill can only be created for scheduled launchplans")
if isinstance(base_lp.schedule, CronSchedule):
pass
else:
raise NotImplementedError("The launchplan schedule needs to be a cron schedule")
if start_date >= end_date:
raise ValueError("Start date should be greater than end date")
sub_name = "_".join(kwargs.values())
wf = Workflow(name=f"backfill-{base_lp.name}-{sub_name}")
lp_iter = croniter(
base_lp.schedule.cron_schedule.schedule,
start_time=start_date,
ret_type=datetime,
)
while True:
next_start_date = lp_iter.get_next()
if next_start_date > end_date:
break
wf.add_launch_plan(
base_lp, kickoff_time=next_start_date, **kwargs
).with_overrides(
node_name=f"{base_lp.name}_{sub_name}_{next_start_date.strftime('%Y-%m-%d')}"
)
return wf
def backfill_greeting(
chain_list: List[str],
country_list: List[str],
start_date: datetime,
num_days: int,
**kwargs,
) -> Workflow:
wf_wrapper = Workflow(name="helloflyte_greeting")
end_date = start_date + timedelta(days=num_days)
for chain in chain_list:
for country in country_list:
wf_wrapper.add_entity(
generate_backfill_workflow(
start_date=start_date,
end_date=end_date,
base_lp=lp_greeting,
chain=chain,
country=country,
**kwargs,
)
).with_overrides(name=f"greeting_{chain}_{country}")
return wf_wrapper
hallowed-mouse-14616
01/17/2023, 4:31 PMuser
01/17/2023, 4:31 PMwooden-sandwich-59360
01/17/2023, 4:33 PMfreezing-airport-6809