clean-glass-36808
05/17/2024, 10:54 PMArmadaTasksclean-glass-36808
05/17/2024, 10:55 PMfrom flytekit import dynamic, task, workflow
from flytekit.core.array_node_map_task import map_task
from flytekitplugins.armada.task import ArmadaConfig
@task(task_config=ArmadaConfig(queue="compute"), container_image="<redacted>")
def say_hello(name: str) -> str:
    print(f"Hello, {name}!")
    return f"Hello, {name}!"
@task(task_config=ArmadaConfig(queue="compute"), container_image="<redacted>")
def simple_map_task(a: int) -> int:  # noqa: D103
    return a * a
@task(task_config=ArmadaConfig(queue="compute"), container_image="<redacted>")
def simple_reduce(b: list[int]) -> int:  # noqa: D103
    return sum(b)
@task(task_config=ArmadaConfig(queue="compute"), container_image="<redaced>")
def to_list(size: int) -> list[int]:  # noqa: D103
    return list(range(size))
@workflow
def map_reduce(size: int) -> int:
    """Simple workflow to illustrate a large fan out and fan in."""
    input_array = to_list(size=size)
    output = map_task(simple_map_task)(a=input_array)
    return simple_reduce(b=output)clean-glass-36808
05/17/2024, 10:56 PMpyflyte run --remote example.py map_reduce --size 10[1]: failed at Node[n1]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[2]: failed at Node[n2]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[3]: failed at Node[n3]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[4]: failed at Node[n4]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[5]: failed at Node[n5]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[6]: failed at Node[n6]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[7]: failed at Node[n7]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[8]: failed at Node[n8]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phase
[9]: failed at Node[n9]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [agent-service]: [CorruptedPluginState] Failed to unmarshal custom state in Handle, caused by: gob: wrong type (webapi.Phase) for received field PluginState.Phaseclean-glass-36808
05/17/2024, 10:56 PMclean-glass-36808
05/17/2024, 11:03 PMArmadaTasksArmadaTasksArmadaTasksfreezing-airport-6809
freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
clean-glass-36808
05/17/2024, 11:21 PMfreezing-airport-6809
freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
@dynamicfreezing-airport-6809
clean-glass-36808
05/17/2024, 11:30 PMfreezing-airport-6809
freezing-airport-6809
clean-glass-36808
05/17/2024, 11:32 PMArmadaTaskPythonFunctionTaskclass ArmadaTask(AsyncAgentExecutorMixin, PythonFunctionTask[ArmadaConfig]):
    """Wrapper around PythonFunctionTask which includes details on how to run on Armada."""freezing-airport-6809
clean-glass-36808
05/17/2024, 11:36 PMfreezing-airport-6809
initclean-glass-36808
05/17/2024, 11:39 PMdef __init__(self) -> None:
        """Constructor for ArmadaAgent."""
        super().__init__(task_type_name="armada", metadata_type=ArmadaMetadata)clean-glass-36808
