Rahul Mehta

01/27/2023, 2:05 AM
Any pointers for debug cache misses? I've started integrating caching into our core training tasks and am seeing that outputs are being correctly written to the cache, but it looks like only cache misses for downstream tasks. I also don't see anything super meaningful in the flytepropeller logs, only a few sparse log lines with:
  "json": {
    "exec_id": "run-name",
    "ns": "flytetester-development",
    "routine": "worker-11"
  "level": "warning",
  "msg": "Workflow not found in cache.",
  "ts": "2023-01-26T17:37:00Z"
Are there any other locations to check/can I make the logging more verbose to see the cache keys that are being used for each execution?
Hm, turns out I just botched my
-- it seems like varargs in
isn't supported when adding a
. For ex
DataFrame = Annotated[pd.DataFrame, some_other_annotation, HashMethod(...)]
doesn't work, but dropping
and just using
DataFrame = Annotated[pd.DataFrame, HashMethod(...)]

Samhita Alla

01/27/2023, 6:43 AM
Tyler Su

02/03/2023, 6:40 PM
Eduardo Apolinario (eapolinario)

02/07/2023, 12:58 AM
@Rahul Mehta, can you say more? I can't repro this with this example:
from typing_extensions import Annotated
import pandas as pd
from flytekit import task, workflow, HashMethod

def hash_pandas_dataframe(df: pd.DataFrame) -> str:
    return str(pd.util.hash_pandas_object(df))

@task(cache=True, cache_version="1")
def get_df() -> Annotated[pd.DataFrame, HashMethod(hash_pandas_dataframe)]:
# Including another annotation does not modify the interface used by flytekit to calculate cache keys,
# in other words, we're going to hit the same key if we include another annotation, for example:
# def get_df() -> Annotated[pd.DataFrame, 42, HashMethod(hash_pandas_dataframe)]:
    return pd.DataFrame({"column_1": [1, 2, 3]})

def wf(name: str) -> pd.DataFrame:
    return get_df()

Tyler Su

02/15/2023, 11:02 PM
