bored-needle-72209
10/04/2024, 9:14 AMfreezing-airport-6809
bored-needle-72209
10/04/2024, 2:08 PMbored-needle-72209
10/04/2024, 2:11 PMbroad-monitor-993
10/04/2024, 2:32 PMFlyteRemote
I believe it re-uploads the pickle file input so as far as Flyte knows it’s a different file.broad-monitor-993
10/04/2024, 2:33 PMHashMethod
as shown here: https://docs.flyte.org/en/latest/user_guide/development_lifecycle/caching.html#caching-of-non-flyte-offloaded-objectsbored-needle-72209
10/04/2024, 2:54 PMHashMethod
the same way as it is used in the discussion linked above. However, when executing that single task by FlyteRemote
, it never hits the cache. As you said, it re-uploads the pickle input file and runs the task as it would be new input. Don't you know some solution for this?broad-monitor-993
10/04/2024, 2:55 PMbored-needle-72209
10/04/2024, 3:02 PMfreezing-airport-6809
bored-needle-72209
10/07/2024, 8:30 AMfrom flytekit import task, HashMethod
from typing_extensions import Annotated
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config, PlatformConfig
class VideoRecord:
def __init__(self, video_path: str):
self.video_path = video_path
def hash_video_record(record: VideoRecord) -> str:
return record.video_path
@task(cache=True, cache_version="1.0")
def bar_1(video_record: Annotated[VideoRecord, HashMethod(hash_video_record)]) -> str:
print("Running bar_1")
return video_record.video_path
if __name__ == "__main__":
video_record = VideoRecord("path/to/video")
remote = FlyteRemote(
config=Config(
platform=PlatformConfig(
endpoint=endpoint,
insecure=True,
insecure_skip_verify=True,
)
),
default_project=default_project,
default_domain=default_domain,
)
entity = remote.fetch_task(name="toy_example.bar_1", version="1.1")
remote.execute(
entity=entity,
inputs={"video_record": video_record},
wait=True,
tags=[],
overwrite_cache=False,
)
bored-needle-72209
10/07/2024, 8:32 AMif __name__ == "__main__":
video_record = VideoRecord("path/to/video")
bar_1(video_record=video_record)
bar_1(video_record=video_record)
bored-needle-72209
10/07/2024, 8:32 AMbroad-monitor-993
10/07/2024, 12:51 PMVideoRecord
and not a FlyteFile
?broad-monitor-993
10/07/2024, 12:52 PMbroad-monitor-993
10/07/2024, 12:54 PMHashMethod
? I don’t that that’ll work…
You need to annotated the output of a task with HashMethod
. (referring to the docs example)
def hash_pandas_dataframe(df: pandas.DataFrame) -> str:
return str(pandas.util.hash_pandas_object(df))
@task
def foo_1( # noqa: F811
a: int,
b: str, # noqa: F821
) -> Annotated[pandas.DataFrame, HashMethod(hash_pandas_dataframe)]: # noqa: F821 # noqa: F821
df = pandas.DataFrame(...) # noqa: F821
...
return df
@task(cache=True, cache_version="1.0") # noqa: F811
def bar_1(df: pandas.DataFrame) -> int: # noqa: F811
... # noqa: F811
@workflow
def wf_1(a: int, b: str): # noqa: F811
df = foo(a=a, b=b) # noqa: F811
broad-monitor-993
10/07/2024, 12:56 PMNote how the output of taskis annotated with an object of typefoo
. Essentially, it represents a function that produces a hash that is used as part of the cache key calculation in calling the taskHashMethod
.bar
broad-monitor-993
10/07/2024, 12:56 PMVideoRecord
object:broad-monitor-993
10/07/2024, 12:57 PM@task
def produce_video(...) -> Annotated[VideoRecord, HashMethod(...)]:
...
@task
def consume_video(video_record: VideoRecord): ...
bored-needle-72209
10/07/2024, 12:59 PMbroad-monitor-993
10/07/2024, 1:05 PMbroad-monitor-993
10/07/2024, 1:05 PMbroad-monitor-993
10/07/2024, 1:20 PMFlyteFile
and specify the remote_path
argument, where the value is some s3 bucket path where you want to upload your videos. I’m not 100% certain this works with FlyteRemote
, but that should make caching work.thankful-minister-83577
bored-needle-72209
10/08/2024, 1:38 PMhigh-accountant-32689
10/17/2024, 9:19 PMbored-needle-72209
10/22/2024, 11:55 AM