Jason Parraga
06/05/2024, 10:26 PMarray_node_map_task
.Jason Parraga
06/05/2024, 10:27 PM@task
def simple_map_task(a: int) -> int: # noqa: D103
return a * a
@task
def simple_reduce(b: list[int]) -> int: # noqa: D103
return sum(b)
@task
def to_list(size: int) -> list[int]: # noqa: D103
return list(range(size))
@workflow
def map_reduce_array_node(size: int) -> int:
"""Simple workflow to illustrate a large fan-out/fan-in using Flyte experimental feature."""
input_array = to_list(size=size)
output = array_node_map_task(simple_map_task)(a=input_array)
return simple_reduce(b=output)
Jason Parraga
06/05/2024, 10:27 PMWorkflow[flytesnacks:development:fan_out.map_reduce_array_node] failed. RuntimeExecutionError: max number of system retry attempts [31/30] exhausted. Last known status message: worker error(s) encountered: [0]: failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [armada]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (armada.ExecutionPhase) for received field PluginState.Phase
[1]: failed at Node[n1]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [armada]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (armada.ExecutionPhase) for received field PluginState.Phase
Jason Parraga
06/05/2024, 10:27 PMJason Parraga
06/05/2024, 10:28 PMfunc (p Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) {
incomingState := ExecutionState{}
logger.Infof(ctx, "Handling")
// We assume here that the first time this function is called, the custom state we get back is whatever we passed in,
// namely the zero-value of our struct.
if _, err := tCtx.PluginStateReader().Get(&incomingState); err != nil {
logger.Errorf(ctx, "Plugin %s failed to unmarshal custom state when handling [%s] [%s]",
p.id, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), err)
return core.UnknownTransition, errors.Wrapf(errors.CorruptedPluginState, err,
"Failed to unmarshal custom state in Handle")
}
// Do what needs to be done, and give this function everything it needs to do its job properly
outgoingState, transformError := HandleExecutionState(ctx, tCtx, incomingState, p.submitClient, p.jobsClient)
// Return if there was an error
if transformError != nil {
return core.UnknownTransition, transformError
}
// If no error, then infer the new Phase from the various states
phaseInfo := MapExecutionStateToPhaseInfo(outgoingState)
// Hack to ensure the output reader is set. This is similar to what the k8s pod plugin management does
if phaseInfo.Phase() == core.PhaseSuccess {
opReader := ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), 0)
err := tCtx.OutputWriter().Put(ctx, opReader)
if err != nil {
return core.UnknownTransition, err
}
}
if err := tCtx.PluginStateWriter().Put(pluginStateVersion, outgoingState); err != nil {
return core.UnknownTransition, err
}
return core.DoTransition(phaseInfo), nil
}
Jason Parraga
06/05/2024, 10:29 PMtype ExecutionState struct {
Phase ExecutionPhase
// This will store the job ID from Armada
JobSetId string `json:"jobSetId,omitempty"`
JobId string `json:"jobId,omitempty"`
ArmadaEndpoint string
CreationFailureCount int `json:"creation_failure_count,omitempty"`
}
Jason Parraga
06/05/2024, 10:30 PMJason Parraga
06/05/2024, 10:59 PMK\u0003\u0001\u0001\u000bPluginState\u0001\ufffd\ufffd\u0000\u0001\u0003\u0001\u0005Phase\u0001\u0006\u0000\u0001\u000eK8sPluginState\u0001\ufffd\ufffd\u0000\u0001\u000fLastEventUpdate\u0001\ufffd\ufffd\u0000\u0000\u0000?\ufffd\ufffd\u0003\u0001\u0001\u000bPluginState\u0001\ufffd\ufffd\u0000\u0001\u0003\u0001\u0005Phase\u0001\u0004\u0000\u0001\u000cPhaseVersion\u0001\u0006\u0000\u0001\u0006Reason\u0001\u000c\u0000\u0000\u0000\u0010\ufffd\ufffd\u0005\u0001\u0001\u0004Time\u0001\ufffd\ufffd\u0000\u0000\u0000\u0005\ufffd\ufffd\u0002\u0000\u0000"
Normally the state looks as it should
\ufffd\ufffd\u0003\u0001\u0001\u000eExecutionState\u0001\ufffd\ufffd\u0000\u0001\u0005\u0001\u0005Phase\u0001\u0004\u0000\u0001\u0008JobSetId\u0001\u000c\u0000\u0001\u0005JobId\u0001\u000c\u0000\u0001\u000eArmadaEndpoint\u0001\u000c\u0000\u0001\u0014CreationFailureCount\u0001\u0004\u0000\u0000\u0000G\ufffd\ufffd\u0001\u0002\u0001$b9497c10-9e4b-413c-a786-fa6ef6a607be\u0001\u001a01hzna22my5b8hsa6ytbhrz94b\u0000"
Jason Parraga
06/05/2024, 11:03 PMCurrentPhase
and PreviousPhase
. But the hive plugin uses Phase
so I'm guessing it would be problematic with array node too.Jason Parraga
06/05/2024, 11:11 PMPhase
field of my execution state.
{
"json": {
"exec_id": "f3a025450ebba49e7a40",
"ns": "flytesnacks-development",
"res_ver": "81883862",
"routine": "worker-69",
"wf": "flytesnacks:development:fan_out.map_reduce_array_node"
},
"level": "error",
"msg": "Error when trying to reconcile workflow. Error [worker error(s) encountered: [0]: failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [armada]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: type mismatch: no fields matched compiling decoder for ExecutionState\n[1]: failed at Node[n1]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [armada]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: type mismatch: no fields matched compiling decoder for ExecutionState\n]. Error Type[*errors.errorString]",
"ts": "2024-06-05T23:09:26Z"
}
Interesting. So the error message prior was there because there was some overlap between my custom plugin state and whatever if being injected. Now there is no overlap at all.Jason Parraga
06/05/2024, 11:20 PMJason Parraga
06/05/2024, 11:22 PMJason Parraga
06/05/2024, 11:29 PMmap_task
.
// TODO - if we want to support more plugin types we need to figure out the best way to store plugin state
// currently just mocking based on node phase -> which works for all k8s plugins
// we can not pre-allocated a bit array because max size is 256B and with 5k fanout node state = 1.28MB
pluginStateBytes := a.pluginStateBytesStarted
Haytham Abuelfutuh
taskExecutionContext.GeneratedName()
and then there is no need to store any state... right?
If it's an API call, does it support idemotency? surrogate keys? is there a way to tell it the job id to use? have you had to setup an AutoRefreshCache to sync the status of the jobs from ArmadaJason Parraga
06/06/2024, 5:12 PMJason Parraga
06/07/2024, 12:24 AMk8s.PluginState
into my plugin's state which resolved the decoding issue but we noticed that the state was wiped during every poll, and it would kick off jobs over and over again. I think if we can use the taskExecutionContext.GeneratedName()
as an identifier than we can forgo the plugin state and just keep the state in the in-memory auto refresh cache.Ketan (kumare3)
Haytham Abuelfutuh
Jason Parraga
06/07/2024, 9:24 PMHaytham Abuelfutuh
Haytham Abuelfutuh
Jason Parraga
06/08/2024, 1:38 AMKetan (kumare3)
Haytham Abuelfutuh
Haytham Abuelfutuh
Haytham Abuelfutuh
Jason Parraga
06/08/2024, 2:03 AMKetan (kumare3)
Jason Parraga
06/08/2024, 3:43 AMJason Parraga
06/08/2024, 3:46 AMKetan (kumare3)
Jason Parraga
06/10/2024, 8:39 PMwebapi.PluginSetupContext
interface is limited? I was hoping to access the secrets manager during plugin initialization but in order to work around that I'll have to lazy load my API clients during task creationJason Parraga
06/10/2024, 10:14 PMHaytham Abuelfutuh
Jason Parraga
06/11/2024, 6:40 PMJason Parraga
06/11/2024, 6:41 PMHaytham Abuelfutuh
Jason Parraga
06/11/2024, 11:12 PMSubmitJob
you pass a job_set_id
which is something you control. But every time you call that it will kick off a Job
with a random job_id
that is tied to the JobSet
. I just looked into the Armada API protos and I can't even pull current jobs by JobSetId
so I don't think I can even do what I want and resume state from Armada after a propeller reboot.Jason Parraga
06/11/2024, 11:23 PMKetan (kumare3)