Hi all! We're currently working on an eager workf...
# flyte-support
b
Hi all! We're currently working on an eager workflow that utilizes Ax (https://github.com/facebook/Ax) for hyper-parameter optimization and that runs trials as concurrent tasks. Unfortunately we're running into some problems: The first problem that we've encountered: we're using
asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
in our implementation to update the Ax model when a trial finishes. However,
pending
in
asyncio.wait
is expected to be a set of asyncio tasks (https://docs.python.org/3/library/asyncio-task.html#id4), not futures/awaitables. Our initial implementation worked fine when executed locally, but when executed remotely (sandbox)
task.results()
returns a dictionary with
"o0"
,
"o1"
as keys and not a tuple of values, what it actually should be. We used a workaround to circumvent this problem, see:
Copy code
...

    pending = set()

    while True:
        try:
            parameters, trial_index = ax_client.get_next_trial()
            task = asyncio.create_task(
                evaluate_model(
                    parameters=prepare_trial_config(
                        parameters,
                        training_config,
                    ),
                    trial_index=trial_index,
                ),
                name=f"Trial {trial_index}",
            )
            pending.add(task)
        except (MaxParallelismReachedException, DataRequiredError):
            # Maximum parallelism reached, wait for a task to complete.
            done, pending = await asyncio.wait(
                pending,
                return_when=asyncio.FIRST_COMPLETED,
            )
            for task in done:
                try:
                    result = task.result()
                    # Bugfix: flyte behaves differently when executed locally vs
                    #   remotely
                    if isinstance(result, dict) and {*result.keys()} == {"o0", "o1"}:
                        trial_index = result["o1"]
                        raw_data = result["o0"]
                    else:
                        raw_data, trial_index = result
                    ax_client.complete_trial(
                        trial_index=trial_index,
                        raw_data=raw_data,
                    )
                except Exception as e:
                    logging.error(f"Trial {trial_index} failed with error {str(e)}")
            continue

    ...
The second problem that we've encountered, however, seems to be a little bit more complicated and we're stuck: when trying to execute this eager workflow/task on a remote cluster, flyte/kubernetes attempts to create a pod for this task but fails immediately without producing any logs. Our assumption is that the pod cannot be created, because we're using a custom image that exceeds the default resource limits:
Copy code
resources:
        limits:
          - name: CPU
            value: 200m
          - name: MEMORY
            value: 200Mi
        requests:
          - name: CPU
            value: 200m
          - name: MEMORY
            value: 200Mi
We've also tried to set the tolerations and resource-requests for the eager workflow (according to the documentation
**kwargs
can be any additional task argument), see
Copy code
@eager(
    remote=FlyteRemote(
        config=Config.auto(
            os.path.join(
                os.path.dirname(__file__),
                "flyte_remote_config.yaml",
            )
        ),
        default_project="...",
        default_domain="...",
        data_upload_location="abfs://.../flyte/raw-data/",
        task_config=Pod(pod_spec=V1PodSpec(containers=[], tolerations=...)),
        requests=Resources(cpu="4", mem="8Gi", ephemeral_storage="8Gi"),
    )
)
but this had no effect at all: the eager workflow/task is still executed as a python task, not a sidecar task, as it is the case when setting the tolerations etc. and we're still observing the same resource requests in kubernets. We've also tried the same settings with a conventional task and the task executed without any issues. Any help would be much appreciated. Thank you and all the best.
👍 1
t
cc @broad-monitor-993
b
For the first problem, would you mind creating a minimally reproducible code snippet? That’ll help us reproduce and debug
For the second problem, can you increase the default resource limit in the Flyte cluster?
b
Hi Niels, thank you for your reply. The code snippet:
Copy code
import asyncio
from flytekit import task
from flytekit.experimental import eager
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config


@task
def square_dual(x: int) -> tuple[int, int]:
    print(f"Computing square of {x}")
    return x**2, x * 2


@eager(
    remote=FlyteRemote(
        config=Config.for_sandbox(),
        default_project="flytesnacks",
        default_domain="development",
    )
)
async def eager_workflow_with_asyncio_tasks(x: int) -> tuple[int, int]:

    pending = {asyncio.create_task(square_dual(x=x))}
    done, pending = await asyncio.wait(pending)
    assert not pending
    task = done.pop()
    result = task.result()
    print(f"{result=}")
    if isinstance(result, dict) and {*result.keys()} == {"o0", "o1"}:
        print("result has wrong type")
        result = result["o0"], result["o1"]
    return result
