Is it possible to construct a "dynamic workflow" u...
# ask-the-community
d
Is it possible to construct a "dynamic workflow" using the Flyte "imperative" API? The DAGs I want to execute will be described by some rpc call:
Copy code
get_dag( job_input ) ->   DAG of task inputs
In Flyte, I need to read the response, which is a DAG represented as an adjacency list. The directed edges say which tasks need to be executed before which other tasks, and each node in the graph has a payload which is the input to the task. This
get_dag()
call happens at runtime, so I understand from [1] that I can do this:
Copy code
@dynamic
start_workflow(...) -> ...:
   dag = get_dag(...)
   ???

@workflow
def my_workflow(...) -> None
  return start_workflow(...)
but I can't figure out how to use "imperative workflows"[2] at the same time. How do I get a `
Copy code
wf = Workflow(name="my.imperative.workflow.example")
? I did find the dynamic workflow in the API docs: https://docs.flyte.org/projects/flytekit/en/latest/generated/flytekit.dynamic.html But how do I use it? Do I need to create a "sub workflow" or is there a more direct way? [1] https://docs.flyte.org/projects/cookbook/en/stable/auto/core/control_flow/dynamics.html [2] https://docs.flyte.org/projects/cookbook/en/stable/auto/core/flyte_basics/imperative_wf_style.html
If anyone finds this later, I still don't know how to mix the dynamic and "imperitive" APIs however I was able to accomplish my goal using the dynamic API only, by setting up faking outputs from tasks and mapping them to inputs of other tasks.
Copy code
74 def _run_task(task_id, adj_list, submitted_tasks):
 75     if task_id in submitted_tasks:
 76         # we already ran it
 77         return submitted_tasks[task_id]
 78 
 79     mylist = adj_list.get((int(task_id)))
 80     dep_ids = []
 81     if mylist is not None:
 82         dep_ids = list(adj_list[int(task_id)])
 83 
 84     markers = []
 85     for dep_id in dep_ids:
 86         if dep_id in submitted_tasks:
 87             markers.append(submitted_tasks[dep_id])
 88         else:
 89             markers.append(_run_task(dep_id, adj_list, submitted_tas
 90 
 91     marker = run_task(task_input="TODO", markers=markers)
 92     submitted_tasks[task_id] = marker
 93     return submitted_tasks[task_id]
 94 
 95 @dynamic
 96 def wf4(n: int = 5) -> None:
 97 
 98     nodes, adj_list = get_dag()
 99     submitted_tasks = {}
100     for node_id in nodes:
101         _run_task(node_id, adj_list, submitted_tasks)
y
@David Cupp sorry for the delay - can you describe the use-case a bit more? what you’re doing is probably the right way to go, but wanted to understand a bit more about the ask, don’t think i’ve heard this one before
what does a dag of task inputs look like?
i guess i’d like to understand how a use-case like this can arise. it’s kinda meta… like you’re using a workflow engine, to run generic workflows or something, which is totally okay and kinda cool, but wanted to understand a bit more.
d
Sorry for the delay; I was in meetings.
to run generic workflows or something
This is actually exactly what I want to do. My company has a lot of critical jobs that run as DAGs and our existing system has many flaws including unreliability and a lack of scalability. These jobs are currently owned by many different teams. This requirement about fully dynamic dags came from one of the learnings of that system. Right now the current system uses a plugin model where multiple teams all own code in the same process. This is terrible because: 1. every team needs to understand how the DAG execution system works 2. no team actually understands how it works; the first time someone sees the code they own is at 3am during an incident. Instead, the team that keeps the system online becomes the de facto owner of everyones code. So, for the brand new world of scheduling, I have instilled 1 requirement: no business logic in the scheduling/DAG execution system. ALL business logic must live in (preferably stateless) services outside the DAG execution system: 1. the definition of jobs comes from RPC methods like
getJobs()
2. the definition of a task (which we have always been able to calculate at runtime) comes from an RPC method
getDAG()
3. the actual business logic to run a task lives in an RPC method
runTask()
So the requirement to keep all business logic out of the scheduler is something that I am trying to impose. The requirement that DAGs must be calculated at runtime comes from the fact that our jobs already run this way -- we can compromise on this but we would have to rewrite many jobs completely with the new constraint.
To answer your question about the task inputs, the task inputs are actually opaque byte arrays serialized as base64 strings. The scheduling system is meant to have no idea what they are. These inputs are returned by
getDag()
and passed without modification directly to
runTask()
. The serialization mechanism is up to the people who implement
getDag()
and
runTask()
-- I provide a version field to allow people to change serialization mechanisms in a backwards-compatible way.
Oh and I chose base64 strings because I figured every DAG execution system on the planet would probably allow you to pass a UTF-8 string.
y
so the goal is to use flyte as the execution/runtime engine, and retain existing systems for workflow creation and specification
d
Yes. I think they want me to also use Flyte for scheduling as well, though we wouldn't do that unless/until we can submit a change to implement support for RRULE schedules.
166 Views