Hi! Is there a way to make caching using the
Annotated[..., HashMethod(...)]
work with
FlyteFile
?
It doesn't work out-of-the-box, failing an `issubclass`
check in flytekit/types/file/file.py. However, I have put together a two-line patch (can submit an MR) for that file that would allow the use of the annotated type alias with a custom hash function.
Or am I chasing a red herring here and there is an easier way to get content-aware caching for `FlyteFile`s?
Thanks! 🙏
Here's the example workflow (expected behavior is that
print_file
is executed only when either its inputs change, or the signature/`cache_version` is bumped -- if I omit the custom hash method, the cache is hit regardless if the inputs have changed in content):
def calc_hash(f: FlyteFile) -> str:
...
CachedFlyteFile = Annotated[FlyteFile, HashMethod(calc_hash)]
@task
def write_file() -> CachedFlyteFile:
print("write_file")
local_path = "data.parquet"
df = pd.DataFrame(data={"a": [1, 2, 3], "b": [3, 4, 5]})
df.to_parquet(local_path)
return FlyteFile(local_path, remote_path="s3://...")
@task(cache=True, cache_version="1")
def print_file(file: FlyteFile) -> None:
file.download()
print(pd.read_parquet(file))
@workflow
def wf() -> None:
f = write_file()
print_file(file=f)
if __name__ == "__main__":
wf()
wf() # don't expect to see output from `print_file`