Hello. I have a question about cached tasks with p...
# flyte-support
c
Hello. I have a question about cached tasks with pandas dataframes as inputs. https://docs.flyte.org/projects/cookbook/en/v0.3.66/auto/core/flyte_basics/task_cache.html#caching-of-non-flyte-offloaded-objects describes how to configure the
HashMethod
so that the hash can be used to check for a previously cached result. However I can't work out how to do this when the task is launched dynamically. If you run
dynamic_workflow
from my attached example multiple times you will see that the
process_data
task gets re-computed each time. I'm not sure if I missing something or if this is unsupported.
m
what does your dataframe contain, have you confirmed hash_pandas_dataframe generates same hash in each runs
c
I have not explicitly checked the hash but if I do the same thing with static tasks it works as expected. I'm just using the hash function from the flyte docs.
m
i see, then its not related to that 😅 do you see the small check mark saying execution is cached
f
Will try
Cc @tall-lock-23197 / @high-accountant-32689 can either of you try if at keyboard
cc @high-accountant-32689 I tried this, and @calm-pilot-2010 is right, we do not cache the dynamic workflow tasks in this way. Is the "Hash" value not computed? Let me actually look into the actual intermediate values
The hash value exists
Copy code
...
"full_outputs": {
"literals": {
"o0": {
"scalar": {
"structured_dataset": {
"uri": "<s3://my-s3-bucket/data/r4/f9dab720d67a242a5abb-fabn1cay-0/4de7a86075b8ad656c12555aea89bc14>",
"metadata": {
"structured_dataset_type": {
"format": "parquet"
}
}
}
},
"hash": "0        843980084580590629\n1      18198708973748498431\n2      17076221967822531503\n3      12993900499373406413\n4       9273754782203642490\n               ...         \n173     5634508623394073396\n174     6630791936465087689\n175     2882766373253673557\n176     1418935499778223063\n177     2600902136869639931\nLength: 178, dtype: uint64"
}
}

}
...
@calm-pilot-2010 also the hash calculation is weird, there is pandas hash method included with flytekit
@high-accountant-32689 / @hallowed-mouse-14616 - I tried the example in multiple ways. I think there is a problem with hash methods. I tried regular primitive input that worked, cache outside of dynamic worked
Copy code
from typing import Annotated, List

import pandas as pd
from sklearn.datasets import load_wine

from flytekit import HashMethod, dynamic, task, workflow


def hash_pandas_dataframe(df: pd.DataFrame) -> str:
    return str(pd.util.hash_pandas_object(df))


# Type hint to use for pandas dataframes in flyte. This adds information for how to hash them so they can be cached.
# <https://docs.flyte.org/projects/cookbook/en/v0.3.66/auto/core/flyte_basics/task_cache.html#caching-of-non-flyte-offloaded-objects>
CacheablePandasDataFrameT = Annotated[pd.DataFrame, HashMethod(hash_pandas_dataframe)]


@task(cache=True, cache_version="664fedb393489d052c874ddd2c9e7273907cd8ac")
def process_data(df: CacheablePandasDataFrameT) -> CacheablePandasDataFrameT:
    return df.assign(target=lambda x: x["target"].where(x["target"] == 0, 1))


@dynamic(cache=True, cache_version="1.0")
def dynamic_workflow() -> List[CacheablePandasDataFrameT]:
    df = load_wine(as_frame=True).frame
    return [process_data(df=df)]


@task(cache=True, cache_version="1.0")
def load() -> CacheablePandasDataFrameT:
    return load_wine(as_frame=True).frame


@workflow
def wf() -> CacheablePandasDataFrameT:
    return process_data(df=load())


@task(cache=True, cache_version="1")
def process(i: int) -> int:
   return i*i

@dynamic
def foo() -> List[int]:
   return [process(i=10)]
c
Thanks for looking into this. @freezing-airport-6809 your findings are inline with what I found. It always works except when you combine a non-primitive type as input to a dynamically launched task. Should I create a github issue for this? > @calm-pilot-2010 also the hash calculation is weird, there is pandas hash method included with flytekit Could you point me to this. I just copied the hash function from the example in the flyte docs. As far as I can see the same hash function implementation is used in a few flytekit tests but I can't find any others. @freezing-airport-6809 is that snippet you had which shows the hash from the output of the
process_data
task or the
dynamic_workflow
? My guess was that the hash of the
df
created within
dynamic_workflow
is never computed. We don't really provide a type hint for that df so we haven't really told flyte how to compute that hash.
f
No it was Computed I can see it
I am trying to see what’s happening
c
I think is correct to say there is a bug - I'll create a github issue.
Actually it looks like there is already a github issue https://github.com/flyteorg/flyte/issues/3842
👍 1
f
Cc @cool-lifeguard-49380
h
looks like a flytekit issue?
c
I'm not sure. Personally I'm still confused how
flytekit
would know how to hash the df created internally in the
dynamic_workflow
because it doesn't really have an associated type annotation that specifies the
HashMethod
.
f
@calm-pilot-2010 / @cool-lifeguard-49380 / @high-accountant-32689 / @hallowed-mouse-14616 I found the problem. Will post in an issue and the fix should be easy. Just will need backend and flytekit upgrade 😞
🙌 5
cc @hallowed-mouse-14616 / @high-accountant-32689
c
Excited to see that there appears to be a good solution to this problem!! We have a few workflows in which a pytorch module get’s passed to a dynamic workflow and then to a task within it. We need the dynamic workflow and because of this bug, we don’t pass an
nn.Module
around but a pydantic base model (for which we have a simple transformer) which acts as a reference to a
nn.Module
which we “manually” (de)-serialize to and from blob storage …
f
i think this will solve it for good
👍 1
and will allow intra-dynamic workflow hashing
👍 1