Sam Eckert
02/17/2023, 9:54 PMclass Compounds1DTransformer(TypeTransformer[Compounds1D]):
"""
Custom transformer for Compounds1D to be used with Flyte
"""
def __init__(self):
super().__init__("Compounds1D", Compounds1D)
self._transformer = FlyteSchemaTransformer()
def get_literal_type(self, t: Type[Compounds1D]) -> LiteralType:
return self._transformer.get_literal_type(FlyteSchema)
def to_literal(
self, ctx: FlyteContext, python_val: Compounds1D, python_type: Type[Compounds1D], expected: LiteralType
) -> Literal:
return self._transformer.to_literal(ctx, python_val=python_val.df, python_type=FlyteSchema, expected=LiteralType)
def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[Compounds1D]) -> Compounds1D:
data = self._transformer.to_python_value(ctx, lv, expected_python_type=FlyteSchema)
df = data.open().all()
return Compounds1D(df)
TypeEngine.register(
Compounds1DTransformer()
)
In this example, Compounds1D
is a Pydantic model wrapping a dataframe called df
. I call the workflow via FlyteRemote.execute
, and as far as I can tell, the Compounds1D
type is being passed around between tasks correctly. However, when I fetch the results using FlyteWorkflowExecution.outputs
the output comes back as a FlyteSchema
. Does FlyteRemote
not understand custom types?Compounds1D
object at the end, but ideally I'd want to find a way to get this done in a more generic way so we don't have to parse and rebuild every custom type coming out of an execution when we are already defining the conversion in the TypeTransformer
subclass.Yee
Sam Eckert
02/17/2023, 10:18 PMexecution=remote.execute(
flyte_obj,
inputs=inputs,
wait=execution_options.wait,
project=execution_options.project,
domain=execution_options.domain,
**execute_kwargs,
)
Yee
Sam Eckert
02/17/2023, 10:21 PMCompounds1D
object, but when I get the results via the .outputs
attribute I just get back a FlyteSchema rather than a Compounds1D
outputs {
variables {
key: "o0"
value {
type {
schema {
}
}
description: "custom compound container"
}
}
@workflow
def ensure_custom_types(
compounds: Compounds1D
) -> tuple[Compounds1D, int]:
"""Ensure custom Lego types work
Args:
compounds (Compounds1D): custom compound container
Returns:
Compounds1D: custom compound container
"""
return echo_types(compounds=compounds)
Yee
type(xyz.outputs)
outputs.get("o0", as_type=Compounds1D)
Sam Eckert
02/17/2023, 10:24 PMwhat’sChecking nowtype(xyz.outputs)
type(execution.outputs)=<class 'flytekit.core.type_engine.LiteralsResolver'>
Yee
execution.outputs.get("o0", as_type=Compounds1D)
Sam Eckert
02/17/2023, 10:36 PMYee
Sam Eckert
02/17/2023, 10:38 PMKetan (kumare3)
Samhita Alla
as_type
needn't be given when guess_python_type
is defined in the type transformer, right?Sam Eckert
02/21/2023, 5:12 PM@workflow
decorator preserved type annotations we could do something like the following for local flyte workflows:
from inspect import getfullargspec
@workflow
def foo() -> tuple[int, str]:
pass
>>> rt = getfullargspec(foo).annotations["return"]
>>> rt
tuple[int, str]
And then use rt
with as_type
.Samhita Alla
Ketan (kumare3)