creamy-whale-12531
08/26/2025, 11:51 AMTypeTransformerFailedError: Error converting input 'df' at position 0:
Literal value: Flyte Serialized object (Literal):
scalar:
structured_dataset:
uri: <s3://my-s3-bucket/data/cn/arnr4n65vn287zp446f5-n2-> [...]
metadata:
structured_dataset_type:
format: parquet
Expected Python type: <class 'pandas.core.frame.DataFrame'>
Exception: Expected checksum ++koqw== did not match calculated checksum: hniEMQ==
It seems to be happening when the pandas dataframe gets transferred between tasks. Here is the simplified workflow:
import os
import flytekit as fl
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from src.util.logging import get_logger, setup_logging
logger = get_logger(__name__)
CACHE_VERSION = "0.1"
image_spec = fl.ImageSpec(
name="weather-app",
requirements="uv.lock",
registry=os.environ["FLYTE_IMAGE_REGISTRY"],
)
request_resources = fl.Resources(cpu="1", mem="1000Mi", ephemeral_storage="500Mi")
limit_resources = fl.Resources(cpu="2", mem="2000Mi", ephemeral_storage="1000Mi")
@fl.task(container_image=image_spec)
def round_datetime_to_hour(dt: datetime) -> datetime:
return datetime(year=dt.year, month=dt.month, day=dt.day, hour=dt.hour, tzinfo=None)
@fl.task(container_image=image_spec)
def get_data(start: datetime, end: datetime) -> pd.DataFrame:
n_rows = 1_000_000
# n_rows = 1_000
<http://logger.info|logger.info>(f"running get_data with start={start}, end={end}")
np.random.seed(42)
start_date = datetime(2020, 1, 1)
end_date = datetime(2024, 12, 31)
dates = pd.date_range(start=start_date, end=end_date, periods=n_rows)
df = pd.DataFrame(
{
"date": dates.astype("datetime64[s]"),
"price": np.random.uniform(10.0, 1000.0, n_rows),
"quantity": np.random.randint(1, 1000, n_rows, int),
}
)
<http://logger.info|logger.info>(f"df dtypes={df.dtypes}")
return df
@fl.task(container_image=image_spec)
def log_data(df: pd.DataFrame):
<http://logger.info|logger.info>(f"data={df.head()}")
@fl.workflow
def experiment(
start: datetime = datetime.now().replace(
tzinfo=None, hour=0, minute=0, second=0, microsecond=0
)
- timedelta(days=3),
end: datetime = datetime.now().replace(
tzinfo=None, hour=0, minute=0, second=0, microsecond=0
),
):
start_date = round_datetime_to_hour(start)
end_date = round_datetime_to_hour(end)
df = get_data(start_date, end_date)
log_data(df)
setup_logging(level="INFO")
I noticed that if the number of rows is 1000 the error does not happen(n_rows variable). Looks like something related to serialisation, so I tried to store it as StructuredDataset, however that did not help. Any pointers on what can be the problem and how to fix it?