:thread: Hello. I have an rough Armada plugin that...
# flyte-support
c
🧵 Hello. I have an rough Armada plugin that is able to successfully run basic tasks in Armada (ie. hello world). I decided to take it a step further and use a couple of these
ArmadaTasks
that I have defined in a workflow and I seem to have run into an issue related to (de)serialization. Wanted to know if there is something obvious I'm missing before I start digging into the code.
My code
Copy code
from flytekit import dynamic, task, workflow
from flytekit.core.array_node_map_task import map_task
from flytekitplugins.armada.task import ArmadaConfig


@task(task_config=ArmadaConfig(queue="compute"), container_image="<redacted>")
def say_hello(name: str) -> str:
    print(f"Hello, {name}!")
    return f"Hello, {name}!"


@task(task_config=ArmadaConfig(queue="compute"), container_image="<redacted>")
def simple_map_task(a: int) -> int:  # noqa: D103
    return a * a


@task(task_config=ArmadaConfig(queue="compute"), container_image="<redacted>")
def simple_reduce(b: list[int]) -> int:  # noqa: D103
    return sum(b)


@task(task_config=ArmadaConfig(queue="compute"), container_image="<redaced>")
def to_list(size: int) -> list[int]:  # noqa: D103
    return list(range(size))


@workflow
def map_reduce(size: int) -> int:
    """Simple workflow to illustrate a large fan out and fan in."""
    input_array = to_list(size=size)
    output = map_task(simple_map_task)(a=input_array)
    return simple_reduce(b=output)
Run with
pyflyte run --remote example.py map_reduce --size 10
and it seems to break on the ArrayNodeTask with the following errors.
Copy code
[1]: failed at Node[n1]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[2]: failed at Node[n2]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[3]: failed at Node[n3]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[4]: failed at Node[n4]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[5]: failed at Node[n5]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[6]: failed at Node[n6]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[7]: failed at Node[n7]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[8]: failed at Node[n8]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[9]: failed at Node[n9]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
I just tried to create a workflow with only
ArmadaTasks
and that seems to work fine, so there seems to be some sort of issue plumbing data between
ArmadaTasks
and non-
ArmadaTasks
for some reason.
f
Armada plugin will not work with array node today
we are working on generalized support for all task types. This is because the plugin state can be large and supporting that in array node would cause a problem.
Any reason why you are looking into armada? Is Flyte not scaling for something?
also is this a backendplugin in golang?
ohh nm, this is a Agent plugin. how were you able to use array node for this? cc @flat-area-42876 this should have just failed right?
c
This is a Python plugin since we are prototyping. I believe our team (Stack) has spoken to you. We are running on prem and we don’t really have the ability to autoscale due to fixed capacity so we are looking to use Armada for queueing workloads instead of scheduling directly into k8s and having tons of pending workloads. We also rely on gang scheduling and I’m not sure that is supported.
f
ya, makes sense
i would love to use a gang scheduler like batch/kqueue but armada sounds cool
and agent would be loved by the community
but this is interesting, array node should not work with agents today
can you simply use
@dynamic
for now
Array node support for agents as well as any task type is coming soon
c
Yeah I can give that a shot. By coming soon is that work that is publicly in progress or something you'll roll into open source later.
f
we will upstream it from union, but it is shipping to union soon
we will upstream soon though
👍 1
c
I see, the reason why this code didn't short circuit is probably because the
ArmadaTask
extends
PythonFunctionTask
.
Copy code
class ArmadaTask(AsyncAgentExecutorMixin, PythonFunctionTask[ArmadaConfig]):
    """Wrapper around PythonFunctionTask which includes details on how to run on Armada."""
👍 1
f
hmm this is interesting, which should be ok, but still should, what is the task-type?
c
Not sure what you are referring to
f
can you share your
init
for the agent executor?
c
Copy code
def __init__(self) -> None:
        """Constructor for ArmadaAgent."""
        super().__init__(task_type_name="armada", metadata_type=ArmadaMetadata)
