Hi, for compatibility reasons I need to write an e...
# flyte-support
b
Hi, for compatibility reasons I need to write an empty _SUCCESS file to the same directory as a StructuredDataset is written, directly after the dataframe is done writing. How to do that (and avoid having a separate task for it)?
g
The file's name is _SUCCES, right? But otherwise is just an ordinary empty txt file?
You could create a dataclass like here https://docs.flyte.org/en/latest/user_guide/data_types_and_io/dataclass.html#flyte-types that has two fields
dataset
and
success_file
with types
StructuredDataset
and
FlyteFile
respectively and then use that as the output of your task
b
Yeah, just empty text file. But when serialising a dataclass like that, is order enforced? If so where is the order specified?
g
No clue, sorry. I haven't had to use dataclasses yet. It just seems like a fitting solution to your question
b
It seems like a custom encoder like here should work, https://flyte-next.readthedocs.io/en/latest/flytesnacks/examples/data_types_and_io/structured_dataset.html#numpy-encoder I just haven’t gotten it to actually use the thing for my task, it still pick the standard
Found a solution, posting here if others need it:
Copy code
class StructuredDatasetWithSuccess(StructuredDataset):
    """
    Just for writing the _SUCCESS file after StructuredDataset is serialized, which is needed for external tasks reading the dataset
    """

    def __init__(
        self,
        dataframe=None,
        uri=None,
        file_format=None,
        columns=None,
        metadata=None,
        schema=None,
    ):
        super().__init__(
            dataframe=dataframe,
            uri=uri,
            file_format=file_format,
            columns=columns,
            metadata=metadata,
            schema=schema,
        )
        self._literal_sd: Optional[literals.StructuredDataset] = None

    @property
    def literal(self) -> literals.StructuredDataset:
        if self._literal_sd is None:
            self._literal_sd = literals.StructuredDataset(
                uri=self.uri,
                metadata=literals.StructuredDatasetMetadata(
                    structured_dataset_type=literals.StructuredDatasetType(
                        format=self.file_format
                    )
                ),
            )
        return self._literal_sd


class StructuredDatasetTransformer(TypeTransformer[StructuredDatasetWithSuccess]):
    def __init__(self):
        super(StructuredDatasetTransformer, self).__init__(
            name="structured_dataset_with_success", t=StructuredDatasetWithSuccess
        )

    def get_literal_type(self, t: Type[StructuredDatasetWithSuccess]) -> LiteralType:
        """Return the Flyte literal type for StructuredDatasetWithSuccess"""
        return LiteralType(structured_dataset_type=literals.StructuredDatasetType())

    def to_literal(
        self,
        ctx: FlyteContext,
        python_val: StructuredDatasetWithSuccess,
        python_type: Type[StructuredDatasetWithSuccess],
        expected: LiteralType,
    ) -> Literal:
        """Convert StructuredDatasetWithSuccess to Flyte literal"""
        python_val.serialize_structured_dataset()
        write_success_file(python_val.uri)
        sd = literals.StructuredDataset(
            uri=python_val.uri,
            metadata=literals.StructuredDatasetMetadata(
                structured_dataset_type=literals.StructuredDatasetType(
                    format=python_val.file_format or "parquet"
                )
            ),
        )
        return Literal(scalar=literals.Scalar(structured_dataset=sd))

    def to_python_value(
        self,
        ctx: FlyteContext,
        lv: Literal,
        expected_python_type: Type[StructuredDatasetWithSuccess],
    ) -> StructuredDatasetWithSuccess:
        """Convert Flyte literal back to StructuredDatasetWithSuccess"""
        if not lv.scalar or not lv.scalar.structured_dataset:
            raise ValueError("Expected StructuredDataset literal")

        sd = lv.scalar.structured_dataset
        return StructuredDatasetWithSuccess(
            uri=sd.uri,
            metadata=sd.metadata,
            file_format=(
                sd.metadata.structured_dataset_type.format if sd.metadata else None
            ),
        )


TypeEngine.register(StructuredDatasetTransformer())
usage:
Copy code
@task
def mytask(...)-> -Annotated[StructuredDatasetWithSuccess, PreprocessedMLSchema]:
   ...
   return StructuredDatasetWithSuccess(
        dataframe=spark_df,
        schema=PreprocessedMLSchema,
        uri=output_path,
    )