We have a workflow that is composed (for now) of 2...
# ask-the-community
g
We have a workflow that is composed (for now) of 2 parts - generic one and a custom one. I want to have a parameter to the top-level workflow that will determine which workflow is running after the generic part. Trying to put it into code:
Copy code
@dynamic
def do_next_step(next_step: str, generic_output: ...):
  # ...somehow get the reference launch-plan
  ref_lp = get_ref_lp(next_step)
  ref_lp()

@workflow
def primary(generic_args..., next_step: str):
  generic_output = generic_task(generic_args...)
  do_next_step(next_step, generic_output)
I managed to do something like that:
Copy code
@dynamic
def do_next(next_step_module: str, next_step_func: str) -> None:
    next_step_mod = importlib.import_module(next_step_module)
    next_step = getattr(next_step_mod, next_step_func)
    next_step()
where the module+func are ref plan, e.g.:
Copy code
@reference_launch_plan(
    project="flytesnacks",
    domain="development",
    name="data_types_and_io.file.normalize_csv_file",
    version="-ism2z-QiRgPisO-KY_YTA"
)
def normalize_csv_file(
        csv_url: FlyteFile,
        column_names: List[str],
        columns_to_normalize: List[str],
        output_location: str,
) -> FlyteFile:
    ...
That worked but means that I have to maintain launch plans for each sync next step. Essentially, there's no reason I shouldn't be able to get the latest launch plan in runtime using Flyte client which is part of the current context. But I didn't find a way to convert the Flyte API object to something a dynamic workflow can trigger. Another thing I tried is figuring out how to trigger a workflow from within a task. I can then "wait" on the workflow to finish. That's less ideal but also an option. Lastly, I figured we can use a different approach, and generate the workflow dynamically using imperative workflow and register it if needed, then run it. I hope I'm making sense. In more simple terms - I'm trying to determine and execute next steps in a workflow (dynamic or otherwise) according to input arguments.
s
isn't a conditional appropriate for your use case?
g
Not really. It needs to be dynamic. I don't want to change the main workflow every new specific workflow being introduced
y
Another thing I tried is figuring out how to trigger a workflow from within a task. I can then “wait” on the workflow to finish.
this is the eager workflow construct… maybe give that a read, but the operating paradigm is quite a bit different than dynamic task. i think dynamic makes more sense for now.
there is no way to get the latest version without getting the latest version, there’s no “latest” tag that is. so you need to make a network call to fetch it
you can read more about eager workflows here https://docs.flyte.org/en/latest/user_guide/advanced_composition/eager_workflows.html but i don’t think it’s needed in this case.
i don’t think you need a reference launch plan even, a bit surprised importing doesn’t work, assuming it’s defined in the same set of files