bored-football-75597
01/30/2024, 12:58 PMasyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
in our implementation to update the Ax model when a trial finishes. However, pending
in asyncio.wait
is expected to be a set of asyncio tasks (https://docs.python.org/3/library/asyncio-task.html#id4), not futures/awaitables. Our initial implementation worked fine when executed locally, but when executed remotely (sandbox) task.results()
returns a dictionary with "o0"
, "o1"
as keys and not a tuple of values, what it actually should be. We used a workaround to circumvent this problem, see:
...
pending = set()
while True:
try:
parameters, trial_index = ax_client.get_next_trial()
task = asyncio.create_task(
evaluate_model(
parameters=prepare_trial_config(
parameters,
training_config,
),
trial_index=trial_index,
),
name=f"Trial {trial_index}",
)
pending.add(task)
except (MaxParallelismReachedException, DataRequiredError):
# Maximum parallelism reached, wait for a task to complete.
done, pending = await asyncio.wait(
pending,
return_when=asyncio.FIRST_COMPLETED,
)
for task in done:
try:
result = task.result()
# Bugfix: flyte behaves differently when executed locally vs
# remotely
if isinstance(result, dict) and {*result.keys()} == {"o0", "o1"}:
trial_index = result["o1"]
raw_data = result["o0"]
else:
raw_data, trial_index = result
ax_client.complete_trial(
trial_index=trial_index,
raw_data=raw_data,
)
except Exception as e:
logging.error(f"Trial {trial_index} failed with error {str(e)}")
continue
...
The second problem that we've encountered, however, seems to be a little bit more complicated and we're stuck: when trying to execute this eager workflow/task on a remote cluster, flyte/kubernetes attempts to create a pod for this task but fails immediately without producing any logs. Our assumption is that the pod cannot be created, because we're using a custom image that exceeds the default resource limits:
resources:
limits:
- name: CPU
value: 200m
- name: MEMORY
value: 200Mi
requests:
- name: CPU
value: 200m
- name: MEMORY
value: 200Mi
We've also tried to set the tolerations and resource-requests for the eager workflow (according to the documentation **kwargs
can be any additional task argument), see
@eager(
remote=FlyteRemote(
config=Config.auto(
os.path.join(
os.path.dirname(__file__),
"flyte_remote_config.yaml",
)
),
default_project="...",
default_domain="...",
data_upload_location="abfs://.../flyte/raw-data/",
task_config=Pod(pod_spec=V1PodSpec(containers=[], tolerations=...)),
requests=Resources(cpu="4", mem="8Gi", ephemeral_storage="8Gi"),
)
)
but this had no effect at all: the eager workflow/task is still executed as a python task, not a sidecar task, as it is the case when setting the tolerations etc. and we're still observing the same resource requests in kubernets. We've also tried the same settings with a conventional task and the task executed without any issues.
Any help would be much appreciated. Thank you and all the best.tall-lock-23197
broad-monitor-993
01/31/2024, 2:29 PMbroad-monitor-993
01/31/2024, 2:31 PMbored-football-75597
02/05/2024, 4:12 PMimport asyncio
from flytekit import task
from flytekit.experimental import eager
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
@task
def square_dual(x: int) -> tuple[int, int]:
print(f"Computing square of {x}")
return x**2, x * 2
@eager(
remote=FlyteRemote(
config=Config.for_sandbox(),
default_project="flytesnacks",
default_domain="development",
)
)
async def eager_workflow_with_asyncio_tasks(x: int) -> tuple[int, int]:
pending = {asyncio.create_task(square_dual(x=x))}
done, pending = await asyncio.wait(pending)
assert not pending
task = done.pop()
result = task.result()
print(f"{result=}")
if isinstance(result, dict) and {*result.keys()} == {"o0", "o1"}:
print("result has wrong type")
result = result["o0"], result["o1"]
return result
Output when running locally:
Running Execution on local.
Computing square of 3
result=DefaultNamedTupleOutput(o0=9, o1=6)
DefaultNamedTupleOutput(o0=9, o1=6)
Output when running remotely (sandbox):
{"asctime": "2024-02-05 15:39:12,944", "name": "flytekit", "levelname": "WARNING", "message": "Plugin 'flytekit-deck-standard' is not installed. To display time line, install the plugin in the image."}
Getting <s3://my-s3-bucket/metadata/propeller/flytesnacks-development-asfspxr4nrt2jj6n2w4t/hpsearchworkflowstesttaskeagerworkflowwithasynciotasks/data/inputs.pb> to /tmp/flyte-sh_gj73f/sandbox/local_flytekit/inputs.pb
result={'o0': 9, 'o1': 6}
result has wrong type
We've also noticed that this error only occurs with (named-)tuples.
Regarding the second problem: we've increased the resource limits in the cluster; it seems to be starting, but get's killed after some time... we're still not sure whether this is because of the limits or whether something's generally misconfigured; the logs don't show any errors... seem truncated, or something killed the pod forcefully.
However, our main problem is that we need to set the tolerations for the task that is executing the eager workflow (e.g., to execute the task on a CPU or GPU worker). But setting the tolerations and resource requests hadn't had an effect.
Can we actually set the tolerations and resource requests for eager workflows or is this something that is planned for the future? Also, would you maybe know how we could execute an eager workflow on a GPU node, in case that tolerations and resource requests are ignored?
On a side note: we're also evaluating ray-tune and installed the ray plugin for Flyte, but there it also seems that we cannot configure the tolerations and resource requests for the workers (etc.): the set of possible arguments for RayJob and affected dataclass configurations seems to be rather minimal. Any help in that regard would also be much appreciated, or maybe you could refer us to somebody that has some insight into the ray plugin.
Thank you and all the best.broad-monitor-993
02/05/2024, 5:05 PMasyncio
API will probably not work (as you’ve discovered)
Re: second problem we’ll need some infra support, not exactly sure how to debug those @average-finland-92144average-finland-92144
02/05/2024, 5:27 PMbored-accountant-47063
02/06/2024, 10:40 AM...
tolerations = [
V1Toleration(
effect="NoSchedule",
key="<http://kubernetes.azure.com/scalesetpriority|kubernetes.azure.com/scalesetpriority>",
operator="Equal",
value="spot",
),
V1Toleration(
effect="NoSchedule", key="nodetype", operator="Equal", value="NC6s_v3"
),
]
@task(
task_config=Pod(pod_spec=V1PodSpec(containers=[], tolerations=tolerations)),
requests=Resources(cpu="100m", mem="100Mi", gpu="1")
)
def train_model(...):
...
As we are anyway defining which resources to use (depending on the model type to train) we haven't even defined default resource-tolerations yet (in plugins->k8s->resource-tolerations). Potentially I would like to define a dedicated worker pool (non GPU) as default but haven't figured out how to.
plugins:
k8s:
resource-tolerations:
- key: "nodetype"
operator: "Equal"
value: "Standard_B8ms"
effect: "NoExecute"
The default task resources are set to:
task_resources:
defaults:
cpu: 500m
memory: 500Mi
We want to use a similar approach (deciding on the GPU size depending on the model) for eager workflows and also ray. If there is a better way to define resources (for various GPU sizes and taints) and not defining them directly on a task basis I would be glad for any input.
Thanks you and BRbored-football-75597
02/06/2024, 11:14 AMbored-accountant-47063
02/12/2024, 10:10 AMbored-football-75597
02/12/2024, 1:11 PMaverage-finland-92144
02/12/2024, 4:53 PMlimits
from the default resource configuration. It invokes a K8s CPU throttling mechanism that causes more harm than it helps. Only leave requests
b. Change the Toleration effect from NoExecute
to NoSchedule
. I think the first one only affects running Pods
2. How to define a default resource toleration config. I'd suggest considering the PodTemplate
resource. You can have multiple of them and call that PodTemplate from the task definition so you control the toleration set in the Pod depending on the model to be trained (see docs, it would be option #2)
3. How to do something similar but for GPUs. I think the closest is the use of GPU Selectors, a feature recently added to Flyte.
◦ You define the labels and tolerations that correspond to what you have configured in your node pools at the platform level
◦ From the task definition you only request the device that your particular model needs
◦ Propeller will inject the paired toleration and nodeSelector into the Task Pods
◦ See docs and PR
I'm happy to hop on a call too if it helpsbored-accountant-47063
02/15/2024, 11:42 AMtask_resources:
defaults:
cpu: 500m
memory: 500Mi
limits:
cpu: 32
gpu: 4
memory: 120Gi
storage: 120Gi
Regarding 1.b - I will test this out if this has an effect on the behavior
Regarding 2 and 3 - I will check this out and let you know about our results
Thanks and BRaverage-finland-92144
03/21/2024, 5:09 PM