clean-glass-36808
05/17/2024, 10:54 PMArmadaTasks
that I have defined in a workflow and I seem to have run into an issue related to (de)serialization. Wanted to know if there is something obvious I'm missing before I start digging into the code.clean-glass-36808
05/17/2024, 10:55 PMfrom flytekit import dynamic, task, workflow
from flytekit.core.array_node_map_task import map_task
from flytekitplugins.armada.task import ArmadaConfig
@task(task_config=ArmadaConfig(queue="compute"), container_image="<redacted>")
def say_hello(name: str) -> str:
print(f"Hello, {name}!")
return f"Hello, {name}!"
@task(task_config=ArmadaConfig(queue="compute"), container_image="<redacted>")
def simple_map_task(a: int) -> int: # noqa: D103
return a * a
@task(task_config=ArmadaConfig(queue="compute"), container_image="<redacted>")
def simple_reduce(b: list[int]) -> int: # noqa: D103
return sum(b)
@task(task_config=ArmadaConfig(queue="compute"), container_image="<redaced>")
def to_list(size: int) -> list[int]: # noqa: D103
return list(range(size))
@workflow
def map_reduce(size: int) -> int:
"""Simple workflow to illustrate a large fan out and fan in."""
input_array = to_list(size=size)
output = map_task(simple_map_task)(a=input_array)
return simple_reduce(b=output)
clean-glass-36808
05/17/2024, 10:56 PMpyflyte run --remote example.py map_reduce --size 10
and it seems to break on the ArrayNodeTask with the following errors.
[1]: failed at Node[n1]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[2]: failed at Node[n2]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[3]: failed at Node[n3]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[4]: failed at Node[n4]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[5]: failed at Node[n5]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[6]: failed at Node[n6]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[7]: failed at Node[n7]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[8]: failed at Node[n8]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[9]: failed at Node[n9]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
clean-glass-36808
05/17/2024, 10:56 PMclean-glass-36808
05/17/2024, 11:03 PMArmadaTasks
and that seems to work fine, so there seems to be some sort of issue plumbing data between ArmadaTasks
and non-ArmadaTasks
for some reason.freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
clean-glass-36808
05/17/2024, 11:21 PMfreezing-airport-6809
freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
@dynamic
for nowfreezing-airport-6809
clean-glass-36808
05/17/2024, 11:30 PMfreezing-airport-6809
freezing-airport-6809
clean-glass-36808
05/17/2024, 11:32 PMArmadaTask
extends PythonFunctionTask
.
class ArmadaTask(AsyncAgentExecutorMixin, PythonFunctionTask[ArmadaConfig]):
"""Wrapper around PythonFunctionTask which includes details on how to run on Armada."""
freezing-airport-6809
clean-glass-36808
05/17/2024, 11:36 PMfreezing-airport-6809
init
for the agent executor?clean-glass-36808
05/17/2024, 11:39 PMdef __init__(self) -> None:
"""Constructor for ArmadaAgent."""
super().__init__(task_type_name="armada", metadata_type=ArmadaMetadata)
clean-glass-36808
05/17/2024, 11:44 PM@task
fails with
[1/1] currentAttempt done. Last Error: USER::
[f91a21ca9a5bc4234935-n0-0] terminated with exit code (1). Reason [Error]. Message:
y", line 783, in invoke
return __callback(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/flytekit/bin/entrypoint.py", line 506, in execute_task_cmd
_execute_task(
File "/usr/local/lib/python3.11/site-packages/flytekit/exceptions/scopes.py", line 148, in f
return outer_f(inner_f, args, kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/flytekit/exceptions/scopes.py", line 178, in system_entry_point
return wrapped(*args, **kwargs)
clean-glass-36808
05/17/2024, 11:45 PM@dynamic
def dynamic_task(size: int) -> int:
"""DAG shape is not known at compile time so we need to use @dynamic."""
values = []
for i in range(size):
value = simple_map_task(a=i)
values.append(value)
return simple_reduce(b=values)
@workflow
def map_reduce_dynamic(size: int) -> int:
"""Simple workflow to illustrate a large fan-out/fan-in with tasks."""
return dynamic_task(size=size)
pyflyte run --remote example.py map_reduce_dynamic --size 10
clean-glass-36808
05/17/2024, 11:46 PMflat-area-42876
05/18/2024, 12:00 AMflat-area-42876
05/18/2024, 12:02 AMif not (isinstance(actual_task, PythonFunctionTask) or isinstance(actual_task, PythonInstanceTask)):
raise ValueError("Only PythonFunctionTask and PythonInstanceTask are supported in map tasks.")
freezing-airport-6809
clean-glass-36808
05/23/2024, 9:50 PM@dynamic
and it runs the code as a Python
task and not the Armada
task. I'm guessing this is expected until some changes land? I may have updated flytekit since the last time I tried to do this.
pyflyte run --remote example.py dynamic_task --size 10
@task(task_config=ArmadaConfig(queue="compute"), container_image="<>")
def simple_map_task(a: int) -> int: # noqa: D103
return a * a
@task(task_config=ArmadaConfig(queue="compute"), container_image="<>")
def simple_reduce(b: list[int]) -> int: # noqa: D103
return sum(b)
@dynamic(container_image="<>")
def dynamic_task(size: int) -> int:
"""DAG shape is not known at compile time so we need to use @dynamic."""
values = []
for i in range(size):
value = simple_map_task(a=i)
values.append(value)
return simple_reduce(b=values)
freezing-airport-6809
freezing-airport-6809
clean-glass-36808
05/23/2024, 11:13 PMPythonTask
and runs it on the local data planefreezing-airport-6809
freezing-airport-6809
clean-glass-36808
05/24/2024, 5:48 AMfreezing-airport-6809
freezing-airport-6809
damp-lion-88352
05/24/2024, 2:50 PMdamp-lion-88352
05/24/2024, 2:50 PMdamp-lion-88352
05/24/2024, 2:57 PMdamp-lion-88352
05/24/2024, 2:57 PMfrom flytekit import dynamic, task, workflow
from flytekit.sensor.file_sensor import FileSensor
from flytekit.sensor.file_sensor import FileSensor
sensor = FileSensor(name="test_file_sensor")
@dynamic
def dyanmic_sensor():
for _ in range(2):
sensor(path="<s3://my-s3-bucket>")
@workflow
def wf():
return dyanmic_sensor()
damp-lion-88352
05/24/2024, 2:58 PMdamp-lion-88352
05/24/2024, 2:58 PMclean-glass-36808
05/24/2024, 4:32 PMclean-glass-36808
05/24/2024, 4:33 PM@task(task_config=ArmadaConfig(queue="compute"), container_image="<>")
def say_hello(name: str) -> str:
print(f"Hello, {name}!")
return f"Hello, {name}!"
@dynamic(container_image="<>")
def dyanmic_sensor():
for _ in range(2):
say_hello(name="test")
@workflow()
def wf():
return dyanmic_sensor()
clean-glass-36808
05/24/2024, 4:35 PMBaseSensor
is a PythonTask
but my ArmadaTask
is a PythonFunctionTask
maybe thats related..damp-lion-88352
05/24/2024, 4:36 PMdamp-lion-88352
05/24/2024, 4:36 PMdamp-lion-88352
05/24/2024, 4:36 PMdamp-lion-88352
05/24/2024, 4:41 PMpythonFunctionTask
will show pythonFunctionTask
on flyte console, then we can't run dynamic pythonFunctionTask
on agent.damp-lion-88352
05/24/2024, 4:42 PMclean-glass-36808
05/24/2024, 4:43 PMclean-glass-36808
05/24/2024, 4:44 PMArmada
clean-glass-36808
05/24/2024, 4:46 PMdamp-lion-88352
05/24/2024, 5:15 PMdamp-lion-88352
05/24/2024, 5:15 PMdamp-lion-88352
05/24/2024, 5:15 PMclean-glass-36808
05/24/2024, 5:16 PMfreezing-airport-6809
damp-lion-88352
05/25/2024, 4:46 AMdamp-lion-88352
05/25/2024, 4:46 AMdamp-lion-88352
05/25/2024, 4:46 AMdamp-lion-88352
05/25/2024, 4:46 AMfrom flytekit import Resources, task, workflow, dynamic
from flytekitplugins.kfpytorch import PyTorch, Worker
# %%
cpu_request = "500m"
mem_request = "500Mi"
gpu_request = "0"
mem_limit = "500Mi"
gpu_limit = "0"
# %%
@task(
task_config=PyTorch(worker=Worker(replicas=2)),
retries=2,
# cache=True,
# cache_version="0.1",
requests=Resources(cpu=cpu_request, mem=mem_request, gpu=gpu_request),
limits=Resources(mem=mem_limit, gpu=gpu_limit),
)
def mnist_pytorch_job() -> str:
return "Hi"
@dynamic
def d_wf():
for _ in range(2):
mnist_pytorch_job()
@workflow
def wf():
d_wf()
damp-lion-88352
05/25/2024, 4:46 AMdamp-lion-88352
05/25/2024, 4:47 AMPythonFunctionTask
damp-lion-88352
05/25/2024, 4:50 AMPythonFunctionTask
, Databricks Agent.damp-lion-88352
05/25/2024, 4:50 AMfreezing-airport-6809
clean-glass-36808
05/30/2024, 6:32 PMglamorous-carpet-83516
05/30/2024, 6:37 PMclean-glass-36808
05/30/2024, 6:39 PMPythonTask
on the flyte data plane which is not what we want.
ie.
@workflow
def map_reduce(size: int) -> int:
"""Simple workflow to illustrate a large fan out and fan in."""
input_array = to_list(size=size)
output = map_task(simple_map_task)(a=input_array)
return simple_reduce(b=output)
@workflow
def map_reduce_dynamic(size: int) -> int:
"""Simple workflow to illustrate a large fan-out/fan-in with tasks."""
return dynamic_task(size=size)
clean-glass-36808
05/30/2024, 6:41 PMglamorous-carpet-83516
05/30/2024, 6:44 PM@dynamic
def d1():
agent_task(...)
glamorous-carpet-83516
05/30/2024, 6:47 PMclean-glass-36808
05/30/2024, 6:56 PMflyte
repo in go.clean-glass-36808
05/30/2024, 6:57 PM@task(task_config=ArmadaConfig(queue="compute"), container_image="<>")
def simple_map_task(a: int) -> int: # noqa: D103
return a * a
@task(task_config=ArmadaConfig(queue="compute"), container_image="<>")
def simple_reduce(b: list[int]) -> int: # noqa: D103
return sum(b)
@dynamic(container_image="<>")
def dynamic_task(size: int) -> int:
"""DAG shape is not known at compile time so we need to use @dynamic."""
values = []
for i in range(size):
value = simple_map_task(a=i)
values.append(value)
return simple_reduce(b=values)
damp-lion-88352
05/31/2024, 2:57 AMdamp-lion-88352
05/31/2024, 2:58 AMdamp-lion-88352
05/31/2024, 2:59 AMdamp-lion-88352
05/31/2024, 3:00 AMclean-glass-36808
05/31/2024, 3:11 AMfreezing-airport-6809
damp-lion-88352
05/31/2024, 3:31 AMdamp-lion-88352
05/31/2024, 3:32 AMclean-glass-36808
05/31/2024, 3:43 AMdamp-lion-88352
05/31/2024, 3:43 AMdamp-lion-88352
05/31/2024, 3:43 AMfreezing-airport-6809
glamorous-carpet-83516
05/31/2024, 3:45 AM@dynamic(container_image="<>")
def dynamic_task(size: int) -> int:
"""DAG shape is not known at compile time so we need to use @dynamic."""
values = []
for i in range(size):
value = simple_map_task(a=i)
values.append(value)
return simple_reduce(b=values) <- values here is [promise, promise, promise]
glamorous-carpet-83516
05/31/2024, 3:45 AMdamp-lion-88352
05/31/2024, 3:45 AMfrom flytekit import task, dynamic
@task()
def simple_map_task(a: int) -> int: # noqa: D103
return a * a
@task()
def simple_reduce(b: list[int]) -> int: # noqa: D103
return sum(b)
@dynamic
def dynamic_task(size: int) -> int:
"""DAG shape is not known at compile time so we need to use @dynamic."""
values = []
for i in range(size):
value = simple_map_task(a=i)
values.append(value)
return simple_reduce(b=values)
freezing-airport-6809
damp-lion-88352
05/31/2024, 3:46 AMdamp-lion-88352
05/31/2024, 3:46 AMdamp-lion-88352
05/31/2024, 3:46 AMfreezing-airport-6809
freezing-airport-6809
damp-lion-88352
05/31/2024, 3:48 AMfrom flytekit import task, dynamic, workflow
@task()
def simple_map_task(a: int) -> int: # noqa: D103
return a * a
@task()
def simple_reduce(b: list[int]) -> int: # noqa: D103
return sum(b)
@dynamic
def dynamic_task(size: int) -> int:
"""DAG shape is not known at compile time so we need to use @dynamic."""
values = []
for i in range(size):
value = simple_map_task(a=i)
values.append(value)
return simple_reduce(b=values)
@workflow
def wf(size: int) -> int:
return dynamic_task(size=size)
if __name__ == "__main__":
print(f"Running {__file__} main...")
print(f"Output: {wf(size=5)}")
damp-lion-88352
05/31/2024, 3:48 AMdamp-lion-88352
05/31/2024, 3:48 AMclean-glass-36808
05/31/2024, 3:49 AMfreezing-airport-6809
freezing-airport-6809
freezing-airport-6809
clean-glass-36808
05/31/2024, 3:52 AMglamorous-carpet-83516
05/31/2024, 3:54 AMclean-glass-36808
05/31/2024, 3:54 AMfreezing-airport-6809
freezing-airport-6809
freezing-airport-6809
clean-glass-36808
05/31/2024, 3:56 AMclean-glass-36808
05/31/2024, 3:58 AM@task(task_config=ArmadaConfig(queue="compute"), container_image="redacted")
def simple_map_task(a: int) -> int: # noqa: D103
return a * a
@task(task_config=ArmadaConfig(queue="compute"), container_image="redacted")
def simple_reduce(b: list[int]) -> int: # noqa: D103
return sum(b)
@dynamic(container_image="redacted")
def dynamic_task(size: int) -> int:
"""DAG shape is not known at compile time so we need to use @dynamic."""
values = []
for i in range(size):
value = simple_map_task(a=i)
values.append(value)
return simple_reduce(b=values)
@workflow
def wf(size: int) -> int:
return dynamic_task(size=size)
Not recognized as armada tasksglamorous-carpet-83516
05/31/2024, 3:59 AMsimple_map_task
, does it show armada on UI?clean-glass-36808
05/31/2024, 4:00 AMglamorous-carpet-83516
05/31/2024, 4:01 AMfreezing-airport-6809
freezing-airport-6809
damp-lion-88352
05/31/2024, 4:32 AMdamp-lion-88352
05/31/2024, 4:33 AMfreezing-airport-6809
damp-lion-88352
05/31/2024, 4:33 AMdamp-lion-88352
05/31/2024, 4:34 AMclean-glass-36808
05/31/2024, 4:35 AMdamp-lion-88352
05/31/2024, 4:39 AMfreezing-airport-6809
damp-lion-88352
05/31/2024, 4:39 AMfrom flytekit import task, dynamic, workflow
from flytekitplugins.spark import Databricks
# pyflyte run --remote --image localhost:30000/databricks-map:0531 databricks_example.py wf --size 3
@task(task_config=Databricks(
spark_conf={
"spark.driver.memory": "1000M",
"spark.executor.memory": "1000M",
"spark.executor.cores": "1",
"spark.executor.instances": "2",
"spark.driver.cores": "1",
},
databricks_conf={
"run_name": "flytekit databricks plugin example",
"new_cluster": {
"spark_version": "11.0.x-scala2.12",
"node_type_id": "r3.xlarge",
"aws_attributes": {
"availability": "ON_DEMAND",
"instance_profile_arn": "arn:aws:iam::<AWS_ACCOUNT_ID_DATABRICKS>:instance-profile/databricks-flyte-integration",
},
"num_workers": 4,
},
"timeout_seconds": 3600,
"max_retries": 1,
},
),
)
def simple_map_task(a: int):
return
@task(task_config=Databricks(
spark_conf={
"spark.driver.memory": "1000M",
"spark.executor.memory": "1000M",
"spark.executor.cores": "1",
"spark.executor.instances": "2",
"spark.driver.cores": "1",
},
databricks_conf={
"run_name": "flytekit databricks plugin example",
"new_cluster": {
"spark_version": "11.0.x-scala2.12",
"node_type_id": "r3.xlarge",
"aws_attributes": {
"availability": "ON_DEMAND",
"instance_profile_arn": "arn:aws:iam::<AWS_ACCOUNT_ID_DATABRICKS>:instance-profile/databricks-flyte-integration",
},
"num_workers": 4,
},
"timeout_seconds": 3600,
"max_retries": 1,
},
),
)
def simple_reduce(b: list[int]): # noqa: D103
return
@dynamic
def dynamic_task(size: int):
"""DAG shape is not known at compile time so we need to use @dynamic."""
values = []
for i in range(size):
simple_map_task(a=i)
values.append(i)
simple_reduce(b=values)
return
@workflow
def wf(size: int):
dynamic_task(size=size)
if __name__ == "__main__":
print(f"Running {__file__} main...")
print(f"Output: {wf(size=5)}")