Hi! I have a few questions about Flyte caching.. ...
# ask-the-community
e
Hi! I have a few questions about Flyte caching.. I read through the documentation here a few times but I’m still a bit confused about why the “Caching of Non-flyte Offloaded Objects” examples are hashing the task output instead of the input. My intuition for how a cache would work is that the input values for a task needs to be hashable so that they can be used as keys for the cache. In which case, I feel like it would make more sense to add the
Annotated HashMethod
to the input types of a task instead of the output. Then, the
HashMethod
can be run on the inputs when a cached task is invoked to determine the cache keys and then check if there's a hit. I tried this out and it kind of works for local caching? but not for remote.
Copy code
@task(cache=True, cache_version="1.0")
def my_task(obj: ty.Annotated[pd.DataFrame, HashMethod(hash_df)]) -> pd.DataFrame:
    redis_client.incr(REDIS_COUNTER_KEY)

    return obj

@dynamic
def my_workflow():
    obj = pd.DataFrame(
        {
            "name": ["a", "b"],
            "val": ["test1", "test2"],
        }
    )

    obj = my_task(obj=obj)
    obj = my_task(obj=obj)
    my_task(obj=obj)
In the example above
my_task
will be be called three times the first time
my_workflow
is called. This still doesn't match my expectation since I thought it would be called once for the first call and for the cache to be hit on the second and third call since the input
obj
is the same. However! The second run of
my_workflow
has a cache hit for all three calls to
my_task
so it does work to some extent even though I don’t fully understand what it’s doing. For remote caching, this doesn’t seem to work at all and there are no cache hits no matter how many times I run
my_workflow
.
y
hey so it’s flipped actually. the producer of the data is the one that decides whether to hash and if so how to.
one reason for this is that you need a container running code (typically python) in order to do the hash, and you also need to have (or at least have seen) the entire set of data.
so the best time to do that is when the producer creates the data.
so yeah this use case is for when you have something like
Copy code
@task
t1() -> T

@task(cache=True, cache_version="1")
def t2(a: T)

t2(a=t1())
that is… if t1 is liable to produce an offloaded output but is often identical to a past output, then the downstream task should still detect a cache hit
does that make sense?
e
I see. So the hash is always produced on the output when a task is finished rather than when a task is about to be invoked? Does that mean it's not possible to get a cache hit if the input is not an output produced by Flyte? For example, in my example above,
obj
is initially created through a pandas.df constructor. The first call to
my_task
would never get a cache hit?
y
let me take a closer look at the code and get back to you.
i’m not sure if there’s a way to trigger the hash in your example. it might be.
it’s not an output, that’s correct, but because it’s in a dynamic task it is being run and created by flyte.
209 Views