Trying out the streaming API for FlyteFile and FlyteDirectory. Got a weird exception however: ``` Or...
g
Trying out the streaming API for FlyteFile and FlyteDirectory. Got a weird exception however:
Copy code
Original exception: Invalid variable type: value should be str, int or float, got FlyteContext(file_access=<flytekit.core.data_persistence.FileAccessProvider object at 0x795cc4248f70>, level=4, flyte_client=None, compilation_state=None, execution_state=ExecutionState(mode=<Mode.TASK_EXECUTION: 1>, working_dir=PosixPath('/tmp/flytecphsguxb/local_flytekit'), engine_dir='/tmp/flytecphsguxb/local_flytekit/engine_dir', branch_eval_mode=None, user_space_params=ExecutionParameters(execution_date=2025-03-11 18:21:01.821699+00:00, stats=<flytekit.interfaces.stats.taggable.TaggableStats object at 0x795cc42488e0>, tmp_dir=/tmp/flyte-y7i7t51b/sandbox/local_flytekit/8fa767fd4226dc3f5ca402270b4cbe66, execution_id=Flyte Serialized object (WorkflowExecutionIdentifier):
      project: flytesnacks
      domain: development
      name: ankpjf7p5bjfcfd5ts76,  checkpoint=<flytekit.core.checkpointer.SyncCheckpoint object at 0x795cc4248070>,, decks=[], raw_output_prefix=<gs://xxxx-flyte/data/k0/ankpjf7p5bjfcfd5ts76-n2-0-n2-n0-0-n0-0>, task_id=TASK:flytesnacks:development:raw_workflow.xxx.main.xxx_task:fDRVXJU6bMjLL3los9Bc2w, output_metadata_prefix=<gs://xxx-flyte/metadata/propeller/flytesnacks-development-ankpjf7p5bjfcfd5ts76/n2/data/0/n2-n0/0/n0/0>, enable_deck=False)), serialization_settings=SerializationSettings(image_config=ImageConfig(default_image=None, images=[]), project='flytesnacks', domain='development', version='fDRVXJU6bMjLL3los9Bc2w', env=None, git_repo=None, python_interpreter='/opt/venv/bin/python3', flytekit_virtualenv_root='/opt/venv', fast_serialization_settings=FastSerializationSettings(enabled=True, destination_dir='.', distribution_location='<gs://xxx-flyte/flytesnacks/development/C2J4ETIZP3J2Y67M7CMS3MJEUQ======/fastb1cd8a8ec0c0cffdd1174f7388e3ff64.tar.gz>'), source_root=None), in_a_condition=False, origin_stackframe=<FrameSummary file /usr/local/lib/python3.10/site-packages/flytekit/core/base_task.py, line 732 in dispatch_execute>, output_metadata_tracker=OutputMetadataTracker(output_metadata={}), worker_queue=None) of type <class 'flytekit.core.context_manager.FlyteContext'>
The accompanying stack trace:
Copy code
File "/xxx_workflow/xxx/main.py", line 23, in convert
        _, ext= os.path.splitext(raw_file)
      File "/usr/local/lib/python3.10/posixpath.py", line 118, in splitext
        p = os.fspath(p)
      File "/usr/local/lib/python3.10/site-packages/flytekit/types/file/file.py", line 356, in __fspath__
        self._downloader()
      File "/usr/local/lib/python3.10/site-packages/flytekit/utils/asyn.py", line 113, in wrapped
        return self.run_sync(coro_func, *args, **kwargs)
      File "/usr/local/lib/python3.10/site-packages/flytekit/utils/asyn.py", line 106, in run_sync
        return self._runner_map[name].run(coro)
      File "/usr/local/lib/python3.10/site-packages/flytekit/utils/asyn.py", line 85, in run
        res = fut.result(None)
      File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 458, in result
        return self.__get_result()
      File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
        raise self._exception
      File "/usr/local/lib/python3.10/site-packages/flytekit/core/data_persistence.py", line 628, in async_get_data
        raise FlyteDownloadDataException(
    flytekit.exceptions.system.FlyteDownloadDataException: SYSTEM:DownloadDataError: error=Failed to get data from <gs://xxx-flyte/data/lx/ankpjf7p5bjfcfd5ts76-n0-0-n0-0/fd421400cf5b1111051dffd6b51b5349/xxx> to /tmp/flytecphsguxb/local_flytekit/2f5c90639bde398a2a90c8c7a144ded2/xxx (recursive=False).
Where
raw_file
is a
FlyteFile
whose
path
argument is constructed from an element of a
FlyteDirectory.crawl()
generator (e.g. a minimum example)
Copy code
@task
def process_data(data_dir: FlyteDirectory):
   for (base, filepath) in data_dir.crawl():
       raw_file = FlyteFile(os.path.join(base, filepath)
       _, ext = os.path.splitext(raw_file)
Flytekit version 15.2 Backend version 15.0
g
mind sharing code snippet
g
This is the minimal snippet
Copy code
import os
from flytekit import task
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile

@task
def process_data(data_dir: FlyteDirectory):
   for (base, filepath) in data_dir.crawl():
       raw_file = FlyteFile(os.path.join(base, filepath)
       _, ext = os.path.splitext(raw_file)
Like I've got more code underneath this, but this where it crashes
Maybe an important piece of context is that I'm running this in a container I built myself (and installed flytekit in) vs one that's built using
ImageSpec
Going to try it again in another task which does run inside a container that was built using ImageSpec
So flytekit
1.13.7
in an image_spec built container using the above works. but if I rebuild it with a
1.15.0
I get the above error (using the above code snippet)
So some change between
1.13.7
and
1.15.0
seems to have introduced this issue.
Yup, can confirm that downgrading to
flytekit==1.13.7
at least in the non ImageSpec built image also allows me to continue
g
thanks for these informations, there might be a breaking change in flytekit. we will dig into this
g
Unrelated to the error, I notice that streaming a file one by one is slower than just downloading all files in a FlyteDirectory at once
g
it’s useful when you do some async operations
g
@glamorous-carpet-83516 Do you need me to file a issues on github too? Not sure how to describe/title it though
g
yes, please file an issue on github too. [flyte-bug]
g
Feel free to suggest alternative titles for the issue name