shy-evening-51366
11/06/2023, 2:23 PM@task
def train_model(
some: pd.DataFrame,
pandas: pd.DataFrame,
data: pd.DataFrame,
frames: pd.DataFrame,
) -> train_model_output:
print("foo")
return train_model_output(
model=None
)
In this case, the biggest file in S3 is about 2.1GB large (Dataframe is about 12GB), so nothing crazy. One other is ~200MB, and others in the order of KBs. But the memory usage is silly high. In the logs it’s not even hitting the print statement, so Im guessing something is up with deserialization? Im seeing boto download logs, and after they stop the memory starts spiking (hence Im thinking perhaps deserialization). Any thoughts on this?
Note: We run the exact same workflow and it runs fine for a smaller dataset (~500MB).freezing-airport-6809
freezing-airport-6809
shy-evening-51366
11/06/2023, 5:09 PMdf_dataset_cols = kwtypes(
some_field=int,
some_other_field=str,
and_another_field=List[int],
...
)
@task
def train_model(
df: Annotated[StructuredDataset, df_dataset_cols],
...
) -> train_model_output:
df = df.open(pd.DataFrame).all()
...
freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
shy-evening-51366
11/06/2023, 5:18 PMshy-evening-51366
11/06/2023, 5:18 PMfreezing-airport-6809
shy-evening-51366
11/06/2023, 8:09 PMc
with type list[int]
I see the same stepwise behaviour resulting in eventual OOMKilled. If I run the example without that column, it finishes successfully:
from typing import List, NamedTuple
try:
from typing import Annotated
except ImportError:
from typing_extensions import Annotated
import pandas as pd
import numpy as np
import uuid
from flytekit import (
task,
workflow,
kwtypes,
Resources,
)
from flytekit.types.structured.structured_dataset import (
StructuredDataset,
)
df_cols = kwtypes(
a=int,
b=int,
c=List[str],
d=str,
e=str,
f=str,
)
foo_output = NamedTuple(
"FooOutput",
[
("df_output", Annotated[pd.DataFrame, df_cols]),
],
)
@task(
requests=Resources(cpu="1000m", mem="64Gi"),
limits=Resources(cpu="2000m", mem="128Gi"),
)
def foo() -> foo_output:
print("foo")
num_rows = 50000000
df1 = pd.DataFrame(
np.random.randint(0, 100, size=(num_rows, 2)), columns=list("ab")
)
df2 = pd.DataFrame(
np.array([str(uuid.uuid4()) for i in range(3 * num_rows)]).reshape(-1, 3),
columns=list("def"),
)
df2["c"] = df2["d"].map(lambda x: x.split("-"))
df_output = pd.concat([df1, df2], ignore_index=True, sort=False)
return foo_output(df_output)
@task(
requests=Resources(cpu="1000m", mem="64Gi"),
limits=Resources(cpu="2000m", mem="128Gi"),
)
def bar(df_input: Annotated[StructuredDataset, df_cols]):
print("bar")
df_input = df_input.open(pd.DataFrame).all()
print(df_input)
@workflow
def base_workflow():
task_foo = foo()
task_bar = bar(
df_input=task_foo.df_output,
)
freezing-airport-6809
freezing-airport-6809
eager-processor-63090
11/06/2023, 9:59 PMprint('bar')
?freezing-airport-6809
eager-processor-63090
11/06/2023, 10:03 PMshy-evening-51366
11/06/2023, 10:17 PMeager-processor-63090
11/06/2023, 10:37 PMshy-evening-51366
11/07/2023, 7:48 PMlist[..]
columns, and instead df.explode() them, and output/input them as separate pd.DataFrame
in the tasks, and merge the columns again in the next task into the original dataframe. _I_t just takes a lot of extra processing time to do the explode and merge again in the subsequent task, so it’s not really a fix at the moment (checking if I can speed it up). But no memory issues.shy-evening-51366
11/24/2023, 7:52 AMlist[]
types in our DataFrames without any extra wranglingeager-processor-63090
11/27/2023, 11:03 PMeager-processor-63090
11/27/2023, 11:05 PMeager-processor-63090
11/28/2023, 12:41 AM@task(
requests=Resources(cpu="1000m", mem="2Gi"),
limits=Resources(cpu="2000m", mem="4Gi"),
)
def foo(num_rows: int) -> foo_output:
print("foo")
df1 = pd.DataFrame(
np.random.randint(0, 100, size=(num_rows, 2)), columns=list("ab")
)
print(f'df1: {sys.getsizeof(df1)}') # 40000144
df2 = pd.DataFrame(
np.array([str(uuid.uuid4()) for i in range(3 * num_rows)]).reshape(-1, 3),
columns=list("def"),
)
print(f'df2 pre transform: {sys.getsizeof(df2)}') # 697500144
df2["c"] = df2["d"].map(lambda x: x.split("-"))
print(f'df2 post transform: {sys.getsizeof(df2)}') # 1057500144
df_output = pd.concat([df1, df2], ignore_index=True, sort=False)
print(f'df_output: {sys.getsizeof(df2)}') # 1057500144
return foo_output(df_output)
@task(
requests=Resources(cpu="1000m", mem="16Gi"),
limits=Resources(cpu="2000m", mem="32Gi"),
)
def bar(df_input: Annotated[StructuredDataset, df_cols]):
print("bar")
df_input = df_input.open(pd.DataFrame).all()
print(f'df input: {sys.getsizeof(df_input)}') # 1317500144
print(df_input)
@workflow
def base_workflow(num_rows: int=2500000):
task_foo = foo(num_rows=num_rows)
task_bar = bar(
df_input=task_foo.df_output,
)
The objects in question are taking up way more memory than they should. I had a peak at the structured_dataset implementation but no memory leaks jump out at me. What do you think @freezing-airport-6809?freezing-airport-6809
shy-evening-51366
12/04/2023, 8:04 AMlist[..]
columns separately (and not nested in the original DataFrame):
@task
def some_func():
...
df_some_list_type = pd.DataFrame(df_original.explode("foo")["foo"])
df_dropped = df_original.drop("foo", axis=1)
...
return (..., df_dropped, df_some_list_type, ...)
@task
def some_other_func(..., df_dropped, df_some_list_type,...):
...
df_some_list_type = df_some_list_type.groupby(df_some_list_type.index).agg(
{"foo": lambda x: x.tolist()}
)
df_dropped["foo"] = df_some_list_type
...
eager-processor-63090
12/04/2023, 7:23 PMshy-evening-51366
01/11/2024, 12:49 PMeager-processor-63090
01/11/2024, 3:02 PMfreezing-airport-6809
shy-evening-51366
01/11/2024, 6:06 PMshy-evening-51366
02/01/2024, 12:08 PMpyarrow.Table.from_pandas()
through the use of `pandas.DataFrame.to_parquet()`:
• https://github.com/apache/arrow/issues/37989
• https://github.com/pandas-dev/pandas/issues/55296shy-evening-51366
02/01/2024, 12:10 PMeager-processor-63090
02/01/2024, 6:20 PMshy-evening-51366
02/02/2024, 7:46 AMshy-evening-51366
02/02/2024, 7:46 AM