Hi! I have a few questions about Flyte caching.. I read through the documentation <here> a few time...
e

Eric Song

over 2 years ago
Hi! I have a few questions about Flyte caching.. I read through the documentation here a few times but I’m still a bit confused about why the “Caching of Non-flyte Offloaded Objects” examples are hashing the task output instead of the input. My intuition for how a cache would work is that the input values for a task needs to be hashable so that they can be used as keys for the cache. In which case, I feel like it would make more sense to add the
Annotated HashMethod
to the input types of a task instead of the output. Then, the
HashMethod
can be run on the inputs when a cached task is invoked to determine the cache keys and then check if there's a hit. I tried this out and it kind of works for local caching? but not for remote.
@task(cache=True, cache_version="1.0")
def my_task(obj: ty.Annotated[pd.DataFrame, HashMethod(hash_df)]) -> pd.DataFrame:
    redis_client.incr(REDIS_COUNTER_KEY)

    return obj

@dynamic
def my_workflow():
    obj = pd.DataFrame(
        {
            "name": ["a", "b"],
            "val": ["test1", "test2"],
        }
    )

    obj = my_task(obj=obj)
    obj = my_task(obj=obj)
    my_task(obj=obj)
In the example above
my_task
will be be called three times the first time
my_workflow
is called. This still doesn't match my expectation since I thought it would be called once for the first call and for the cache to be hit on the second and third call since the input
obj
is the same. However! The second run of
my_workflow
has a cache hit for all three calls to
my_task
so it does work to some extent even though I don’t fully understand what it’s doing. For remote caching, this doesn’t seem to work at all and there are no cache hits no matter how many times I run
my_workflow
.
I've spun up a Sandbox cluster (using `flytectl sandbox start` ) on one machine, and I'm trying to r...
a

Aleksei Potov

over 3 years ago
I've spun up a Sandbox cluster (using
flytectl sandbox start
) on one machine, and I'm trying to run an example wf from different machine (I've updated
endpoint
in
~/.flyte/config.yaml
).
pyflyte run --remote example.py:wf --n 500 --mean 42 --sigma 21
succeds and I get a link to the console. However when inspecting the execution I see this error:
[1/1] currentAttempt done. Last Error: USER::Pod failed. No message received from kubernetes.
[f31016626d79e40b2a4c-n0-0] terminated with exit code (1). Reason [Error]. Message: 
_execute(cmd=cmd, s3_cfg=self.s3_cfg)
  File "/usr/local/lib/python3.8/site-packages/flytekit/extras/persistence/s3_awscli.py", line 51, in _update_cmd_config_and_execute
    return subprocess.check_call(anonymous_cmd, env=env)
  File "/usr/local/lib/python3.8/site-packages/flytekit/tools/subprocess.py", line 26, in check_call
    raise Exception(
Exception: Called process exited with error code: 1.  Stderr dump:

b'fatal error: An error occurred (403) when calling the HeadObject operation: Forbidden\n'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/bin/pyflyte-fast-execute", line 8, in <module>
    sys.exit(fast_execute_task_cmd())
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/flytekit/bin/entrypoint.py", line 495, in fast_execute_task_cmd
    _download_distribution(additional_distribution, dest_dir)
  File "/usr/local/lib/python3.8/site-packages/flytekit/tools/fast_registration.py", line 102, in download_distribution
    file_access.get_data(additional_distribution, destination)
  File "/usr/local/lib/python3.8/site-packages/flytekit/core/data_persistence.py", line 427, in get_data
    raise FlyteAssertion(
flytekit.exceptions.user.FlyteAssertion: Failed to get data from <s3://my-s3-bucket/36/flytesnacks/development/VGJSBXLCAE3JIKQYKY2O3N5IMQ======/scriptmode.tar.gz> to . (recursive=False).

Original exception: Called process exited with error code: 1.  Stderr dump:

b'fatal error: An error occurred (403) when calling the HeadObject operation: Forbidden\n'
.