Rahul Mehta
01/27/2023, 2:05 AM{
"json": {
"exec_id": "run-name",
"ns": "flytetester-development",
"routine": "worker-11"
},
"level": "warning",
"msg": "Workflow not found in cache.",
"ts": "2023-01-26T17:37:00Z"
}
Are there any other locations to check/can I make the logging more verbose to see the cache keys that are being used for each execution?Annotation
-- it seems like varargs in Annotations
isn't supported when adding a flytekit.HashMethod
. For ex
DataFrame = Annotated[pd.DataFrame, some_other_annotation, HashMethod(...)]
doesn't work, but dropping some_other_annotation
and just using
DataFrame = Annotated[pd.DataFrame, HashMethod(...)]
doesSamhita Alla
Tyler Su
02/03/2023, 6:40 PMEduardo Apolinario (eapolinario)
02/07/2023, 12:58 AMfrom typing_extensions import Annotated
import pandas as pd
from flytekit import task, workflow, HashMethod
def hash_pandas_dataframe(df: pd.DataFrame) -> str:
return str(pd.util.hash_pandas_object(df))
@task(cache=True, cache_version="1")
def get_df() -> Annotated[pd.DataFrame, HashMethod(hash_pandas_dataframe)]:
# Including another annotation does not modify the interface used by flytekit to calculate cache keys,
# in other words, we're going to hit the same key if we include another annotation, for example:
# def get_df() -> Annotated[pd.DataFrame, 42, HashMethod(hash_pandas_dataframe)]:
return pd.DataFrame({"column_1": [1, 2, 3]})
@workflow
def wf(name: str) -> pd.DataFrame:
return get_df()
Tyler Su
02/15/2023, 11:02 PM