Hey all. I'm trying to figure out how to get custo...
# flytekit
s
Hey all. I'm trying to figure out how to get custom types working with flyte and running into issues. I have the following code:
Copy code
class Compounds1DTransformer(TypeTransformer[Compounds1D]):
    """
    Custom transformer for Compounds1D to be used with Flyte
    """

    def __init__(self):
        super().__init__("Compounds1D", Compounds1D)
        self._transformer = FlyteSchemaTransformer()

    def get_literal_type(self, t: Type[Compounds1D]) -> LiteralType:
        return self._transformer.get_literal_type(FlyteSchema)

    def to_literal(
        self, ctx: FlyteContext, python_val: Compounds1D, python_type: Type[Compounds1D], expected: LiteralType
    ) -> Literal:
        return self._transformer.to_literal(ctx, python_val=python_val.df, python_type=FlyteSchema, expected=LiteralType)

    def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[Compounds1D]) -> Compounds1D:
        data = self._transformer.to_python_value(ctx, lv, expected_python_type=FlyteSchema)
        df = data.open().all()
        return Compounds1D(df)

TypeEngine.register(
    Compounds1DTransformer()
)
In this example,
Compounds1D
is a Pydantic model wrapping a dataframe called
df
. I call the workflow via
FlyteRemote.execute
, and as far as I can tell, the
Compounds1D
type is being passed around between tasks correctly. However, when I fetch the results using
FlyteWorkflowExecution.outputs
the output comes back as a
FlyteSchema
. Does
FlyteRemote
not understand custom types?
I can fix this by converting the schema to a
Compounds1D
object at the end, but ideally I'd want to find a way to get this done in a more generic way so we don't have to parse and rebuild every custom type coming out of an execution when we are already defining the conversion in the
TypeTransformer
subclass.
y
flyteremote does not understand custom types no… can i see how you’re using flyteremote?
you should be able to add a type_hint
in general, it’s not that flyteremote doesn’t understand custom types… it’s that flyteremote doesn’t understand python types.
because once something is in flyte land, only flyte types exist.
s
Copy code
execution=remote.execute(
                flyte_obj,
                inputs=inputs,
                wait=execution_options.wait,
                project=execution_options.project,
                domain=execution_options.domain,
                **execute_kwargs,
            )
The input contains the custom type as well, and the TypeTransformer is called to convert it to a Literal, could the same be done on the return side?
y
this should be fine right?
what do you mean by the return side?
can you paste that here?
s
I mean, the workflow itself returns a
Compounds1D
object, but when I get the results via the
.outputs
attribute I just get back a FlyteSchema rather than a
Compounds1D
Copy code
outputs {
    variables {
      key: "o0"
      value {
        type {
          schema {
          }
        }
        description: "custom compound container"
      }
    }
Copy code
@workflow
def ensure_custom_types(
    compounds: Compounds1D
) -> tuple[Compounds1D, int]:
    """Ensure custom Lego types work

    Args:
        compounds (Compounds1D): custom compound container

    Returns:
        Compounds1D: custom compound container
    """

    return echo_types(compounds=compounds)
y
what’s
type(xyz.outputs)
is it a LiteralsResolver?
you should be able to do
outputs.get("o0", as_type=Compounds1D)
s
what’s
type(xyz.outputs)
Checking now
type(execution.outputs)=<class 'flytekit.core.type_engine.LiteralsResolver'>
Yeah, its a LiteralsResolver
y
yeah can you try
execution.outputs.get("o0", as_type=Compounds1D)
s
That worked! I'll add an option to our wrapper for users to specify expected return types. Thanks!
y
ofc
let us circle back to you next week though with some more info
i’m wondering if there’s a better way to support this more broadly.
s
That would be great. Going to have to add a bit of hacking to get this to work with local execution, so a more general solution would be appeciated
k
Also can we use structureddataset instead of flyteschema
s
@Yee,
as_type
needn't be given when
guess_python_type
is defined in the type transformer, right?
s
As a note: if the
@workflow
decorator preserved type annotations we could do something like the following for local flyte workflows:
Copy code
from inspect import getfullargspec

@workflow
def foo() -> tuple[int, str]:
  pass

>>> rt = getfullargspec(foo).annotations["return"]
>>> rt
tuple[int, str]
And then use
rt
with
as_type
.
s
Have you got the custom type working, @Sam Eckert?
k
Ahh you are right @Sam Eckert we can preserve the annotation - cc @Eduardo Apolinario (eapolinario)
153 Views