acoustic-carpenter-78188
10/18/2023, 10:34 PMAnnotated[FlyteFile, HashMethod(...)]
for calculating a custom hash for a FlyteFile
doesn't work out-of-the-box, failing an `issubclass` check in flytekit/types/file/file.py.
This makes it impossible to use custom hash functions with FlyteFile
, e.g., to implement content-aware caching.
Note, that the same approach with Annotated[FlyteDirectory, HashMethod(...)]
works as expected out of the box.
Example workflow code
import hashlib
from typing import Annotated
import pandas as pd
from flytekit import task, workflow
from flytekit.core.hash import HashMethod
from flytekit.types.file import FlyteFile
def calc_hash(f: FlyteFile) -> str:
"""Calculate SHA1 hash of a file"""
h = hashlib.sha1(usedforsecurity=False)
with open(f.path, "rb") as f:
while chunk := f.read(4096):
h.update(chunk)
return str(h.hexdigest())
CachedFlyteFile = Annotated[FlyteFile, HashMethod(calc_hash)]
@task
def write_file() -> CachedFlyteFile:
print("write_file")
local_path = "data.parquet"
df = pd.DataFrame(data={"a": [1, 2, 3], "b": [3, 4, 5]})
df.to_parquet(local_path)
return FlyteFile(local_path, remote_path=f"<s3://test-repo/main/{local_path}>")
@task(cache=True, cache_version="1")
def print_file(file: FlyteFile) -> None:
file.download()
print(pd.read_parquet(file))
@workflow
def wf() -> None:
f = write_file()
print_file(file=f)
if __name__ == "__main__":
wf()
wf() # don't expect output from `print_file`, since it should be cached
Expected behavior
The first execution of wf
should run both the write_file
and print_file
tasks, the second execution should only run write_file
and hit the cache for print_file
.
In reality, an exception is raised in `flytekit/types/file/file.py, in to_literal()`:
Additional context to reproduce See example workflow above. I was able to solve the issue by inserting the following before theTypeError: issubclass() arg 1 must be a class
issubclass
check in flytekit/types/file/file.py
(I haven't run the flytekit test suite, so this might not be suitable for a mergeable PR just yet - should I submit a draft PR regardless?):
@@ -284,6 +288,10 @@
"None value cannot be converted to a file."
)
+ # Handle Annotated[FlyteFile, ...] correctly by extracting the wrapped type
+ if issubclass(typing.get_origin(python_type), typing.Annotated):
+ python_type = typing.get_args(python_type)[0]
+
if not (python_type is os.PathLike or issubclass(python_type, FlyteFile)):
raise ValueError(
f"Incorrect type {python_type}, must be either a FlyteFile or os.PathLike"
Screenshots
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteacoustic-carpenter-78188
10/18/2023, 10:34 PM