https://flyte.org logo
#ask-the-community
Title
# ask-the-community
g

Geert

11/06/2023, 2:23 PM
Hi, we’re hitting an issue where we see excessive memory usage from a task before the Pod gets OOMKilled. This happens even when we reduce the task to do basically nothing:
Copy code
@task
def train_model(
    some: pd.DataFrame,
    pandas: pd.DataFrame,
    data: pd.DataFrame,
    frames: pd.DataFrame,
) -> train_model_output:
    print("foo")
    return train_model_output(
        model=None
    )
In this case, the biggest file in S3 is about 2.1GB large (Dataframe is about 12GB), so nothing crazy. One other is ~200MB, and others in the order of KBs. But the memory usage is silly high. In the logs it’s not even hitting the print statement, so Im guessing something is up with deserialization? Im seeing boto download logs, and after they stop the memory starts spiking (hence Im thinking perhaps deserialization). Any thoughts on this? Note: We run the exact same workflow and it runs fine for a smaller dataset (~500MB).
k

Ketan (kumare3)

11/06/2023, 3:49 PM
This seems like how the dataframes are built
Can you try using structured dataset
g

Geert

11/06/2023, 5:09 PM
Same behaviour when Im doing:
Copy code
df_dataset_cols = kwtypes(
    some_field=int,
    some_other_field=str,
    and_another_field=List[int],
    ...
)

@task
def train_model(
    df: Annotated[StructuredDataset, df_dataset_cols],
    ...
) -> train_model_output:
    df = df.open(pd.DataFrame).all()
    ...
k

Ketan (kumare3)

11/06/2023, 5:16 PM
interesting
we would love to help understand and solve this
but would love to get a representative example
cc @Kevin Su / @Yi Chiu / @Eduardo Apolinario (eapolinario)?
g

Geert

11/06/2023, 5:18 PM
Yeah I understand, Ill try to repro with a minimal example with some random data
Thanks 👍
k

Ketan (kumare3)

11/06/2023, 7:13 PM
thank you
g

Geert

11/06/2023, 8:09 PM
I think I have a relatively minimal example, when I add the column
c
with type
list[int]
I see the same stepwise behaviour resulting in eventual OOMKilled. If I run the example without that column, it finishes successfully:
Copy code
from typing import List, NamedTuple

try:
    from typing import Annotated
except ImportError:
    from typing_extensions import Annotated

import pandas as pd
import numpy as np
import uuid

from flytekit import (
    task,
    workflow,
    kwtypes,
    Resources,
)
from flytekit.types.structured.structured_dataset import (
    StructuredDataset,
)


df_cols = kwtypes(
    a=int,
    b=int,
    c=List[str],
    d=str,
    e=str,
    f=str,
)


foo_output = NamedTuple(
    "FooOutput",
    [
        ("df_output", Annotated[pd.DataFrame, df_cols]),
    ],
)


@task(
    requests=Resources(cpu="1000m", mem="64Gi"),
    limits=Resources(cpu="2000m", mem="128Gi"),
)
def foo() -> foo_output:
    print("foo")

    num_rows = 50000000

    df1 = pd.DataFrame(
        np.random.randint(0, 100, size=(num_rows, 2)), columns=list("ab")
    )
    df2 = pd.DataFrame(
        np.array([str(uuid.uuid4()) for i in range(3 * num_rows)]).reshape(-1, 3),
        columns=list("def"),
    )
    df2["c"] = df2["d"].map(lambda x: x.split("-"))

    df_output = pd.concat([df1, df2], ignore_index=True, sort=False)
    return foo_output(df_output)


@task(
    requests=Resources(cpu="1000m", mem="64Gi"),
    limits=Resources(cpu="2000m", mem="128Gi"),
)
def bar(df_input: Annotated[StructuredDataset, df_cols]):
    print("bar")
    df_input = df_input.open(pd.DataFrame).all()
    print(df_input)


@workflow
def base_workflow():
    task_foo = foo()
    task_bar = bar(
        df_input=task_foo.df_output,
    )
k

Ketan (kumare3)

11/06/2023, 9:23 PM
cc @Pryce
also cc @Eduardo Apolinario (eapolinario) / @Kevin Su
p

Pryce

11/06/2023, 9:59 PM
@Geert I'm guessing that in your minimal example you're not even getting to
print('bar')
?
k

Ketan (kumare3)

11/06/2023, 10:00 PM
@Pryce you could try "Union" TLM to simply observe this in real time
p

Pryce

11/06/2023, 10:03 PM
Sounds good will try to repro on our end
g

Geert

11/06/2023, 10:17 PM
Actually in the minimal example I do see the print, so guess here it's the open/all call? I can run more tests tomorrow if that helps! Thanks for checking on this
p

Pryce

11/06/2023, 10:37 PM
Maybe! Will let you know what my testing reveals when I get to it..
g

Geert

11/07/2023, 7:48 PM
In our original workflow, it runs if I remove the
list[..]
columns, and instead df.explode() them, and output/input them as separate
pd.DataFrame
in the tasks, and merge the columns again in the next task into the original dataframe. _I_t just takes a lot of extra processing time to do the explode and merge again in the subsequent task, so it’s not really a fix at the moment (checking if I can speed it up). But no memory issues.
Hi @Pryce, did your investigation lead to some insights? We are using the workaround at the moment, but it would be great to be able to use
list[]
types in our DataFrames without any extra wrangling
p

Pryce

11/27/2023, 11:03 PM
Hey @Geert thanks for your patience. I've been able to reproduce this internally with a smaller dataset. I'm still chasing down why those datastructures are ballooning so much though.
Could you please share your workaround code in the context of the repro example above?
Copy code
@task(
    requests=Resources(cpu="1000m", mem="2Gi"),
    limits=Resources(cpu="2000m", mem="4Gi"),
)
def foo(num_rows: int) -> foo_output:
    print("foo")

    df1 = pd.DataFrame(
        np.random.randint(0, 100, size=(num_rows, 2)), columns=list("ab")
    )
    print(f'df1: {sys.getsizeof(df1)}') # 40000144
    df2 = pd.DataFrame(
        np.array([str(uuid.uuid4()) for i in range(3 * num_rows)]).reshape(-1, 3),
        columns=list("def"),
    )
    print(f'df2 pre transform: {sys.getsizeof(df2)}') # 697500144
    df2["c"] = df2["d"].map(lambda x: x.split("-"))
    print(f'df2 post transform: {sys.getsizeof(df2)}') # 1057500144

    df_output = pd.concat([df1, df2], ignore_index=True, sort=False)
    print(f'df_output: {sys.getsizeof(df2)}') # 1057500144

    return foo_output(df_output)


@task(
    requests=Resources(cpu="1000m", mem="16Gi"),
    limits=Resources(cpu="2000m", mem="32Gi"),
)
def bar(df_input: Annotated[StructuredDataset, df_cols]):
    print("bar")
    df_input = df_input.open(pd.DataFrame).all()
    print(f'df input: {sys.getsizeof(df_input)}') # 1317500144
    print(df_input)


@workflow
def base_workflow(num_rows: int=2500000):
    task_foo = foo(num_rows=num_rows)
    task_bar = bar(
        df_input=task_foo.df_output,
    )
The objects in question are taking up way more memory than they should. I had a peak at the structured_dataset implementation but no memory leaks jump out at me. What do you think @Ketan (kumare3)?
3 Views