cuddly-jelly-27016
01/25/2025, 1:21 AMprint_model
in the below example. Importantly this has an input that is a an in memory StructuredDataset
inside a @dataclass
.
from dataclasses import dataclass
import pandas as pd
from flytekit import StructuredDataset, task
DEFAULT_DATASET = StructuredDataset(pd.DataFrame({"column1": [1, 2, 3], "column2": ["a", "b", "c"]}))
@dataclass
class Model:
dataset: StructuredDataset
DEFAULT_MODEL = Model(dataset=DEFAULT_DATASET)
@task
def print_model(model: Model = DEFAULT_MODEL):
df = model.dataset.open(pd.DataFrame).all()
print(df)
print(model)
PandasToParquetEncodingHandler
calls df.to_parquet
internally, with a flyte://
path. df.to_parquet
is able to fetch the corresponding fsspec
filesystem implementation using fsspec.core.url_to_fs
, which is registered when initialising FlyteRemote
https://github.com/flyteorg/flytekit/blob/17841c8e2317e21039b47a58406c07f823572859/flytekit/remote/remote.py#L297.
This results in a structured dataset literal with a flyte://
uri. For a plain StructuredDataset
, modify_literal_uris
would swap this flyte://
URI for an equivalent using the blob store's native format (for us we use Azure so an abfs://
uri). Unfortunately modify_literal_uris
doesn't apply this swap for `StructuredDataset`s inside @dataclass
or pydantic.BaseModel
. The result is:
flyte://
URI in the input:
Image
and therefore a failure to open the `StructuredDataset`:
Traceback (most recent call last):
File "/redacted/pip-svc_flytekit/site-packages/flytekit/core/base_task.py", line 754, in dispatch_execute
native_outputs = self.execute(**native_inputs)
File "/redacted/pip-svc_flytekit/site-packages/flytekit/core/python_function_task.py", line 204, in execute
return self._task_function(**kwargs)
File "/redacted/test_structured_dataset.py", line 25, in print_model
df = model.dataset.open(pd.DataFrame).all()
File "/redacted/pip-svc_flytekit/site-packages/flytekit/types/structured/structured_dataset.py", line 197, in all
return flyte_dataset_transformer.open_as(ctx, self.literal, self._dataframe_type, self.metadata)
File "/redacted/pip-svc_flytekit/site-packages/flytekit/types/structured/structured_dataset.py", line 1055, in open_as
result = decoder.decode(ctx, sd, updated_metadata)
File "/redacted/pip-svc_flytekit/site-packages/flytekit/types/structured/basic_dfs.py", line 137, in decode
return pd.read_parquet(uri, columns=columns, storage_options=kwargs)
File "/redacted/pip-core_pandas/site-packages/pandas/io/parquet.py", line 670, in read_parquet
return impl.read(
File "/redacted/pip-core_pandas/site-packages/pandas/io/parquet.py", line 265, in read
path_or_handle, handles, filesystem = _get_path_or_handle(
File "/redacted/pip-core_pandas/site-packages/pandas/io/parquet.py", line 121, in _get_path_or_handle
fs, path_or_handle = fsspec.core.url_to_fs(
File "/redacted/pip-svc_fsspec/site-packages/fsspec/core.py", line 383, in url_to_fs
chain = _un_chain(url, kwargs)
File "/redacted/pip-svc_fsspec/site-packages/fsspec/core.py", line 332, in _un_chain
cls = get_filesystem_class(protocol)
File "/redacted/pip-svc_fsspec/site-packages/fsspec/registry.py", line 233, in get_filesystem_class
raise ValueError(f"Protocol not known: {protocol}")
ValueError: Protocol not known: flyte
Expected behavior
Irrespective of whether its inside a @dataclass
or pydantic.BaseModel
the resulting StructuredDataset
literal should not have a flyte://
URI. It should be a standard blob storage URI, that is serviceable by one of the standard fsspec
implementations, and therefore works without error.
Additional context to reproduce
No response
Screenshots
No response
Are you sure this issue hasn't been raised already?
• Yes
Have you read the Code of Conduct?
• Yes
flyteorg/flytecuddly-jelly-27016
01/25/2025, 1:21 AM