Hi team.. I need some help. I have a flyte task th...
# ask-the-community
r
Hi team.. I need some help. I have a flyte task that needs to read a parquet file stored in s3 (about 2 GB in size) and return 1000 records from it… I’m using awswrangler for reading the file into a pandas dataframe and then return the first 1000 records from it. Each task node has upto 8 GB in memory… but the pod keeps getting OOM killed with no useful info in the stack trace. Not sure what’s going on…
k
is 8GB task resource request or limit?
could you check the pods spec by using
kubectl describe
? just want to make sure flyte set the resource correctly
r
sorry.. not 8.. 18 Here’s a snippet of our helm chart
Copy code
# -- Task default resources configuration
  task_resource_defaults:
    task_resources:
      defaults:
        cpu: 100m
        memory: 200Mi
        storage: 200Mi
        ephemeral-storage: 1Gi
        gpu: 0
      limits:
        cpu: 9
        memory: 18Gi
        storage: 20Gi
        ephemeral-storage: 28Gi
        gpu: 0
do yes.. that’s the limit
k
how about the pod spec? could you show me the output of kubectl get pods <name> -n flytesnack-development
r
Sent you the screen shot… Meanwhile here are the sanitized logs from a pod that got OOMKilled
Copy code
{"asctime": "2022-12-28 05:20:19,825", "name": "flytekit", "levelname": "INFO", "message": "Setting protocol to file"}
{"asctime": "2022-12-28 05:20:25,924", "name": "flytekit", "levelname": "INFO", "message": "We won't register PyTorchCheckpointTransformer, PyTorchTensorTransformer, and PyTorchModuleTransformer because torch is not installed."}
{"asctime": "2022-12-28 05:20:26,124", "name": "flytekit", "levelname": "INFO", "message": "Setting protocol to file"}
{"asctime": "2022-12-28 05:20:26,124", "name": "flytekit", "levelname": "INFO", "message": "Setting protocol to file"}
{"asctime": "2022-12-28 05:20:26,125", "name": "flytekit", "levelname": "INFO", "message": "Setting protocol to file"}
{"asctime": "2022-12-28 05:20:26,125", "name": "flytekit", "levelname": "INFO", "message": "Setting protocol to file"}

{"asctime": "2022-12-28 05:20:27,733", "name": "flytekit.entrypoint", "levelname": "INFO", "message": "Welcome to Flyte! Version: 1.2.1"}
{"asctime": "2022-12-28 05:20:27,734", "name": "flytekit.entrypoint", "levelname": "INFO", "message": "Using user directory /tmp/flyte-i8059ewt/sandbox/local_flytekit/9f57ef630f6df792206a67f0b4e593c5"}
{"asctime": "2022-12-28 05:20:32,830", "name": "flytekit", "levelname": "INFO", "message": "Entering timed context: Copying (<s3://my-flyte/flyte/metadata/propeller/project-development-agbd42n8ph94v8rwcbgv/n0/data/inputs.pb> -> /tmp/flyte-i8059ewt/sandbox/local_flytekit/inputs.pb)"}
{"asctime": "2022-12-28 05:20:41,531", "name": "flytekit", "levelname": "INFO", "message": "Output of command '['aws', 's3', 'cp', '<s3://my-flyte/flyte/metadata/propeller/project-development-agbd42n8ph94v8rwcbgv/n0/data/inputs.pb>', '/tmp/flyte-i8059ewt/sandbox/local_flytekit/inputs.pb']':\nb'Completed 130 Bytes/130 Bytes (697 Bytes/s) with 1 file(s) remaining\\rdownload: <s3://my-flyte/flyte/metadata/propeller/project-development-agbd42n8ph94v8rwcbgv/n0/data/inputs.pb> to ../tmp/flyte-i8059ewt/sandbox/local_flytekit/inputs.pb\\n'\n"}
{"asctime": "2022-12-28 05:20:41,532", "name": "flytekit", "levelname": "INFO", "message": "Exiting timed context: Copying (<s3://my-flyte/flyte/metadata/propeller/project-development-agbd42n8ph94v8rwcbgv/n0/data/inputs.pb> -> /tmp/flyte-i8059ewt/sandbox/local_flytekit/inputs.pb) [Wall Time: 8.701755479996791s, Process Time: 0.027927802s]"}
{"asctime": "2022-12-28 05:20:41,534", "name": "flytekit", "levelname": "INFO", "message": "Invoking flyte.workflows.my_workflow.read_input_records with inputs: {'num_requests': 1000, 'input_path': '<s3://my_bucket/some_file_name.parquet>'}"}
k
sorry, I said something wrong. kubectl describe …
r
I’m seeing this in the describe pod output
Copy code
Limits:
      cpu:     100m
      memory:  200Mi
    Requests:
      cpu:     100m
      memory:  200Mi
