We have a conditional branch that takes the output...
# ask-the-community
f
We have a conditional branch that takes the output of two tasks whether they are true of false, takes the output of two different tasks to process in a final task. Once in a while, the inputs from the boolean checks are being bound to the branch instead of the other tasks (once or twice out of a couple hundred). This makes flyte’s input converter fail. Does anyone else have the case where the variable being conditionally evaluated is not the same variable being passed into the nested tasks? Happy to share more logging as needed. Running on GKE.
Copy code
Traceback (most recent call last):

      File "/root/.varner_dp/venv/3.7.1/lib/python3.9/site-packages/flytekit/exceptions/scopes.py", line 165, in system_entry_point
        return wrapped(*args, **kwargs)
      File "/root/.varner_dp/venv/3.7.1/lib/python3.9/site-packages/flytekit/core/base_task.py", line 472, in dispatch_execute
        native_inputs = TypeEngine.literal_map_to_kwargs(exec_ctx, input_literal_map, self.python_interface.inputs)
      File "/root/.varner_dp/venv/3.7.1/lib/python3.9/site-packages/flytekit/core/type_engine.py", line 800, in literal_map_to_kwargs
        return {k: TypeEngine.to_python_value(ctx, lm.literals[k], python_types[k]) for k, v in lm.literals.items()}
      File "/root/.varner_dp/venv/3.7.1/lib/python3.9/site-packages/flytekit/core/type_engine.py", line 800, in <dictcomp>
        return {k: TypeEngine.to_python_value(ctx, lm.literals[k], python_types[k]) for k, v in lm.literals.items()}

Message:

    'n2.o0'

SYSTEM ERROR! Contact platform administrators.
k
@Fredrik Lyford when I hear once in many - I just don’t like it - Flyte strives to be correct and hermetic. This does not sound like it. Would you be open to sharing a representative code snippet
f
This is a representative pipeline that has the same components we need. The big pieces are flag checks for output and, potentially, input data which evaluate to boolean values. This helps the workflow know whether the pipeline was previously successful. If the conditions are right, we call the greeting task and data is processed to the output.
Copy code
@workflow
def greeting_wf(kickoff_time: datetime, chain: str, country: str):
    date = kickoff_time_converter(kickoff_time=kickoff_time)
    output_check = generate_output_flag_check_greeting(
        date=kickoff_time, chain=chain, country=country
    )
    output_path = output_path_greeting(date=kickoff_time, chain=chain, country=country)

    conditional_flow = (
        conditional("flag_check")
        .if_(output_check.is_false())
        .then(
            greeting_shell_task(
                date=kickoff_time, chain=chain, country=country, output_path=output_path
            )
        )
        .elif_(output_check.is_true())
        .then(
            previous_completion(
            chain=chain, country=country, date=date, task_name="helloflyte_greeting"
            )
        )
        .else_()
        .fail("Something went wrong. Please contact DPAR.")
    )

    output_check >> conditional_flow
So in this case the conditional flow needs output_check in order to evaluate which method should be called (previous completion, greeting shell task). But it also needs chain, country, date, output_path in order to actually execute. These are the inputs we expect to see on the conditional branch, and do see most of the time:
Copy code
{
  "chain": "dummy_chain",
  "country": "dummy_country",
  "date": "1/2/2022 12:00:00 AM UTC",
  "output_path": "<gs://root_bucket/examples/helloflyte/greeting/v1/chain=dummy_chain/country=dummy_country/year=2022/month=01/day=02/>"
}
And this is what we see once in a while:
Copy code
{1 item
n1.o0:false
}
n1 is the node that evaluated the flag check. It is worth noting the workflow is in two additional methods to generate Workflows across a range of values:
Copy code
def generate_backfill_workflow(
    start_date: datetime, end_date: datetime, base_lp: LaunchPlan, **kwargs
) -> Workflow:
    if base_lp.schedule is None:
        raise ValueError("Backfill can only be created for scheduled launchplans")

    if isinstance(base_lp.schedule, CronSchedule):
        pass
    else:
        raise NotImplementedError("The launchplan schedule needs to be a cron schedule")

    if start_date >= end_date:
        raise ValueError("Start date should be greater than end date")

    sub_name = "_".join(kwargs.values())
    wf = Workflow(name=f"backfill-{base_lp.name}-{sub_name}")
    lp_iter = croniter(
        base_lp.schedule.cron_schedule.schedule,
        start_time=start_date,
        ret_type=datetime,
    )
    while True:
        next_start_date = lp_iter.get_next()
        if next_start_date > end_date:
            break
        wf.add_launch_plan(
            base_lp, kickoff_time=next_start_date, **kwargs
        ).with_overrides(
            node_name=f"{base_lp.name}_{sub_name}_{next_start_date.strftime('%Y-%m-%d')}"
        )

    return wf

def backfill_greeting(
    chain_list: List[str],
    country_list: List[str],
    start_date: datetime,
    num_days: int,
    **kwargs,
) -> Workflow:
    wf_wrapper = Workflow(name="helloflyte_greeting")
    end_date = start_date + timedelta(days=num_days)
    for chain in chain_list:
        for country in country_list:
            wf_wrapper.add_entity(
                generate_backfill_workflow(
                    start_date=start_date,
                    end_date=end_date,
                    base_lp=lp_greeting,
                    chain=chain,
                    country=country,
                    **kwargs,
                )
            ).with_overrides(name=f"greeting_{chain}_{country}")
    return wf_wrapper
d
@Fredrik Lyford this is troublesome - do you mine filing an issue? I would be glad to dive into this. [flyte-bug]
f
@Dan Rammer (hamersaw) For sure, will get it done tomorrow morning. Happy to get on a call to demo at some point
k
thank you @Fredrik Lyford sorry for the delay in response. thank you @Dan Rammer (hamersaw) for taking over
106 Views