https://flyte.org logo
#ask-the-community
Title
# ask-the-community
f

Frank Shen

11/15/2022, 12:14 AM
What I intent to do is to request resource overrides if the dataset size is too big.
s

Samhita Alla

11/15/2022, 6:43 AM
Would you mind letting me know the reason behind returning
Resources
? You should request for resources in the task decorator: https://docs.flyte.org/projects/cookbook/en/latest/auto/deployment/customizing_resources.html.
f

Frank Shen

11/15/2022, 5:49 PM
Hi @Samhita Alla, Because I extends a class from PythonInstanceTask.
Is it possible to request resource in the execute() method?
s

Samhita Alla

11/15/2022, 5:53 PM
You can request for resources in a PythonInstanceTask by sending
requests
as an argument: https://github.com/flyteorg/flytekit/blob/07299d0b0e8d172e33ad48d719f9014effb5d96b/flytekit/core/python_auto_container.py#L37.
f

Frank Shen

11/15/2022, 5:56 PM
That’s exactly what I am doing. But first I need to check the input dataset size to determine the resource memory usage dynamically by adding a task that returns the Resource object.
Copy code
@task
def get_xgb_task_resource_request(df: pd.DataFrame) -> Resources:
    _Gibibyte = 1073741824
    data_size_gb = df.memory_usage(index=True, deep=True).sum() // _Gibibyte
    # request more memory for the task if the dataset size is >= 2 GB, because the default task mem alloc might not be enough.
    resource_requests = Resources(mem=f'{data_size_gb}Gi') if data_size_gb >= 2 else None
    return resource_requests
Note that because I have an if statement, I cannot have it done inside the PythonInstanceTask’s constructor (init method).
Therefore I have to return a Resource obj from the get_xgb_task_resource_request() task.
I requested more resources inside the execute() method and that solved the problem.
s

Samhita Alla

11/16/2022, 4:49 AM
Good to hear that you were able to solve the problem. Have you created a new
PythonInstanceTask
and requested resources in the
execute()
method? Would you mind sharing the coding snippet?
f

Frank Shen

11/16/2022, 5:18 PM
@Samhita Alla Sure. Will this work in the execute() method? It runs without error though.
Copy code
_Gibibyte = 1073741824
        data_size_gb = (train_size + validation_size) / _Gibibyte
        """
            Dynamically request task memory in GB + 0.1 GB buffer for the task (to 3 decimals), 
            because the default task memory allocation might not be enough for big dataset
        """
        self.resources.requests = Resources(mem=f'{round(data_size_gb + 0.1, 3)}Gi')
s

Samhita Alla

11/17/2022, 5:49 AM
Yeah, I think this should work. But I’m not sure if these resources will be allocated. Have you checked the task spec?
f

Frank Shen

11/17/2022, 7:56 PM
@Samhita Alla, where and how to check task spec memory?
s

Samhita Alla

11/18/2022, 4:50 AM
On the UI?
f

Frank Shen

11/18/2022, 5:44 PM
It doesn’t work when setting resource.requests in execute() method.
I think because I tried to re-allocate resource while the task is already running. And that is not possible.
s

Samhita Alla

11/19/2022, 10:44 AM
Yeah! You’ll need to set it in the init method.
f

Frank Shen

11/21/2022, 5:45 PM
Then it won’t take calculated / dynamic values for resources at runtime which defeats the purpose.
s

Samhita Alla

11/22/2022, 4:12 AM
@Kevin Su @Eduardo Apolinario (eapolinario), how can the task resources be set dynamically?
f

Frank Shen

11/22/2022, 5:45 AM
@Samhita Alla, That’s when you define task with the decorator. I don’t think with_overrides() is available for tasks defined by PythonInstanceTask Class. Plus with_overrides() doesn’t dynamically allocate resource based on realtime resource calculation either.
s

Samhita Alla

11/22/2022, 6:03 AM
I think
with_overrides()
should work with PythonInstanceTask class. Also regarding real-time resource calculation, I’m not sure how that can be done. I’ll defer to Kevin/Eduardo to answer.
e

Eduardo Apolinario (eapolinario)

11/22/2022, 6:49 PM
@Frank Shen, you can't set the resources for a task after the task is running, but you could use a dynamic task to achieve this suggestion of setting task resources dynamically. For example:
Copy code
@task
def t(a: int):
    ...

@dynamic
def dyn(a: int):
    t(a=a).with_overrides(
        requests=Resources(mem="432Mi"),
        limits=Resources(mem="543Mi"),
    )
4 Views