Geert
11/06/2023, 2:23 PM@task
def train_model(
some: pd.DataFrame,
pandas: pd.DataFrame,
data: pd.DataFrame,
frames: pd.DataFrame,
) -> train_model_output:
print("foo")
return train_model_output(
model=None
)
In this case, the biggest file in S3 is about 2.1GB large (Dataframe is about 12GB), so nothing crazy. One other is ~200MB, and others in the order of KBs. But the memory usage is silly high. In the logs it’s not even hitting the print statement, so Im guessing something is up with deserialization? Im seeing boto download logs, and after they stop the memory starts spiking (hence Im thinking perhaps deserialization). Any thoughts on this?
Note: We run the exact same workflow and it runs fine for a smaller dataset (~500MB).Ketan (kumare3)
Geert
11/06/2023, 5:09 PMdf_dataset_cols = kwtypes(
some_field=int,
some_other_field=str,
and_another_field=List[int],
...
)
@task
def train_model(
df: Annotated[StructuredDataset, df_dataset_cols],
...
) -> train_model_output:
df = df.open(pd.DataFrame).all()
...
Ketan (kumare3)
Geert
11/06/2023, 5:18 PMKetan (kumare3)
Geert
11/06/2023, 8:09 PMc
with type list[int]
I see the same stepwise behaviour resulting in eventual OOMKilled. If I run the example without that column, it finishes successfully:
from typing import List, NamedTuple
try:
from typing import Annotated
except ImportError:
from typing_extensions import Annotated
import pandas as pd
import numpy as np
import uuid
from flytekit import (
task,
workflow,
kwtypes,
Resources,
)
from flytekit.types.structured.structured_dataset import (
StructuredDataset,
)
df_cols = kwtypes(
a=int,
b=int,
c=List[str],
d=str,
e=str,
f=str,
)
foo_output = NamedTuple(
"FooOutput",
[
("df_output", Annotated[pd.DataFrame, df_cols]),
],
)
@task(
requests=Resources(cpu="1000m", mem="64Gi"),
limits=Resources(cpu="2000m", mem="128Gi"),
)
def foo() -> foo_output:
print("foo")
num_rows = 50000000
df1 = pd.DataFrame(
np.random.randint(0, 100, size=(num_rows, 2)), columns=list("ab")
)
df2 = pd.DataFrame(
np.array([str(uuid.uuid4()) for i in range(3 * num_rows)]).reshape(-1, 3),
columns=list("def"),
)
df2["c"] = df2["d"].map(lambda x: x.split("-"))
df_output = pd.concat([df1, df2], ignore_index=True, sort=False)
return foo_output(df_output)
@task(
requests=Resources(cpu="1000m", mem="64Gi"),
limits=Resources(cpu="2000m", mem="128Gi"),
)
def bar(df_input: Annotated[StructuredDataset, df_cols]):
print("bar")
df_input = df_input.open(pd.DataFrame).all()
print(df_input)
@workflow
def base_workflow():
task_foo = foo()
task_bar = bar(
df_input=task_foo.df_output,
)
Ketan (kumare3)
Pryce
11/06/2023, 9:59 PMprint('bar')
?Ketan (kumare3)
Pryce
11/06/2023, 10:03 PMGeert
11/06/2023, 10:17 PMPryce
11/06/2023, 10:37 PMGeert
11/07/2023, 7:48 PMlist[..]
columns, and instead df.explode() them, and output/input them as separate pd.DataFrame
in the tasks, and merge the columns again in the next task into the original dataframe. _I_t just takes a lot of extra processing time to do the explode and merge again in the subsequent task, so it’s not really a fix at the moment (checking if I can speed it up). But no memory issues.list[]
types in our DataFrames without any extra wranglingPryce
11/27/2023, 11:03 PM@task(
requests=Resources(cpu="1000m", mem="2Gi"),
limits=Resources(cpu="2000m", mem="4Gi"),
)
def foo(num_rows: int) -> foo_output:
print("foo")
df1 = pd.DataFrame(
np.random.randint(0, 100, size=(num_rows, 2)), columns=list("ab")
)
print(f'df1: {sys.getsizeof(df1)}') # 40000144
df2 = pd.DataFrame(
np.array([str(uuid.uuid4()) for i in range(3 * num_rows)]).reshape(-1, 3),
columns=list("def"),
)
print(f'df2 pre transform: {sys.getsizeof(df2)}') # 697500144
df2["c"] = df2["d"].map(lambda x: x.split("-"))
print(f'df2 post transform: {sys.getsizeof(df2)}') # 1057500144
df_output = pd.concat([df1, df2], ignore_index=True, sort=False)
print(f'df_output: {sys.getsizeof(df2)}') # 1057500144
return foo_output(df_output)
@task(
requests=Resources(cpu="1000m", mem="16Gi"),
limits=Resources(cpu="2000m", mem="32Gi"),
)
def bar(df_input: Annotated[StructuredDataset, df_cols]):
print("bar")
df_input = df_input.open(pd.DataFrame).all()
print(f'df input: {sys.getsizeof(df_input)}') # 1317500144
print(df_input)
@workflow
def base_workflow(num_rows: int=2500000):
task_foo = foo(num_rows=num_rows)
task_bar = bar(
df_input=task_foo.df_output,
)
The objects in question are taking up way more memory than they should. I had a peak at the structured_dataset implementation but no memory leaks jump out at me. What do you think @Ketan (kumare3)?