alert-oil-1341
11/01/2024, 9:58 PMListTransformer
be made to handle this case in the future? Or do we need to create a TypeTransformer specifically for list[Foo]
?
@dataclass_json
@dataclass
class Foo:
val: str
@task
def foos_task(foos: list[Foo]) -> None:
....
It's also really strange because we are definitely able to handle lists of dataclasses as workflow arguments we use for subworkflows without any issue
@workflow
def sub_wf(foos: list[Foo]) -> None:
....
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
alert-oil-1341
11/03/2024, 3:07 PMfrom flytekit import task, workflow
from agent import Foo, FooTask
foo_task = FooTask(name="foo_task")
@task
def foos_task(foos: list[Foo]) -> None:
print(f"hi {foos}")
@workflow
def dc_wf(foos: list[Foo]) -> None:
has_foos, foos = foo_task()
foos_task(foos=foos)
if __name__ == "__main__":
dc_wf([Foo(val="a"), Foo(val="b")])
and
from dataclasses import dataclass
from typing import Any, Dict, Optional
from dataclasses_json import dataclass_json
from flyteidl.core.execution_pb2 import TaskExecution
from flytekit.configuration import SerializationSettings
from flytekit.core.base_task import PythonTask
from flytekit.core.interface import Interface
from flytekit.extend.backend.base_agent import (
AgentRegistry,
Resource,
SyncAgentBase,
SyncAgentExecutorMixin,
)
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate
@dataclass_json
@dataclass
class Foo:
val: str
class FooAgent(SyncAgentBase):
def __init__(self) -> None:
super().__init__(task_type_name="foo")
async def do(
self,
task_template: TaskTemplate,
inputs: Optional[LiteralMap] = None,
**kwargs: Any,
) -> Resource:
return Resource(
phase=TaskExecution.SUCCEEDED, outputs={"has_foos": True, "foos": [Foo(val="a"), Foo(val="b")]}
)
AgentRegistry.register(FooAgent())
class FooTask(SyncAgentExecutorMixin, PythonTask): # type: ignore
_TASK_TYPE = "foo"
def __init__(self, name: str, **kwargs: Any) -> None:
task_config: dict[str, Any] = {}
outputs = {"has_foos": bool, "foos": Optional[list[Foo]]}
super().__init__(
task_type=self._TASK_TYPE,
name=name,
task_config=task_config,
interface=Interface(outputs=outputs),
**kwargs,
)
def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:
return {}
alert-oil-1341
11/03/2024, 3:09 PMalert-oil-1341
11/04/2024, 3:46 PMalert-oil-1341
11/04/2024, 5:18 PMthankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
alert-oil-1341
11/04/2024, 6:41 PMand there’s one bug with the typing not getting passed.Is this what's most likely causing my issue?
alert-oil-1341
11/04/2024, 6:42 PMthankful-minister-83577
thankful-minister-83577
alert-oil-1341
11/04/2024, 6:43 PMalert-oil-1341
11/04/2024, 6:43 PMthankful-minister-83577