Does Flyte support the use of Polars LazyFrames? ...
# ask-the-community
b
Does Flyte support the use of Polars LazyFrames? I keep getting a warrning regrading it, see below:
Copy code
[f6198790a618a4396a3a-n0-0] terminated with exit code (247). Reason [OOMKilled]. Message: 

{"asctime": "2024-05-01 20:01:11,141", "name": "flytekit", "levelname": "WARNING", "message": "Unsupported Type <built-in function any> found, Flyte will default to use PickleFile as the transport. Pickle can only be used to send objects between the exact same version of Python, and we strongly recommend to use python type that flyte support."}
thoughts?
n
looks like it’s not: https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-polars/flytekitplugins/polars/sd_transformers.py#L87 would be fairly straightforward to register a new
PolarsDataFrameToParquetEncodingHandler
and
ParquetToPolarsDataFrameDecodingHandler
for lazyframes
b
Im pretty new to flyte, im probably even more "green" when it comes to th interworking's. So you will have bare with me here 🙂 .
What do i need to do, to register a new handler? is there a guide anywhere?
n
if you look at that
sd_transformers
file, you’ll see those classes defined, which operate on
pl.DataFrame
here, here, here, and here.
Copy code
class PolarsLazyFrameToParquetEncodingHandler(StructuredDatasetEncoder): ...

class ParquetToPolarsLazyFrameDecodingHandler(StructuredDatasetDecoder):
You could define your own classes (or contribute it back into the polars plugin 🙂) and then register it
Copy code
StructuredDatasetTransformerEngine.register(PolarsLazyFrameToParquetEncodingHandler())
StructuredDatasetTransformerEngine.register(ParquetToPolarsLazyFrameDecodingHandler())
b
so what something like this:
Copy code
class PolarsLazyFrameRenderer:
    """
    Renders Polars LazyFrame's statistics as an HTML table after collecting the data.
    """

    def to_html(self, lf: pl.LazyFrame) -> str:
        assert isinstance(lf, pl.LazyFrame)
        df = lf.collect()  # Convert LazyFrame to DataFrame
        describe_df = df.describe()
        return pd.DataFrame(describe_df.transpose(), columns=describe_df.columns).to_html(index=False)


class PolarsLazyFrameToParquetEncodingHandler(StructuredDatasetEncoder):
    def __init__(self):
        super().__init__(pl.LazyFrame, None, PARQUET)

    def encode(
        self,
        ctx: FlyteContext,
        structured_dataset: StructuredDataset,
        structured_dataset_type: StructuredDatasetType,
    ) -> literals.StructuredDataset:
        lf = typing.cast(pl.LazyFrame, structured_dataset.dataframe)
        df = lf.collect()  # Collect data to a DataFrame before writing

        output_bytes = io.BytesIO()
        df.write_parquet(output_bytes)  # Polars now only supports write_parquet

        if structured_dataset.uri is not None:
            fs = ctx.file_access.get_filesystem_for_path(path=structured_dataset.uri)
            with fs.open(structured_dataset.uri, "wb") as s:
                s.write(output_bytes.getvalue())
            output_uri = structured_dataset.uri
        else:
            remote_fn = "00000"  # Default filename
            output_uri = ctx.file_access.put_raw_data(output_bytes.getvalue(), file_name=remote_fn)
        return literals.StructuredDataset(uri=output_uri, metadata=StructuredDatasetMetadata(structured_dataset_type))


class ParquetToPolarsLazyFrameDecodingHandler(StructuredDatasetDecoder):
    def __init__(self):
        super().__init__(pl.LazyFrame, None, PARQUET)

    def decode(
        self,
        ctx: FlyteContext,
        flyte_value: literals.StructuredDataset,
        current_task_metadata: StructuredDatasetMetadata,
    ) -> pl.LazyFrame:
        uri = flyte_value.uri

        kwargs = get_fsspec_storage_options(
            protocol=fsspec_utils.get_protocol(uri),
            data_config=ctx.file_access.data_config,
        )
        # Return LazyFrame directly from read_parquet
        return pl.scan_parquet(uri, storage_options=kwargs)


StructuredDatasetTransformerEngine.register(PolarsLazyFrameToParquetEncodingHandler())
StructuredDatasetTransformerEngine.register(ParquetToPolarsLazyFrameDecodingHandler())
StructuredDatasetTransformerEngine.register_renderer(pl.LazyFrame, PolarsLazyFrameRenderer())
n
yeah, that looks about right
b
one thing im confused of, is that StructredDataset type.
since LazyFrames return back a query plan essentially, im curious how that type would be handled
as opposed to a Dataframe object this more like a series
n
as long as you
collect
at serialization time it should be fine
b
intresting..
one more question, am i able to simple test this, by making these classes tasks in an overall workflow?
n
yeah you can test this by having these classes and registration call inside the same file you have the tasks/workflows. you could have it in a separate file, as long as you import it in the workflow file.
b
Cool, let me give it a go
well, it seems not see my registrations
do i need to do the registration in side my workflow function
(which is where im doing it, right now)
n
it needs to be in the top-level scope of the module
Copy code
# lazyframe encoder/decoder class defs
# registration
...


@workflow
def wf(): ...
b
yeah, so its a bit odd, i get this now:
Copy code
{
  "asctime": "2024-05-01 20:54:00,858",
  "name": "flytekit",
  "levelname": "WARNING",
  "message": "Unsupported Type <built-in function any> found, Flyte will default to use PickleFile as the transport. Pickle can only be used to send objects between the exact same version of Python, and we strongly recommend to use python type that flyte support."
}
or i guess is this
<built-in function any>
saying, that i have a defined type of "any" and thats not an existing transformer?
n
weird, this is the
any
built-in function, not the
typing.Any
type
b
well, i think i got that cleared
probably was a user error on my part
now i just need to demystify this "magical polars out-of-core processing" pattern, that i cant seem to figure out or find examples of lol
but i really appreciate your help on the flyte side!!! 🙂
k
cool stuff
should we contribute these handlers to Flytekit itself? cc @Brad / @Niels Bantilan
n
@Brad could you make an issue for this? [flyte-core] and if you have capacity a contribution would be 🔥. Basically the code you have here is a good start and can be added to this file. You can follow this contribution guide if you get stuck.
b
will do!