05/17/2024, 11:44 PM@task[1/1] currentAttempt done. Last Error: USER::
[f91a21ca9a5bc4234935-n0-0] terminated with exit code (1). Reason [Error]. Message: 
y", line 783, in invoke
    return __callback(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/flytekit/bin/entrypoint.py", line 506, in execute_task_cmd
    _execute_task(
  File "/usr/local/lib/python3.11/site-packages/flytekit/exceptions/scopes.py", line 148, in f
    return outer_f(inner_f, args, kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/flytekit/exceptions/scopes.py", line 178, in system_entry_point
    return wrapped(*args, **kwargs)clean-glass-36808
05/17/2024, 11:45 PM@dynamic
def dynamic_task(size: int) -> int:
    """DAG shape is not known at compile time so we need to use @dynamic."""
    values = []
    for i in range(size):
        value = simple_map_task(a=i)
        values.append(value)
    return simple_reduce(b=values)
@workflow
def map_reduce_dynamic(size: int) -> int:
    """Simple workflow to illustrate a large fan-out/fan-in with tasks."""
    return dynamic_task(size=size)pyflyte run --remote example.py map_reduce_dynamic --size 10clean-glass-36808
05/17/2024, 11:46 PMflat-area-42876
05/18/2024, 12:00 AMflat-area-42876
05/18/2024, 12:02 AMif not (isinstance(actual_task, PythonFunctionTask) or isinstance(actual_task, PythonInstanceTask)):
            raise ValueError("Only PythonFunctionTask and PythonInstanceTask are supported in map tasks.")freezing-airport-6809
clean-glass-36808
05/23/2024, 9:50 PM@dynamicPythonArmadapyflyte run --remote example.py dynamic_task --size 10@task(task_config=ArmadaConfig(queue="compute"), container_image="<>")
def simple_map_task(a: int) -> int:  # noqa: D103
    return a * a
@task(task_config=ArmadaConfig(queue="compute"), container_image="<>")
def simple_reduce(b: list[int]) -> int:  # noqa: D103
    return sum(b)
@dynamic(container_image="<>")
def dynamic_task(size: int) -> int:
    """DAG shape is not known at compile time so we need to use @dynamic."""
    values = []
    for i in range(size):
        value = simple_map_task(a=i)
        values.append(value)
    return simple_reduce(b=values)freezing-airport-6809
freezing-airport-6809
clean-glass-36808
05/23/2024, 11:13 PMPythonTaskfreezing-airport-6809
freezing-airport-6809
clean-glass-36808
05/24/2024, 5:48 AMfreezing-airport-6809
freezing-airport-6809
damp-lion-88352
05/24/2024, 2:50 PMdamp-lion-88352
05/24/2024, 2:50 PMdamp-lion-88352
05/24/2024, 2:57 PMdamp-lion-88352
05/24/2024, 2:57 PMfrom flytekit import dynamic, task, workflow
from flytekit.sensor.file_sensor import FileSensor
from flytekit.sensor.file_sensor import FileSensor
sensor = FileSensor(name="test_file_sensor")
@dynamic
def dyanmic_sensor():
    for _ in range(2):
        sensor(path="<s3://my-s3-bucket>") 
@workflow
def wf():
    return dyanmic_sensor()damp-lion-88352
05/24/2024, 2:58 PMdamp-lion-88352
05/24/2024, 2:58 PMclean-glass-36808
05/24/2024, 4:32 PMclean-glass-36808
05/24/2024, 4:33 PM@task(task_config=ArmadaConfig(queue="compute"), container_image="<>")
def say_hello(name: str) -> str:
    print(f"Hello, {name}!")
    return f"Hello, {name}!"
@dynamic(container_image="<>")
def dyanmic_sensor():
    for _ in range(2):
        say_hello(name="test")
@workflow()
def wf():
    return dyanmic_sensor()clean-glass-36808
05/24/2024, 4:35 PMBaseSensorPythonTaskArmadaTaskPythonFunctionTaskdamp-lion-88352
05/24/2024, 4:36 PMdamp-lion-88352
05/24/2024, 4:36 PMdamp-lion-88352
05/24/2024, 4:36 PMdamp-lion-88352
05/24/2024, 4:41 PMpythonFunctionTaskpythonFunctionTaskpythonFunctionTaskdamp-lion-88352
05/24/2024, 4:42 PMclean-glass-36808
05/24/2024, 4:43 PMclean-glass-36808
05/24/2024, 4:44 PMArmadaclean-glass-36808
05/24/2024, 4:46 PMdamp-lion-88352
05/24/2024, 5:15 PMdamp-lion-88352
05/24/2024, 5:15 PMdamp-lion-88352
05/24/2024, 5:15 PMclean-glass-36808
05/24/2024, 5:16 PMfreezing-airport-6809
damp-lion-88352
05/25/2024, 4:46 AMdamp-lion-88352
05/25/2024, 4:46 AMdamp-lion-88352
05/25/2024, 4:46 AMdamp-lion-88352
05/25/2024, 4:46 AMfrom flytekit import Resources, task, workflow, dynamic
from flytekitplugins.kfpytorch import PyTorch, Worker
# %%
cpu_request = "500m"
mem_request = "500Mi"
gpu_request = "0"
mem_limit = "500Mi"
gpu_limit = "0"
# %%
@task(
    task_config=PyTorch(worker=Worker(replicas=2)),
    retries=2,
    # cache=True,
    # cache_version="0.1",
    requests=Resources(cpu=cpu_request, mem=mem_request, gpu=gpu_request),
    limits=Resources(mem=mem_limit, gpu=gpu_limit),
)
def mnist_pytorch_job() -> str:
    return "Hi"
    
@dynamic
def d_wf():
    for _ in range(2):
        mnist_pytorch_job()
@workflow
def wf():
    d_wf()damp-lion-88352
05/25/2024, 4:46 AMdamp-lion-88352
05/25/2024, 4:47 AMPythonFunctionTaskdamp-lion-88352
05/25/2024, 4:50 AMPythonFunctionTaskdamp-lion-88352
05/25/2024, 4:50 AMfreezing-airport-6809
clean-glass-36808
05/30/2024, 6:32 PMglamorous-carpet-83516
05/30/2024, 6:37 PMclean-glass-36808
05/30/2024, 6:39 PMPythonTask@workflow
def map_reduce(size: int) -> int:
    """Simple workflow to illustrate a large fan out and fan in."""
    input_array = to_list(size=size)
    output = map_task(simple_map_task)(a=input_array)
    return simple_reduce(b=output)
@workflow
def map_reduce_dynamic(size: int) -> int:
    """Simple workflow to illustrate a large fan-out/fan-in with tasks."""
    return dynamic_task(size=size)clean-glass-36808
05/30/2024, 6:41 PMglamorous-carpet-83516
05/30/2024, 6:44 PM@dynamic
def d1():
  agent_task(...)glamorous-carpet-83516
05/30/2024, 6:47 PMclean-glass-36808
05/30/2024, 6:56 PMflyteclean-glass-36808
05/30/2024, 6:57 PM@task(task_config=ArmadaConfig(queue="compute"), container_image="<>")
def simple_map_task(a: int) -> int:  # noqa: D103
    return a * a
@task(task_config=ArmadaConfig(queue="compute"), container_image="<>")
def simple_reduce(b: list[int]) -> int:  # noqa: D103
    return sum(b)
@dynamic(container_image="<>")
def dynamic_task(size: int) -> int:
    """DAG shape is not known at compile time so we need to use @dynamic."""
    values = []
    for i in range(size):
        value = simple_map_task(a=i)
        values.append(value)
    return simple_reduce(b=values)damp-lion-88352
05/31/2024, 2:57 AMdamp-lion-88352
05/31/2024, 2:58 AMdamp-lion-88352
05/31/2024, 2:59 AMdamp-lion-88352
05/31/2024, 3:00 AMclean-glass-36808
05/31/2024, 3:11 AMfreezing-airport-6809
damp-lion-88352
05/31/2024, 3:31 AMdamp-lion-88352
05/31/2024, 3:32 AMclean-glass-36808
05/31/2024, 3:43 AMdamp-lion-88352
05/31/2024, 3:43 AMdamp-lion-88352
05/31/2024, 3:43 AMfreezing-airport-6809
glamorous-carpet-83516
05/31/2024, 3:45 AM@dynamic(container_image="<>")
def dynamic_task(size: int) -> int:
    """DAG shape is not known at compile time so we need to use @dynamic."""
    values = []
    for i in range(size):
        value = simple_map_task(a=i)
        values.append(value)
    return simple_reduce(b=values)  <- values here is [promise, promise, promise]glamorous-carpet-83516
05/31/2024, 3:45 AMdamp-lion-88352
05/31/2024, 3:45 AMfrom flytekit import task, dynamic
@task()
def simple_map_task(a: int) -> int:  # noqa: D103
    return a * a
@task()
def simple_reduce(b: list[int]) -> int:  # noqa: D103
    return sum(b)
@dynamic
def dynamic_task(size: int) -> int:
    """DAG shape is not known at compile time so we need to use @dynamic."""
    values = []
    for i in range(size):
        value = simple_map_task(a=i)
        values.append(value)
    return simple_reduce(b=values)freezing-airport-6809
damp-lion-88352
05/31/2024, 3:46 AMdamp-lion-88352
05/31/2024, 3:46 AMdamp-lion-88352
05/31/2024, 3:46 AMfreezing-airport-6809
freezing-airport-6809
damp-lion-88352
05/31/2024, 3:48 AMfrom flytekit import task, dynamic, workflow
@task()
def simple_map_task(a: int) -> int:  # noqa: D103
    return a * a
@task()
def simple_reduce(b: list[int]) -> int:  # noqa: D103
    return sum(b)
@dynamic
def dynamic_task(size: int) -> int:
    """DAG shape is not known at compile time so we need to use @dynamic."""
    values = []
    for i in range(size):
        value = simple_map_task(a=i)
        values.append(value)
    return simple_reduce(b=values)
@workflow
def wf(size: int) -> int:
    return dynamic_task(size=size)
if __name__ == "__main__":
    print(f"Running {__file__} main...")
    print(f"Output: {wf(size=5)}")damp-lion-88352
05/31/2024, 3:48 AMdamp-lion-88352
05/31/2024, 3:48 AMclean-glass-36808
05/31/2024, 3:49 AMfreezing-airport-6809
freezing-airport-6809
freezing-airport-6809
clean-glass-36808
05/31/2024, 3:52 AMglamorous-carpet-83516
05/31/2024, 3:54 AMclean-glass-36808
05/31/2024, 3:54 AMfreezing-airport-6809
freezing-airport-6809
freezing-airport-6809
clean-glass-36808
05/31/2024, 3:56 AMclean-glass-36808
05/31/2024, 3:58 AM@task(task_config=ArmadaConfig(queue="compute"), container_image="redacted")
def simple_map_task(a: int) -> int:  # noqa: D103
    return a * a
@task(task_config=ArmadaConfig(queue="compute"), container_image="redacted")
def simple_reduce(b: list[int]) -> int:  # noqa: D103
    return sum(b)
@dynamic(container_image="redacted")
def dynamic_task(size: int) -> int:
    """DAG shape is not known at compile time so we need to use @dynamic."""
    values = []
    for i in range(size):
        value = simple_map_task(a=i)
        values.append(value)
    return simple_reduce(b=values)
@workflow
def wf(size: int) -> int:
    return dynamic_task(size=size)glamorous-carpet-83516
05/31/2024, 3:59 AMsimple_map_taskclean-glass-36808
05/31/2024, 4:00 AMglamorous-carpet-83516
05/31/2024, 4:01 AMfreezing-airport-6809
freezing-airport-6809
damp-lion-88352
05/31/2024, 4:32 AMdamp-lion-88352
05/31/2024, 4:33 AMfreezing-airport-6809
damp-lion-88352
05/31/2024, 4:33 AMdamp-lion-88352
05/31/2024, 4:34 AMclean-glass-36808
05/31/2024, 4:35 AMdamp-lion-88352
05/31/2024, 4:39 AMfreezing-airport-6809
damp-lion-88352
05/31/2024, 4:39 AMfrom flytekit import task, dynamic, workflow
from flytekitplugins.spark import Databricks
# pyflyte run --remote --image localhost:30000/databricks-map:0531 databricks_example.py wf --size 3
@task(task_config=Databricks(
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
        },
        databricks_conf={
            "run_name": "flytekit databricks plugin example",
            "new_cluster": {
                "spark_version": "11.0.x-scala2.12",
                "node_type_id": "r3.xlarge",
                "aws_attributes": {
                    "availability": "ON_DEMAND",
                    "instance_profile_arn": "arn:aws:iam::<AWS_ACCOUNT_ID_DATABRICKS>:instance-profile/databricks-flyte-integration",
                },
                "num_workers": 4,
            },
            "timeout_seconds": 3600,
            "max_retries": 1,
        },
    ),
)
def simple_map_task(a: int):
    return
@task(task_config=Databricks(
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
        },
        databricks_conf={
            "run_name": "flytekit databricks plugin example",
            "new_cluster": {
                "spark_version": "11.0.x-scala2.12",
                "node_type_id": "r3.xlarge",
                "aws_attributes": {
                    "availability": "ON_DEMAND",
                    "instance_profile_arn": "arn:aws:iam::<AWS_ACCOUNT_ID_DATABRICKS>:instance-profile/databricks-flyte-integration",
                },
                "num_workers": 4,
            },
            "timeout_seconds": 3600,
            "max_retries": 1,
        },
    ),
)
def simple_reduce(b: list[int]):  # noqa: D103
    return
@dynamic
def dynamic_task(size: int):
    """DAG shape is not known at compile time so we need to use @dynamic."""
    values = []
    for i in range(size):
        simple_map_task(a=i)
        values.append(i)
    simple_reduce(b=values)
    return 
@workflow
def wf(size: int):
    dynamic_task(size=size)
if __name__ == "__main__":
    print(f"Running {__file__} main...")
    print(f"Output: {wf(size=5)}")