curved-petabyte-84246
06/27/2024, 4:53 PM@dynamic
def do_next_step(next_step: str, generic_output: ...):
# ...somehow get the reference launch-plan
ref_lp = get_ref_lp(next_step)
ref_lp()
@workflow
def primary(generic_args..., next_step: str):
generic_output = generic_task(generic_args...)
do_next_step(next_step, generic_output)
I managed to do something like that:
@dynamic
def do_next(next_step_module: str, next_step_func: str) -> None:
next_step_mod = importlib.import_module(next_step_module)
next_step = getattr(next_step_mod, next_step_func)
next_step()
where the module+func are ref plan, e.g.:
@reference_launch_plan(
project="flytesnacks",
domain="development",
name="data_types_and_io.file.normalize_csv_file",
version="-ism2z-QiRgPisO-KY_YTA"
)
def normalize_csv_file(
csv_url: FlyteFile,
column_names: List[str],
columns_to_normalize: List[str],
output_location: str,
) -> FlyteFile:
...
That worked but means that I have to maintain launch plans for each sync next step.
Essentially, there's no reason I shouldn't be able to get the latest launch plan in runtime using Flyte client which is part of the current context. But I didn't find a way to convert the Flyte API object to something a dynamic workflow can trigger.
Another thing I tried is figuring out how to trigger a workflow from within a task. I can then "wait" on the workflow to finish. That's less ideal but also an option.
Lastly, I figured we can use a different approach, and generate the workflow dynamically using imperative workflow and register it if needed, then run it.
I hope I'm making sense. In more simple terms - I'm trying to determine and execute next steps in a workflow (dynamic or otherwise) according to input arguments.tall-lock-23197
curved-petabyte-84246
06/28/2024, 5:41 AMthankful-minister-83577
Another thing I tried is figuring out how to trigger a workflow from within a task. I can then “wait” on the workflow to finish.this is the eager workflow construct… maybe give that a read, but the operating paradigm is quite a bit different than dynamic task. i think dynamic makes more sense for now.
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577