proud-glass-36655
08/07/2023, 6:43 PMdiskcache
package seems to be unable to create the relevant local cache directory -- is this expected / is there something further I need to do to enable caching?
I've registered this workflow and I'm running it on the Flyte app (not on my local machine). The workflow runs fine when setting cache=False
, but I'd love to be able to take advantage of Flyte's caching here to avoid rerunning on the same files repeatedly
IMAGE_SPEC = ImageSpec(
env={[redacted]},
pip_index=[redacted]
packages=[[redacted]],
base_image="<http://ghcr.io/flyteorg/flytekit:py3.11-1.8.3|ghcr.io/flyteorg/flytekit:py3.11-1.8.3>",
registry=[redacted],
)
@task(container_image=IMAGE_SPEC, cache=True, cache_version="0.1")
def download_file(document_id: int) -> Document:
...
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
return wrapped(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/root/data/download.py", line 86, in process_documents
doc = download_file(document_id=document)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/flytekit/core/base_task.py", line 304, in __call__
return flyte_entity_call_handler(self, *args, **kwargs) # type: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/flytekit/core/promise.py", line 1022, in flyte_entity_call_handler
result = cast(LocallyExecutable, entity).local_execute(child_ctx, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/flytekit/core/base_task.py", line 268, in local_execute
outputs_literal_map = LocalTaskCache.get(self.name, self.metadata.cache_version, input_literal_map)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/flytekit/core/local_cache.py", line 66, in get
LocalTaskCache.initialize()
File "/usr/local/lib/python3.11/site-packages/flytekit/core/local_cache.py", line 54, in initialize
LocalTaskCache._cache = Cache(CACHE_LOCATION)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/diskcache/core.py", line 450, in __init__
raise EnvironmentError(
Message:
[Errno 13] Cache directory "/home/flytekit/.flyte/local-cache" does not exist and could not be created
User error.
high-accountant-32689
08/07/2023, 6:59 PMdownload_file
being invoked inside another task?proud-glass-36655
08/07/2023, 7:10 PM@task(container_image=IMAGE_SPEC)
def process_documents(documents: typing.List[int]) -> typing.List[Document]:
docs = []
for document in documents:
doc = download_file(document_id=document)
docs.append(doc)
return docs
proud-glass-36655
08/07/2023, 7:43 PMprocess_documents()
and performing the loop directly in a dynamic download_files()
-- but then my caching doesn't work as I would like it to. In my restructured case, the cache only avoids rerunning the download task if the full list of files is precisely the same -- the behavior that I want is for Flyte to recognize if some subset of the listed files are cached, to ignore those, and to only run on the "new" files in the listproud-glass-36655
08/07/2023, 8:01 PMdownload_file()
to process a single file (so I get the desired caching benefits), but I've removed the intermediate process_documents()
task call and I'm now calling download_file
directly with a MapTask
from my workflow:
files = map_task(download_file)(document_id=documents)
This seems to be running and caching properly! Thanks for pointing me in the right direction @high-accountant-32689!high-accountant-32689
08/07/2023, 8:10 PM