Hi, I have a question about passing inputs to task...
# flyte-support
m
Hi, I have a question about passing inputs to tasks. I want to pass an input to a task in a workflow. This input may change from execution to execution (datetime type not used for illustrative purposes):
Copy code
@task
def datetime_task(datetime_str: str = None):
    print(datetime_str)


@workflow
def datetime_workflow():
    datetime_str = str(datetime.datetime.now())
    return datetime_task(datetime_str=datetime_str)
The second execution of this workflow raises an error like "task with same structure already exists: datetime_str changed". It seems this error is raised because the input from Flyte's POV is a
str
-- not a
Promise
-- and so the literal value from the first execution of the workflow is now forever baked into the structure of the task. The error goes away if you: 1. Add the input to the workflow func signature and pass it when executing the workflow (e.g.
pyflyte run [the workflow] --datetime_str=[some time str]
) 2. Write another task
@task def produce_datetime_str(): return str(datetime.datetime.now())
, and call the original task like
datetime_task(datetime_str=produce_datetime_str())
. In other words, have another task produce the input. These work because now the input type from Flyte's POV is a
Promise
, so the value is not recorded into the task structure. I don't want to do #1 above because it is burdensome. My real use case is more complex than the above example: It's easy to generate my input in Python code and pass it to a task, but hard to pass it via the command line. I don't want to do #2 above because now my DAG may have twice as many nodes -- one "produce" task per "use the value" task. These extra nodes add workflow latency, potential failure points, and clutter (including when viewing the DAG in the UI). So my question is: Are there ways other than 1 and 2 above to pass an input whose value can change from execution to execution to a task and not get that error?
t
It's easy to generate my input in Python code and pass it to a task, but hard to pass it via the command line.
workflow should generally just define dependencies between the tasks. if there's code you want to execute, it should preferably be included in a task. could you encapsulate your python code in a function and send that as a default input to the workflow?
g
I'm aware of what Ryan is working on, so I can say that something like that is possible -- but I'm not sure it will work for the real use case.
The real use case involves a graph of agent-based tasks, but we tried to generalize the problem a bit.
We're really trying to resolve some inputs for each node in the graph programmatically. Each agent task has some inputs like this, and they're not really inputs to the workflow nor are they outputs from a previous task in the graph.
It looks like you may have run across some similar concerns while working on the new OpenAI Batch API agent? https://github.com/flyteorg/flytekit/pull/2353
t
is there anything specific you're referring to in the openai agent?
Each agent task has some inputs like this, and they're not really inputs to the workflow nor are they outputs from a previous task in the graph.
understood. do you not want to provide any sort of input to the workflow? since you want to resolve the inputs programmatically, can't you write a vanilla python function for that and send it as a default input to the workflow?
g
I noticed that you're generating the Workflow rather than decorating a function with
@workflow
> understood. do you not want to provide any sort of input to the workflow? since you want to resolve the inputs programmatically, can't you write a vanilla python function for that and send it as a default input to the workflow? Ideally the workflow doesn't need to know about these inputs to the agent tasks
You can correct me if I'm wrong here, but I think if the workflow has these special inputs for each agent task in the graph, then there will be a dependency from the start node to every other node in the graph?
t
I noticed that you're generating the Workflow rather than decorating a function with
@workflow
that's because i want to generate a workflow with the user-given config first, and then enable the user to send runtime inputs when triggering it.
there will be a dependency from the start node to every other node in the graph
there will be a dependency only if there's data passage between the tasks. otherwise, they should run in parallel.
g
Ok, thanks for the assistance / idea -- let us try and spike on that a bit tomorrow b/c I don't think that's an angle we've quite explored.
👍 1
If we want to kind of distill down the core issue here -- it's that we have values in the agent task config that we want to make (optional) inputs. If a user specifies the value for that input in their code explicitly, we'll use it and everything is fine. When they don't, we want to resolve it for them in code (and they won't modify the workflow code - it's just different inputs in theory being passed to the same version of the workflow). We can still resolve these inputs before triggering the workflow -- but we're having a tough time of figuring out the right way to do it.
👍 1
m
thanks for the ideas, samhita. "but we're having a tough time of figuring out the right way to do it." Now that there is more context in this thread, to reiterate the key point for our actual use case: We're having a tough time because when a user says "use default inputs" with our custom task, then the defaults are resolved by an API call during task instance instantiation. So if a user in their course of developing their workflow -- which could include an arbitrary number of our custom tasks -- does
pyflyte run the_workflow ...
more than once and the API call for any of the custom tasks changes, it errs with "task with same structure already exists". "since you want to resolve the inputs programmatically, can't you write a vanilla python function for that and send it as a default input to the workflow?" even if that solves "task structure exists" error, I'm not sure that's viable for us: - our users make the workflows. with this approach -- unless I misunderstand -- they must define N workflow inputs for N of our custom tasks and include those N inputs in the workflow signature. The workflow could be very complex and N is theoretically unbounded. Just for more context, we've tried several things including manually making a Flyte
Promise
out of the resolved API call and passing that
Promise
as the input, but all our ideas so far produce the same error.