Hi friends! We're launching ~100k+ tasks at the mo...
# ask-the-community
v
Hi friends! We're launching ~100k+ tasks at the moment, with a
max_parallelism
set to
500
. We're sometimes seeing large spikes of errors in propeller, with various error messages:
Copy code
Workflow[<redacted>] failed. RuntimeExecutionError: max number of system retry attempts [31/30] exhausted. Last known status message: [SystemError] failed to launch workflow [<redacted>], system error, caused by: rpc error: code = DeadlineExceeded desc = context deadline exceeded
This error causes the whole workflow to fail ^ We also see this:
Copy code
failed Execute for node. Error: EventSinkError: Error sending event, caused by [rpc error: code = DeadlineExceeded desc = context deadline exceeded]
and these warnings, but maybe that's unrelated:
Copy code
Failed to record taskEvent, error [AlreadyExists: Event already exists, caused by [rpc error: code = AlreadyExists desc = have already recorded task execution phase ABORTED (version: 0) for {resource_type:TASK project:\"<project>\" domain:\"production\" name:\"<workflow>\" version:\"6KLmT5WBECbwj7w_fSLTPw==\"  node_id:\"n0-0-dn2121\" execution_id:\u003cproject:\"<project>\" domain:\"<domain>\" name:\"f0a0d716e3d924aa7a1e\" \u003e  1 {} [] 0}]]. Trying to record state: ABORTED. Ignoring this error!"
Does anyone know what could be the issue? cc @Thomas Newton
t
I have a theory for what is causing these. There are currently 2 errors w see occasionally that could result in flyte thinking that a pod has failed when in fact its a flaky failure and the pod is still likely to succeed. This results in flyte trying to transition a node from FAILED to RUNNING which fails. 1. Pod not found errors. I believe the kube API server is being overloaded and we need to adjust the rate limiting on flyte propeller. 2. Secret sync errors https://github.com/flyteorg/flyte/issues/4309
k
i think either you do not have inject-finalizer, but it also seems that you are unable to write to admin
check admin memory etc
t
I'm pretty sure plenty of stuff is making it to admin but likely not all. Please could you explain what inject-finalizer does and why its likely to be helpful?
Currently we just have one flyteadmin pod but it never gets anywhere near its resource requests 1 CPU core 1GiB of memory. Does flyteadmin have much parallelism within a single process? Probably we should create multiple smaller replicas as was mentioned on https://docs.flyte.org/en/latest/deployment/configuration/performance.html#scaling-out-flyteadmin
k
hmm not using much memory seems odd
but what is happening maybe is that the database writes are locked
but i need to see more
actually there is some performance stuff already done by @Dan Rammer (hamersaw) and it is available
@Victor Delépine / @Thomas Newton would also love to see the workflow sample too
maybe you are running over etcD limit and offloading maybe a good idea
t
It's largely a dynamic workflow using the structure you recommend and launching lots of small subworkflows. It's also making use of caching inside those sub workflows. I can get a sample of that a bit when I'm at a keyboard. Also it's probably worth mentioning that this is a multi-cluster setup. Flyteadmin and flytecatalog are in a different k8s cluster.
v
Didn’t we previously see some « etcD request too large » errors also @Thomas Newton ?
t
Yes but, not right now. I don't think that was related.
Actually I do see some etcD request too large errors that are definitely relevant.
Copy code
{"json":{"exec_id":"anbwtd2v2x5p8px67rzt","ns":"nerf-test-production","routine":"worker-672"},"level":"error","msg":"Failed to update workflow. Error [etcdserver: request is too large]","ts":"2023-11-09T11:37:18Z"}
E1109 11:37:18.581775       1 workers.go:103] error syncing '<project-domain>/anbwtd2v2x5p8px67rzt': etcdserver: request is too large
From the metrics etcd does not look particularly healthy
d
Tom/Victor: just in case you need it: more details on the offloading of the static portions of CRDs to blob storage, which could relieve pressure on etcd: https://docs.flyte.org/en/latest/deployment/configuration/performance.html#offloading-static-workflow-information-from-crd
v
Thanks! Does that help with Dynamic workflows as well? we found that there wasn't a huge difference in size. The status part of the CRD was quite large as well
t
I don't have strong evidence but it seems to me like our problems start if there is a high error rate on the workflow. My theory is that as things fail the CRD gets much bigger because lots of tracebacks get added to the CRD. I assume tracebacks would not be offloaded.
For reference the workflow structure is like this
k
the tracebacks will be cut, the moment one is done, but usually tracebacks mean the workflow is going to fail
t
This is a workflow that fan-outs out to lots of branches and we expect some branches to fail. So we use
@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
. Maybe I should have created a better example rather that giving you back the example you gave us 😅
This version is more representative
If I fetch one of the big CRDs using
kubectl get flyteworkflows f0319e66fb12740eaa4d -o yaml > etcd_sync_failure.yaml
the file size is
1.9MB
. If I use a python script to delete all the
message
fields the size is only
0.38MB
. I think its quite likely that tracebacks in the CRDs are pushing them over the edge to be too big.
This use-case is a bit crazy I know but it is the easiest way for other teams at our company to go from python function that works somewhat reliably to running it at really big scale and getting some valuable results. Ideally they would not need to learn about any distributed compute framework. It seems so me that the size of CRDs is definitely the biggest scalability issue with running flyte workflows like this with really big fan-out. As far as I understand the
useOffloadedWorkflowClosure
option won'h help us much because the majority of the CRD size is non-static given our use of dynamic workflows. Is there any possibility of improving this? For example I tried hacking something together to preventing the tracebacks from getting into the CRD. That helped a bit but the resulting CRD was still much larger than one where everything succeeded and it probably only allows maybe 4X greater scalability (based on the CRD size numbers I got above). Would it be possible to store the CRD in a more compressed format? My best guess is that currently the CRD data is stored as YAML or JSON but if it was stored as a protobuf probably the size would be much smaller.
k
The option to store crd in etcd is json. But offloading should fix a lot of it.
Sadly we will have to dig deeper into this and at the moment a little busy. Will get to it hopefully sometime
t
I will make sure to test offloading tomorrow
I actually just ran the test with the offload enabled. Unfortunately it doesn't seem to have helped much 🙁. From a quick look it seems like the only difference is that the
spec
field is
null
when using offload but in this case that is quite a small portion of the total CRD. I think we will need to come up with a solution for this. Possibly we could make a contribution to flyte. Would you mind explaining why etcd is used for storing this. What stuff updates the CRD? Is it just flytepropeller or does flyteadmin also make updates? Would it be feasible to use something like a k8s persistent volume claim? Its quite likely I will experiment with some of my ideas so if you can provide any insight on whether they are feasible that would be much appreciated 🙂.
k
I will need to understand the workflow and there are some Cheeky ways to improve the compression
Also we plan to compress further - cc @Haytham Abuelfutuh / @Dan Rammer (hamersaw)
@Thomas Newton i was looking into it some today, this is the suggestion i had given some time ago - haha so couple question • what is the chunk size? • what is the size of
l
is map task ideal? map tasks are designed to automatically compress the storage usage in etcd
@Thomas Newton i think we should hop on a call
t
Thanks for looking into it. I think a call would be useful. I'm on London time (UTC) so will need to arrange a suitable time. To give you more detail on the workflow structure I've taken our real workflow and cut out parts that can't be shared publicly. To give you some real numbers (for
fdf9c218e2eaa400083d
) the first layer of fannout is spawned tasks 2000 tasks. The second layer of fannout spawned 2124 copies of the
core_workflow
. We've been able to run the same workflow structure successfully with 50,000 copies of the
core_workflow
if the
core_workflow
always succeeds.
k
@Thomas Newton are you around now for a bit?
t
Yes
k
let me jump on a call
t
Thanks for discussing with me 🙌. I think the most promising sounding solutions are: 1. Ensure that the workflow CRD still collapses after failures. Additionally having an option to prevent tracebacks getting stored in the CRD. 2. Enable launching other flyte entities from map tasks. 3. Add more layers to the fanout and put launchplan layers between them. The first thing I'm going to do is continue testing with map tasks. I think this could help a lot thanks to the more compressed CRD state but I think it does have the disadvantage that stragglers in each task will block completed segments from getting started on the next task of the core workflow.
I ended up making a couple of alterations to flytepropeller: https://github.com/Tom-Newton/flyte/pull/2 and https://github.com/Tom-Newton/flyte/pull/3. These implement point 1 and they seem to be very effective. I'm hoping we could upstream something along these lines if we put them behind a feature flag.
I think those changes resolve the etcd size limit problems, but I think that is actually a different problem, to the
context deadline exceeded
errors we saw first. I'm reasonably confident these
context deadline exceeded
errors are coming from networking issues between our control plane cluster and the data plane cluster. When I run the same workflow all in the same cluster as the control plane I haven't been able to reproduce the issue. I'm hoping that scaling up our nginx ingress controller will resolve the context deadline exceeded errors.
Thanks for looking at my PRs @Dan Rammer (hamersaw). If we think its somewhat sensible then I can try to make versions that could by up-streamed, with feature flags etc.
d
@Thomas Newton absolutely. I think this is more than sensible, tbh I'm wondering if we can update the defaults to not store this information in the CR as long as it's correctly persisted through flyteadmin. The CR is only inspected by some explicit power users under the weirdest scenarios. It might be something that can be enabled on a per-execution base maybe.
i'm speaking for a lot of flyte users, but really appreciate the effort here. some of the stuff you've hacked on is certainly in the weeds 🙌