microscopic-animal-46688
05/22/2024, 8:03 PM@task
def datetime_task(datetime_str: str = None):
print(datetime_str)
@workflow
def datetime_workflow():
datetime_str = str(datetime.datetime.now())
return datetime_task(datetime_str=datetime_str)
The second execution of this workflow raises an error like "task with same structure already exists: datetime_str changed".
It seems this error is raised because the input from Flyte's POV is a str
-- not a Promise
-- and so the literal value from the first execution of the workflow is now forever baked into the structure of the task.
The error goes away if you:
1. Add the input to the workflow func signature and pass it when executing the workflow (e.g. pyflyte run [the workflow] --datetime_str=[some time str]
)
2. Write another task @task def produce_datetime_str(): return str(datetime.datetime.now())
, and call the original task like datetime_task(datetime_str=produce_datetime_str())
. In other words, have another task produce the input.
These work because now the input type from Flyte's POV is a Promise
, so the value is not recorded into the task structure.
I don't want to do #1 above because it is burdensome. My real use case is more complex than the above example: It's easy to generate my input in Python code and pass it to a task, but hard to pass it via the command line.
I don't want to do #2 above because now my DAG may have twice as many nodes -- one "produce" task per "use the value" task. These extra nodes add workflow latency, potential failure points, and clutter (including when viewing the DAG in the UI).
So my question is: Are there ways other than 1 and 2 above to pass an input whose value can change from execution to execution to a task and not get that error?tall-lock-23197
It's easy to generate my input in Python code and pass it to a task, but hard to pass it via the command line.workflow should generally just define dependencies between the tasks. if there's code you want to execute, it should preferably be included in a task. could you encapsulate your python code in a function and send that as a default input to the workflow?
gorgeous-waitress-5026
05/23/2024, 7:06 AMgorgeous-waitress-5026
05/23/2024, 7:09 AMgorgeous-waitress-5026
05/23/2024, 7:11 AMgorgeous-waitress-5026
05/23/2024, 7:13 AMtall-lock-23197
Each agent task has some inputs like this, and they're not really inputs to the workflow nor are they outputs from a previous task in the graph.understood. do you not want to provide any sort of input to the workflow? since you want to resolve the inputs programmatically, can't you write a vanilla python function for that and send it as a default input to the workflow?
gorgeous-waitress-5026
05/23/2024, 7:26 AM@workflow
> understood. do you not want to provide any sort of input to the workflow? since you want to resolve the inputs programmatically, can't you write a vanilla python function for that and send it as a default input to the workflow?
Ideally the workflow doesn't need to know about these inputs to the agent tasksgorgeous-waitress-5026
05/23/2024, 7:29 AMtall-lock-23197
I noticed that you're generating the Workflow rather than decorating a function withthat's because i want to generate a workflow with the user-given config first, and then enable the user to send runtime inputs when triggering it.@workflow
there will be a dependency from the start node to every other node in the graphthere will be a dependency only if there's data passage between the tasks. otherwise, they should run in parallel.
gorgeous-waitress-5026
05/23/2024, 7:34 AMgorgeous-waitress-5026
05/23/2024, 7:36 AMmicroscopic-animal-46688
05/23/2024, 3:16 PMpyflyte run the_workflow ...
more than once and the API call for any of the custom tasks changes, it errs with "task with same structure already exists".
"since you want to resolve the inputs programmatically, can't you write a vanilla python function for that and send it as a default input to the workflow?"
even if that solves "task structure exists" error, I'm not sure that's viable for us:
- our users make the workflows. with this approach -- unless I misunderstand -- they must define N workflow inputs for N of our custom tasks and include those N inputs in the workflow signature. The workflow could be very complex and N is theoretically unbounded.
Just for more context, we've tried several things including manually making a Flyte Promise
out of the resolved API call and passing that Promise
as the input, but all our ideas so far produce the same error.