https://flyte.org logo
#ask-the-community
Title
# ask-the-community
g

Geert

11/06/2023, 2:23 PM
Hi, we’re hitting an issue where we see excessive memory usage from a task before the Pod gets OOMKilled. This happens even when we reduce the task to do basically nothing:
Copy code
@task
def train_model(
    some: pd.DataFrame,
    pandas: pd.DataFrame,
    data: pd.DataFrame,
    frames: pd.DataFrame,
) -> train_model_output:
    print("foo")
    return train_model_output(
        model=None
    )
In this case, the biggest file in S3 is about 2.1GB large (Dataframe is about 12GB), so nothing crazy. One other is ~200MB, and others in the order of KBs. But the memory usage is silly high. In the logs it’s not even hitting the print statement, so Im guessing something is up with deserialization? Im seeing boto download logs, and after they stop the memory starts spiking (hence Im thinking perhaps deserialization). Any thoughts on this? Note: We run the exact same workflow and it runs fine for a smaller dataset (~500MB).
k

Ketan (kumare3)

11/06/2023, 3:49 PM
This seems like how the dataframes are built
Can you try using structured dataset
g

Geert

11/06/2023, 5:09 PM
Same behaviour when Im doing:
Copy code
df_dataset_cols = kwtypes(
    some_field=int,
    some_other_field=str,
    and_another_field=List[int],
    ...
)

@task
def train_model(
    df: Annotated[StructuredDataset, df_dataset_cols],
    ...
) -> train_model_output:
    df = df.open(pd.DataFrame).all()
    ...
k

Ketan (kumare3)

11/06/2023, 5:16 PM
interesting
we would love to help understand and solve this
but would love to get a representative example
cc @Kevin Su / @Yi Chiu / @Eduardo Apolinario (eapolinario)?
g

Geert

11/06/2023, 5:18 PM
Yeah I understand, Ill try to repro with a minimal example with some random data
Thanks 👍
k

Ketan (kumare3)

11/06/2023, 7:13 PM
thank you
g

Geert

11/06/2023, 8:09 PM
I think I have a relatively minimal example, when I add the column
c
with type
list[int]
I see the same stepwise behaviour resulting in eventual OOMKilled. If I run the example without that column, it finishes successfully:
Copy code
from typing import List, NamedTuple

try:
    from typing import Annotated
except ImportError:
    from typing_extensions import Annotated

import pandas as pd
import numpy as np
import uuid

from flytekit import (
    task,
    workflow,
    kwtypes,
    Resources,
)
from flytekit.types.structured.structured_dataset import (
    StructuredDataset,
)


df_cols = kwtypes(
    a=int,
    b=int,
    c=List[str],
    d=str,
    e=str,
    f=str,
)


foo_output = NamedTuple(
    "FooOutput",
    [
        ("df_output", Annotated[pd.DataFrame, df_cols]),
    ],
)


@task(
    requests=Resources(cpu="1000m", mem="64Gi"),
    limits=Resources(cpu="2000m", mem="128Gi"),
)
def foo() -> foo_output:
    print("foo")

    num_rows = 50000000

    df1 = pd.DataFrame(
        np.random.randint(0, 100, size=(num_rows, 2)), columns=list("ab")
    )
    df2 = pd.DataFrame(
        np.array([str(uuid.uuid4()) for i in range(3 * num_rows)]).reshape(-1, 3),
        columns=list("def"),
    )
    df2["c"] = df2["d"].map(lambda x: x.split("-"))

    df_output = pd.concat([df1, df2], ignore_index=True, sort=False)
    return foo_output(df_output)


@task(
    requests=Resources(cpu="1000m", mem="64Gi"),
    limits=Resources(cpu="2000m", mem="128Gi"),
)
def bar(df_input: Annotated[StructuredDataset, df_cols]):
    print("bar")
    df_input = df_input.open(pd.DataFrame).all()
    print(df_input)


@workflow
def base_workflow():
    task_foo = foo()
    task_bar = bar(
        df_input=task_foo.df_output,
    )
k

Ketan (kumare3)

11/06/2023, 9:23 PM
cc @Pryce
also cc @Eduardo Apolinario (eapolinario) / @Kevin Su
p

Pryce

11/06/2023, 9:59 PM
@Geert I'm guessing that in your minimal example you're not even getting to
print('bar')
?
k

Ketan (kumare3)

11/06/2023, 10:00 PM
@Pryce you could try "Union" TLM to simply observe this in real time
p

Pryce

11/06/2023, 10:03 PM
Sounds good will try to repro on our end
g

Geert

11/06/2023, 10:17 PM
Actually in the minimal example I do see the print, so guess here it's the open/all call? I can run more tests tomorrow if that helps! Thanks for checking on this
p

Pryce

11/06/2023, 10:37 PM
Maybe! Will let you know what my testing reveals when I get to it..
g

Geert

