Joe Kelly
08/07/2023, 6:43 PMdiskcache
package seems to be unable to create the relevant local cache directory -- is this expected / is there something further I need to do to enable caching?
I've registered this workflow and I'm running it on the Flyte app (not on my local machine). The workflow runs fine when setting cache=False
, but I'd love to be able to take advantage of Flyte's caching here to avoid rerunning on the same files repeatedly
IMAGE_SPEC = ImageSpec(
env={[redacted]},
pip_index=[redacted]
packages=[[redacted]],
base_image="<http://ghcr.io/flyteorg/flytekit:py3.11-1.8.3|ghcr.io/flyteorg/flytekit:py3.11-1.8.3>",
registry=[redacted],
)
@task(container_image=IMAGE_SPEC, cache=True, cache_version="0.1")
def download_file(document_id: int) -> Document:
...
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
return wrapped(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/root/data/download.py", line 86, in process_documents
doc = download_file(document_id=document)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/flytekit/core/base_task.py", line 304, in __call__
return flyte_entity_call_handler(self, *args, **kwargs) # type: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/flytekit/core/promise.py", line 1022, in flyte_entity_call_handler
result = cast(LocallyExecutable, entity).local_execute(child_ctx, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/flytekit/core/base_task.py", line 268, in local_execute
outputs_literal_map = LocalTaskCache.get(self.name, self.metadata.cache_version, input_literal_map)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/flytekit/core/local_cache.py", line 66, in get
LocalTaskCache.initialize()
File "/usr/local/lib/python3.11/site-packages/flytekit/core/local_cache.py", line 54, in initialize
LocalTaskCache._cache = Cache(CACHE_LOCATION)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/diskcache/core.py", line 450, in __init__
raise EnvironmentError(
Message:
[Errno 13] Cache directory "/home/flytekit/.flyte/local-cache" does not exist and could not be created
User error.
Eduardo Apolinario (eapolinario)
08/07/2023, 6:59 PMdownload_file
being invoked inside another task?Joe Kelly
08/07/2023, 7:10 PM@task(container_image=IMAGE_SPEC)
def process_documents(documents: typing.List[int]) -> typing.List[Document]:
docs = []
for document in documents:
doc = download_file(document_id=document)
docs.append(doc)
return docs
Joe Kelly
08/07/2023, 7:43 PMprocess_documents()
and performing the loop directly in a dynamic download_files()
-- but then my caching doesn't work as I would like it to. In my restructured case, the cache only avoids rerunning the download task if the full list of files is precisely the same -- the behavior that I want is for Flyte to recognize if some subset of the listed files are cached, to ignore those, and to only run on the "new" files in the listJoe Kelly
08/07/2023, 8:01 PMdownload_file()
to process a single file (so I get the desired caching benefits), but I've removed the intermediate process_documents()
task call and I'm now calling download_file
directly with a MapTask
from my workflow:
files = map_task(download_file)(document_id=documents)
This seems to be running and caching properly! Thanks for pointing me in the right direction @Eduardo Apolinario (eapolinario)!Eduardo Apolinario (eapolinario)
08/07/2023, 8:10 PM