bland-printer-19092
02/26/2025, 12:05 PMgentle-tomato-480
02/26/2025, 12:57 PMgentle-tomato-480
02/26/2025, 1:01 PMdataset
and success_file
with types StructuredDataset
and FlyteFile
respectively and then use that as the output of your taskbland-printer-19092
02/26/2025, 1:08 PMgentle-tomato-480
02/26/2025, 1:09 PMbland-printer-19092
02/26/2025, 1:10 PMbland-printer-19092
02/27/2025, 4:57 PMclass StructuredDatasetWithSuccess(StructuredDataset):
"""
Just for writing the _SUCCESS file after StructuredDataset is serialized, which is needed for external tasks reading the dataset
"""
def __init__(
self,
dataframe=None,
uri=None,
file_format=None,
columns=None,
metadata=None,
schema=None,
):
super().__init__(
dataframe=dataframe,
uri=uri,
file_format=file_format,
columns=columns,
metadata=metadata,
schema=schema,
)
self._literal_sd: Optional[literals.StructuredDataset] = None
@property
def literal(self) -> literals.StructuredDataset:
if self._literal_sd is None:
self._literal_sd = literals.StructuredDataset(
uri=self.uri,
metadata=literals.StructuredDatasetMetadata(
structured_dataset_type=literals.StructuredDatasetType(
format=self.file_format
)
),
)
return self._literal_sd
class StructuredDatasetTransformer(TypeTransformer[StructuredDatasetWithSuccess]):
def __init__(self):
super(StructuredDatasetTransformer, self).__init__(
name="structured_dataset_with_success", t=StructuredDatasetWithSuccess
)
def get_literal_type(self, t: Type[StructuredDatasetWithSuccess]) -> LiteralType:
"""Return the Flyte literal type for StructuredDatasetWithSuccess"""
return LiteralType(structured_dataset_type=literals.StructuredDatasetType())
def to_literal(
self,
ctx: FlyteContext,
python_val: StructuredDatasetWithSuccess,
python_type: Type[StructuredDatasetWithSuccess],
expected: LiteralType,
) -> Literal:
"""Convert StructuredDatasetWithSuccess to Flyte literal"""
python_val.serialize_structured_dataset()
write_success_file(python_val.uri)
sd = literals.StructuredDataset(
uri=python_val.uri,
metadata=literals.StructuredDatasetMetadata(
structured_dataset_type=literals.StructuredDatasetType(
format=python_val.file_format or "parquet"
)
),
)
return Literal(scalar=literals.Scalar(structured_dataset=sd))
def to_python_value(
self,
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type[StructuredDatasetWithSuccess],
) -> StructuredDatasetWithSuccess:
"""Convert Flyte literal back to StructuredDatasetWithSuccess"""
if not lv.scalar or not lv.scalar.structured_dataset:
raise ValueError("Expected StructuredDataset literal")
sd = lv.scalar.structured_dataset
return StructuredDatasetWithSuccess(
uri=sd.uri,
metadata=sd.metadata,
file_format=(
sd.metadata.structured_dataset_type.format if sd.metadata else None
),
)
TypeEngine.register(StructuredDatasetTransformer())
usage:
@task
def mytask(...)-> -Annotated[StructuredDatasetWithSuccess, PreprocessedMLSchema]:
...
return StructuredDatasetWithSuccess(
dataframe=spark_df,
schema=PreprocessedMLSchema,
uri=output_path,
)