Ziyad Sheebaelhamd
11/28/2023, 10:38 AMmap_task
. The code looks as follows:
@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def pipeline(cfg: DictConfig) -> t.Dict:
cfg_data_iterable, cfg_store, cfg_dataset = create_config(cfg=cfg)
ids = get_ids(cfg_dataset=cfg_dataset)
partial_task = partial(write_data, cfg_data_iterable=cfg_data_iterable, cfg_store=cfg_store)
map_task(
partial_task, concurrency=100, min_success_ratio=0.1
)(id=ids)
return {}
Initially, flyte was complaining due to the large list (the maximum allowed size of the output was 10 MB). I increased the max-output-size-bytes
to 40MB and I got no errors, however, the map_task
queues forever and never gets executed.
What could be the issue here?Yee
Yee
Yee
experimental
module in flytekit?Yee
Ziyad Sheebaelhamd
11/28/2023, 2:44 PM{"json":{"exec_id":"f238bc22ff0864023aef","ns":"default-development","res_ver":"97788251","routine":"worker-5","wf":"default:development:mxm_dzsf.flyte_workflows.workflow.pipeline"},"level":"warning","msg":"Error in handling running workflow [failed at Node[n2]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [k8s-array]: panic when executing a plugin [k8s-array]. Stack: [goroutine 388 [running]:\nruntime/debug.Stack()\n\t/usr/local/go/src/runtime/debug/stack.go:24 +0x65\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1.1()|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1.1()>\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:389 +0xfe\npanic({0x227d1c0, 0x3f7e4d0})\n\t/usr/local/go/src/runtime/panic.go:838 +0x207\<http://ngithub.com/flyteorg/flytestdlib/bitarray.(*BitSet).IsSet(...)|ngithub.com/flyteorg/flytestdlib/bitarray.(*BitSet).IsSet(...)>\n\t/go/pkg/mod/github.com/flyteorg/flytestdlib@v1.0.15/bitarray/bitset.go:33\ngithub.com/flyteorg/flyteplugins/go/tasks/plugins/array/core.InitializeExternalResources({0x2ba0c38, 0xc002e20600}, {0x2bacfe0?, 0xc001790900?}, 0xc000ff8480, 0x27a3e30)\n\t/go/pkg/mod/github.com/flyteorg/flyteplugins@v1.0.56/go/tasks/plugins/array/core/metadata.go:33 +0x1e1\<http://ngithub.com/flyteorg/flyteplugins/go/tasks/plugins/array/k8s.Executor.Handle({{0x7eb8eeb50080|ngithub.com/flyteorg/flyteplugins/go/tasks/plugins/array/k8s.Executor.Handle({{0x7eb8eeb50080>, 0xc000c10ea0}, {{0x2b92f50, 0xc00072be40}}, {{0x2b92f50, 0xc001516000}}}, {0x2ba0c38, 0xc002e20600}, {0x2bacfe0, 0xc001790900})\n\t/go/pkg/mod/github.com/flyteorg/flyteplugins@v1.0.56/go/tasks/plugins/array/k8s/executor.go:96 +0x268\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1(0x19|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1(0x19>?, {0x2ba0c38, 0xc002e201e0}, {0x2ba3378?, 0xc00150d920?}, 0xc0038960dc?)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:396 +0x184\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin({{0x2ba2878|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin({{0x2ba2878>, 0xc0011e2378}, {0x2b8d138, 0xc0007d8360}, 0xc00096ee10, 0xc00096ee40, 0xc00096ee70, {0x2ba28b8, 0xc000d6cb40}, 0xc00055a2c0, ...}, ...)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:398 +0x9a\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.Handle({{0x2ba2878|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.Handle({{0x2ba2878>, 0xc0011e2378}, {0x2b8d138, 0xc0007d8360}, 0xc00096ee10, 0xc00096ee40, 0xc00096ee70, {0x2ba28b8, 0xc000d6cb40}, 0xc00055a2c0, ...}, ...)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:666 +0x1ba5\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic.dynamicNodeTaskNodeHandler.handleParentNode({{0x2ba42f0|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic.dynamicNodeTaskNodeHandler.handleParentNode({{0x2ba42f0>, 0xc0007ee900}, {{0xc001490940, {{...}, 0x0}, {0xc0005aa180, 0x4, 0x4}}, {0xc001490960, {{...}, ...}, ...}, ...}, ...}, ...)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic/handler.go:70 +0xd8\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic.dynamicNodeTaskNodeHandler.Handle({{0x2ba42f0|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic.dynamicNodeTaskNodeHandler.Handle({{0x2ba42f0>, 0xc0007ee900}, {{0xc001490940, {{...}, 0x0}, {0xc0005aa180, 0x4, 0x4}}, {0xc001490960, {{...}, ...}, ...}, ...}, ...}, ...)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/dynamic/handler.go:224 +0x9d0\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).execute(0xc0007ee540|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).execute(0xc0007ee540>, {0x2ba0c38, 0xc002e057a0}, {0x2ba26b8, 0xc00173a3c0}, 0xc001790840, {0x2bb6e50?, 0xc00368a410?})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:460 +0x157\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleQueuedOrRunningNode(0xc0007ee540|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleQueuedOrRunningNode(0xc0007ee540>, {0x2ba0c38, 0xc002e057a0}, 0xc001790840, {0x2ba26b8?, 0xc00173a3c0?})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:593 +0x227\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleNode(0xc0007ee540|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleNode(0xc0007ee540>, {0x2ba0c38, 0xc002e057a0}, {0x2b85500, 0xc003a75900}, 0xc001790840, {0x2ba26b8?, 0xc00173a3c0})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:820 +0x3c5\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0007ee540|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0007ee540>, {0x2ba0c38, 0xc002e04b70}, {0x2bb1318, 0xc00079e0a0}, {0x2b85500, 0xc003a75900}, {0x2ba0f10?, 0xc003a75900?}, {0x2baec30, ...})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:1018 +0x705\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x26b69c5|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x26b69c5>?, {0x2ba0c38, 0xc002e04b70}, {0x2bb1318, 0xc00079e0a0}, {0x2b85500, 0xc003a75900?}, {0x2ba0f10?, 0xc003a75900}, {0x2baec30, ...})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:858 +0x3c5\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0007ee540|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0007ee540>, {0x2ba0c38, 0xc002e04b70}, {0x2bb1318, 0xc00079e0a0}, {0x2b85500, 0xc003a75900}, {0x2ba0f10?, 0xc003a75900?}, {0x2baec30, ...})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:1025 +0x935\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x26b69c5|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x26b69c5>?, {0x2ba0c38, 0xc002e04b70}, {0x2bb1318, 0xc00079e0a0}, {0x2b85500, 0xc003a75900?}, {0x2ba0f10?, 0xc003a75900}, {0x2baec30, ...})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:858 +0x3c5\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0007ee540|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0007ee540>, {0x2ba0c38, 0xc002e04b70}, {0x2bb1318, 0xc00079e0a0}, {0x2b85500, 0xc003a75900}, {0x2ba0f10?, 0xc003a75900?}, {0x2baec30, ...})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:1025 +0x935\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x26b69c5|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).handleDownstream(0x26b69c5>?, {0x2ba0c38, 0xc002e04b70}, {0x2bb1318, 0xc00079e0a0}, {0x2b85500, 0xc003a75900?}, {0x2ba0f10?, 0xc003a75900}, {0x2baec30, ...})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:858 +0x3c5\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0007ee540|ngithub.com/flyteorg/flytepropeller/pkg/controller/nodes.(*nodeExecutor).RecursiveNodeHandler(0xc0007ee540>, {0x2ba0c38, 0xc002e04b70}, {0x2bb1318, 0xc00079e0a0}, {0x2b85500, 0xc003a75900}, {0x2ba0f10?, 0xc003a75900?}, {0x2baec30, ...})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/executor.go:1025 +0x935\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/workflow.(*workflowExecutor).handleRunningWorkflow(0xc0008e1ea0|ngithub.com/flyteorg/flytepropeller/pkg/controller/workflow.(*workflowExecutor).handleRunningWorkflow(0xc0008e1ea0>, {0x2ba0c38, 0xc002e04b70}, 0xc003a75900)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workflow/executor.go:147 +0x1b3\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller/workflow.(*workflowExecutor).HandleFlyteWorkflow(0xc0008e1ea0|ngithub.com/flyteorg/flytepropeller/pkg/controller/workflow.(*workflowExecutor).HandleFlyteWorkflow(0xc0008e1ea0>, {0x2ba0c38, 0xc002e04b70}, 0xc003a75900)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workflow/executor.go:393 +0x40f\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).TryMutateWorkflow.func2(0xc000b7e3c0|ngithub.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).TryMutateWorkflow.func2(0xc000b7e3c0>, {0x2ba0c38, 0xc002e04b70}, 0xc002e477d0, 0x2137020?)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/handler.go:142 +0x18e\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).TryMutateWorkflow(0xc000b7e3c0|ngithub.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).TryMutateWorkflow(0xc000b7e3c0>, {0x2ba0c38, 0xc002e047b0}, 0xc000eec000)\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/handler.go:143 +0x495\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).Handle(0xc000b7e3c0|ngithub.com/flyteorg/flytepropeller/pkg/controller.(*Propeller).Handle(0xc000b7e3c0>, {0x2ba0c38, 0xc002e047b0}, {0xc00112b1d0, 0x13}, {0xc00112b1e4, 0x14})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/handler.go:259 +0xe4a\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).processNextWorkItem.func1(0xc00004acf0|ngithub.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).processNextWorkItem.func1(0xc00004acf0>, 0xc002e47f28, {0x2137020?, 0xc0045a7760})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:88 +0x510\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).processNextWorkItem(0xc00004acf0|ngithub.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).processNextWorkItem(0xc00004acf0>, {0x2ba0c38, 0xc002e047b0})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:99 +0xf1\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).runWorker(0x2ba0c38|ngithub.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).runWorker(0x2ba0c38>?, {0x2ba0c38, 0xc00174c060})\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:115 +0xbd\<http://ngithub.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).Run.func1()|ngithub.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).Run.func1()>\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:150 +0x59\ncreated by <http://github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).Run|github.com/flyteorg/flytepropeller/pkg/controller.(*WorkerPool).Run>\n\t/go/src/github.com/flyteorg/flytepropeller/pkg/controller/workers.go:147 +0x285\n]]","ts":"2023-11-28T14:42:02Z"}
Ziyad Sheebaelhamd
11/28/2023, 2:45 PMZiyad Sheebaelhamd
11/28/2023, 2:56 PMTraceback (most recent call last):
File "/home/ziyad/merantix/dzsf/.venv/lib/python3.9/site-packages/grpc/_interceptor.py", line 315, in continuation
response, call = self._thunk(new_method).with_call(
File "/home/ziyad/merantix/dzsf/.venv/lib/python3.9/site-packages/grpc/_interceptor.py", line 343, in with_call
return self._with_call(
File "/home/ziyad/merantix/dzsf/.venv/lib/python3.9/site-packages/grpc/_interceptor.py", line 332, in _with_call
return call.result(), call
File "/home/ziyad/merantix/dzsf/.venv/lib/python3.9/site-packages/grpc/_channel.py", line 438, in result
raise self
File "/home/ziyad/merantix/dzsf/.venv/lib/python3.9/site-packages/grpc/_interceptor.py", line 315, in continuation
response, call = self._thunk(new_method).with_call(
File "/home/ziyad/merantix/dzsf/.venv/lib/python3.9/site-packages/grpc/_interceptor.py", line 343, in with_call
return self._with_call(
File "/home/ziyad/merantix/dzsf/.venv/lib/python3.9/site-packages/grpc/_interceptor.py", line 332, in _with_call
return call.result(), call
File "/home/ziyad/merantix/dzsf/.venv/lib/python3.9/site-packages/grpc/_channel.py", line 438, in result
raise self
File "/home/ziyad/merantix/dzsf/.venv/lib/python3.9/site-packages/grpc/_interceptor.py", line 315, in continuation
response, call = self._thunk(new_method).with_call(
File "/home/ziyad/merantix/dzsf/.venv/lib/python3.9/site-packages/grpc/_channel.py", line 1178, in with_call
return _end_unary_response_blocking(state, call, True, None)
File "/home/ziyad/merantix/dzsf/.venv/lib/python3.9/site-packages/grpc/_channel.py", line 1004, in _end_unary_response_blocking
raise _InactiveRpcError(state) # pytype: disable=not-instantiable
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.INTERNAL
details = "failed to compile workflow for [resource_type:WORKFLOW project:"default" domain:"development" name:"mxm_dzsf.flyte_workflows.workflow.pipeline" version:"ziyad-experimental-map-task-bad3bec-dirty-CQRIH" ] with err failed to compile workflow with err Collected Errors: 1
Error 0: Code: ValueRequired, Node Id: n2, Description: Value required [Target].
"
debug_error_string = "UNKNOWN:Error received from peer {grpc_message:"failed to compile workflow for [resource_type:WORKFLOW project:\"default\" domain:\"development\" name:\"mxm_dzsf.flyte_workflows.workflow.pipeline\" version:\"ziyad-experimental-map-task-bad3bec-dirty-CQRIH\" ] with err failed to compile workflow with err Collected Errors: 1\n\tError 0: Code: ValueRequired, Node Id: n2, Description: Value required [Target].\n", grpc_status:13, created_time:"2023-11-28T15:46:28.604168633+01:00"}"
Yee
Error in handling running workflow [failed at Node[n2]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [k8s-array]: panic when executing a plugin [k8s-array]. Stack: [goroutine 388 [running]:
runtime/debug.Stack()
/usr/local/go/src/runtime/debug/stack.go:24 +0x65
<http://github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1.1()|github.com/flyteorg/flytepropeller/pkg/controller/nodes/task.Handler.invokePlugin.func1.1()>
/go/src/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/handler.go:389 +0xfe
panic({0x227d1c0, 0x3f7e4d0})
/usr/local/go/src/runtime/panic.go:838 +0x207
<http://github.com/flyteorg/flytestdlib/bitarray.(*BitSet).IsSet(...)|github.com/flyteorg/flytestdlib/bitarray.(*BitSet).IsSet(...)>
/go/pkg/mod/github.com/flyteorg/flytestdlib@v1.0.15/bitarray/bitset.go:33
<http://github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core.InitializeExternalResources({0x2ba0c38|github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core.InitializeExternalResources({0x2ba0c38>, 0xc002e20600}, {0x2bacfe0?, 0xc001790900?}, 0xc000ff8480, 0x27a3e30)
Yee
Yee
Ziyad Sheebaelhamd
11/29/2023, 8:50 AM{"json":{},"level":"debug","msg":"Subqueue handler batch round","ts":"2023-11-29T08:49:50Z"}
{"json":{},"level":"debug","msg":"Dynamically configured batch size [-1]","ts":"2023-11-29T08:49:50Z"}
{"json":{},"level":"debug","msg":"Exiting SubQueue handler batch round","ts":"2023-11-29T08:49:50Z"}
{"json":{},"level":"debug","msg":"Subqueue handler batch round","ts":"2023-11-29T08:49:52Z"}
{"json":{},"level":"debug","msg":"Dynamically configured batch size [-1]","ts":"2023-11-29T08:49:52Z"}
Adrian Loy
11/29/2023, 1:38 PMDan Rammer (hamersaw)
11/29/2023, 1:57 PMDan Rammer (hamersaw)
11/29/2023, 1:59 PMAdrian Loy
11/29/2023, 2:03 PM