Output when running locally:
Copy code
Running Execution on local.
Computing square of 3
result=DefaultNamedTupleOutput(o0=9, o1=6)
DefaultNamedTupleOutput(o0=9, o1=6)
Output when running remotely (sandbox):
Copy code
{"asctime": "2024-02-05 15:39:12,944", "name": "flytekit", "levelname": "WARNING", "message": "Plugin 'flytekit-deck-standard' is not installed. To display time line, install the plugin in the image."}
Getting <s3://my-s3-bucket/metadata/propeller/flytesnacks-development-asfspxr4nrt2jj6n2w4t/hpsearchworkflowstesttaskeagerworkflowwithasynciotasks/data/inputs.pb> to /tmp/flyte-sh_gj73f/sandbox/local_flytekit/inputs.pb
result={'o0': 9, 'o1': 6}
result has wrong type
We've also noticed that this error only occurs with (named-)tuples. Regarding the second problem: we've increased the resource limits in the cluster; it seems to be starting, but get's killed after some time... we're still not sure whether this is because of the limits or whether something's generally misconfigured; the logs don't show any errors... seem truncated, or something killed the pod forcefully. However, our main problem is that we need to set the tolerations for the task that is executing the eager workflow (e.g., to execute the task on a CPU or GPU worker). But setting the tolerations and resource requests hadn't had an effect. Can we actually set the tolerations and resource requests for eager workflows or is this something that is planned for the future? Also, would you maybe know how we could execute an eager workflow on a GPU node, in case that tolerations and resource requests are ignored? On a side note: we're also evaluating ray-tune and installed the ray plugin for Flyte, but there it also seems that we cannot configure the tolerations and resource requests for the workers (etc.): the set of possible arguments for RayJob and affected dataclass configurations seems to be rather minimal. Any help in that regard would also be much appreciated, or maybe you could refer us to somebody that has some insight into the ray plugin. Thank you and all the best.
b
Great, thanks! lemme try to repro. For the first problem, would you mind creating a bug report issue? https://github.com/flyteorg/flyte/issues/new?assignees=&amp;labels=bug%2Cuntriaged&amp;projects=&amp;template=bug_report.yaml&amp;title=%5BBUG%5D+ Basically you can just copy-paste the code snippet and provide the context you’ve already articulated on the thread. As a caveat, eager workflows is still an experimental feature, so many parts of the python
asyncio
API will probably not work (as you’ve discovered) Re: second problem we’ll need some infra support, not exactly sure how to debug those @average-finland-92144
a
@bored-football-75597 what Helm chart did you use to install Flyte? From what you've shared, it seems to me that the task resources config could be a bit off
b
@average-finland-92144 - we are using the helm binary chart but defining the config using externalConfigMap. We are defining various node pools with taints for different GPU sizes + spot instances. We typically annotate task in flyte using tolerations. For example:
Copy code
...
tolerations = [
    V1Toleration(
        effect="NoSchedule",
        key="<http://kubernetes.azure.com/scalesetpriority|kubernetes.azure.com/scalesetpriority>",
        operator="Equal",
        value="spot",
    ),
    V1Toleration(
        effect="NoSchedule", key="nodetype", operator="Equal", value="NC6s_v3"
    ),
]

@task(
    task_config=Pod(pod_spec=V1PodSpec(containers=[], tolerations=tolerations)),
    requests=Resources(cpu="100m", mem="100Mi", gpu="1")
)
def train_model(...):
...
As we are anyway defining which resources to use (depending on the model type to train) we haven't even defined default resource-tolerations yet (in plugins->k8s->resource-tolerations). Potentially I would like to define a dedicated worker pool (non GPU) as default but haven't figured out how to.
Copy code
plugins:
  k8s:
    resource-tolerations:      
      - key: "nodetype"
        operator: "Equal"
        value: "Standard_B8ms"
        effect: "NoExecute"
The default task resources are set to:
Copy code
task_resources:
  defaults:
    cpu: 500m
    memory: 500Mi
We want to use a similar approach (deciding on the GPU size depending on the model) for eager workflows and also ray. If there is a better way to define resources (for various GPU sizes and taints) and not defining them directly on a task basis I would be glad for any input. Thanks you and BR
b
Already opened an issue regarding the return-value discrepancy between local and remote executions: https://github.com/flyteorg/flyte/issues/4841
b
@average-finland-92144 Wanted to ask regarding my previous questions in this thread for defining resource tolerations and if the mentioned approach is valid. Any hints or recommendations would be appreciated. Thanks very much and BR.
b
@average-finland-92144... Clemens is a colleague of mine and he's far more knowledgeable when it comes to our Kubernetes setup :)
a
@bored-accountant-47063 @bored-football-75597 thanks for your patience. I see three main discussion points here: 1. The actual workflow execution that is failing without even creating the Pod. I'd suggest here to: a. Remove the
limits
from the default resource configuration. It invokes a K8s CPU throttling mechanism that causes more harm than it helps. Only leave
requests
b. Change the Toleration effect from
NoExecute
to
NoSchedule
. I think the first one only affects running Pods 2. How to define a default resource toleration config. I'd suggest considering the
PodTemplate
resource. You can have multiple of them and call that PodTemplate from the task definition so you control the toleration set in the Pod depending on the model to be trained (see docs, it would be option #2) 3. How to do something similar but for GPUs. I think the closest is the use of GPU Selectors, a feature recently added to Flyte. ◦ You define the labels and tolerations that correspond to what you have configured in your node pools at the platform level ◦ From the task definition you only request the device that your particular model needs ◦ Propeller will inject the paired toleration and nodeSelector into the Task Pods ◦ See docs and PR I'm happy to hop on a call too if it helps
b
@average-finland-92144 thanks for your response. Regarding 1.a - the actual limits in the default resource configuration are set to an extremely high value - otherwise default limits are applied
Copy code
task_resources:
  defaults:
    cpu: 500m
    memory: 500Mi
  limits:
    cpu: 32
    gpu: 4
    memory: 120Gi
    storage: 120Gi
Regarding 1.b - I will test this out if this has an effect on the behavior Regarding 2 and 3 - I will check this out and let you know about our results Thanks and BR
a
@bored-accountant-47063 wondering if this is still an issue in your environment?