cuddly-jelly-27016
01/31/2025, 5:58 PMStructuredDataset
attribute. Following shows a simple definition:
@dataclass
class DC:
# StructuredDataset with local uri
lsd: StructuredDataset = field(default_factory=lambda: StructuredDataset(uri=LOCAL_URI, file_format=FILE_FORMAT))
# StructuredDataset with remote uri
rsd: StructuredDataset = field(default_factory=lambda: StructuredDataset(uri=REMOTE_URI, file_format=FILE_FORMAT))
When we run the workflow remotely, we observe that the file_format
field becomes an empty string, as illustrated in the following screenshot:
Screenshot 2024-12-08 at 12 38 13 PM
Initial Thoughts
We think that msgpack serialization doesn't process file_format
properly, because the file_format
is an empty string right after inputs.pb
is loaded as `input_proto`:
Screenshot 2024-12-08 at 1 10 23 PM
If I've not misunderstood it, \240
(0xA0
in hex) is a fixstr with a length of zero, which means file_format
is an empty string.
Expected behavior
file_format
of StructuredDataset
should keep the original input value (i.e., "parquet"
in this case).
Additional context to reproduce
Run the following script to trigger the remote run of the workflow:
from dataclasses import dataclass, field
from pathlib import Path
import pandas as pd
from flytekit import task, workflow, ImageSpec
from flytekit.types.structured import StructuredDataset
# Build image
flytekit_hash = "adc1061709b2cff74c2e66dd65399d6a59954023"
flytekit = f"git+https://github.com/flyteorg/flytekit.git@{flytekit_hash}"
image = ImageSpec(
packages=[flytekit, "pandas", "pyarrow"],
apt_packages=["git"],
registry="localhost:30000",
)
# Define constants
LOCAL_URI = "./df.parquet"
REMOTE_URI = "s3://my-s3-bucket/s3_flyte_dir/df.parquet"
FILE_FORMAT = "parquet"
@dataclass
class DC:
# StructuredDataset with local uri
lsd: StructuredDataset = field(default_factory=lambda: StructuredDataset(uri=LOCAL_URI, file_format=FILE_FORMAT))
# StructuredDataset with remote uri
rsd: StructuredDataset = field(default_factory=lambda: StructuredDataset(uri=REMOTE_URI, file_format=FILE_FORMAT))
@task(container_image=image)
def direct_sd(sd: StructuredDataset) -> StructuredDataset:
"""Pass through a StructuredDataset without any action."""
print(f"SD | {sd}")
print(f"Literal SD | {sd._literal_sd}")
print(f"DF\n{'-'*30}\n{sd.open(pd.DataFrame).all()}")
return sd
@workflow
def wf1(sd: StructuredDataset) -> StructuredDataset:
"""Pass through a StructuredDataset without any action."""
return direct_sd(sd=sd)
@workflow
def wf2(dc: DC) -> StructuredDataset:
"""Pass through a StructuredDataset with attr access."""
return direct_sd(sd=dc.rsd)
if name == "__main__":
from flytekit.clis.sdk_in_container import pyflyte
from click.testing import CliRunner
# Configure the current run
script_path = str(Path(file).absolute())
dc = '{"dc": {"rsd": {"uri": "s3://my-s3-bucket/s3_flyte_dir/df.parquet", "file_format": "parquet"}}}'
runner = CliRunner()
result = runner.invoke(pyflyte.main, ["run", "--remote", script_path, "wf2", "--dc", dc])
print(result.output)
Screenshots
No response
Are you sure this issue hasn't been raised already?
• Yes
Have you read the Code of Conduct?
• Yes
flyteorg/flytecuddly-jelly-27016
01/31/2025, 5:58 PM