wooden-musician-95940
09/18/2023, 5:40 PM@dynamic
tasks…
Specifically, something like this works fine:
@dynamic
def terminus(myobj: MyCustomType, key: str) -> float:
results: dict[str, Promise] = myobj.call_some_flytekit_tasks()
return results[key]
@workflow
def wf(myobj: MyCustomType) -> float:
return terminus(myobj=myobj, key='foo')
But passing myobj
through another @dynamic
like this does not:
@dynamic
def terminus(myobj: MyCustomType, key: str) -> float:
results: dict[str, Promise] = myobj.call_some_flytekit_tasks()
return results[key]
@dynamic
def passthrough(myobj: MyCustomType) -> float:
return terminus(myobj=myobj, key='foo')
@workflow
def wf(myobj: MyCustomType) -> float:
return passthrough(myobj=myobj)
Local execution works in both cases, but remote execution hangs in the latter case in the following state: the passthrough
task fails, but the workflow remains in a running state until it finally fails with this error message:
Workflow[flytetester:development:<http://workflows.wf|workflows.wf>] failed. RuntimeExecutionError: max number of system retry attempts [51/50] exhausted. Last known status message: Workflow[flytetester:development:<http://workflows.wf|workflows.wf>] failed. CausedByError: Failed to propagate Abort for workflow. Error: 0: 0: [User] malformed dynamic workflow, caused by: Collected Errors: 2
Error 0: Code: MismatchingBindings, Node Id: dn0, Description: Input [myobj] on node [dn0] expects bindings of type [map_value_type:<blob:<> > ]. Received []
...
Have I committed a flytepropeller faux pas? (flytekit 1.9.1)tall-lock-23197
wooden-musician-95940
09/19/2023, 1:35 PMmyobj
argument from the terminus
dynamic task, remote execution works fine.
Any tips for how to debug this? Wrapping the methods of the type transformer in try .. except: logging.error(…)
does not result in any extra information written to logs…wooden-musician-95940
09/20/2023, 12:08 AMclass MyTypeTransformer(TypeTransformer[MyCustomType]):
def __init__(self):
super().__init__("MyCustomType", t=MyCustomType)
self._base_type = dict[str, FlyteFile]
self._base_impl = DictTransformer()
def get_literal_type(self, t: Type[MyCustomType]) -> LiteralType:
return self._base_impl.get_literal_type(t=self._base_type)
def to_literal(
self,
ctx: FlyteContext,
python_val: MyCustomType,
python_type: Type[MyCustomType],
expected: LiteralType,
) -> Literal:
flyte_files = python_val.pack()
return self._base_impl.to_literal(
ctx, flyte_files, self._base_type, expected
)
def to_python_value(
self,
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type[MyCustomType],
) -> MyCustomType:
flyte_files = self._base_impl.to_python_value(ctx, lv, self._base_type)
return MyCustomType.unpack(flyte_files)
It uses two methods on `MyCustomType`:
• An instance method pack : MyCustomType -> dict[str, FlyteFile]
• A class method unpack : dict[str, FlyteFile] -> MyCustomType
wooden-musician-95940
09/20/2023, 12:09 AMDictTransformer
wooden-musician-95940
09/20/2023, 1:00 AMMyCustomType
with dict[str, FlyteFile]
in the terminus
dynamic task resolves the error and results in successful remote executiontall-lock-23197
MyCustomType
has to work too. Not sure why it is failing. I'll try to reproduce it on my end, but please feel free to create an issue.wooden-musician-95940
09/20/2023, 8:19 PM