Hi, I am running into a weird error. Everything is...
# flyte-support
q
Hi, I am running into a weird error. Everything is running on the demo cluster. I have a workflow containing two tasks, one reads a csv into dataframe and the other returns data from the dataframe (say mean value of a column). The workflow works for a small csv (~5k lines) but fails if the csv is large (>10k lines? def fails for 1m lines) and produces a big stack trace culminating in:
Copy code
Expected checksum 96ZInw== did not match calculated checksum: GBZQVA==
Any way to solve this issue? Posting the traceback in the reply
Trace: Traceback (most recent call last): File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/flytekit/core/base_task.py", line 741, in dispatch_execute native_inputs = self._literal_map_to_python_input(input_literal_map, exec_ctx) File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/flytekit/core/base_task.py", line 610, in _literal_map_to_python_input return TypeEngine.literal_map_to_kwargs(ctx, literal_map, self.python_interface.inputs) File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/flytekit/core/utils.py", line 312, in wrapper return func(*args, **kwargs) File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/flytekit/core/type_engine.py", line 1479, in literal_map_to_kwargs return synced(ctx, lm, python_types, literal_types) File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/flytekit/utils/asyn.py", line 100, in wrapped return self.run_sync(coro_func, *args, **kwargs) File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/flytekit/utils/asyn.py", line 93, in run_sync return self._runner_map[name].run(coro) File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/flytekit/utils/asyn.py", line 72, in run res = fut.result(None) File "/opt/micromamba/envs/runtime/lib/python3.9/concurrent/futures/_base.py", line 446, in result return self.__get_result() File "/opt/micromamba/envs/runtime/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result raise self._exception File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/flytekit/core/type_engine.py", line 1517, in _literal_map_to_kwargs await asyncio.gather(*kwargs.values()) File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/flytekit/core/type_engine.py", line 1441, in async_to_python_value pv = await transformer.async_to_python_value(ctx, lv, expected_python_type) File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/flytekit/types/structured/structured_dataset.py", line 1020, in async_to_python_value return self.open_as(ctx, lv.scalar.structured_dataset, df_type=expected_python_type, updated_metadata=metad) File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/flytekit/types/structured/structured_dataset.py", line 1055, in open_as result = decoder.decode(ctx, sd, updated_metadata) File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/flytekit/types/structured/basic_dfs.py", line 137, in decode return pd.read_parquet(uri, columns=columns, storage_options=kwargs) File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/pandas/io/parquet.py", line 667, in read_parquet return impl.read( File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/pandas/io/parquet.py", line 274, in read pa_table = self.api.parquet.read_table( File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/pyarrow/parquet/core.py", line 1793, in read_table dataset = ParquetDataset( File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/pyarrow/parquet/core.py", line 1371, in init self._dataset = ds.dataset(path_or_paths, filesystem=filesystem, File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/pyarrow/dataset.py", line 794, in dataset return _filesystem_dataset(source, **kwargs) File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/pyarrow/dataset.py", line 486, in _filesystem_dataset return factory.finish(schema) File "pyarrow/_dataset.pyx", line 3138, in pyarrow._dataset.DatasetFactory.finish File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/fsspec/spec.py", line 2083, in read out = self.cache._fetch(self.loc, self.loc + length) File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/fsspec/caching.py", line 249, in _fetch self.cache = self.fetcher(start, end) # new block replaces old File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/s3fs/core.py", line 2359, in _fetch_range return _fetch_range( File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/s3fs/core.py", line 2531, in _fetch_range return sync(fs.loop, _inner_fetch, fs, bucket, key, version_id, start, end, req_kw) File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/fsspec/asyn.py", line 103, in sync raise return_result File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/fsspec/asyn.py", line 56, in _runner result[0] = await coro File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/s3fs/core.py", line 2549, in _inner_fetch return await _error_wrapper(_call_and_read, retries=fs.retries) File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/s3fs/core.py", line 146, in _error_wrapper raise err File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/s3fs/core.py", line 114, in _error_wrapper return await func(*args, **kwargs) File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/s3fs/core.py", line 2545, in _call_and_read return await resp["Body"].read() File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/aiobotocore/httpchecksum.py", line 58, in read self._validate_checksum() File "/opt/micromamba/envs/runtime/lib/python3.9/site-packages/aiobotocore/httpchecksum.py", line 67, in _validate_checksum raise FlexibleChecksumError(error_msg=error_msg) botocore.exceptions.FlexibleChecksumError: Expected checksum 96ZInw== did not match calculated checksum: GBZQVA== Message: FlexibleChecksumError: Expected checksum 96ZInw== did not match calculated checksum: GBZQVA==
a
AFAICT flytekit doesn't perform checksums on downloaded/uploaded data so this is left to the storage subsystem. Flyte uses
boto3
to interact with S3 (or S3-compliant services like the minio instance that ships with the demo cluster) and this error seems to come from there. Not sure if it's a limitation of the minio instance and for those large files would be better to move to flyte-binary and an S3 bucket?
and yeah, what RunLLM said, for large objects it's better to use FlyteFile
f
Is this on the sandbox? If so minio may be struggling to send that much data- try on s3
It’s a tiny instance made for demo
q
ye its running on the demo. I was hoping to use it to at least create a basic workflow running.
Thanks for the replies
f
You can but I am not sure about Minio
f
I'm encountering this issue while running the demo. Do you know if it could also occur in production(k8s)? What are the best practices in this case? DataFrames are widely used and quite useful, and we're trying to avoid using
FlyteFile
if possible
f
It should work
We have folks using it for many gigabytes close to terabytes of data
Can you give us a representative for example and I will share a running version or a screenshot - or maybe find a bug?
@fierce-monitor-77717
f
Hi @freezing-airport-6809, Thanks for the quick reply. I'm working on a POC with Flyte using the demo cluster on my laptop. I've created a pretty simple workflow where the first task is responsible for generating a dataframe. When running it locally, everything works fine, but when running it on the cluster, I get the following error in the second task (which depends on the dataframe):
Copy code
Trace:

    Traceback (most recent call last):
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/flytekit/core/base_task.py", line 741, in dispatch_execute
        native_inputs = self._literal_map_to_python_input(input_literal_map, exec_ctx)
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/flytekit/core/base_task.py", line 610, in _literal_map_to_python_input
        return TypeEngine.literal_map_to_kwargs(ctx, literal_map, self.python_interface.inputs)
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/flytekit/core/utils.py", line 312, in wrapper
        return func(*args, **kwargs)
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/flytekit/core/type_engine.py", line 1488, in literal_map_to_kwargs
        return synced(ctx, lm, python_types, literal_types)
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/flytekit/utils/asyn.py", line 100, in wrapped
        return self.run_sync(coro_func, *args, **kwargs)
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/flytekit/utils/asyn.py", line 93, in run_sync
        return self._runner_map[name].run(coro)
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/flytekit/utils/asyn.py", line 72, in run
        res = fut.result(None)
      File "/opt/micromamba/envs/runtime/lib/python3.10/concurrent/futures/_base.py", line 458, in result
        return self.__get_result()
      File "/opt/micromamba/envs/runtime/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
        raise self._exception
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/flytekit/core/type_engine.py", line 1526, in _literal_map_to_kwargs
        await asyncio.gather(*kwargs.values())
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/flytekit/core/type_engine.py", line 1450, in async_to_python_value
        pv = await transformer.async_to_python_value(ctx, lv, expected_python_type)
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/flytekit/types/structured/structured_dataset.py", line 1026, in async_to_python_value
        return self.open_as(ctx, lv.scalar.structured_dataset, df_type=expected_python_type, updated_metadata=metad)
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/flytekit/types/structured/structured_dataset.py", line 1061, in open_as
        result = decoder.decode(ctx, sd, updated_metadata)
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/flytekit/types/structured/basic_dfs.py", line 137, in decode
        return pd.read_parquet(uri, columns=columns, storage_options=kwargs)
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/pandas/io/parquet.py", line 667, in read_parquet
        return impl.read(
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/pandas/io/parquet.py", line 274, in read
        pa_table = self.api.parquet.read_table(
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/pyarrow/parquet/core.py", line 1793, in read_table
        dataset = ParquetDataset(
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/pyarrow/parquet/core.py", line 1371, in __init__
        self._dataset = ds.dataset(path_or_paths, filesystem=filesystem,
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/pyarrow/dataset.py", line 794, in dataset
        return _filesystem_dataset(source, **kwargs)
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/pyarrow/dataset.py", line 486, in _filesystem_dataset
        return factory.finish(schema)
      File "pyarrow/_dataset.pyx", line 3126, in pyarrow._dataset.DatasetFactory.finish
      File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
      File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/fsspec/spec.py", line 2083, in read
        out = self.cache._fetch(self.loc, self.loc + length)
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/fsspec/caching.py", line 249, in _fetch
        self.cache = self.fetcher(start, end)  # new block replaces old
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/s3fs/core.py", line 2359, in _fetch_range
        return _fetch_range(
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/s3fs/core.py", line 2531, in _fetch_range
        return sync(fs.loop, _inner_fetch, fs, bucket, key, version_id, start, end, req_kw)
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/fsspec/asyn.py", line 103, in sync
        raise return_result
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/fsspec/asyn.py", line 56, in _runner
        result[0] = await coro
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/s3fs/core.py", line 2549, in _inner_fetch
        return await _error_wrapper(_call_and_read, retries=fs.retries)
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/s3fs/core.py", line 146, in _error_wrapper
        raise err
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/s3fs/core.py", line 114, in _error_wrapper
        return await func(*args, **kwargs)
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/s3fs/core.py", line 2545, in _call_and_read
        return await resp["Body"].read()
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/aiobotocore/httpchecksum.py", line 58, in read
        self._validate_checksum()
      File "/opt/micromamba/envs/runtime/lib/python3.10/site-packages/aiobotocore/httpchecksum.py", line 67, in _validate_checksum
        raise FlexibleChecksumError(error_msg=error_msg)
    botocore.exceptions.FlexibleChecksumError: Expected checksum ++UuwQ== did not match calculated checksum: g1Ltrg==

Message:

    FlexibleChecksumError: Expected checksum ++UuwQ== did not match calculated checksum: g1Ltrg==
All of the tasks are using the following container spec:
Copy code
import os

from flytekit import ImageSpec

requirements_path = os.path.join(os.path.dirname(__file__), "..", "..", "requirements.txt")

default_image_spec = ImageSpec(
    requirements=requirements_path,
    python_version="3.10",
    registry="localhost:30000"
)
here are my requirements file:
Copy code
flytekit>=1.5.0
flytekitplugins-mlflow
flytekitplugins-deck-standard
numpy
pydantic<2.0.0
scikit-learn>=0.24.1,<= 1.6.1
snowflake-connector-python[pandas]
fastparquet>=2024.11.0
scikit-plot==0.3.7
matplotlib==3.9.2
seaborn==0.13.2
shap>=0.41.0
scipy==1.11.4
catboost==1.2.7
You can use the following code to generate the dataframe:
Copy code
def random_string(length=10):
    return ''.join(random.choices(string.ascii_letters + string.digits, k=length))


@task(container_image=default_image_spec)
def generate_dataframe() -> pd.DataFrame:
    num_rows = 3000
    num_cols = 200

    data = {}

    for i in range(num_cols):
        col_type = i % 5  # Cycle through different data types

        if col_type == 0:
            data[f"int_col_{i}"] = np.random.randint(0, 1000, num_rows)
        elif col_type == 1:
            data[f"float_col_{i}"] = np.random.rand(num_rows) * 100
        elif col_type == 2:
            data[f"bool_col_{i}"] = np.random.choice([True, False], num_rows)
        elif col_type == 3:
            start_date = datetime(2020, 1, 1)
            data[f"datetime_col_{i}"] = [start_date + timedelta(days=np.random.randint(0, 3650)) for _ in
                                         range(num_rows)]
        elif col_type == 4:
            data[f"string_col_{i}"] = [random_string(10) for _ in range(num_rows)]

    df = pd.DataFrame(data)
    return df
Do you have any insights into why this error might be occurring?
@freezing-airport-6809 any update?
f
Sorry @fierce-monitor-77717 completely slipped my mind. Cc @damp-lion-88352 can you try this code I can try once I get near a keyboard
d
Hi coming
Hi, @fierce-monitor-77717
it works in my setup
using flytekit master branch
Copy code
import os
import random
import string
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from flytekit import ImageSpec, task

requirements_path = "/Users/future-outlier/code/dev/flytekit/build/projects/oss-image-checksum-error/requirements.txt"

default_image_spec = ImageSpec(
    requirements=requirements_path,
    python_version="3.10",
    registry="futureoutlier",
)
def random_string(length=10):
    return ''.join(random.choices(string.ascii_letters + string.digits, k=length))


@task(container_image=default_image_spec)
def generate_dataframe() -> pd.DataFrame:
    num_rows = 3000
    num_cols = 200

    data = {}

    for i in range(num_cols):
        col_type = i % 5  # Cycle through different data types

        if col_type == 0:
            data[f"int_col_{i}"] = np.random.randint(0, 1000, num_rows)
        elif col_type == 1:
            data[f"float_col_{i}"] = np.random.rand(num_rows) * 100
        elif col_type == 2:
            data[f"bool_col_{i}"] = np.random.choice([True, False], num_rows)
        elif col_type == 3:
            start_date = datetime(2020, 1, 1)
            data[f"datetime_col_{i}"] = [start_date + timedelta(days=np.random.randint(0, 3650)) for _ in
                                         range(num_rows)]
        elif col_type == 4:
            data[f"string_col_{i}"] = [random_string(10) for _ in range(num_rows)]

    df = pd.DataFrame(data)
    return df
Copy code
flytekit>=1.5.0
flytekitplugins-mlflow
flytekitplugins-deck-standard
numpy
pydantic<2.0.0
scikit-learn>=0.24.1,<= 1.6.1
snowflake-connector-python[pandas]
fastparquet>=2024.11.0
scikit-plot==0.3.7
matplotlib==3.9.2
seaborn==0.13.2
shap>=0.41.0
scipy==1.11.4
catboost==1.2.7
pandas
numpy
and also you have to add pandas and numpy in requirements.txt
it works in my cluster
f
@damp-lion-88352 did u use the demo cluster (sandbox) locally? what was the exact setup? I've installed flyte 1.14.6
d
I use the latest flytekit
It is not related to flyte
f
@damp-lion-88352 can u share the exact requirements? after pip freeze? maybe there are some issues with versions?
d
Use the latest one
Pip install flytekit -U
I use master branch flytekit
q
@fierce-monitor-77717 like previously mentioned this is probably a demo cluster thing. You might be able to make it work but I have moved to a k8 cluster on a KIND and never encountered this issue again. https://github.com/davidmirror-ops/flyte-the-hard-way/blob/main/docs/on-premises/single-node/002-single-node-onprem-install.md heres a quick guide on how to make it happen