I’m trying to use a task to do incremental state u...
# flyte-support
g
I’m trying to use a task to do incremental state updates. Is it possible for a task to retrieve the most recently cached output of that same task, without knowing what the inputs to that task were? Something like this (but assume that the actual computation involved is expensive, hence the desire to access the cache):
Copy code
@task
def incremental_update(additional_value: int):
    previous_total = ???
    if previous_total is None:
         return additional_value
    else:
         return previous_total + additional value
t
@great-nest-93561, how many times do you need to incrementally update? How about you retrieve the task output in a different task, not the same one?
g
Can be as much as once a second. Even if I use a second task to retrieve the previous cached output, how could I do so without knowing the exact parameters passed into it?
t
Trying to understand your use-case. Will you be running the task in separate executions? Or will all of this be happening in the same execution?
g
Separate executions (each execution a new piece of streaming data is received)
Essentially: • I have a very large directed acyclic graph (DAG) that converts streaming inputs into outputs • The inputs come from separate sources, at very different times, and some have preprocessing steps. • Some steps are very expensive to compute from scratch, so I need a way to incrementally react to received inputs • Alternatively, I need a way to retrieve the results of previous steps in the dag, without knowing their inputs. For example, say I have a task C that takes inputs from tasks A and B. A is very expensive to compute, and comes from previous steps. When I get a new value of B, I need to be able to run C using the new value of B and the previously cached value from A
t
Why do you not want to send inputs? How do you know which value of A to retrieve for your task C?
g
Each task is stateful, and the result is the last run of that task. So the value of A I need is the last run of that task. Essentially, every task is a singleton that iterates on its previous value when it receives an input To keep track of historical inputs used to generate the latest value of A defeats the point of caching
155 Views