Hi all I've been trying to do a small experiment w...
# flyte-support
c
Hi all I've been trying to do a small experiment with flyte sandbox (sha-08a8d43838a8c02b07f18394ed74d5d17ce61891), however I keep seeing the error that says:
Copy code
TypeTransformerFailedError: Error converting input 'df' at position 0:
    Literal value: Flyte Serialized object (Literal):
      scalar:
        structured_dataset:
          uri: <s3://my-s3-bucket/data/cn/arnr4n65vn287zp446f5-n2-> [...]
          metadata:
            structured_dataset_type:
              format: parquet
    Expected Python type: <class 'pandas.core.frame.DataFrame'>
    Exception: Expected checksum ++koqw== did not match calculated checksum: hniEMQ==
It seems to be happening when the pandas dataframe gets transferred between tasks. Here is the simplified workflow:
Copy code
import os

import flytekit as fl
import pandas as pd
import numpy as np

from datetime import datetime, timedelta

from src.util.logging import get_logger, setup_logging

logger = get_logger(__name__)

CACHE_VERSION = "0.1"

image_spec = fl.ImageSpec(
    name="weather-app",
    requirements="uv.lock",
    registry=os.environ["FLYTE_IMAGE_REGISTRY"],
)

request_resources = fl.Resources(cpu="1", mem="1000Mi", ephemeral_storage="500Mi")
limit_resources = fl.Resources(cpu="2", mem="2000Mi", ephemeral_storage="1000Mi")


@fl.task(container_image=image_spec)
def round_datetime_to_hour(dt: datetime) -> datetime:
    return datetime(year=dt.year, month=dt.month, day=dt.day, hour=dt.hour, tzinfo=None)


@fl.task(container_image=image_spec)
def get_data(start: datetime, end: datetime) -> pd.DataFrame:
    n_rows = 1_000_000
    # n_rows = 1_000
    <http://logger.info|logger.info>(f"running get_data with start={start}, end={end}")

    np.random.seed(42)

    start_date = datetime(2020, 1, 1)
    end_date = datetime(2024, 12, 31)
    dates = pd.date_range(start=start_date, end=end_date, periods=n_rows)

    df = pd.DataFrame(
        {
            "date": dates.astype("datetime64[s]"),
            "price": np.random.uniform(10.0, 1000.0, n_rows),
            "quantity": np.random.randint(1, 1000, n_rows, int),
        }
    )

    <http://logger.info|logger.info>(f"df dtypes={df.dtypes}")

    return df


@fl.task(container_image=image_spec)
def log_data(df: pd.DataFrame):
    <http://logger.info|logger.info>(f"data={df.head()}")


@fl.workflow
def experiment(
    start: datetime = datetime.now().replace(
        tzinfo=None, hour=0, minute=0, second=0, microsecond=0
    )
    - timedelta(days=3),
    end: datetime = datetime.now().replace(
        tzinfo=None, hour=0, minute=0, second=0, microsecond=0
    ),
):
    start_date = round_datetime_to_hour(start)
    end_date = round_datetime_to_hour(end)
    df = get_data(start_date, end_date)
    log_data(df)


setup_logging(level="INFO")
I noticed that if the number of rows is 1000 the error does not happen(n_rows variable). Looks like something related to serialisation, so I tried to store it as StructuredDataset, however that did not help. Any pointers on what can be the problem and how to fix it?