https://flyte.org logo
#ask-the-community
Title
# ask-the-community
f

Frank Shen

03/17/2023, 6:12 PM
Hello, I have been applying .with_overrides() to workflows to increase the resource memory limit and it does seem to work. It doesn’t seem to work when I have a workflow has at least one task defined as subclass of PythonInstanceTask. Say I have a workflow with two tasks defined with @task decorator. The workflow failed the first task because of OOMKilled error. Then I apply .with_overrides(requests=Resources(cpu=“1”, mem=“25Gi”)) to the workflow, and both tasks will succeed. Then I change the second task to be a subclass of PythonInstanceTask and run it with the same .with_overrides(requests=Resources(cpu=“1”, mem=“25Gi”)) applied. Now the first task will fail with OOMKilled error again. Do you have any idea why? This lead me to think maybe .with_overrides() doesn’t work when there is a task defined as a subclass of PythonInstanceTask. Thanks
k

Kevin Su

03/17/2023, 6:15 PM
is subclass of PythonInstanceTask a custom task internally?
f

Frank Shen

03/17/2023, 6:15 PM
Yes. class XGBTrainTask(PythonInstanceTask[XGBParams]):
I created the subclass
@Kevin Su
Yes,
Copy code
def __init__(
        self,
        bucket: str,
        dataset_type: typing.Type[StructuredDataset],
        config: Optional[XGBParams] = None,
        **kwargs,
    ):
        """
        A task that trains a XGBoost model.
        Args:
            label_col: name of the label or target column
            dataset_type: Type of the dataset, supported type is FlyteSchema.
            config: Configuration for the task. Contains the params used in the model training
        Returns:
            model_loc: The trained model's location in string presentation.
            evaluation_result: The evaluation result against the validation dataset.
        """
        self._config = config
        self._dataset_type = dataset_type
        self._bucket = bucket

        inputs = {
            self._TEAM: str,
            self._PROJECT: str,
            self._VERSION: str,
            self._TRAIN_ARG: dataset_type,
            self._VALIDATION_ARG: dataset_type,
            self._PARAMS_ARG: XGBParams,
            self._LABEL_COL: str,
            self._USE_RAY: bool,
            self._NUM_CPUS: int
        }

        outputs = {
            self._OUTPUT_MODEL: str,
            self._OUTPUT_EVAL_RESULT: Dict[str, Dict[str, List[float]]],
        }

        super(XGBTrainTask, self).__init__(
            name=f'{self._TASK_TYPE}',
            task_type=self._TASK_TYPE,
            task_config=config,
            interface=Interface(inputs=inputs, outputs=outputs),
            requests = Resources(cpu='4'),
            **kwargs,
        )
k

Kevin Su

03/17/2023, 6:24 PM
looking
f

Frank Shen

03/17/2023, 6:37 PM
Thanks @Kevin Su!
k

Kevin Su

03/17/2023, 8:03 PM
@Frank Shen To clarify, you are overriding the sub workflow resource, and it doesn’t work if there is a PythonInstanceTask inside it?
def subwf():
t1()
t2()
def  wf():
subwf().with_override()
f

Frank Shen

03/17/2023, 8:55 PM
yes, @Kevin Su
k

Kevin Su

03/20/2023, 11:29 PM
@Frank Shen sorry for the late reply. I will deep dive to it tomorrow, and get back to you. mind creating a issue here [flyte-bug]
f

Frank Shen

03/23/2023, 7:32 PM
Hi @Kevin Su, I’ve created the issue. https://github.com/flyteorg/flyte/issues/3525 I was on PTO, hans the delay.
k

Kevin Su

03/23/2023, 7:34 PM
no worries, thank you so much.
f

Frank Shen

03/23/2023, 7:35 PM
YW
75 Views