late-truck-36921
12/20/2022, 4:24 PMget_dag( job_input ) -> DAG of task inputs
In Flyte, I need to read the response, which is a DAG represented as an adjacency list. The directed edges say which tasks need to be executed before which other tasks, and each node in the graph has a payload which is the input to the task.
This get_dag()
call happens at runtime, so I understand from [1] that I can do this:
@dynamic
start_workflow(...) -> ...:
dag = get_dag(...)
???
@workflow
def my_workflow(...) -> None
return start_workflow(...)
but I can't figure out how to use "imperative workflows"[2] at the same time. How do I get a `
wf = Workflow(name="my.imperative.workflow.example")
?
I did find the dynamic workflow in the API docs: https://docs.flyte.org/projects/flytekit/en/latest/generated/flytekit.dynamic.html
But how do I use it? Do I need to create a "sub workflow" or is there a more direct way?
[1] https://docs.flyte.org/projects/cookbook/en/stable/auto/core/control_flow/dynamics.html
[2] https://docs.flyte.org/projects/cookbook/en/stable/auto/core/flyte_basics/imperative_wf_style.htmllate-truck-36921
12/20/2022, 6:22 PM74 def _run_task(task_id, adj_list, submitted_tasks):
75 if task_id in submitted_tasks:
76 # we already ran it
77 return submitted_tasks[task_id]
78
79 mylist = adj_list.get((int(task_id)))
80 dep_ids = []
81 if mylist is not None:
82 dep_ids = list(adj_list[int(task_id)])
83
84 markers = []
85 for dep_id in dep_ids:
86 if dep_id in submitted_tasks:
87 markers.append(submitted_tasks[dep_id])
88 else:
89 markers.append(_run_task(dep_id, adj_list, submitted_tas
90
91 marker = run_task(task_input="TODO", markers=markers)
92 submitted_tasks[task_id] = marker
93 return submitted_tasks[task_id]
94
95 @dynamic
96 def wf4(n: int = 5) -> None:
97
98 nodes, adj_list = get_dag()
99 submitted_tasks = {}
100 for node_id in nodes:
101 _run_task(node_id, adj_list, submitted_tasks)
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
late-truck-36921
12/20/2022, 10:56 PMto run generic workflows or somethingThis is actually exactly what I want to do. My company has a lot of critical jobs that run as DAGs and our existing system has many flaws including unreliability and a lack of scalability. These jobs are currently owned by many different teams. This requirement about fully dynamic dags came from one of the learnings of that system. Right now the current system uses a plugin model where multiple teams all own code in the same process. This is terrible because: 1. every team needs to understand how the DAG execution system works 2. no team actually understands how it works; the first time someone sees the code they own is at 3am during an incident. Instead, the team that keeps the system online becomes the de facto owner of everyones code. So, for the brand new world of scheduling, I have instilled 1 requirement: no business logic in the scheduling/DAG execution system. ALL business logic must live in (preferably stateless) services outside the DAG execution system: 1. the definition of jobs comes from RPC methods like
getJobs()
2. the definition of a task (which we have always been able to calculate at runtime) comes from an RPC method getDAG()
3. the actual business logic to run a task lives in an RPC method runTask()
So the requirement to keep all business logic out of the scheduler is something that I am trying to impose.
The requirement that DAGs must be calculated at runtime comes from the fact that our jobs already run this way -- we can compromise on this but we would have to rewrite many jobs completely with the new constraint.late-truck-36921
12/20/2022, 10:59 PMgetDag()
and passed without modification directly to runTask()
.
The serialization mechanism is up to the people who implement getDag()
and runTask()
-- I provide a version field to allow people to change serialization mechanisms in a backwards-compatible way.late-truck-36921
12/20/2022, 11:02 PMthankful-minister-83577
late-truck-36921
12/20/2022, 11:07 PM