:thread: Hello. I just finished an initial impleme...
# contribute
j
🧵 Hello. I just finished an initial implementation of an Armada backend plugin. I'm using it as the default plugin for Python tasks and it seems to work well. I am able to run tasks, workflows, dynamic workflows. I only seem to have an issue with
array_node_map_task
.
Workflow is defined as follows
Copy code
@task
def simple_map_task(a: int) -> int:  # noqa: D103
    return a * a


@task
def simple_reduce(b: list[int]) -> int:  # noqa: D103
    return sum(b)


@task
def to_list(size: int) -> list[int]:  # noqa: D103
    return list(range(size))

@workflow
def map_reduce_array_node(size: int) -> int:
    """Simple workflow to illustrate a large fan-out/fan-in using Flyte experimental feature."""
    input_array = to_list(size=size)
    output = array_node_map_task(simple_map_task)(a=input_array)
    return simple_reduce(b=output)
Error message is the following
Copy code
Workflow[flytesnacks:development:fan_out.map_reduce_array_node] failed. RuntimeExecutionError: max number of system retry attempts [31/30] exhausted. Last known status message: worker error(s) encountered: [0]: failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [armada]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (armada.ExecutionPhase) for received field PluginState.Phase
[1]: failed at Node[n1]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [armada]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (armada.ExecutionPhase) for received field PluginState.Phase
Screenshot 2024-06-05 at 2.31.39 PM.png
It seems to me that something about the array node handling might be injecting data into the plugin state which is causing issues upon deserialization in my Armada plugin's handle method.
Copy code
func (p Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) {
	incomingState := ExecutionState{}
	logger.Infof(ctx, "Handling")

	// We assume here that the first time this function is called, the custom state we get back is whatever we passed in,
	// namely the zero-value of our struct.
	if _, err := tCtx.PluginStateReader().Get(&incomingState); err != nil {
		logger.Errorf(ctx, "Plugin %s failed to unmarshal custom state when handling [%s] [%s]",
			p.id, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), err)
		return core.UnknownTransition, errors.Wrapf(errors.CorruptedPluginState, err,
			"Failed to unmarshal custom state in Handle")
	}

	// Do what needs to be done, and give this function everything it needs to do its job properly
	outgoingState, transformError := HandleExecutionState(ctx, tCtx, incomingState, p.submitClient, p.jobsClient)

	// Return if there was an error
	if transformError != nil {
		return core.UnknownTransition, transformError
	}

	// If no error, then infer the new Phase from the various states
	phaseInfo := MapExecutionStateToPhaseInfo(outgoingState)

	// Hack to ensure the output reader is set. This is similar to what the k8s pod plugin management does
	if phaseInfo.Phase() == core.PhaseSuccess {
		opReader := ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), 0)
		err := tCtx.OutputWriter().Put(ctx, opReader)
		if err != nil {
			return core.UnknownTransition, err
		}
	}

	if err := tCtx.PluginStateWriter().Put(pluginStateVersion, outgoingState); err != nil {
		return core.UnknownTransition, err
	}

	return core.DoTransition(phaseInfo), nil
}
Copy code
type ExecutionState struct {
	Phase ExecutionPhase

	// This will store the job ID from Armada
	JobSetId string `json:"jobSetId,omitempty"`
	JobId    string `json:"jobId,omitempty"`

	ArmadaEndpoint string
    CreationFailureCount int `json:"creation_failure_count,omitempty"`
}
I'm generally having trouble trying to understand the array node code path and where its executed to debug this
I've dumped the state gob to a string (no ideal) but it clearly shows there is some stuff being added to the plugin state that seems to be conflicting.
Copy code
K\u0003\u0001\u0001\u000bPluginState\u0001\ufffd\ufffd\u0000\u0001\u0003\u0001\u0005Phase\u0001\u0006\u0000\u0001\u000eK8sPluginState\u0001\ufffd\ufffd\u0000\u0001\u000fLastEventUpdate\u0001\ufffd\ufffd\u0000\u0000\u0000?\ufffd\ufffd\u0003\u0001\u0001\u000bPluginState\u0001\ufffd\ufffd\u0000\u0001\u0003\u0001\u0005Phase\u0001\u0004\u0000\u0001\u000cPhaseVersion\u0001\u0006\u0000\u0001\u0006Reason\u0001\u000c\u0000\u0000\u0000\u0010\ufffd\ufffd\u0005\u0001\u0001\u0004Time\u0001\ufffd\ufffd\u0000\u0000\u0000\u0005\ufffd\ufffd\u0002\u0000\u0000"
Normally the state looks as it should
Copy code
\ufffd\ufffd\u0003\u0001\u0001\u000eExecutionState\u0001\ufffd\ufffd\u0000\u0001\u0005\u0001\u0005Phase\u0001\u0004\u0000\u0001\u0008JobSetId\u0001\u000c\u0000\u0001\u0005JobId\u0001\u000c\u0000\u0001\u000eArmadaEndpoint\u0001\u000c\u0000\u0001\u0014CreationFailureCount\u0001\u0004\u0000\u0000\u0000G\ufffd\ufffd\u0001\u0002\u0001$b9497c10-9e4b-413c-a786-fa6ef6a607be\u0001\u001a01hzna22my5b8hsa6ytbhrz94b\u0000"
It looks like many plugins use
CurrentPhase
and
PreviousPhase
. But the hive plugin uses
Phase
so I'm guessing it would be problematic with array node too.
Got a different error once I renamed the
Phase
field of my execution state.
Copy code
{
  "json": {
    "exec_id": "f3a025450ebba49e7a40",
    "ns": "flytesnacks-development",
    "res_ver": "81883862",
    "routine": "worker-69",
    "wf": "flytesnacks:development:fan_out.map_reduce_array_node"
  },
  "level": "error",
  "msg": "Error when trying to reconcile workflow. Error [worker error(s) encountered: [0]: failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [armada]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: type mismatch: no fields matched compiling decoder for ExecutionState\n[1]: failed at Node[n1]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [armada]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: type mismatch: no fields matched compiling decoder for ExecutionState\n]. Error Type[*errors.errorString]",
  "ts": "2024-06-05T23:09:26Z"
}
Interesting. So the error message prior was there because there was some overlap between my custom plugin state and whatever if being injected. Now there is no overlap at all.
@Ketan (kumare3) seems that the k8s plugin manager is injecting some execution state before my Armada Plugin is being executed. Maybe I've misconfigured something because I don't think k8s plugin manager should be involved here.
Ah. The array node handler seems to seed the plugin state from the k8s plugin state. As written, I'm not sure its compatible with non k8s plugins https://github.com/flyteorg/flyte/blob/master/flytepropeller/pkg/controller/nodes/array/handler.go#L620-L642
@Haytham Abuelfutuh I think I hit the limitation you were mentioning with
map_task
.
Copy code
// TODO - if we want to support more plugin types we need to figure out the best way to store plugin state
	// currently just mocking based on node phase -> which works for all k8s plugins
	// we can not pre-allocated a bit array because max size is 256B and with 5k fanout node state = 1.28MB
	pluginStateBytes := a.pluginStateBytesStarted
