Hello guys We are trying to use a map task in a wo...
# ask-the-community
v
Hello guys We are trying to use a map task in a workflow, but no matter how we increase the pod memory, it keeps giving OOMKilled with error 137... By looking at the pods creation, in a successfull execution (with less inputs), we can verify that the resources setted on the task are not the same that are requested in GKE... The image shows the pods resources, when we setted the task with 6Gi memory and 2CPU...
n
Are you setting resources in the
@task(task_config=...)
?
d
Hey Victor, what version of Flyte are you running? Do you have a minimal repro for code here? Would be useful to test locally. It seems like this could be a bug.
v
@Nicholas Roberson not in task_config, but in limits @Dan Rammer (hamersaw) yes, @Jaine Rannow Budke can you help with this?
d
Thanks, this is a high priority bug - once we can repro I will have a fix out in the next day or so.
v
Ah, about the version, we are using flyte 1.3.0
k
@Dan Rammer (hamersaw) is this similar to what @Nicholas LoFaso is seeing that pods land on the same node
d
@Ketan (kumare3) that's my concern - this is now my top priority today
k
Ya
v
May be this could be related to the problem I've mentioned before about dynamic workflows?
Somehow, since it's about pods on the same node...
d
These could be related, but seem to be separate issues. I'm going to start diving into maptasks and see what I can find.
v
Cool. I'm talking with Jaine to send you the code snippet ASAP
Copy code
@workflow
def image_workflow(inputs..) -> None:

    skus = get_sku_list(...)

    models = load_model()

    prepared = prepare_map_inputs(...)

    result = map_task(
        process_images,
        metadata=TaskMetadata(retries=1),
        concurrency=2
    )(input=prepared)


@task(requests=Resources(mem='2Gi'))
def prepare_map_inputs(
    skus: List[str],
    ...
    ) -> List[MapInput]:
    
    
@task(requests=Resources(cpu='2', mem='6Gi'))
def process_images(
    input: MapInput
    ) -> bool:
This is a snippet, we had to ommit some info...
d
thanks! an no problem on ommiting, it was expected 😄
I'm having difficulty reproducing this in 1.3.0. Do you have any unique configuration in flyteadmin? For example the
task_resources
configuration option (sets default resource request) or any matchable attributes for example task level or workflow level, etc
v
Yup We have an extended_task decorator, with gcloud integration
d
Can you say a little more? Extending the flytekit
@task
?
v
Yup This extension just creates a new gcloud credentials integration, with the authentication needed for the execution
k
@Victor Gustavo da Silva Oliveira just to confirm, did you pass the task resource to the parent class. like
Copy code
class GcloudFunctionTask(PythonFunctionTask[gcloud]):    
    def __init__(self, task_config: gcloud, task_function: Callable, **kwargs):
        super(PysparkFunctionTask, self).__init__(
            task_config=task_config,
            task_type="gcloud",
            task_function=task_function,
            **kwargs,  # resource limit is in it
        )
v
Copy code
def extended_task(
    function: Callable = None, *, integrations: List[str] = None, **task_kwargs
) -> Callable:
    if function is None:
        return functools.partial(
            extended_task, integrations=integrations, **task_kwargs
        )

    if integrations is not None and not isinstance(integrations, list):
        raise ValueError("'integrations' parameter must be a list.")

    task_integrations = TaskIntegrations(integrations)

    if task_integrations.has_requests:
        task_kwargs["task_config"] = generate_task_config(task_integrations)

    @task(**task_kwargs)
    @functools.wraps(function)
    def wrapper(*args, **kwargs) -> Any:
        ...
It's like this
d
it looks like the mem is applied but no the cpu right? the tasks got 6G Mem just not 2CPU?
v
Nope Neither is working If you look in the image, it is creating the pods with 2Gi, but in the requests we've setted 6Gi
d
the
...n3-0-0
and
n3-0-1
are the maptask Pods right?
skus
is
n0
,
models
is
n1
, and
prepared
is
n2
from your example.
v
All these pods are supposed to be the map task
Sorry, you are right
The n3 pods are the maptask
n
@Dan Rammer (hamersaw) Let me know if I can be of assistance here. At a glance this doesn’t appear to be the same issue I was experiencing, but both are happening on GKE with map tasks so maybe they are related
k
@Nicholas LoFaso i would love to understand whats happening
i feel something is wrong in the setup
it could be that you are running out of disk space to start container or something
n
As far as I can tell the container starts, but then is evicted (preepmted) by k8s to make room for a “calico” pod which is a GKE system pod
We are setting CPU/Mem to have equal request/limit so k8s should know exactly how many to schedule on a given node. Not sure why it is over scheduling when using map_task but not dyanimc. I confirmed with Dan that I was setting concurrency to 25 for both dynamic and map_task so should be equivalent load to k8s API.
k
Cc @Dan Rammer (hamersaw)
k
@Victor Gustavo da Silva Oliveira I’ve tested it with different version of propeller, still can’t reproduce it. could you run a map task that maps over regular python task, and see if it can properly override the resource. just want to make sure the extended_task doesn’t have anything wrong or system config.
you could run this example.
Copy code
from typing import List, Optional
from flytekit import map_task, task, workflow, Resources, TaskMetadata


@task(requests=Resources(cpu="1", mem="800Mi"))
def my_task(should_succeed: bool) -> Optional[bool]:
    if should_succeed:
        return should_succeed
    raise ValueError("This is a failure")


@task(requests=Resources(cpu="1", mem="800Mi"))
def get_num_successes_task(my_task_results: List[Optional[bool]]) -> int:
    return len(my_task_results)


@workflow
def wf(inputs: List[bool] = [False, True, False]) -> int:
    my_task_results = map_task(
        my_task,
        metadata=TaskMetadata(retries=1),
        min_success_ratio=0.5,
        concurrency=2,
    )(should_succeed=inputs)
    num_successes = get_num_successes_task(my_task_results=my_task_results)
    return num_successes
v
@Kevin Su ok! When I've done testing I come back here!
d
@Victor Gustavo da Silva Oliveira were you able to get this solved? Is the example above working?
v
I'm trying to update Flyte to 1.5.4, but we having some issues. When we finish this task, I send you a heads up, is that all right?
d
sounds great, thanks! just want to make sure we address this.
a
@Nizar Hattab @Eyal Abbas
154 Views