Hi all, I have a workflow that contains two tasks,...
# ask-the-community
z
Hi all, I have a workflow that contains two tasks, one that returns a big list of string ids (around 37 MB in total), and another task with I use within a
map_task
. The code looks as follows:
Copy code
@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def pipeline(cfg: DictConfig) -> t.Dict:
    cfg_data_iterable, cfg_store, cfg_dataset = create_config(cfg=cfg)
    ids = get_ids(cfg_dataset=cfg_dataset)
    partial_task = partial(write_data, cfg_data_iterable=cfg_data_iterable, cfg_store=cfg_store)
    map_task(
        partial_task, concurrency=100, min_success_ratio=0.1
    )(id=ids)
    return {}
Initially, flyte was complaining due to the large list (the maximum allowed size of the output was 10 MB). I increased the
max-output-size-bytes
to 40MB and I got no errors, however, the
map_task
queues forever and never gets executed. What could be the issue here?
y
can you take a look at propeller logs? my guess is that propeller is having a hard time still with i/o around the file, or an error creating the map tasks.
do you know roughly how many tasks the map task creates?
are you using the old map task or the new map task under the
experimental
module in flytekit?
i think you should try the new one.
z
Here the error I am getting in the propeller:
Copy code
{"json":{"exec_id":"f238bc22ff0864023aef","ns":"default-development","res_ver":"97788251","routine":"worker-5","wf":"default:development:mxm_dzsf.flyte_workflows.workflow.pipeline"},"level":"warning","msg":"Error in handling running workflow [failed at Node[n2]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [k8s-array]: panic when executing a plugin [k8s-array]. Stack: [goroutine 388 [running]:\nruntime/debug.Stack()\n\t/usr/local/go/src/runtime/debug/stack.go:24 +0x65\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1.1()|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1.1()>\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:389 +0xfe\npanic({0x227d1c0, 0x3f7e4d0})\n\t/usr/local/go/src/runtime/panic.go:838 +0x207\<http://ngithub.com/flyteorg/flytestdlib/bitarray.(*BitSet).IsSet(...)|ngithub.com/flyteorg/flytestdlib/bitarray.(*BitSet).IsSet(...)>\n\t/go/pkg/mod/github.com/flyteorg/flytestdlib@v1.0.15/bitarray/bitset.go:33\ngithub.com/flyteorg/flyteplugins/go/tasks/plugins/array/core.InitializeExternalResources({0x2ba0c38, 0xc002e20600}, {0x2bacfe0?, 0xc001790900?}, 0xc000ff8480, 0x27a3e30)\n\t/go/pkg/mod/github.com/flyteorg/flyteplugins@v1.0.56/go/tasks/plugins/array/core/metadata.go:33 +0x1e1\<http://ngithub.com/flyteorg/flyteplugins/go/tasks/plugins/array/k8s.Executor.Handle({{0x7eb8eeb50080|ngithub.com/flyteorg/flyteplugins/go/tasks/plugins/array/k8s.Executor.Handle({{0x7eb8eeb50080>, 0xc000c10ea0}, {{0x2b92f50, 0xc00072be40}}, {{0x2b92f50, 0xc001516000}}}, {0x2ba0c38, 0xc002e20600}, {0x2bacfe0, 0xc001790900})\n\t/go/pkg/mod/github.com/flyteorg/flyteplugins@v1.0.56/go/tasks/plugins/array/k8s/executor.go:96 +0x268\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1(0x19|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1(0x19>?, {0x2ba0c38, 0xc002e201e0}, {0x2ba3378?, 0xc00150d920?}, 0xc0038960dc?)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:396 +0x184\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin({{0x2ba2878|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin({{0x2ba2878>, 0xc0011e2378}, {0x2b8d138, 0xc0007d8360}, 0xc00096ee10, 0xc00096ee40, 0xc00096ee70, {0x2ba28b8, 0xc000d6cb40}, 0xc00055a2c0, ...}, ...)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:398 +0x9a\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.Handle({{0x2ba2878|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.Handle({{0x2ba2878>, 0xc0011e2378}, {0x2b8d138, 0xc0007d8360}, 0xc00096ee10, 0xc00096ee40, 0xc00096ee70, {0x2ba28b8, 0xc000d6cb40}, 0xc00055a2c0, ...}, ...)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:666 +0x1ba5\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic.dynamicNodeTaskNodeHandler.handleParentNode({{0x2ba42f0|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic.dynamicNodeTaskNodeHandler.handleParentNode({{0x2ba42f0>, 0xc0007ee900}, {{0xc001490940, {{...}, 0x0}, {0xc0005aa180, 0x4, 0x4}}, {0xc001490960, {{...}, ...}, ...}, ...}, ...}, ...)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic/handler.go:70 +0xd8\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic.dynamicNodeTaskNodeHandler.Handle({{0x2ba42f0|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic.dynamicNodeTaskNodeHandler.Handle({{0x2ba42f0>, 0xc0007ee900}, {{0xc001490940, {{...}, 0x0}, {0xc0005aa180, 0x4, 0x4}}, {0xc001490960, {{...}, ...}, ...}, ...}, ...}, ...)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic/handler.go:224 +0x9d0\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).execute(0xc0007ee540|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).execute(0xc0007ee540>, {0x2ba0c38, 0xc002e057a0}, {0x2ba26b8, 0xc00173a3c0}, 0xc001790840, {0x2bb6e50?, 0xc00368a410?})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:460 +0x157\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleQueuedOrRunningNode(0xc0007ee540|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleQueuedOrRunningNode(0xc0007ee540>, {0x2ba0c38, 0xc002e057a0}, 0xc001790840, {0x2ba26b8?, 0xc00173a3c0?})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:593 +0x227\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleNode(0xc0007ee540|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleNode(0xc0007ee540>, {0x2ba0c38, 0xc002e057a0}, {0x2b85500, 0xc003a75900}, 0xc001790840, {0x2ba26b8?, 0xc00173a3c0})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:820 +0x3c5\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0007ee540|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0007ee540>, {0x2ba0c38, 0xc002e04b70}, {0x2bb1318, 0xc00079e0a0}, {0x2b85500, 0xc003a75900}, {0x2ba0f10?, 0xc003a75900?}, {0x2baec30, ...})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:1018 +0x705\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x26b69c5|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x26b69c5>?, {0x2ba0c38, 0xc002e04b70}, {0x2bb1318, 0xc00079e0a0}, {0x2b85500, 0xc003a75900?}, {0x2ba0f10?, 0xc003a75900}, {0x2baec30, ...})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:858 +0x3c5\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0007ee540|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0007ee540>, {0x2ba0c38, 0xc002e04b70}, {0x2bb1318, 0xc00079e0a0}, {0x2b85500, 0xc003a75900}, {0x2ba0f10?, 0xc003a75900?}, {0x2baec30, ...})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:1025 +0x935\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x26b69c5|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x26b69c5>?, {0x2ba0c38, 0xc002e04b70}, {0x2bb1318, 0xc00079e0a0}, {0x2b85500, 0xc003a75900?}, {0x2ba0f10?, 0xc003a75900}, {0x2baec30, ...})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:858 +0x3c5\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0007ee540|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0007ee540>, {0x2ba0c38, 0xc002e04b70}, {0x2bb1318, 0xc00079e0a0}, {0x2b85500, 0xc003a75900}, {0x2ba0f10?, 0xc003a75900?}, {0x2baec30, ...})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:1025 +0x935\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x26b69c5|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x26b69c5>?, {0x2ba0c38, 0xc002e04b70}, {0x2bb1318, 0xc00079e0a0}, {0x2b85500, 0xc003a75900?}, {0x2ba0f10?, 0xc003a75900}, {0x2baec30, ...})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:858 +0x3c5\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0007ee540|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0007ee540>, {0x2ba0c38, 0xc002e04b70}, {0x2bb1318, 0xc00079e0a0}, {0x2b85500, 0xc003a75900}, {0x2ba0f10?, 0xc003a75900?}, {0x2baec30, ...})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:1025 +0x935\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/workflow.(*workflowExecutor).handleRunningWorkflow(0xc0008e1ea0|ngithub.com/flyteorg/flytepropeller/pkg/controller/workflow.(*workflowExecutor).handleRunningWorkflow(0xc0008e1ea0>, {0x2ba0c38, 0xc002e04b70}, 0xc003a75900)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workflow/executor.go:147 +0x1b3\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/workflow.(*workflowExecutor).HandleFlyteWorkflow(0xc0008e1ea0|ngithub.com/flyteorg/flytepropeller/pkg/controller/workflow.(*workflowExecutor).HandleFlyteWorkflow(0xc0008e1ea0>, {0x2ba0c38, 0xc002e04b70}, 0xc003a75900)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workflow/executor.go:393 +0x40f\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).TryMutateWorkflow.func2(0xc000b7e3c0|ngithub.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).TryMutateWorkflow.func2(0xc000b7e3c0>, {0x2ba0c38, 0xc002e04b70}, 0xc002e477d0, 0x2137020?)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/handler.go:142 +0x18e\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).TryMutateWorkflow(0xc000b7e3c0|ngithub.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).TryMutateWorkflow(0xc000b7e3c0>, {0x2ba0c38, 0xc002e047b0}, 0xc000eec000)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/handler.go:143 +0x495\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).Handle(0xc000b7e3c0|ngithub.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).Handle(0xc000b7e3c0>, {0x2ba0c38, 0xc002e047b0}, {0xc00112b1d0, 0x13}, {0xc00112b1e4, 0x14})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/handler.go:259 +0xe4a\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).processNextWorkItem.func1(0xc00004acf0|ngithub.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).processNextWorkItem.func1(0xc00004acf0>, 0xc002e47f28, {0x2137020?, 0xc0045a7760})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:88 +0x510\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).processNextWorkItem(0xc00004acf0|ngithub.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).processNextWorkItem(0xc00004acf0>, {0x2ba0c38, 0xc002e047b0})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:99 +0xf1\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).runWorker(0x2ba0c38|ngithub.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).runWorker(0x2ba0c38>?, {0x2ba0c38, 0xc00174c060})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:115 +0xbd\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).Run.func1()|ngithub.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).Run.func1()>\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:150 +0x59\ncreated by <http://github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).Run|github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).Run>\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:147 +0x285\n]]","ts":"2023-11-28T14:42:02Z"}
My list has 300K elements, and I am using 100 concurrency
When trying the experimental map_task, I get the following error:
Copy code
Traceback (most recent call last):
  File "/home/ziyad/merantix/dzsf/.venv/lib/python3.9/site-packages/grpc/_interceptor.py", line 315, in continuation
    response, call = self._thunk(new_method).with_call(
  File "/home/ziyad/merantix/dzsf/.venv/lib/python3.9/site-packages/grpc/_interceptor.py", line 343, in with_call
    return self._with_call(
  File "/home/ziyad/merantix/dzsf/.venv/lib/python3.9/site-packages/grpc/_interceptor.py", line 332, in _with_call
    return call.result(), call
  File "/home/ziyad/merantix/dzsf/.venv/lib/python3.9/site-packages/grpc/_channel.py", line 438, in result
    raise self
  File "/home/ziyad/merantix/dzsf/.venv/lib/python3.9/site-packages/grpc/_interceptor.py", line 315, in continuation
    response, call = self._thunk(new_method).with_call(
  File "/home/ziyad/merantix/dzsf/.venv/lib/python3.9/site-packages/grpc/_interceptor.py", line 343, in with_call
    return self._with_call(
  File "/home/ziyad/merantix/dzsf/.venv/lib/python3.9/site-packages/grpc/_interceptor.py", line 332, in _with_call
    return call.result(), call
  File "/home/ziyad/merantix/dzsf/.venv/lib/python3.9/site-packages/grpc/_channel.py", line 438, in result
    raise self
  File "/home/ziyad/merantix/dzsf/.venv/lib/python3.9/site-packages/grpc/_interceptor.py", line 315, in continuation
    response, call = self._thunk(new_method).with_call(
  File "/home/ziyad/merantix/dzsf/.venv/lib/python3.9/site-packages/grpc/_channel.py", line 1178, in with_call
    return _end_unary_response_blocking(state, call, True, None)
  File "/home/ziyad/merantix/dzsf/.venv/lib/python3.9/site-packages/grpc/_channel.py", line 1004, in _end_unary_response_blocking
    raise _InactiveRpcError(state)  # pytype: disable=not-instantiable
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.INTERNAL
        details = "failed to compile workflow for [resource_type:WORKFLOW project:"default" domain:"development" name:"mxm_dzsf.flyte_workflows.workflow.pipeline" version:"ziyad-experimental-map-task-bad3bec-dirty-CQRIH" ] with err failed to compile workflow with err Collected Errors: 1
        Error 0: Code: ValueRequired, Node Id: n2, Description: Value required [Target].
"
        debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"failed to compile workflow for [resource_type:WORKFLOW project:\"default\" domain:\"development\" name:\"mxm_dzsf.flyte_workflows.workflow.pipeline\" version:\"ziyad-experimental-map-task-bad3bec-dirty-CQRIH\" ] with err failed to compile workflow with err Collected Errors: 1\n\tError 0: Code: ValueRequired, Node Id: n2, Description: Value required [Target].\n", grpc_status:13, created_time:"2023-11-28T15:46:28.604168633+01:00"}"
y
reformatting the interesting bits of the first stack trace for posterity
Copy code
Error in handling running workflow [failed at Node[n2]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [k8s-array]: panic when executing a plugin [k8s-array]. Stack: [goroutine 388 [running]:
runtime/debug.Stack()
        /usr/local/go/src/runtime/debug/stack.go:24 +0x65
<http://github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1.1()|github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1.1()>
        /go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:389 +0xfe
panic({0x227d1c0, 0x3f7e4d0})
        /usr/local/go/src/runtime/panic.go:838 +0x207
<http://github.com/flyteorg/flytestdlib/bitarray.(*BitSet).IsSet(...)|github.com/flyteorg/flytestdlib/bitarray.(*BitSet).IsSet(...)>
        /go/pkg/mod/github.com/flyteorg/flytestdlib@v1.0.15/bitarray/bitset.go:33
<http://github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core.InitializeExternalResources({0x2ba0c38|github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core.InitializeExternalResources({0x2ba0c38>, 0xc002e20600}, {0x2bacfe0?, 0xc001790900?}, 0xc000ff8480, 0x27a3e30)
but i think the second one is more interesting… we’re moving towards the new array node handling
which backend version are you using? like which version of flyte
z
I just realized that we were using a pretty old backend version, 1.6.2. Now I upgraded to the 1.10.0 and the experimental map_task no longer throws an error, but still queues forever. Here are the logs from the flyte propeller now:
Copy code
{"json":{},"level":"debug","msg":"Subqueue handler batch round","ts":"2023-11-29T08:49:50Z"}
{"json":{},"level":"debug","msg":"Dynamically configured batch size [-1]","ts":"2023-11-29T08:49:50Z"}
{"json":{},"level":"debug","msg":"Exiting SubQueue handler batch round","ts":"2023-11-29T08:49:50Z"}
{"json":{},"level":"debug","msg":"Subqueue handler batch round","ts":"2023-11-29T08:49:52Z"}
{"json":{},"level":"debug","msg":"Dynamically configured batch size [-1]","ts":"2023-11-29T08:49:52Z"}
a
@Dan Rammer (hamersaw) FYI, here are the issues we are encountering with the experimental map_task. Parellelization across 10k list items was fine, at 20k it started to hang.
d
Ahh ok. So I'm guessing the issue here is just with scale. The maptask execution requires some pre-processing steps that I suspect are taking a very long time on 300k inputs. I feel confident that batching tasks and using separate launchplans with experimental will work.
This scale is something we're working very actively on handling and have a few thoughts / efforts. Lots of users with interesting usecases want to scale 100k+. Keep an eye out for some big things in the coming months like container reuse so each subtask in the maptask doesn't need to incur the overhead of starting a new k8s pod and improved metadata tracking to scale maptasks larger.
a
Sounds great. We are now splitting it into chunks and several workflow executions.