wondering why the 18 GB limit didn’t take effect?
k
could you check the propeller config map? just want to make sure the resource is 18GB in that configmap.
btw, you’re running regular task, right? not spark or tensorflow job
r
python task
kubectl describe pod flytepropeller-6984f5cd7-ws65t
Copy code
Limits:
      cpu:                200m
      ephemeral-storage:  100Mi
      memory:             200Mi
    Requests:
      cpu:                10m
      ephemeral-storage:  50Mi
      memory:             100Mi
is that what you wanted?
k
no, kubectl edit cm <propeller config map>
you can use kubectl get cm to get the name first
r
ah.. hang on
Copy code
k8s.yaml: |
    plugins:
      k8s:
        default-cpus: 100m
        default-env-vars: []
        default-memory: 100Mi
I can’t seem to find any mention of the 18 GB
k
sorry again, flyteadmin config map
r
Copy code
task_resource_defaults.yaml: |
    task_resources:
      defaults:
        cpu: 100m
        ephemeral-storage: 1Gi
        gpu: 0
        memory: 200Mi
        storage: 200Mi
      limits:
        cpu: 9
        ephemeral-storage: 28Gi
        gpu: 0
        memory: 18Gi
        storage: 20Gi
k
one sec
have you changed the resource in task decorator?
r
no… didn’t realize I needed to do that
k
no, you don’t. just to confirm
oh. i see. flyteadmin will set both resource and limit to the value of
task_resources.defaults.
To change the limit value, you have to update the limit in task decorator.
r
is it the task_config parameter?
do you have an example for how to set
limits
in the decorator? There’s also
requests
in the decorator
r
thanks.. so do I set both?
k
I think you only need to set limit. if it doesn’t work, try to set both
cc @Ketan (kumare3) why we can’t use the limit in the
task_resources
by default
r
No crash so far 🤞🏼
ok.. that worked
Thank you!
k
@Rupsha Chaudhuri can you help Document what you had to do
r
Sure.. but not sure why on earth that task took 4.5 hours??? All it does is read a 2 GB parquet file into a pandas data frame and return the first 1000 records. From the logs there appears to be a lot of reading and writing to s3 in tiny chunks (600 bytes here, 7 KB there and so on) Example
Copy code
Exiting timed context: Writing (/tmp/flyte2qtcr_i0/local_flytekit/f4484561b67506b8c8bfe18c1b4a315d -> <s3://my-flyte/2x/axd9n8qhbwlgszfb9slf-n0-0/939989a9a40c75f65c631bdf11070354/f4484561b67506b8c8bfe18c1b4a315d>) [Wall Time: 6.298165863845497s, Process Time: 0.029386856000002126s]
1614	Output of command '['aws', 's3', 'cp', '--acl', 'bucket-owner-full-control', '/tmp/flyte2qtcr_i0/local_flytekit/f4484561b67506b8c8bfe18c1b4a315d', '<s3://my-flyte/2x/axd9n8qhbwlgszfb9slf-n0-0/939989a9a40c75f65c631bdf11070354/f4484561b67506b8c8bfe18c1b4a315d']'>:
b'Completed 8.9 KiB/8.9 KiB (47.7 KiB/s) with 1 file(s) remaining\rupload: ../tmp/flyte2qtcr_i0/local_flytekit/f4484561b67506b8c8bfe18c1b4a315d to <s3://tesla-flyte/2x/axd9n8qhbwlgszfb9slf-n0-0/939989a9a40c75f65c631bdf11070354/f4484561b67506b8c8bfe18c1b4a315d>\n'
1615	Entering timed context: Writing (/tmp/flyte2qtcr_i0/local_flytekit/8016c26332f3085b27a3d08692b5df94 -> <s3://my-flyte/2x/axd9n8qhbwlgszfb9slf-n0-0/ebb807108d3cd57bbfff58b978a8fa22/8016c26332f3085b27a3d08692b5df94>)
@Ketan (kumare3) where would you like me to add the documentation?
@Kevin Su any idea what’s causing the slowdown?
k
Might be Flyte deck issue
Did you disable it? @task(disable_deck=True….
r
let me try that.. I saw that on another thread but wasn’t sure if it applied to my use case
ok.. still super slow. This runs faster on my desktop as well as laptop 😞
k
btw, is fsspec package installed in your image? There are some issues in fsspec we just found that. it will download the files serially
r
yes.. I do have fsspec
k
sorry, could you remove that? and then try to run the workflow again. I’m going to fix the bug in fsspec
r
I’ll remove it for now in my branch.. since this specific workflow doesn’t need it..
k
thanks
r
@Kevin Su it looks like the bottleneck is writing to s3 when the task returns.. at this point it’s just 1000 rows
k
hmm this is really slow
@Kevin Su would be great to dig into this
r
15 min and counting….
k
does not make sense
k
I’m looking into it
r
I’ll be happy to share the code of the task (nothing proprietary in there).. if you want to dig in
k
Yes, that help
r
Copy code
import awswrangler as wr
import pandas as pd
import boto3
from time import perf_counter

@task(
    #cache=True,
    #cache_version="1.0",
    limits=Resources(mem="10Gi"),
    disable_deck=True,
    secret_requests=[
        Secret(group=SECRET_GROUP, key="aws_secret_access_key"),
        Secret(group=SECRET_GROUP, key="aws_access_key_id"),
    ],
)
def read_input_records(input_path: str, num_requests: int) -> List[Dict[str, Any]]:
    aws_access_key_id = flytekit.current_context().secrets.get(
        SECRET_GROUP, "aws_access_key_id"
    )
    aws_secret_access_key = flytekit.current_context().secrets.get(
        SECRET_GROUP, "aws_secret_access_key"
    )
    session = boto3.Session(
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key,
        region_name=AWS_REGION,
    )
    start = perf_counter()
    df = wr.s3.read_parquet(path=input_path, boto3_session=session)
    stop = perf_counter()
    flytekit.current_context().<http://logging.info|logging.info>(
        f"Read {len(df)} records in {stop - start} s"
    )

    if num_requests != -1 and num_requests < len(df):
        df = df[:num_requests]
    stop_2 = perf_counter()
    flytekit.current_context().<http://logging.info|logging.info>(
        f"Truncated to {len(df)} records in {stop_2 - stop} s"
    )

    records = df.to_dict("records")

    stop_3 = perf_counter()
    flytekit.current_context().<http://logging.info|logging.info>(
        f"Converted to records in {stop_3 - stop_2} s"
    )

    return records
