Enabling task cache for Map tasks throws the erro...
# ask-the-community
v
Enabling task cache for Map tasks throws the error in 🧵 .. Is this an issue with my config or a real bug? Thanks 🙏
My map task config
Copy code
@flytekit.task(
    limits=flytekit.Resources(cpu="1", gpu="1", mem="70Gi"),
    requests=flytekit.Resources(cpu="1", gpu="1", mem="70Gi"),
    retries=1,
    task_config=Pod(
        pod_spec=V1PodSpec(
            containers=[
                V1Container(name="primary"),
            ],
            node_selector={"<http://l5.lyft.com/pool|l5.lyft.com/pool>": "eks-pdx-pool-gpu"},
            tolerations=[
                V1Toleration(effect="NoSchedule", key="reserved", operator="Equal", value="gpu"),
            ],
        ),
        primary_container_name="primary",
    ),
)
the above works as expected..
If i include cache, then it crashes with below error
Copy code
@flytekit.task(
    cache=True,
    cache_version="0.0.0",
    limits=flytekit.Resources(cpu="1", gpu="1", mem="70Gi"),
    requests=flytekit.Resources(cpu="1", gpu="1", mem="70Gi"),
    retries=1,
    task_config=Pod(
        pod_spec=V1PodSpec(
            containers=[
                V1Container(name="primary"),
            ],
            node_selector={"<http://l5.lyft.com/pool|l5.lyft.com/pool>": "eks-pdx-pool-gpu"},
            tolerations=[
                V1Toleration(effect="NoSchedule", key="reserved", operator="Equal", value="gpu"),
            ],
        ),
        primary_container_name="primary",
    ),
)
Copy code
Workflow[avperceptionworkflows:dev:src.perception.scene_workflows.ground_truth.ground_truth_offline_workflows.GroundTruthOfflinePCPWorkFlow] failed. RuntimeExecutionError: max number of system retry attempts [51/50] exhausted. Last known status message: failed at Node[n1]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [k8s-array]: panic when executing a plugin [k8s-array]. Stack: [goroutine 982 [running]:
runtime/debug.Stack()
	/usr/local/go/src/runtime/debug/stack.go:24 +0x65
<http://github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1.1()|github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1.1()>
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:375 +0xfe
panic({0x1f45600, 0x395a540})
	/usr/local/go/src/runtime/panic.go:838 +0x207
<http://github.com/flyteorg/flytestdlib/bitarray.(*BitSet).IsSet(...)|github.com/flyteorg/flytestdlib/bitarray.(*BitSet).IsSet(...)>
	/go/pkg/mod/github.com/flyteorg/flytestdlib@v1.0.4/bitarray/bitset.go:33
<http://github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core.InitializeExternalResources({0x279cfd0|github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core.InitializeExternalResources({0x279cfd0>, 0xc07cb88f60}, {0x27a88c0?, 0xc0887b6420?}, 0xc0d58fafc0, 0x23d8110)
	/go/pkg/mod/github.com/flyteorg/flyteplugins@v1.0.8/go/tasks/plugins/array/core/metadata.go:33 +0x1e1
<http://github.com/flyteorg/flyteplugins/go/tasks/plugins/array/k8s.Executor.Handle({{0x7f1fa798ba18|github.com/flyteorg/flyteplugins/go/tasks/plugins/array/k8s.Executor.Handle({{0x7f1fa798ba18>, 0xc000b34a80}, {{0x278ff70, 0xc00054a8f0}}, {{0x278ff70, 0xc00054aa50}}}, {0x279cfd0, 0xc07cb88f60}, {0x27a88c0, 0xc0887b6420})
	/go/pkg/mod/github.com/flyteorg/flyteplugins@v1.0.8/go/tasks/plugins/array/k8s/executor.go:94 +0x225
<http://github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1(0x0|github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1(0x0>?, {0x279cfd0, 0xc07cb88d20}, {0x279f4b8?, 0xc000d17230?}, 0x0?)
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:382 +0x178
<http://github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin({{0x279d318|github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin({{0x279d318>, 0xc00180e648}, {0x278ab08, 0xc000baea40}, 0xc000e76fc0, 0xc000e77050, 0xc000e77080, {0x279ea38, 0xc044cc7cc0}, 0xc000bf2580, ...}, ...)
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:384 +0x9a
<http://github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.Handle({{0x279d318|github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.Handle({{0x279d318>, 0xc00180e648}, {0x278ab08, 0xc000baea40}, 0xc000e76fc0, 0xc000e77050, 0xc000e77080, {0x279ea38, 0xc044cc7cc0}, 0xc000bf2580, ...}, ...)
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:616 +0x182b
<http://github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic.dynamicNodeTaskNodeHandler.handleParentNode({{0x27a0368|github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic.dynamicNodeTaskNodeHandler.handleParentNode({{0x27a0368>, 0xc001d6c1a0}, {{0xc000d1fbe0, {{...}, 0x0}, {0xc0012460c0, 0x4, 0x4}}, {0xc000d1fc00, {{...}, ...}, ...}, ...}, ...}, ...)
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic/handler.go:70 +0xd8
<http://github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic.dynamicNodeTaskNodeHandler.Handle({{0x27a0368|github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic.dynamicNodeTaskNodeHandler.Handle({{0x27a0368>, 0xc001d6c1a0}, {{0xc000d1fbe0, {{...}, 0x0}, {0xc0012460c0, 0x4, 0x4}}, {0xc000d1fc00, {{...}, ...}, ...}, ...}, ...}, ...)
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic/handler.go:220 +0x9d0
<http://github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).execute(0xc001aaa540|github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).execute(0xc001aaa540>, {0x279cfd0, 0xc07cb88690}, {0x279e8b8, 0xc000c2c000}, 0xc063448d80, {0x27b12b8?, 0xc0641ae0d0?})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:382 +0x157
<http://github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleQueuedOrRunningNode(0xc001aaa540|github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleQueuedOrRunningNode(0xc001aaa540>, {0x279cfd0, 0xc07cb88690}, 0xc063448d80, {0x279e8b8?, 0xc000c2c000?})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:512 +0x227
<http://github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleNode(0xc001aaa540|github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleNode(0xc001aaa540>, {0x279cfd0, 0xc07cb88690}, {0x2783470, 0xc0999a5400}, 0xc063448d80, {0x279e8b8?, 0xc000c2c000})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:736 +0x3c5
<http://github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc001aaa540|github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc001aaa540>, {0x279cfd0, 0xc07cb88240}, {0x27ac5a8, 0xc0a5113e50}, {0x2783470, 0xc0999a5400}, {0x2783498?, 0xc0999a5400?}, {0x27a99d0, ...})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:934 +0x705
<http://github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x22fdfc1|github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x22fdfc1>?, {0x279cfd0, 0xc07cb88240}, {0x27ac5a8, 0xc0a5113e50}, {0x2783470, 0xc0999a5400?}, {0x2783498?, 0xc0999a5400}, {0x27a99d0, ...})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:774 +0x3c5
<http://github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc001aaa540|github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc001aaa540>, {0x279cfd0, 0xc07cb88240}, {0x27ac5a8, 0xc0a5113e50}, {0x2783470, 0xc0999a5400}, {0x2783498?, 0xc0999a5400?}, {0x27a99d0, ...})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:941 +0x935
<http://github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x22fdfc1|github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x22fdfc1>?, {0x279cfd0, 0xc07cb88240}, {0x27ac5a8, 0xc0a5113e50}, {0x2783470, 0xc0999a5400?}, {0x2783498?, 0xc0999a5400}, {0x27a99d0, ...})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:774 +0x3c5
<http://github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc001aaa540|github.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc001aaa540>, {0x279cfd0, 0xc07cb88240}, {0x27ac5a8, 0xc0a5113e50}, {0x2783470, 0xc0999a5400}, {0x2783498?, 0xc0999a5400?}, {0x27a99d0, ...})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:941 +0x935
<http://github.com/flyteorg/flytepropeller/pkg/controller/workflow.(*workflowExecutor).handleRunningWorkflow(0xc00078e700|github.com/flyteorg/flytepropeller/pkg/controller/workflow.(*workflowExecutor).handleRunningWorkflow(0xc00078e700>, {0x279cfd0, 0xc07cb88240}, 0xc0999a5400)
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workflow/executor.go:147 +0x1b3
<http://github.com/flyteorg/flytepropeller/pkg/controller/workflow.(*workflowExecutor).HandleFlyteWorkflow(0xc00078e700|github.com/flyteorg/flytepropeller/pkg/controller/workflow.(*workflowExecutor).HandleFlyteWorkflow(0xc00078e700>, {0x279cfd0, 0xc07cb88240}, 0xc0999a5400)
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workflow/executor.go:393 +0x40f
<http://github.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).TryMutateWorkflow.func2(0xc000f6ba10|github.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).TryMutateWorkflow.func2(0xc000f6ba10>, {0x279cfd0, 0xc07cb88240}, 0xc0a4151848, 0x1e5a080?)
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/handler.go:130 +0x18e
<http://github.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).TryMutateWorkflow(0xc000f6ba10|github.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).TryMutateWorkflow(0xc000f6ba10>, {0x279cfd0, 0xc0954dbc50}, 0xc0999a4a00)
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/handler.go:131 +0x459
<http://github.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).Handle(0xc000f6ba10|github.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).Handle(0xc000f6ba10>, {0x279cfd0, 0xc0954dbc50}, {0xc0b25c0318, 0x3}, {0xc0b25c031c, 0x14})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/handler.go:205 +0x86d
<http://github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).processNextWorkItem.func1(0xc001528cf0|github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).processNextWorkItem.func1(0xc001528cf0>, 0xc0a4151f28, {0x1e5a080?, 0xc08a8d8bd0})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:88 +0x510
<http://github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).processNextWorkItem(0xc001528cf0|github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).processNextWorkItem(0xc001528cf0>, {0x279cfd0, 0xc0954dbc50})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:99 +0xf1
<http://github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).runWorker(0x279cfd0|github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).runWorker(0x279cfd0>?, {0x279cfd0, 0xc049036c00})
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:115 +0xbd
<http://github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).Run.func1()|github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).Run.func1()>
	/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:150 +0x59
created by <http://github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).Run|github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).Run>
	/go/src/github.co
@Ketan (kumare3) = Appreciate any insights 🙏 Having cache for map tasks would be extremely helpful not to reompute everything
d
@varsha Parthasarathy this is definitely a bug. There should be no issue with caching in map tasks - it is fully supported.
v
Since I specify tolerations (as above) , could that interfere with caching feature?
d
Can you provide a little more context please? • did this failure occur on the first execution or when using cached data? • how large is the maptask fanout (ie. how many input items)?
tolerations shouldn't be an issue.
k
@Dan Rammer (hamersaw) I actually see subtasks caching is not working
Let’s talk
v
did this failure occur on the first execution or when using cached data?
when using cache data.
how large is the maptask fanout (ie. how many input items)?
6 subtasks for map task
Size of the sub-task doesn’t matter, even if I give 1 sub-task, the above panic mode happens!
d
^^ ah ok, good to know
v
Ahh, any workarounds in the meantime @Ketan (kumare3) ?
k
I do not know
But we will get through this quickly
v
sounds good, thanks 🙏
k
@varsha Parthasarathy / @Dan Rammer (hamersaw) sorry this is different- I see another problem when caching was not working for subtasks
This is a panic that causes failure
v
could it be with the way I specify tolerations?
but without caching, the tolerations work fine. hence i am confused
d
internally the maptask maintains a bitset of subtask status' and indexes thereof that need to be computed (because of a cache miss). according the stacktrace here there is an error where we attempt to access that bitset in an index that doesn't exist. unfortunately, rather than sending an error the bitset lib panics, which is another issue.
so the panic is being caused because there is a bug in how we initially lookup into the cache for each subtask
i'm looking into it right now
v
Sounds good, let me know if you want to try something .. I will be available all morning (PST) to help 🙏
d
@varsha Parthasarathy do you know what version of FlytePropeller you're running? I'm doing some testing locally and unable to repro. I know we had fixed an issue here relatively recently.
it looks like this would have gotten pushed through to FlytePropeller v1.1.33 on Sept 9th.
v
Hi @Dan Rammer (hamersaw)
Copy code
datacatalog_version    = "v1.0.1"
  flyteadmin_version     = "v1.1.21"
  flyteconsole_version   = "v1.1.0"
  flytecopilot_version   = "v0.0.26"
  flytepropeller_version = "v1.1.21"
Is it enough if we update FlytePropeller or should any other version needs an update?
d
You should be able to just update FlytePropeller. It doesn't look like there were any breaking changes introduced between 1.21 and 1.33.
I should note that there does seem to be a bug in displaying the correct cache status in the UI right now for subtasks. I'll get on a PR to fix that.
a
@Dan Rammer (hamersaw) after upgrading propeller to v1.1.33 we see long "*queued"* times (30 min is not unusual)
console shows status running, but there's no pod and the Logs tab shows "queued*"*
cc @Ketan (kumare3)
k
Sorry catching up
Hmm I guess need to looks at logs
Can jump on a call
d
@Alex Pozimenko are the long queued times for map tasks only? Or for all tasks?
a
i have an example of spark task
map task from another workflow looks fine
call?
d
I can hop on for a few minutes here, but only have 10 minutes or so.
k
I can hop in 10
@Alex Pozimenko I am trying to join ^
a
@Dan Rammer (hamersaw), @Ketan (kumare3) we're having similar problem - spark tasks and nested worklfows won't start. Spark operator looks healthy (no exceptions in logs), no pending pods, but queue latency is high and growing and propeller free workers count is 0
is there a limit on how many driver pods we can run at the same time?
i see 100+
for one workflow, i see spark application in SUBMITTED state, but nothing in the spark operator logs
d
@Alex Pozimenko it sounds like it could be a starvation problem. Do you know how many workflows are currently running? Also what is the round latency from FlytePropeller? And finally, how many workers do you have configured for your FlytePropeller instance?
If it is indeed starvation, we can easily scale up the number of flytepropeller instances. Just want to make sure it's not something else and we're just "throw more resources at it" fixing.
a
flyte propeller free workers count was at 0
we ran 100+ spark workflows and perhaps 100 more of non spark
total completed about 1k wf's hourly
d
flyte propeller free workers count was at 0
do you know how many workers you have propeller configured with? and then the round latency is pretty important here i think. were you able to find that prometheus metric?
a
do you know how many workers you have propeller configured with?
64 what is the round latency?
d
Interesting. We can certainly increase the number of workers on propeller. Some users are running up to 500 and it is scaling find in their setup. All you need to do is change the configuration and roll the propeller deployment to pick up the new config. Maybe try 128, 256, etc. This should reduce the queueing times and probably have some free workers.
a
do I need to bump cpu/mem as well? right now we have 8CPU / 16G memory limilt (1 / 1G request)
d
The round latency is an end-to-end timestamp of propeller evaluating a workflow. So it works by periodically reading the CRD and checking nodes status', scheduling new nodes, etc. Each time it does this is one "round". Each worker only works on a single workflow at a time, so if the round latencies are large it reduces the throughput of workflow evaluations. This would cause longer queued times as you are seeing. Per the link I sent previously this is exposed in a prometheus metric with something like
sum(rate(flyte:propeller:all:round:raw_ms[5m])) by (wf)
.
do I need to bump cpu/mem as well? right now we have 8CPU / 16G memory limilt (1 / 1G request)
that's difficult to say. right now depending on configuration propeller can quite a bit of memory for things like caching workflow definitions, blobstore caching, etc. however, we aren't seeing issues with cpu utilization being high. thankfully increasing the number of workers has no effect (or very little) on memory utilization, so if 16G has been fine up until now, i would say your fine. i think 8 cpu would be plenty for 256 workers, but it might be something to keep an eye on.
a
i have propeller workflow acceptance latency and transition latency. These two should correlate with the round latency, right?
d
i have propeller workflow acceptance latency and transition latency. These two should correlate with the round latency, right?
also a difficult question 😅. theoretically, if the round latency is high then workers will take longer to process each workflow - so the acceptance latency and transition latencies will be high as well. however, this is only the case if the number of free workers hits zero. if you have workers available, then the round latency can be higher and the acceptance and transition latencies should not be significantly affected.
a
what would be a scenario when round latency is high but acceptance and transition are low?
k
Usually 1. Workflow has a large fan out and max parallelism is not user 2. Kube client is blocking and is hanging to create pods or kube api is slow 3. S3/ read write is slow
Admin write is slow
a
thanks. We seem to be running into scaling limitations of spark operator. Can you recommend some resources about scaling up spark operator or maybe give some suggestions? 🙂 Typical scenario - it runs fine for a while, but then CPU jumps to 100%. I bumped CPU limit from 16 to 24 to 32, but the pattern repeats, just seem to take a bit longer to get to that state
k
Hmm I think it’s the webhook
Nothing else can take that much cpu
a
# k get sparkapplication -n prod | wc -l 4625 #k get pod -n prod | grep driver | awk '{print $3}'|sort|uniq -c 2150 Completed 11 ContainerCreating 18 Running
no mem pressure.
i wonder if changing
resync-interval
and/or
controller-threads
may help
currently at 90 and 240
k
It will but high cpu indicated something else
158 Views