h
ah right... Can we back up a sec, a question about Armada state... and the need to even have it. How are you creating jobs? is it by creating CRDs? or an API call? If it's CRDs, then we should just use the
taskExecutionContext.GeneratedName()
and then there is no need to store any state... right? If it's an API call, does it support idemotency? surrogate keys? is there a way to tell it the job id to use? have you had to setup an AutoRefreshCache to sync the status of the jobs from Armada
j
You have to use the Armada API to create jobs. If it was CRDs I would have made a k8s plugin. I believe the Armada submit job APIs are idempotent. The submit job API does take in a user controlled "jobSetId" argument which I think is just a string. I just started looking into the plugins and for the most part stripped out anything I didn't see immediately necessary to get it working but I can look into how the other plugins use the refresh cache
I see what you mean now. I tried embedding
k8s.PluginState
into my plugin's state which resolved the decoding issue but we noticed that the state was wiped during every poll, and it would kick off jobs over and over again. I think if we can use the
taskExecutionContext.GeneratedName()
as an identifier than we can forgo the plugin state and just keep the state in the in-memory auto refresh cache.
k
That’s the way to do it
h
Exactly, @Jason Parraga. The JobSetId should be the TaskExecutionContext.GeneratedName()... if you can use that in subsequent calls to interact with the job, that would be all
j
Thanks for the pointers. Just got it working with an in-memory store. Will probably do something with persistence longer term.
h
You probably don't need persistence...
propeller already persists the workflow state, and will "slowly" rebuild the AutoRefreshCache in case it restarts for example...
j
I think I see what you mean. I was following how the hive plugin operates with the AutoRefreshCache, but I think that plugin kicks off the query synchronously and then offloads to the cache for monitoring. If I update my plugin to use the cache completely I should be able to materialize the state form the execution ID and Armada
k
Ohh man hive plugin is the worst one - please let’s burn it
h
If Armada is API-based. are you using the WebAPI Plugin interface?
Because that can simplify how much you need to implement significantly
This is an example plugin: https://github.com/flyteorg/flyte/blob/master/flyteplugins/go/tasks/pluginmachinery/webapi/example/plugin.go You can see you just need to implement Create/Get/Status and they are completely stateless.. as long as you can rely on the TaskExecutionContext.GeneratedName()... it'll take care of the cache initialization, rehydration... etc. Agents are also implemented as WebAPI Plugins: https://github.com/flyteorg/flyte/blob/master/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
j
I have a regular plugin for now but I can look into the WebAPI inteface
k
Hey but would love to understand what was the problem with agents
j
You had pointed me to implementing a backend plugin because we wanted to run the native Flyte Python tasks under Armada. I suppose we could force the Python task type to use the agent service but I’m not totally sure how that would work and don’t really want to spend more cycles messing with the Python code base. I’m much more comfortable iterating in Go so I don’t mind just writing a backend plugin. Also the go support for Armada is much better than the Python support. I was forking Armadas Python client to get basic functionality which wasn’t fun.
Later on we will probably need to support gang scheduling or specific Armada queues and for that we will probably need some more custom configs at the task level. And at that point we can revisit using the agent
k
Ya I know I pointed out - oops. I think we could have just pointed propeller to use agent as default. I thought armada was a crd
j
Any reason why the
webapi.PluginSetupContext
interface is limited? I was hoping to access the secrets manager during plugin initialization but in order to work around that I'll have to lazy load my API clients during task creation
Ah doesn't matter anyway. Web API plugins blow up with array node so no point.
h
I agree, we can make the secret manager available too... it's limited because we just wanted to start with the lightest possible... Can you elaborate on the blowing up part?
j
the web api plugin core logic always tries to deserialize plugin state and it blows up when its triggered by an array node (due to what we discussed above where the k8s plugin state is seeded).
So I kept my backend plugin but just modeled it after the web api plugin and made it not deserialize any incoming plugin state. Uses the auto refresh cache as well. In order to gracefully handle reboots of propeller I'm probably going to have it query armada for existing jobs before creating a new one (when the cache is empty).
h
I see... this is very valuable feedback... we are reworking how array nodes work to make them more consistently behaving like all other nodes at the moment. RE: Reboots. If you get a Create() call, then you call the idempotent armada create API, it should be fine, right? If you get a Get() call, then you check the cache, if it's empty, you just create an item in the cache that will get checked in the background. (Is this where you are saying you will call the API inline first?) This way the only "blocking" API is "Create" to guarantee state transitions, otherwise, everything else happens in the background in a non-blocking way.
j
It looks like the Armada API isn't idempotent as I originally thought. When you call
SubmitJob
you pass a
job_set_id
which is something you control. But every time you call that it will kick off a
Job
with a random
job_id
that is tied to the
JobSet
. I just looked into the Armada API protos and I can't even pull current jobs by
JobSetId
so I don't think I can even do what I want and resume state from Armada after a propeller reboot.
Some minor cleanup while I was in the web api code: https://github.com/flyteorg/flyte/pull/5472
k
yes, they will work after we have the array node for all checked in