bored-needle-72209
10/04/2024, 9:14 AMfreezing-airport-6809
bored-needle-72209
10/04/2024, 2:08 PMbored-needle-72209
10/04/2024, 2:11 PMbroad-monitor-993
10/04/2024, 2:32 PMFlyteRemote I believe it re-uploads the pickle file input so as far as Flyte knows it’s a different file.broad-monitor-993
10/04/2024, 2:33 PMHashMethod as shown here: https://docs.flyte.org/en/latest/user_guide/development_lifecycle/caching.html#caching-of-non-flyte-offloaded-objectsbored-needle-72209
10/04/2024, 2:54 PMHashMethod the same way as it is used in the discussion linked above. However, when executing that single task by FlyteRemote, it never hits the cache. As you said, it re-uploads the pickle input file and runs the task as it would be new input. Don't you know some solution for this?broad-monitor-993
10/04/2024, 2:55 PMbored-needle-72209
10/04/2024, 3:02 PMfreezing-airport-6809
bored-needle-72209
10/07/2024, 8:30 AMfrom flytekit import task, HashMethod
from typing_extensions import Annotated
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config, PlatformConfig
class VideoRecord:
def __init__(self, video_path: str):
self.video_path = video_path
def hash_video_record(record: VideoRecord) -> str:
return record.video_path
@task(cache=True, cache_version="1.0")
def bar_1(video_record: Annotated[VideoRecord, HashMethod(hash_video_record)]) -> str:
print("Running bar_1")
return video_record.video_path
if __name__ == "__main__":
video_record = VideoRecord("path/to/video")
remote = FlyteRemote(
config=Config(
platform=PlatformConfig(
endpoint=endpoint,
insecure=True,
insecure_skip_verify=True,
)
),
default_project=default_project,
default_domain=default_domain,
)
entity = remote.fetch_task(name="toy_example.bar_1", version="1.1")
remote.execute(
entity=entity,
inputs={"video_record": video_record},
wait=True,
tags=[],
overwrite_cache=False,
)bored-needle-72209
10/07/2024, 8:32 AMif __name__ == "__main__":
video_record = VideoRecord("path/to/video")
bar_1(video_record=video_record)
bar_1(video_record=video_record)bored-needle-72209
10/07/2024, 8:32 AMbroad-monitor-993
10/07/2024, 12:51 PMVideoRecord and not a FlyteFile?broad-monitor-993
10/07/2024, 12:52 PMbroad-monitor-993
10/07/2024, 12:54 PMHashMethod? I don’t that that’ll work…
You need to annotated the output of a task with HashMethod . (referring to the docs example)
def hash_pandas_dataframe(df: pandas.DataFrame) -> str:
return str(pandas.util.hash_pandas_object(df))
@task
def foo_1( # noqa: F811
a: int,
b: str, # noqa: F821
) -> Annotated[pandas.DataFrame, HashMethod(hash_pandas_dataframe)]: # noqa: F821 # noqa: F821
df = pandas.DataFrame(...) # noqa: F821
...
return df
@task(cache=True, cache_version="1.0") # noqa: F811
def bar_1(df: pandas.DataFrame) -> int: # noqa: F811
... # noqa: F811
@workflow
def wf_1(a: int, b: str): # noqa: F811
df = foo(a=a, b=b) # noqa: F811broad-monitor-993
10/07/2024, 12:56 PMNote how the output of taskis annotated with an object of typefoo. Essentially, it represents a function that produces a hash that is used as part of the cache key calculation in calling the taskHashMethod.bar
broad-monitor-993
10/07/2024, 12:56 PMVideoRecord object:broad-monitor-993
10/07/2024, 12:57 PM@task
def produce_video(...) -> Annotated[VideoRecord, HashMethod(...)]:
...
@task
def consume_video(video_record: VideoRecord): ...bored-needle-72209
10/07/2024, 12:59 PMbroad-monitor-993
10/07/2024, 1:05 PMbroad-monitor-993
10/07/2024, 1:05 PMbroad-monitor-993
10/07/2024, 1:20 PMFlyteFile and specify the remote_path argument, where the value is some s3 bucket path where you want to upload your videos. I’m not 100% certain this works with FlyteRemote , but that should make caching work.thankful-minister-83577
bored-needle-72209
10/08/2024, 1:38 PMhigh-accountant-32689
10/17/2024, 9:19 PMbored-needle-72209
10/22/2024, 11:55 AM