Hi all! I’m running into a strange issue when usin...
# ask-the-community
b
Hi all! I’m running into a strange issue when using a custom type transformer in a workflow that contains nested
@dynamic
tasks… Specifically, something like this works fine:
Copy code
@dynamic
def terminus(myobj: MyCustomType, key: str) -> float:
    results: dict[str, Promise] = myobj.call_some_flytekit_tasks()
    return results[key]

@workflow
def wf(myobj: MyCustomType) -> float:
    return terminus(myobj=myobj, key='foo')
But passing
myobj
through another
@dynamic
like this does not:
Copy code
@dynamic
def terminus(myobj: MyCustomType, key: str) -> float:
    results: dict[str, Promise] = myobj.call_some_flytekit_tasks()
    return results[key]

@dynamic
def passthrough(myobj: MyCustomType) -> float:
    return terminus(myobj=myobj, key='foo')

@workflow
def wf(myobj: MyCustomType) -> float:
    return passthrough(myobj=myobj)
Local execution works in both cases, but remote execution hangs in the latter case in the following state: the
passthrough
task fails, but the workflow remains in a running state until it finally fails with this error message:
Copy code
Workflow[flytetester:development:<http://workflows.wf|workflows.wf>] failed. RuntimeExecutionError: max number of system retry attempts [51/50] exhausted. Last known status message: Workflow[flytetester:development:<http://workflows.wf|workflows.wf>] failed. CausedByError: Failed to propagate Abort for workflow. Error: 0: 0: [User] malformed dynamic workflow, caused by: Collected Errors: 2
	Error 0: Code: MismatchingBindings, Node Id: dn0, Description: Input [myobj] on node [dn0] expects bindings of type [map_value_type:<blob:<> > ].  Received []
...
Have I committed a flytepropeller faux pas? (flytekit 1.9.1)
s
Hi @Bradley Worley, I just tested a simple nested dynamic workflow and it works for me.
b
Thanks Samhita, did you do anything differently that could shed some light on why my example above is failing? In particular, if I remove the
myobj
argument from the
terminus
dynamic task, remote execution works fine. Any tips for how to debug this? Wrapping the methods of the type transformer in
try .. except: logging.error(…)
does not result in any extra information written to logs…
If it helps, here is my type transformer…
Copy code
class MyTypeTransformer(TypeTransformer[MyCustomType]):
    def __init__(self):
        super().__init__("MyCustomType", t=MyCustomType)
        self._base_type = dict[str, FlyteFile]
        self._base_impl = DictTransformer()

    def get_literal_type(self, t: Type[MyCustomType]) -> LiteralType:
        return self._base_impl.get_literal_type(t=self._base_type)

    def to_literal(
        self,
        ctx: FlyteContext,
        python_val: MyCustomType,
        python_type: Type[MyCustomType],
        expected: LiteralType,
    ) -> Literal:
        flyte_files = python_val.pack()
        return self._base_impl.to_literal(
            ctx, flyte_files, self._base_type, expected
        )

    def to_python_value(
        self,
        ctx: FlyteContext,
        lv: Literal,
        expected_python_type: Type[MyCustomType],
    ) -> MyCustomType:
        flyte_files = self._base_impl.to_python_value(ctx, lv, self._base_type)
        return MyCustomType.unpack(flyte_files)
It uses two methods on `MyCustomType`: • An instance method
pack : MyCustomType -> dict[str, FlyteFile]
• A class method
unpack : dict[str, FlyteFile] -> MyCustomType
As you can see, it’s a very thin wrapper around
DictTransformer
Replacing
MyCustomType
with
dict[str, FlyteFile]
in the
terminus
dynamic task resolves the error and results in successful remote execution
s
Okay;
MyCustomType
has to work too. Not sure why it is failing. I'll try to reproduce it on my end, but please feel free to create an issue.
b
Thanks for you help Samhita, I’ve filed an issue here.