11/07/2023, 7:48 PM
In our original workflow, it runs if I remove the
list[..]
columns, and instead df.explode() them, and output/input them as separate
pd.DataFrame
in the tasks, and merge the columns again in the next task into the original dataframe. _I_t just takes a lot of extra processing time to do the explode and merge again in the subsequent task, so it’s not really a fix at the moment (checking if I can speed it up). But no memory issues.
Hi @Pryce, did your investigation lead to some insights? We are using the workaround at the moment, but it would be great to be able to use
list[]
types in our DataFrames without any extra wrangling
p

Pryce

11/27/2023, 11:03 PM
Hey @Geert thanks for your patience. I've been able to reproduce this internally with a smaller dataset. I'm still chasing down why those datastructures are ballooning so much though.
Could you please share your workaround code in the context of the repro example above?
Copy code
@task(
    requests=Resources(cpu="1000m", mem="2Gi"),
    limits=Resources(cpu="2000m", mem="4Gi"),
)
def foo(num_rows: int) -> foo_output:
    print("foo")

    df1 = pd.DataFrame(
        np.random.randint(0, 100, size=(num_rows, 2)), columns=list("ab")
    )
    print(f'df1: {sys.getsizeof(df1)}') # 40000144
    df2 = pd.DataFrame(
        np.array([str(uuid.uuid4()) for i in range(3 * num_rows)]).reshape(-1, 3),
        columns=list("def"),
    )
    print(f'df2 pre transform: {sys.getsizeof(df2)}') # 697500144
    df2["c"] = df2["d"].map(lambda x: x.split("-"))
    print(f'df2 post transform: {sys.getsizeof(df2)}') # 1057500144

    df_output = pd.concat([df1, df2], ignore_index=True, sort=False)
    print(f'df_output: {sys.getsizeof(df2)}') # 1057500144

    return foo_output(df_output)


@task(
    requests=Resources(cpu="1000m", mem="16Gi"),
    limits=Resources(cpu="2000m", mem="32Gi"),
)
def bar(df_input: Annotated[StructuredDataset, df_cols]):
    print("bar")
    df_input = df_input.open(pd.DataFrame).all()
    print(f'df input: {sys.getsizeof(df_input)}') # 1317500144
    print(df_input)


@workflow
def base_workflow(num_rows: int=2500000):
    task_foo = foo(num_rows=num_rows)
    task_bar = bar(
        df_input=task_foo.df_output,
    )
The objects in question are taking up way more memory than they should. I had a peak at the structured_dataset implementation but no memory leaks jump out at me. What do you think @Ketan (kumare3)?
k

Ketan (kumare3)

12/04/2023, 2:53 AM
@Pryce I am back tomorrow shall we discuss This
g

Geert

12/04/2023, 8:04 AM
@Pryce I’ve since deleted the repro example, but we now do something like below to pass the
list[..]
columns separately (and not nested in the original DataFrame):
Copy code
@task
def some_func():
    ...
    df_some_list_type = pd.DataFrame(df_original.explode("foo")["foo"])
    df_dropped = df_original.drop("foo", axis=1)
    ...
    return (..., df_dropped, df_some_list_type, ...)

@task
def some_other_func(..., df_dropped, df_some_list_type,...):
    ...
    df_some_list_type = df_some_list_type.groupby(df_some_list_type.index).agg(
        {"foo": lambda x: x.tolist()}
    )
    df_dropped["foo"] = df_some_list_type
    ...
p

Pryce

12/04/2023, 7:23 PM
Thanks @Geert, @Ketan (kumare3) I'm free whenever
g

Geert

01/11/2024, 12:49 PM
Hi @Pryce do you have any updates on this issue? Anything we can help you with from our side?
p

Pryce

01/11/2024, 3:02 PM
Hey there, thanks for following up, sorry this hasn't been looked into yet, memory problems at this scale are... tricky to debug 😅 I started digging into the StructuredDataset transformer and supporting code but couldn't find anything conclusive. If you want to have a poke around and see if anything jumps out that would be awesome! https://github.com/flyteorg/flytekit/blob/949bc1b40e1966b409c821b96011d900241b22fd/flytekit/types/structured/structured_dataset.py#L330 If you find anything useful, raising an issue here will help put it on the roadmap.
k

Ketan (kumare3)

01/11/2024, 5:59 PM
cc @Yee / @Kevin Su can either of you check this?
g

Geert

01/11/2024, 6:06 PM
Not a problem at all, really appreciate you guys looking into this. Can imagine these issues are not trivial. And thanks for the links, I'll take a look in the weekend to see if something stands out!
@Pryce Could it be hitting this leak in
pyarrow.Table.from_pandas()
through the use of `pandas.DataFrame.to_parquet()`: • https://github.com/apache/arrow/issues/37989https://github.com/pandas-dev/pandas/issues/55296
There doesn’t seem to be a fix 😅 but at least nice to know if we can confirm it is that one
p

Pryce

02/01/2024, 6:20 PM
Thanks for sticking with it @Geert! I've subscribed to that issue, I suppose we'll see if things get better once a fix is out.
g

Geert

02/02/2024, 7:46 AM
Will do some rewriting into polars instead of pandas today, I guess that would also circumvent this 👍
Thanks for all the support!
5 Views