crooked-artist-67935
02/17/2023, 9:54 PMclass Compounds1DTransformer(TypeTransformer[Compounds1D]):
"""
Custom transformer for Compounds1D to be used with Flyte
"""
def __init__(self):
super().__init__("Compounds1D", Compounds1D)
self._transformer = FlyteSchemaTransformer()
def get_literal_type(self, t: Type[Compounds1D]) -> LiteralType:
return self._transformer.get_literal_type(FlyteSchema)
def to_literal(
self, ctx: FlyteContext, python_val: Compounds1D, python_type: Type[Compounds1D], expected: LiteralType
) -> Literal:
return self._transformer.to_literal(ctx, python_val=python_val.df, python_type=FlyteSchema, expected=LiteralType)
def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[Compounds1D]) -> Compounds1D:
data = self._transformer.to_python_value(ctx, lv, expected_python_type=FlyteSchema)
df = data.open().all()
return Compounds1D(df)
TypeEngine.register(
Compounds1DTransformer()
)
In this example, Compounds1D
is a Pydantic model wrapping a dataframe called df
. I call the workflow via FlyteRemote.execute
, and as far as I can tell, the Compounds1D
type is being passed around between tasks correctly. However, when I fetch the results using FlyteWorkflowExecution.outputs
the output comes back as a FlyteSchema
. Does FlyteRemote
not understand custom types?crooked-artist-67935
02/17/2023, 10:07 PMCompounds1D
object at the end, but ideally I'd want to find a way to get this done in a more generic way so we don't have to parse and rebuild every custom type coming out of an execution when we are already defining the conversion in the TypeTransformer
subclass.thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
crooked-artist-67935
02/17/2023, 10:18 PMexecution=remote.execute(
flyte_obj,
inputs=inputs,
wait=execution_options.wait,
project=execution_options.project,
domain=execution_options.domain,
**execute_kwargs,
)
crooked-artist-67935
02/17/2023, 10:20 PMthankful-minister-83577
thankful-minister-83577
thankful-minister-83577
crooked-artist-67935
02/17/2023, 10:21 PMCompounds1D
object, but when I get the results via the .outputs
attribute I just get back a FlyteSchema rather than a Compounds1D
crooked-artist-67935
02/17/2023, 10:21 PMoutputs {
variables {
key: "o0"
value {
type {
schema {
}
}
description: "custom compound container"
}
}
crooked-artist-67935
02/17/2023, 10:22 PM@workflow
def ensure_custom_types(
compounds: Compounds1D
) -> tuple[Compounds1D, int]:
"""Ensure custom Lego types work
Args:
compounds (Compounds1D): custom compound container
Returns:
Compounds1D: custom compound container
"""
return echo_types(compounds=compounds)
thankful-minister-83577
type(xyz.outputs)
thankful-minister-83577
thankful-minister-83577
outputs.get("o0", as_type=Compounds1D)
crooked-artist-67935
02/17/2023, 10:24 PMwhat’sChecking nowtype(xyz.outputs)
crooked-artist-67935
02/17/2023, 10:26 PMtype(execution.outputs)=<class 'flytekit.core.type_engine.LiteralsResolver'>
crooked-artist-67935
02/17/2023, 10:26 PMthankful-minister-83577
execution.outputs.get("o0", as_type=Compounds1D)
crooked-artist-67935
02/17/2023, 10:36 PMthankful-minister-83577
thankful-minister-83577
thankful-minister-83577
crooked-artist-67935
02/17/2023, 10:38 PMfreezing-airport-6809
tall-lock-23197
as_type
needn't be given when guess_python_type
is defined in the type transformer, right?crooked-artist-67935
02/21/2023, 5:12 PM@workflow
decorator preserved type annotations we could do something like the following for local flyte workflows:
from inspect import getfullargspec
@workflow
def foo() -> tuple[int, str]:
pass
>>> rt = getfullargspec(foo).annotations["return"]
>>> rt
tuple[int, str]
And then use rt
with as_type
.tall-lock-23197
freezing-airport-6809