Please adjust aws credentials etc as per your setup
k
Has the task been completed ? IIRC, if you use a large list (10000+) as output, flyte will take a long time (15 mins+) to construct a large protobuf list, and save it. However, saving size 1000 of a list should be really fast What is the number of columns in the dataframe? The record should be 1000 (list size)*N (number of column), right? To download the file in the pod, awswrangler should spend an equal amount of time as local, so the issue might only happen when we save the record or construct a large protobuf list
@Rupsha Chaudhuri I found the issue. the problem is that flyte doesn’t recognize the
Any
, so it will be serialized the data to pickle, which means you will have thousand of pickle files. In addition, flytekit will upload the file serially, so it become very slow.
could you change
any
to python type?
FYI, I can reproduce it with this code
Copy code
import typing
from typing import List, Dict, Any

import numpy as np
import pandas as pd
from flytekit import task, Resources, workflow, StructuredDataset


@task(
    limits=Resources(mem="4Gi"),
    disable_deck=True,
)
def create_parquet() -> StructuredDataset:
    df = pd.DataFrame(np.random.choice(['foo','bar','baz'], size=(100000, 3)), columns=["a", "b", "c"])
    df = df.apply(lambda col: col.astype('category'))

    return StructuredDataset(dataframe=df)


@task(
    limits=Resources(mem="4Gi"),
    disable_deck=True,
)
def read_input_records(sd: StructuredDataset) -> List[Dict[str, Any]]:
    df = sd.open(pd.DataFrame).all()
    records = typing.cast(pd.DataFrame, df).to_dict()
    return [records]*1000  # 1000 * 3


@workflow
def wf():
    sd = create_parquet()
    read_input_records(sd=sd)


if __name__ == "__main__":
    wf()
r
thanks a lot.. ok.. let me adjust the type
k
Ohh we need to somehow show this
r
I think there was some warning message about using Pickle for typing.Any… but would be nice to throw this into the docs somewhere as a real warning
b
As for the task_resource limits that was brought up earlier in this thread
Copy code
task_resource_defaults.yaml: |
    task_resources:
      defaults:
        cpu: 100m
        ephemeral-storage: 1Gi
        gpu: 0
        memory: 200Mi
        storage: 200Mi
      limits:
        cpu: 9
        ephemeral-storage: 28Gi
        gpu: 0
        memory: 18Gi
        storage: 20Gi
The
task_resources.defaults
refers to the defaults limits that apply on any POD. The
task.resources.limits
refers to the upper limit that can be set from the user's repo. If no request/limit is set on a task, the task_resources.defaults are applied as both.
k
@Babis Kiosidis Thanks for the explain.
393 Views