Hi! I have a few questions about Flyte caching.. I read through the documentation
here a few times but I’m still a bit confused about why the “Caching of Non-flyte Offloaded Objects” examples are hashing the task output instead of the input.
My intuition for how a cache would work is that the input values for a task needs to be hashable so that they can be used as keys for the cache. In which case, I feel like it would make more sense to add the
Annotated HashMethod
to the input types of a task instead of the output. Then, the
HashMethod
can be run on the inputs when a cached task is invoked to determine the cache keys and then check if there's a hit.
I tried this out and it kind of works for local caching? but not for remote.
@task(cache=True, cache_version="1.0")
def my_task(obj: ty.Annotated[pd.DataFrame, HashMethod(hash_df)]) -> pd.DataFrame:
redis_client.incr(REDIS_COUNTER_KEY)
return obj
@dynamic
def my_workflow():
obj = pd.DataFrame(
{
"name": ["a", "b"],
"val": ["test1", "test2"],
}
)
obj = my_task(obj=obj)
obj = my_task(obj=obj)
my_task(obj=obj)
In the example above
my_task
will be be called
three times the first time
my_workflow
is called. This still doesn't match my expectation since I thought it would be called
once for the first call and for the cache to be hit on the second and third call since the input
obj
is the same.
However! The second run of
my_workflow
has a cache hit for all three calls to
my_task
so it does work to some extent even though I don’t fully understand what it’s doing.
For
remote caching, this doesn’t seem to work at all and there are no cache hits no matter how many times I run
my_workflow
.