dynamic (that works normally with regular
@task
fails with
Copy code
[1/1] currentAttempt done. Last Error: USER::
[f91a21ca9a5bc4234935-n0-0] terminated with exit code (1). Reason [Error]. Message: 
y", line 783, in invoke
    return __callback(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/flytekit/bin/entrypoint.py", line 506, in execute_task_cmd
    _execute_task(
  File "/usr/local/lib/python3.11/site-packages/flytekit/exceptions/scopes.py", line 148, in f
    return outer_f(inner_f, args, kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/flytekit/exceptions/scopes.py", line 178, in system_entry_point
    return wrapped(*args, **kwargs)
Copy code
@dynamic
def dynamic_task(size: int) -> int:
    """DAG shape is not known at compile time so we need to use @dynamic."""
    values = []
    for i in range(size):
        value = simple_map_task(a=i)
        values.append(value)

    return simple_reduce(b=values)


@workflow
def map_reduce_dynamic(size: int) -> int:
    """Simple workflow to illustrate a large fan-out/fan-in with tasks."""
    return dynamic_task(size=size)
pyflyte run --remote example.py map_reduce_dynamic --size 10
We can probably wait for the generic task handling effort to land
f
@freezing-airport-6809 right now there's just a check if the passed in subnode is a TaskNode. We should probably add a check for the task type as well to have this fail more gracefully.
*on the propeller end. Flytekit has a check of instance type:
Copy code
if not (isinstance(actual_task, PythonFunctionTask) or isinstance(actual_task, PythonInstanceTask)):
            raise ValueError("Only PythonFunctionTask and PythonInstanceTask are supported in map tasks.")
f
interesting
c
I am back again trying to use
@dynamic
and it runs the code as a
Python
task and not the
Armada
task. I'm guessing this is expected until some changes land? I may have updated flytekit since the last time I tried to do this.
Copy code
pyflyte run --remote example.py dynamic_task --size 10
Copy code
@task(task_config=ArmadaConfig(queue="compute"), container_image="<>")
def simple_map_task(a: int) -> int:  # noqa: D103
    return a * a


@task(task_config=ArmadaConfig(queue="compute"), container_image="<>")
def simple_reduce(b: list[int]) -> int:  # noqa: D103
    return sum(b)


@dynamic(container_image="<>")
def dynamic_task(size: int) -> int:
    """DAG shape is not known at compile time so we need to use @dynamic."""
    values = []
    for i in range(size):
        value = simple_map_task(a=i)
        values.append(value)

    return simple_reduce(b=values)
f
Dynamic should use agent as dynamic is a simple literally a wf
Does a regular task work with armada
c
Yeah, if I execute a single task (or even a chain in a workflow) it runs on Armada. But dynamic throws it all into
PythonTask
and runs it on the local data plane
f
that is odd,
i will share an example that runs, you can try that too?
c
Sure!
f
ok let me try
@damp-lion-88352 can you please help verify if dynamic work with agents (I fell it does) but I don’t get a chance to get this
d
No problem
Doing it now
Yes we can
Copy code
from flytekit import dynamic, task, workflow
from flytekit.sensor.file_sensor import FileSensor

from flytekit.sensor.file_sensor import FileSensor
sensor = FileSensor(name="test_file_sensor")

@dynamic
def dyanmic_sensor():
    for _ in range(2):
        sensor(path="<s3://my-s3-bucket>") 

@workflow
def wf():
    return dyanmic_sensor()
c
I am able to reproduce what you've done.
But when I run it with my task it doesn't recognize the task type.
Copy code
@task(task_config=ArmadaConfig(queue="compute"), container_image="<>")
def say_hello(name: str) -> str:
    print(f"Hello, {name}!")
    return f"Hello, {name}!"

@dynamic(container_image="<>")
def dyanmic_sensor():
    for _ in range(2):
        say_hello(name="test")


@workflow()
def wf():
    return dyanmic_sensor()
BaseSensor
is a
PythonTask
but my
ArmadaTask
is a
PythonFunctionTask
maybe thats related..
d
I can help you take a look tomrrow morning
it's midnight now
there's a way to check if it is normal or not
If run
pythonFunctionTask
will show
pythonFunctionTask
on flyte console, then we can't run dynamic
pythonFunctionTask
on agent.
But actually, PythonFunctionTask will inherit PythonTask I guess?
c
Yeah it does..
When I run the my task directly it comes up correctly as
Armada
When I run it inside a workflow it comes up correctly too
d
Ok
Maybe there’s some improvement we can do
Thank you
c
The code is not in a state that I can share it yet but hopefully I can soon so ya'll can reproduce the issue
f
cc @glamorous-carpet-83516 if you know by chance?
d
I've tried in kubeflow pytorch task
it works
it's weird agent doesn't work
Copy code
from flytekit import Resources, task, workflow, dynamic


from flytekitplugins.kfpytorch import PyTorch, Worker

# %%
cpu_request = "500m"
mem_request = "500Mi"
gpu_request = "0"
mem_limit = "500Mi"
gpu_limit = "0"


# %%
@task(
    task_config=PyTorch(worker=Worker(replicas=2)),
    retries=2,
    # cache=True,
    # cache_version="0.1",
    requests=Resources(cpu=cpu_request, mem=mem_request, gpu=gpu_request),
    limits=Resources(mem=mem_limit, gpu=gpu_limit),
)
def mnist_pytorch_job() -> str:
    return "Hi"
    
@dynamic
def d_wf():
    for _ in range(2):
        mnist_pytorch_job()

@workflow
def wf():
    d_wf()
kubeflow pytorch is also
PythonFunctionTask
🙇 2
I got some setup problem for the only one agent use
PythonFunctionTask
, Databricks Agent.
Will try it after fix it
f
@damp-lion-88352 you rock!
🙏 1
c
Circling back to this. So it seems like this might be an issue with the agent vs the plugin. It seems that if we are willing to we should probably just develop a go based plugin instead.
g
Hey @clean-glass-36808 what’s the current issue you run into with the agent? you can’t use it in the map task?
c
I can't use agent based tasks with dynamic workflows and map tasks. Seems to end up running at a generic
PythonTask
on the flyte data plane which is not what we want. ie.
Copy code
@workflow
def map_reduce(size: int) -> int:
    """Simple workflow to illustrate a large fan out and fan in."""
    input_array = to_list(size=size)
    output = map_task(simple_map_task)(a=input_array)
    return simple_reduce(b=output)


@workflow
def map_reduce_dynamic(size: int) -> int:
    """Simple workflow to illustrate a large fan-out/fan-in with tasks."""
    return dynamic_task(size=size)
I think for dynamic it will work as a PythonTask but I think for map_task there was some serialization related issue.
g
Regarding the dynamic, how does the task look like
Copy code
@dynamic
def d1():
  agent_task(...)
we are going to test it. Using go plugin may have the same issue.
c
It seems that @damp-lion-88352 has shown that it works with a go plugin since he tested wit PyTorch? I only see a task defined for PyTorch and it looks like there is some operator implementation in the main
flyte
repo in go.
Copy code
@task(task_config=ArmadaConfig(queue="compute"), container_image="<>")
def simple_map_task(a: int) -> int:  # noqa: D103
    return a * a


@task(task_config=ArmadaConfig(queue="compute"), container_image="<>")
def simple_reduce(b: list[int]) -> int:  # noqa: D103
    return sum(b)


@dynamic(container_image="<>")
def dynamic_task(size: int) -> int:
    """DAG shape is not known at compile time so we need to use @dynamic."""
    values = []
    for i in range(size):
        value = simple_map_task(a=i)
        values.append(value)

    return simple_reduce(b=values)
d
Kevin and I find the root cause
We currently do not support using PythonFunctionTasks (except for pod tasks) in map tasks.
So if a task is created by K8S plugin or agent, it will probably fail
Will discuss with Kevin how to fix it later, thank you for catching the bug
c
Thanks for looking into it
🙏 1
f
I am confused is the problem with map task or dynamic. If it’s map we already discussed that map is not supported today for anything but simple tasks and pods. More exotic support is in testing right now
d
dynamic works for agent task and k8s plugin task
map taks will fail for agent task and k8s plugin task
c
I still haven’t got dynamic to work for an agent plugin task. You tested against PyTorch which wasn’t an agent plugin?
d
Yes it's not
dynamic should work
f
That is odd
g
@freezing-airport-6809 do we support list of promise?
Copy code
@dynamic(container_image="<>")
def dynamic_task(size: int) -> int:
    """DAG shape is not known at compile time so we need to use @dynamic."""
    values = []
    for i in range(size):
        value = simple_map_task(a=i)
        values.append(value)

    return simple_reduce(b=values)  <- values here is [promise, promise, promise]
I remember we can’t do that
d
Copy code
from flytekit import task, dynamic

@task()
def simple_map_task(a: int) -> int:  # noqa: D103
    return a * a


@task()
def simple_reduce(b: list[int]) -> int:  # noqa: D103
    return sum(b)


@dynamic
def dynamic_task(size: int) -> int:
    """DAG shape is not known at compile time so we need to use @dynamic."""
    values = []
    for i in range(size):
        value = simple_map_task(a=i)
        values.append(value)

    return simple_reduce(b=values)
f
Yes we do
d
We have error in local execution
I just removed the config
f
What ok I will tal in few minutes with kids now
Something is wrong in your code
d
Copy code
from flytekit import task, dynamic, workflow

@task()
def simple_map_task(a: int) -> int:  # noqa: D103
    return a * a


@task()
def simple_reduce(b: list[int]) -> int:  # noqa: D103
    return sum(b)


@dynamic
def dynamic_task(size: int) -> int:
    """DAG shape is not known at compile time so we need to use @dynamic."""
    values = []
    for i in range(size):
        value = simple_map_task(a=i)
        values.append(value)

    return simple_reduce(b=values)

@workflow
def wf(size: int) -> int:
    return dynamic_task(size=size)

if __name__ == "__main__":
    print(f"Running {__file__} main...")
    print(f"Output: {wf(size=5)}")
This will work
@clean-glass-36808 Would you give it a try with your Armada config?
c
Sure
f
I have a guess
The armada config task type is not available at runtime
Is this defined locally?
c
The issue is that it runs on Flyte, but it runs the code as a Python task and not as a custom task. There is no issue with pyflyte-execute actually running
g
so the code is running in the new container as regular python task, not sending to the agent?
c
Correct
f
Ya my guess is because task type is not set correctly
If you look at the Ui what do you see as the task type
Happy to hop On a quick call in a few minutes
c
I'm running @damp-lion-88352’s code (which doesn't seem much different than my original code?). Can show you in a sec
🙏 1
My code.
Copy code
@task(task_config=ArmadaConfig(queue="compute"), container_image="redacted")
def simple_map_task(a: int) -> int:  # noqa: D103
    return a * a


@task(task_config=ArmadaConfig(queue="compute"), container_image="redacted")
def simple_reduce(b: list[int]) -> int:  # noqa: D103
    return sum(b)


@dynamic(container_image="redacted")
def dynamic_task(size: int) -> int:
    """DAG shape is not known at compile time so we need to use @dynamic."""
    values = []
    for i in range(size):
        value = simple_map_task(a=i)
        values.append(value)

    return simple_reduce(b=values)


@workflow
def wf(size: int) -> int:
    return dynamic_task(size=size)
Not recognized as armada tasks
g
if you pyflyte run
simple_map_task
, does it show armada on UI?
c
Now if I just run the simple_map_task it detects correctly as Armada (and blows up because the Armada cluster is unstable)
g
got it, thanks for sharing. something wrong during serialization. looking at it
🙏 1
f
@glamorous-carpet-83516 / @clean-glass-36808 did you folks figure it out
happy to hop on a call now
d
Databricks Agent, Python Function Task works ?
f
d
Kevin and I are in this meeting
c
I’m AFK, be back in 10
d
f
@clean-glass-36808 so let me summarize what i learnt. The agent (example databricks agent) works with dynamic. I am pretty sure, there is a bug in your agent code
d
This databricks agent example works for me.
Copy code
from flytekit import task, dynamic, workflow
from flytekitplugins.spark import Databricks

# pyflyte run --remote --image localhost:30000/databricks-map:0531 databricks_example.py wf --size 3

@task(task_config=Databricks(
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
        },
        databricks_conf={
            "run_name": "flytekit databricks plugin example",
            "new_cluster": {
                "spark_version": "11.0.x-scala2.12",
                "node_type_id": "r3.xlarge",
                "aws_attributes": {
                    "availability": "ON_DEMAND",
                    "instance_profile_arn": "arn:aws:iam::<AWS_ACCOUNT_ID_DATABRICKS>:instance-profile/databricks-flyte-integration",
                },
                "num_workers": 4,
            },
            "timeout_seconds": 3600,
            "max_retries": 1,
        },
    ),
)
def simple_map_task(a: int):
    return


@task(task_config=Databricks(
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
        },
        databricks_conf={
            "run_name": "flytekit databricks plugin example",
            "new_cluster": {
                "spark_version": "11.0.x-scala2.12",
                "node_type_id": "r3.xlarge",
                "aws_attributes": {
                    "availability": "ON_DEMAND",
                    "instance_profile_arn": "arn:aws:iam::<AWS_ACCOUNT_ID_DATABRICKS>:instance-profile/databricks-flyte-integration",
                },
                "num_workers": 4,
            },
            "timeout_seconds": 3600,
            "max_retries": 1,
        },
    ),
)
def simple_reduce(b: list[int]):  # noqa: D103
    return


@dynamic
def dynamic_task(size: int):
    """DAG shape is not known at compile time so we need to use @dynamic."""
    values = []
    for i in range(size):
        simple_map_task(a=i)
        values.append(i)

    simple_reduce(b=values)

    return 

@workflow
def wf(size: int):
    dynamic_task(size=size)

if __name__ == "__main__":
    print(f"Running {__file__} main...")
    print(f"Output: {wf(size=5)}")