Hello. I have a question about cached tasks with p...
# ask-the-community
t
Hello. I have a question about cached tasks with pandas dataframes as inputs. https://docs.flyte.org/projects/cookbook/en/v0.3.66/auto/core/flyte_basics/task_cache.html#caching-of-non-flyte-offloaded-objects describes how to configure the
HashMethod
so that the hash can be used to check for a previously cached result. However I can't work out how to do this when the task is launched dynamically. If you run
dynamic_workflow
from my attached example multiple times you will see that the
process_data
task gets re-computed each time. I'm not sure if I missing something or if this is unsupported.
j
what does your dataframe contain, have you confirmed hash_pandas_dataframe generates same hash in each runs
t
I have not explicitly checked the hash but if I do the same thing with static tasks it works as expected. I'm just using the hash function from the flyte docs.
j
i see, then its not related to that 😅 do you see the small check mark saying execution is cached
k
Will try
Cc @Samhita Alla / @Eduardo Apolinario (eapolinario) can either of you try if at keyboard
cc @Eduardo Apolinario (eapolinario) I tried this, and @Thomas Newton is right, we do not cache the dynamic workflow tasks in this way. Is the "Hash" value not computed? Let me actually look into the actual intermediate values
The hash value exists
Copy code
...
"full_outputs": {
"literals": {
"o0": {
"scalar": {
"structured_dataset": {
"uri": "<s3://my-s3-bucket/data/r4/f9dab720d67a242a5abb-fabn1cay-0/4de7a86075b8ad656c12555aea89bc14>",
"metadata": {
"structured_dataset_type": {
"format": "parquet"
}
}
}
},
"hash": "0        843980084580590629\n1      18198708973748498431\n2      17076221967822531503\n3      12993900499373406413\n4       9273754782203642490\n               ...         \n173     5634508623394073396\n174     6630791936465087689\n175     2882766373253673557\n176     1418935499778223063\n177     2600902136869639931\nLength: 178, dtype: uint64"
}
}

}
...
@Thomas Newton also the hash calculation is weird, there is pandas hash method included with flytekit
@Eduardo Apolinario (eapolinario) / @Dan Rammer (hamersaw) - I tried the example in multiple ways. I think there is a problem with hash methods. I tried regular primitive input that worked, cache outside of dynamic worked
Copy code
from typing import Annotated, List

import pandas as pd
from sklearn.datasets import load_wine

from flytekit import HashMethod, dynamic, task, workflow


def hash_pandas_dataframe(df: pd.DataFrame) -> str:
    return str(pd.util.hash_pandas_object(df))


# Type hint to use for pandas dataframes in flyte. This adds information for how to hash them so they can be cached.
# <https://docs.flyte.org/projects/cookbook/en/v0.3.66/auto/core/flyte_basics/task_cache.html#caching-of-non-flyte-offloaded-objects>
CacheablePandasDataFrameT = Annotated[pd.DataFrame, HashMethod(hash_pandas_dataframe)]


@task(cache=True, cache_version="664fedb393489d052c874ddd2c9e7273907cd8ac")
def process_data(df: CacheablePandasDataFrameT) -> CacheablePandasDataFrameT:
    return df.assign(target=lambda x: x["target"].where(x["target"] == 0, 1))


@dynamic(cache=True, cache_version="1.0")
def dynamic_workflow() -> List[CacheablePandasDataFrameT]:
    df = load_wine(as_frame=True).frame
    return [process_data(df=df)]


@task(cache=True, cache_version="1.0")
def load() -> CacheablePandasDataFrameT:
    return load_wine(as_frame=True).frame


@workflow
def wf() -> CacheablePandasDataFrameT:
    return process_data(df=load())


@task(cache=True, cache_version="1")
def process(i: int) -> int:
   return i*i

@dynamic
def foo() -> List[int]:
   return [process(i=10)]
No it was Computed I can see it
I am trying to see what’s happening
t
I think is correct to say there is a bug - I'll create a github issue.
Actually it looks like there is already a github issue https://github.com/flyteorg/flyte/issues/3842
k
Cc @Fabio Grätz
d
looks like a flytekit issue?
t
I'm not sure. Personally I'm still confused how
flytekit
would know how to hash the df created internally in the
dynamic_workflow
because it doesn't really have an associated type annotation that specifies the
HashMethod
.
k
@Thomas Newton / @Fabio Grätz / @Eduardo Apolinario (eapolinario) / @Dan Rammer (hamersaw) I found the problem. Will post in an issue and the fix should be easy. Just will need backend and flytekit upgrade 😞
cc @Dan Rammer (hamersaw) / @Eduardo Apolinario (eapolinario)
f
Excited to see that there appears to be a good solution to this problem!! We have a few workflows in which a pytorch module get’s passed to a dynamic workflow and then to a task within it. We need the dynamic workflow and because of this bug, we don’t pass an
nn.Module
around but a pydantic base model (for which we have a simple transformer) which acts as a reference to a
nn.Module
which we “manually” (de)-serialize to and from blob storage …
k
i think this will solve it for good
and will allow intra-dynamic workflow hashing