calm-pilot-2010
10/13/2023, 2:42 PMHashMethod
so that the hash can be used to check for a previously cached result. However I can't work out how to do this when the task is launched dynamically. If you run dynamic_workflow
from my attached example multiple times you will see that the process_data
task gets re-computed each time. I'm not sure if I missing something or if this is unsupported.magnificent-teacher-86590
10/13/2023, 5:37 PMcalm-pilot-2010
10/13/2023, 6:03 PMmagnificent-teacher-86590
10/13/2023, 8:12 PMfreezing-airport-6809
freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
...
"full_outputs": {
"literals": {
"o0": {
"scalar": {
"structured_dataset": {
"uri": "<s3://my-s3-bucket/data/r4/f9dab720d67a242a5abb-fabn1cay-0/4de7a86075b8ad656c12555aea89bc14>",
"metadata": {
"structured_dataset_type": {
"format": "parquet"
}
}
}
},
"hash": "0 843980084580590629\n1 18198708973748498431\n2 17076221967822531503\n3 12993900499373406413\n4 9273754782203642490\n ... \n173 5634508623394073396\n174 6630791936465087689\n175 2882766373253673557\n176 1418935499778223063\n177 2600902136869639931\nLength: 178, dtype: uint64"
}
}
}
...
freezing-airport-6809
freezing-airport-6809
from typing import Annotated, List
import pandas as pd
from sklearn.datasets import load_wine
from flytekit import HashMethod, dynamic, task, workflow
def hash_pandas_dataframe(df: pd.DataFrame) -> str:
return str(pd.util.hash_pandas_object(df))
# Type hint to use for pandas dataframes in flyte. This adds information for how to hash them so they can be cached.
# <https://docs.flyte.org/projects/cookbook/en/v0.3.66/auto/core/flyte_basics/task_cache.html#caching-of-non-flyte-offloaded-objects>
CacheablePandasDataFrameT = Annotated[pd.DataFrame, HashMethod(hash_pandas_dataframe)]
@task(cache=True, cache_version="664fedb393489d052c874ddd2c9e7273907cd8ac")
def process_data(df: CacheablePandasDataFrameT) -> CacheablePandasDataFrameT:
return df.assign(target=lambda x: x["target"].where(x["target"] == 0, 1))
@dynamic(cache=True, cache_version="1.0")
def dynamic_workflow() -> List[CacheablePandasDataFrameT]:
df = load_wine(as_frame=True).frame
return [process_data(df=df)]
@task(cache=True, cache_version="1.0")
def load() -> CacheablePandasDataFrameT:
return load_wine(as_frame=True).frame
@workflow
def wf() -> CacheablePandasDataFrameT:
return process_data(df=load())
@task(cache=True, cache_version="1")
def process(i: int) -> int:
return i*i
@dynamic
def foo() -> List[int]:
return [process(i=10)]
calm-pilot-2010
10/14/2023, 4:44 PMprocess_data
task or the dynamic_workflow
? My guess was that the hash of the df
created within dynamic_workflow
is never computed. We don't really provide a type hint for that df so we haven't really told flyte how to compute that hash.freezing-airport-6809
freezing-airport-6809
calm-pilot-2010
10/16/2023, 9:38 AMcalm-pilot-2010
10/16/2023, 9:59 AMfreezing-airport-6809
hallowed-mouse-14616
10/16/2023, 5:42 PMcalm-pilot-2010
10/16/2023, 10:20 PMflytekit
would know how to hash the df created internally in the dynamic_workflow
because it doesn't really have an associated type annotation that specifies the HashMethod
.freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
cool-lifeguard-49380
10/17/2023, 1:56 AMnn.Module
around but a pydantic base model (for which we have a simple transformer) which acts as a reference to a nn.Module
which we “manually” (de)-serialize to and from blob storage …freezing-airport-6809
freezing-airport-6809