high-umbrella-19848
05/01/2024, 8:05 PM[f6198790a618a4396a3a-n0-0] terminated with exit code (247). Reason [OOMKilled]. Message:
{"asctime": "2024-05-01 20:01:11,141", "name": "flytekit", "levelname": "WARNING", "message": "Unsupported Type <built-in function any> found, Flyte will default to use PickleFile as the transport. Pickle can only be used to send objects between the exact same version of Python, and we strongly recommend to use python type that flyte support."}
thoughts?broad-monitor-993
05/01/2024, 8:11 PMPolarsDataFrameToParquetEncodingHandler
and ParquetToPolarsDataFrameDecodingHandler
for lazyframeshigh-umbrella-19848
05/01/2024, 8:13 PMhigh-umbrella-19848
05/01/2024, 8:13 PMbroad-monitor-993
05/01/2024, 8:18 PMsd_transformers
file, you’ll see those classes defined, which operate on pl.DataFrame
here, here, here, and here.
class PolarsLazyFrameToParquetEncodingHandler(StructuredDatasetEncoder): ...
class ParquetToPolarsLazyFrameDecodingHandler(StructuredDatasetDecoder):
You could define your own classes (or contribute it back into the polars plugin 🙂) and then register it
StructuredDatasetTransformerEngine.register(PolarsLazyFrameToParquetEncodingHandler())
StructuredDatasetTransformerEngine.register(ParquetToPolarsLazyFrameDecodingHandler())
high-umbrella-19848
05/01/2024, 8:22 PMclass PolarsLazyFrameRenderer:
"""
Renders Polars LazyFrame's statistics as an HTML table after collecting the data.
"""
def to_html(self, lf: pl.LazyFrame) -> str:
assert isinstance(lf, pl.LazyFrame)
df = lf.collect() # Convert LazyFrame to DataFrame
describe_df = df.describe()
return pd.DataFrame(describe_df.transpose(), columns=describe_df.columns).to_html(index=False)
class PolarsLazyFrameToParquetEncodingHandler(StructuredDatasetEncoder):
def __init__(self):
super().__init__(pl.LazyFrame, None, PARQUET)
def encode(
self,
ctx: FlyteContext,
structured_dataset: StructuredDataset,
structured_dataset_type: StructuredDatasetType,
) -> literals.StructuredDataset:
lf = typing.cast(pl.LazyFrame, structured_dataset.dataframe)
df = lf.collect() # Collect data to a DataFrame before writing
output_bytes = io.BytesIO()
df.write_parquet(output_bytes) # Polars now only supports write_parquet
if structured_dataset.uri is not None:
fs = ctx.file_access.get_filesystem_for_path(path=structured_dataset.uri)
with fs.open(structured_dataset.uri, "wb") as s:
s.write(output_bytes.getvalue())
output_uri = structured_dataset.uri
else:
remote_fn = "00000" # Default filename
output_uri = ctx.file_access.put_raw_data(output_bytes.getvalue(), file_name=remote_fn)
return literals.StructuredDataset(uri=output_uri, metadata=StructuredDatasetMetadata(structured_dataset_type))
class ParquetToPolarsLazyFrameDecodingHandler(StructuredDatasetDecoder):
def __init__(self):
super().__init__(pl.LazyFrame, None, PARQUET)
def decode(
self,
ctx: FlyteContext,
flyte_value: literals.StructuredDataset,
current_task_metadata: StructuredDatasetMetadata,
) -> pl.LazyFrame:
uri = flyte_value.uri
kwargs = get_fsspec_storage_options(
protocol=fsspec_utils.get_protocol(uri),
data_config=ctx.file_access.data_config,
)
# Return LazyFrame directly from read_parquet
return pl.scan_parquet(uri, storage_options=kwargs)
StructuredDatasetTransformerEngine.register(PolarsLazyFrameToParquetEncodingHandler())
StructuredDatasetTransformerEngine.register(ParquetToPolarsLazyFrameDecodingHandler())
StructuredDatasetTransformerEngine.register_renderer(pl.LazyFrame, PolarsLazyFrameRenderer())
broad-monitor-993
05/01/2024, 8:31 PMhigh-umbrella-19848
05/01/2024, 8:31 PMhigh-umbrella-19848
05/01/2024, 8:32 PMhigh-umbrella-19848
05/01/2024, 8:32 PMbroad-monitor-993
05/01/2024, 8:32 PMcollect
at serialization time it should be finehigh-umbrella-19848
05/01/2024, 8:33 PMhigh-umbrella-19848
05/01/2024, 8:34 PMbroad-monitor-993
05/01/2024, 8:37 PMhigh-umbrella-19848
05/01/2024, 8:44 PMhigh-umbrella-19848
05/01/2024, 8:51 PMhigh-umbrella-19848
05/01/2024, 8:51 PMhigh-umbrella-19848
05/01/2024, 8:51 PMbroad-monitor-993
05/01/2024, 8:52 PM# lazyframe encoder/decoder class defs
# registration
...
@workflow
def wf(): ...
high-umbrella-19848
05/01/2024, 8:56 PM{
"asctime": "2024-05-01 20:54:00,858",
"name": "flytekit",
"levelname": "WARNING",
"message": "Unsupported Type <built-in function any> found, Flyte will default to use PickleFile as the transport. Pickle can only be used to send objects between the exact same version of Python, and we strongly recommend to use python type that flyte support."
}
high-umbrella-19848
05/01/2024, 9:00 PM<built-in function any>
saying, that i have a defined type of "any" and thats not an existing transformer?broad-monitor-993
05/01/2024, 9:01 PMany
built-in function, not the typing.Any
typehigh-umbrella-19848
05/01/2024, 9:05 PMhigh-umbrella-19848
05/01/2024, 9:05 PMhigh-umbrella-19848
05/01/2024, 9:06 PMhigh-umbrella-19848
05/01/2024, 9:07 PMfreezing-airport-6809
freezing-airport-6809
broad-monitor-993
05/02/2024, 1:50 PMuser
05/02/2024, 1:50 PMhigh-umbrella-19848
05/02/2024, 1:50 PM