Hello community, I am executing registered flyte ...
# flyte-support
b
Hello community, I am executing registered flyte tasks by FlyteRemote: https://github.com/flyteorg/flytekit/blob/master/flytekit/remote/remote.py#L1176 Is there a way to have cache hit when having single (PythonPickle) blob as inputs? I have tried to add the Annotated HashMethod to the input types in the flyte task, but it is not working.
f
It should work, you hash maybe changing
b
The input in my case is a Python type and not one produced by flyte
b
If you use
FlyteRemote
I believe it re-uploads the pickle file input so as far as Flyte knows it’s a different file.
b
I did use
HashMethod
the same way as it is used in the discussion linked above. However, when executing that single task by
FlyteRemote
, it never hits the cache. As you said, it re-uploads the pickle input file and runs the task as it would be new input. Don't you know some solution for this?
b
can you confirm locally that the hash function is producing the same output for the pickle file?
b
Yes I can Is the problem in the discussion not valid anymore and should it work now (even though the input is not produced by flyte)? Because I strongly believe I am having the same problem and looking for some workaround.
f
It will work if your hashing algorithm results in the same hash
b
Let me show you my toy example, where cache hit never happens after consecutive runs:
Copy code
from flytekit import task, HashMethod
from typing_extensions import Annotated
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config, PlatformConfig


class VideoRecord:
    def __init__(self, video_path: str):
        self.video_path = video_path


def hash_video_record(record: VideoRecord) -> str:
    return record.video_path


@task(cache=True, cache_version="1.0")
def bar_1(video_record: Annotated[VideoRecord, HashMethod(hash_video_record)]) -> str:
    print("Running bar_1")
    return video_record.video_path


if __name__ == "__main__":
    video_record = VideoRecord("path/to/video")

    remote = FlyteRemote(
        config=Config(
            platform=PlatformConfig(
                endpoint=endpoint,
                insecure=True,
                insecure_skip_verify=True,
            )
        ),
        default_project=default_project,
        default_domain=default_domain,
    )
    entity = remote.fetch_task(name="toy_example.bar_1", version="1.1")
    remote.execute(
        entity=entity,
        inputs={"video_record": video_record},
        wait=True,
        tags=[],
        overwrite_cache=False,
    )
Btw bar_1 is registered entity and if I run it locally, then cache hit does happen:
Copy code
if __name__ == "__main__":
    video_record = VideoRecord("path/to/video")
    bar_1(video_record=video_record)
    bar_1(video_record=video_record)
Any idea why is it not working by FlyteRemote?
b
is there any reason you’re using a custom class
VideoRecord
and not a
FlyteFile
?
regardless, this does seem like a bug
oh wait… you’re annotating the input with
HashMethod
? I don’t that that’ll work… You need to annotated the output of a task with
HashMethod
. (referring to the docs example)
Copy code
def hash_pandas_dataframe(df: pandas.DataFrame) -> str:
    return str(pandas.util.hash_pandas_object(df))


@task
def foo_1(  # noqa: F811
    a: int,
    b: str,  # noqa: F821
) -> Annotated[pandas.DataFrame, HashMethod(hash_pandas_dataframe)]:  # noqa: F821  # noqa: F821
    df = pandas.DataFrame(...)  # noqa: F821
    ...
    return df


@task(cache=True, cache_version="1.0")  # noqa: F811
def bar_1(df: pandas.DataFrame) -> int:  # noqa: F811
    ...  # noqa: F811


@workflow
def wf_1(a: int, b: str):  # noqa: F811
    df = foo(a=a, b=b)  # noqa: F811
Note how the output of task
foo
is annotated with an object of type
HashMethod
. Essentially, it represents a function that produces a hash that is used as part of the cache key calculation in calling the task
bar
.
so in your case, you’d need a task that produces the
VideoRecord
object:
Copy code
@task
def produce_video(...) -> Annotated[VideoRecord, HashMethod(...)]:
    ...

@task
def consume_video(video_record: VideoRecord): ...
b
Yes, but what if I don't have? that was my very first question. I would like to execute tasks by FlyteRemote, so the inputs are coming from Python and not from a flyte task.
b
I don’t think this is possible today
@high-accountant-32689 @thankful-minister-83577 ^^
another solution might be to use
FlyteFile
and specify the
remote_path
argument, where the value is some s3 bucket path where you want to upload your videos. I’m not 100% certain this works with
FlyteRemote
, but that should make caching work.
t
I think we should support this. Mind creating a gh issue for this @bored-needle-72209?
b
h
@bored-needle-72209, FYI, I replied on the gh issue. This should be enough to unblock your use of flyteremote.
b
thanks, I will try it out