Bradley Worley
09/18/2023, 5:40 PM@dynamic
tasks…
Specifically, something like this works fine:
@dynamic
def terminus(myobj: MyCustomType, key: str) -> float:
results: dict[str, Promise] = myobj.call_some_flytekit_tasks()
return results[key]
@workflow
def wf(myobj: MyCustomType) -> float:
return terminus(myobj=myobj, key='foo')
But passing myobj
through another @dynamic
like this does not:
@dynamic
def terminus(myobj: MyCustomType, key: str) -> float:
results: dict[str, Promise] = myobj.call_some_flytekit_tasks()
return results[key]
@dynamic
def passthrough(myobj: MyCustomType) -> float:
return terminus(myobj=myobj, key='foo')
@workflow
def wf(myobj: MyCustomType) -> float:
return passthrough(myobj=myobj)
Local execution works in both cases, but remote execution hangs in the latter case in the following state: the passthrough
task fails, but the workflow remains in a running state until it finally fails with this error message:
Workflow[flytetester:development:<http://workflows.wf|workflows.wf>] failed. RuntimeExecutionError: max number of system retry attempts [51/50] exhausted. Last known status message: Workflow[flytetester:development:<http://workflows.wf|workflows.wf>] failed. CausedByError: Failed to propagate Abort for workflow. Error: 0: 0: [User] malformed dynamic workflow, caused by: Collected Errors: 2
Error 0: Code: MismatchingBindings, Node Id: dn0, Description: Input [myobj] on node [dn0] expects bindings of type [map_value_type:<blob:<> > ]. Received []
...
Have I committed a flytepropeller faux pas? (flytekit 1.9.1)Samhita Alla
Bradley Worley
09/19/2023, 1:35 PMmyobj
argument from the terminus
dynamic task, remote execution works fine.
Any tips for how to debug this? Wrapping the methods of the type transformer in try .. except: logging.error(…)
does not result in any extra information written to logs…class MyTypeTransformer(TypeTransformer[MyCustomType]):
def __init__(self):
super().__init__("MyCustomType", t=MyCustomType)
self._base_type = dict[str, FlyteFile]
self._base_impl = DictTransformer()
def get_literal_type(self, t: Type[MyCustomType]) -> LiteralType:
return self._base_impl.get_literal_type(t=self._base_type)
def to_literal(
self,
ctx: FlyteContext,
python_val: MyCustomType,
python_type: Type[MyCustomType],
expected: LiteralType,
) -> Literal:
flyte_files = python_val.pack()
return self._base_impl.to_literal(
ctx, flyte_files, self._base_type, expected
)
def to_python_value(
self,
ctx: FlyteContext,
lv: Literal,
expected_python_type: Type[MyCustomType],
) -> MyCustomType:
flyte_files = self._base_impl.to_python_value(ctx, lv, self._base_type)
return MyCustomType.unpack(flyte_files)
It uses two methods on `MyCustomType`:
• An instance method pack : MyCustomType -> dict[str, FlyteFile]
• A class method unpack : dict[str, FlyteFile] -> MyCustomType
DictTransformer
MyCustomType
with dict[str, FlyteFile]
in the terminus
dynamic task resolves the error and results in successful remote executionSamhita Alla
MyCustomType
has to work too. Not sure why it is failing. I'll try to reproduce it on my end, but please feel free to create an issue.Bradley Worley
09/20/2023, 8:19 PM