Brad
05/01/2024, 8:05 PM[f6198790a618a4396a3a-n0-0] terminated with exit code (247). Reason [OOMKilled]. Message:
{"asctime": "2024-05-01 20:01:11,141", "name": "flytekit", "levelname": "WARNING", "message": "Unsupported Type <built-in function any> found, Flyte will default to use PickleFile as the transport. Pickle can only be used to send objects between the exact same version of Python, and we strongly recommend to use python type that flyte support."}
thoughts?Niels Bantilan
05/01/2024, 8:11 PMPolarsDataFrameToParquetEncodingHandler
and ParquetToPolarsDataFrameDecodingHandler
for lazyframesBrad
05/01/2024, 8:13 PMBrad
05/01/2024, 8:13 PMNiels Bantilan
05/01/2024, 8:18 PMsd_transformers
file, you’ll see those classes defined, which operate on pl.DataFrame
here, here, here, and here.
class PolarsLazyFrameToParquetEncodingHandler(StructuredDatasetEncoder): ...
class ParquetToPolarsLazyFrameDecodingHandler(StructuredDatasetDecoder):
You could define your own classes (or contribute it back into the polars plugin 🙂) and then register it
StructuredDatasetTransformerEngine.register(PolarsLazyFrameToParquetEncodingHandler())
StructuredDatasetTransformerEngine.register(ParquetToPolarsLazyFrameDecodingHandler())
Brad
05/01/2024, 8:22 PMclass PolarsLazyFrameRenderer:
"""
Renders Polars LazyFrame's statistics as an HTML table after collecting the data.
"""
def to_html(self, lf: pl.LazyFrame) -> str:
assert isinstance(lf, pl.LazyFrame)
df = lf.collect() # Convert LazyFrame to DataFrame
describe_df = df.describe()
return pd.DataFrame(describe_df.transpose(), columns=describe_df.columns).to_html(index=False)
class PolarsLazyFrameToParquetEncodingHandler(StructuredDatasetEncoder):
def __init__(self):
super().__init__(pl.LazyFrame, None, PARQUET)
def encode(
self,
ctx: FlyteContext,
structured_dataset: StructuredDataset,
structured_dataset_type: StructuredDatasetType,
) -> literals.StructuredDataset:
lf = typing.cast(pl.LazyFrame, structured_dataset.dataframe)
df = lf.collect() # Collect data to a DataFrame before writing
output_bytes = io.BytesIO()
df.write_parquet(output_bytes) # Polars now only supports write_parquet
if structured_dataset.uri is not None:
fs = ctx.file_access.get_filesystem_for_path(path=structured_dataset.uri)
with fs.open(structured_dataset.uri, "wb") as s:
s.write(output_bytes.getvalue())
output_uri = structured_dataset.uri
else:
remote_fn = "00000" # Default filename
output_uri = ctx.file_access.put_raw_data(output_bytes.getvalue(), file_name=remote_fn)
return literals.StructuredDataset(uri=output_uri, metadata=StructuredDatasetMetadata(structured_dataset_type))
class ParquetToPolarsLazyFrameDecodingHandler(StructuredDatasetDecoder):
def __init__(self):
super().__init__(pl.LazyFrame, None, PARQUET)
def decode(
self,
ctx: FlyteContext,
flyte_value: literals.StructuredDataset,
current_task_metadata: StructuredDatasetMetadata,
) -> pl.LazyFrame:
uri = flyte_value.uri
kwargs = get_fsspec_storage_options(
protocol=fsspec_utils.get_protocol(uri),
data_config=ctx.file_access.data_config,
)
# Return LazyFrame directly from read_parquet
return pl.scan_parquet(uri, storage_options=kwargs)
StructuredDatasetTransformerEngine.register(PolarsLazyFrameToParquetEncodingHandler())
StructuredDatasetTransformerEngine.register(ParquetToPolarsLazyFrameDecodingHandler())
StructuredDatasetTransformerEngine.register_renderer(pl.LazyFrame, PolarsLazyFrameRenderer())
Niels Bantilan
05/01/2024, 8:31 PMBrad
05/01/2024, 8:31 PMBrad
05/01/2024, 8:32 PMBrad
05/01/2024, 8:32 PMNiels Bantilan
05/01/2024, 8:32 PMcollect
at serialization time it should be fineBrad
05/01/2024, 8:33 PMBrad
05/01/2024, 8:34 PMNiels Bantilan
05/01/2024, 8:37 PMBrad
05/01/2024, 8:44 PMBrad
05/01/2024, 8:51 PMBrad
05/01/2024, 8:51 PMBrad
05/01/2024, 8:51 PMNiels Bantilan
05/01/2024, 8:52 PM# lazyframe encoder/decoder class defs
# registration
...
@workflow
def wf(): ...
Brad
05/01/2024, 8:56 PM{
"asctime": "2024-05-01 20:54:00,858",
"name": "flytekit",
"levelname": "WARNING",
"message": "Unsupported Type <built-in function any> found, Flyte will default to use PickleFile as the transport. Pickle can only be used to send objects between the exact same version of Python, and we strongly recommend to use python type that flyte support."
}
Brad
05/01/2024, 9:00 PM<built-in function any>
saying, that i have a defined type of "any" and thats not an existing transformer?Niels Bantilan
05/01/2024, 9:01 PMany
built-in function, not the typing.Any
typeBrad
05/01/2024, 9:05 PMBrad
05/01/2024, 9:05 PMBrad
05/01/2024, 9:06 PMBrad
05/01/2024, 9:07 PMKetan (kumare3)
Ketan (kumare3)
Niels Bantilan
05/02/2024, 1:50 PMSlackbot
05/02/2024, 1:50 PMBrad
05/02/2024, 1:50 PM