I was just messing around with `StructuredDataset`...
# ask-the-community
d
I was just messing around with
StructuredDataset
+
polars
and I ran into a strange error: (threaded minimal example + more error)
Copy code
Message:

    Could not open Parquet input source '<Buffer>': Parquet file size is 0 bytes

User error.
script:
Copy code
from typing import Annotated

import polars as pl
from flytekit import task, workflow, kwtypes, ImageSpec, StructuredDataset


polars_image_spec = ImageSpec(
    base_image="<http://ghcr.io/flyteorg/flytekit:py3.10-1.9.1|ghcr.io/flyteorg/flytekit:py3.10-1.9.1>",
    packages=["polars", "flytekitplugins-polars"],
    registry="localhost:30000",
    python_version="3.10",
)


superset_cols = kwtypes(Name=str, Age=int, Height=int)


@task(container_image=polars_image_spec)
def gettask_as_sd(a: int) -> Annotated[StructuredDataset, superset_cols]:
    return StructuredDataset(pl.DataFrame({"Name": ["Tom", "Joseph"], "Age": [a, 22], "Height": [160, 178]}))


@task(container_image=polars_image_spec)
def take_returntask_as_sd(input_df: Annotated[StructuredDataset, superset_cols]) -> str:
    input_df_pl = input_df.open(pl.DataFrame).all()
    print(type(input_df_pl), input_df_pl)
    return str(input_df_pl)


@workflow
def tryme() -> str:
    a = 11
    return take_returntask_as_sd(input_df=gettask_as_sd(a=a))
error:
Copy code
Traceback (most recent call last):

      File "/usr/local/lib/python3.10/site-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
        return wrapped(*args, **kwargs)
      File "/root/tmptry.py", line 38, in take_returntask_as_sd
        input_df_pl = input_df.open(pl.DataFrame).all()
      File "/usr/local/lib/python3.10/site-packages/flytekit/types/structured/structured_dataset.py", line 105, in all
        return flyte_dataset_transformer.open_as(ctx, self.literal, self._dataframe_type, self.metadata)
      File "/usr/local/lib/python3.10/site-packages/flytekit/types/structured/structured_dataset.py", line 791, in open_as
        result = decoder.decode(ctx, sd, updated_metadata)
      File "/usr/local/lib/python3.10/site-packages/flytekitplugins/polars/sd_transformers.py", line 70, in decode
        return pl.read_parquet(uri, columns=columns, use_pyarrow=True, storage_options=kwargs)
      File "/usr/local/lib/python3.10/site-packages/polars/io/parquet/functions.py", line 123, in read_parquet
        pa.parquet.read_table(
      File "/usr/local/lib/python3.10/site-packages/pyarrow/parquet/core.py", line 2824, in read_table
        dataset = _ParquetDatasetV2(
      File "/usr/local/lib/python3.10/site-packages/pyarrow/parquet/core.py", line 2412, in __init__
        [fragment], schema=schema or fragment.physical_schema,
      File "pyarrow/_dataset.pyx", line 905, in pyarrow._dataset.Fragment.physical_schema.__get__
      File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
      File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status

Message:

    Could not open Parquet input source '<Buffer>': Parquet file size is 0 bytes

User error.
Not sure if i'm doing something wrong, or if this is a bug?
pretty sure it's a bug, not sure if this is the best way to solve it though: https://github.com/flyteorg/flytekit/pull/1899/files
k
This seems like a problem as it will only read one parquet right
The thankfully it’s streaming
Cc @Yee do you know
y
not sure yet, will try to play with this later.
tried to run this… my image spec failed for some reason, will continue looking into this tomorrow.
d
if you're using the demo cluster you have to setup envd to push to it
Copy code
envd context create --name flyte-sandbox --builder tcp --builder-address localhost:30003 --use
s
@Kevin Su, is it still necessary to run this command to push an image to local registry?