flat-monkey-49105
08/09/2025, 7:11 AMfrom dataclasses import dataclass
from typing import Annotated
from flytekit import Cache, HashMethod, StructuredDataset, task, workflow
import pandas as pd
import logging
@dataclass
class Data:
metadata: str
df: StructuredDataset
def hash_pandas_dataframe(df: pd.DataFrame) -> str:
return str(pd.util.hash_pandas_object(df))
def hash_data(data: Data) -> str:
# I cannot access the pd.Dataframe in the hash function?
return str(pd.util.hash_pandas_object(data.df.open(pd.DataFrame).all()))
@task
def generate_data_a() -> Annotated[Data, HashMethod(hash_data)]:
data = Data(
metadata="hello",
df=StructuredDataset(pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})),
)
return data
@task(cache=Cache(version="1.3"))
def process_data_a(data: Data) -> bool:
logging.error(f"process_data_a: {data.df.open(pd.DataFrame).all()}")
return True
@task
def generate_data_b() -> Annotated[pd.DataFrame, HashMethod(hash_pandas_dataframe)]:
return pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
@task(cache=Cache(version="1.3"))
def process_data_b(data: pd.DataFrame) -> bool:
logging.error(f"process_data_b: {data}")
return True
@workflow
def cache_workflow() -> None:
# With the custom hashMethod for the `Data` object it crashes but perhaps that is also not the correct way to do it
data_a = generate_data_a()
process_data_a(data_a)
# This caches correctly
data_b = generate_data_b()
process_data_b(data_b)
return
freezing-airport-6809
flat-monkey-49105
08/11/2025, 9:41 AMflat-monkey-49105
08/11/2025, 9:43 AMfreezing-airport-6809