I am using a custom type for my tasks and objects ...
# flyte-support
c
I am using a custom type for my tasks and objects are being correctly passed between tasks/ workflows, as long as the container images have the custom type plugin installed. However, it starts to fail, once I using launchplans instead of workflows. I see errors such as
Copy code
failed to launch workflow, caused by: rpc error: code = InvalidArgument desc = invalid aoi input wrong type. Expected simple:BINARY, but got simple:STRUCT
Why is that? Do i need to install the type plugin somewhere else (ie the propeller image used) so that I can use it with lauchplans, too?
d
can you help me upgrade the latest flytepropeller, flyteadmin and flytekit?
which version are you using of each flyte component, and what python version are you using?
c
we are using the flyte-core helm chart for
v1.13.2
for our containers we are using flytekit
v1.13.13
d
are you using dataclass?
c
no. It is a custom type. I convert it to a binary type and then back to python type
d
ok ok
so there's a functino called
get literal type
you have to learn how to do this
let me give you an example
c
in flytekit? The type conversion works just fine when I pass the type between tasks or workflows
it only fails when I use a launchplan
d
oh
I'm not sure why launch plan fails
do you use map task or somethign
something
is it reproducible for us?
if you can create a reproducible example for us
we can try to solve this
maybe there's a bug
or maybe there's some implementation error
c
yes, will post something in a bit
d
yes thank you
we've collaborated together before
I can help
c
ok, here is my type transformer. I install this as a plugin
Copy code
from typing import Type

from flytekit import FlyteContext, LiteralType
from flytekit.core.type_engine import TypeTransformerFailedError
from flytekit.extend import TypeEngine, TypeTransformer
from flytekit.models.literals import (
    Binary,
    Literal,
    Scalar,
)
from flytekit.models.types import SimpleType
from shapely import (
    GeometryCollection,
    LinearRing,
    LineString,
    MultiLineString,
    MultiPoint,
    MultiPolygon,
    Point,
    Polygon,
    from_wkb,
)
from shapely.geometry.base import BaseGeometry


class GeometryTransformer(TypeTransformer[BaseGeometry]):
    def __init__(self) -> None:
        super(GeometryTransformer, self).__init__(
            name="custom-geometry-transform", t=BaseGeometry
        )

    def get_literal_type(self, t: Type[BaseGeometry]) -> LiteralType:
        return LiteralType(simple=SimpleType.BINARY)

    def to_literal(
        self,
        ctx: FlyteContext,
        python_val: BaseGeometry,
        python_type: Type[BaseGeometry],
        expected: LiteralType,
    ) -> Literal:
        if isinstance(python_val, BaseGeometry):
            return Literal(
                scalar=Scalar(binary=Binary(value=python_val.wkb, tag="WKB")),
                metadata={"type": "Geometry", "geometry_type": python_val.geom_type},
            )  # type: ignore
        else:
            raise TypeTransformerFailedError("Expected a Geometry or None")

    def to_python_value(
        self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[BaseGeometry]
    ) -> BaseGeometry:
        if lv.scalar.binary:
            geometry = from_wkb(lv.scalar.binary.value)

        if not isinstance(geometry, expected_python_type):
            raise ValueError("Not a geometry")
        return geometry

    def guess_python_type(self, literal_type: LiteralType) -> Type[BaseGeometry]:
        if metadata := literal_type.metadata:
            if isinstance(metadata, dict) and metadata.get("type") == "Geometry":
                geometry_type = metadata["geometry_type"]
                match geometry_type:
                    case "Point":
                        return Point  # type:ignore
                    case "Polygon":
                        return Polygon  # type:ignore
                    case "LinearRing":
                        return LinearRing  # type:ignore
                    case "LineString":
                        return LineString  # type:ignore
                    case "MultiPoint":
                        return MultiPoint  # type:ignore
                    case "MultiLineString":
                        return MultiLineString  # type:ignore
                    case "MultiPolygon":
                        return MultiPolygon  # type:ignore
                    case "GeometryCollection":
                        return GeometryCollection  # type:ignore
                    case _:
                        return BaseGeometry  # type:ignore

        raise ValueError(f"Geometry transformer cannot reverse {literal_type}")


TypeEngine.register(GeometryTransformer())
Here my test workflow
Copy code
from flytekit import task, workflow, ImageSpec, LaunchPlan
from shapely import from_geojson, to_geojson
from shapely.geometry.base import BaseGeometry


@task(
    container_image=ImageSpec(
        name="flytekit-shapely",
        packages=["shapely", "my_type_plugin"],
        registry="<http://ghcr.io/flyteorg|ghcr.io/flyteorg>",
     
    )
)
def geojson_to_geometry(value: str) -> BaseGeometry:
    return from_geojson(value)


@task(
    container_image=ImageSpec(
        name="flytekit-shapely",
        packages=["shapely", "my_type_plugin"],
        registry="<http://ghcr.io/flyteorg|ghcr.io/flyteorg>",
     
    )
)
def geometry_to_geojson(value: BaseGeometry) -> str:
    return to_geojson(value)


@workflow
def round_robin_geojson(value: str) -> str:
    geom = geojson_to_geometry(value)
    geojson = geometry_to_geojson(geom)
    print(geojson)
    return geojson


@workflow
def round_robin_geometry(value: BaseGeometry) -> BaseGeometry:
    geojson = geometry_to_geojson(value)
    geom = geojson_to_geometry(geojson)

    print(geom)
    return geom


round_robin_geojson_lp = LaunchPlan.get_or_create(
    round_robin_geojson, "round_robin_geojson"
)
round_robin_geometry_lp = LaunchPlan.get_or_create(
    round_robin_geometry, "round_robin_geometry"
)

@workflow
def geojson_workflow(value: str) -> None:
    geom = geojson_to_geometry(value)
    round_robin_geometry(value=geom)

@workflow
def geojson_launchplan(value: str) -> None:
    geom = geojson_to_geometry(value)
    round_robin_geometry_lp(value=geom)


@workflow
def geometry_workflow(value: BaseGeometry) -> None:
    geojson = geometry_to_geojson(value)
    round_robin_geojson(value=geojson)

@workflow
def geometry_launchplan(value: BaseGeometry) -> None:
    geojson = geometry_to_geojson(value)
    round_robin_geojson_lp(value=geojson)
and here how i call them after I registered them with the cluster
Copy code
from flytekit.configuration import Config
from flytekit.remote import FlyteRemote
from shapely import Point, to_geojson
from shapely.geometry.base import BaseGeometry


def test_type_transformer():

    remote = FlyteRemote(
        config=Config.auto(),
        default_project="project",
        default_domain="dev",
    )

    # Using a workflow that calls a workflow
    # This works
    geojson_workflow = remote.fetch_workflow(name="flyte.geojson_workflow")

    remote.execute(
        geojson_workflow,
        inputs={
            "value": to_geojson(Point(-76, 38)),
        },
        execution_name_prefix="geojson-workflow",
        wait=False,
        type_hints={"value": str},
    )


    # Using a workflow that calls a launchplan
    # this execution will fail with 
    # [UserError] failed to launch workflow, caused by: rpc error: code = InvalidArgument desc = invalid value input wrong type. Expected simple:BINARY, but got simple:STRUCT
    geojson_launchplan = remote.fetch_workflow(name="flyte.geojson_launchplan")
    remote.execute(
        geojson_launchplan,
        inputs={
            "value": to_geojson(Point(-76, 38)),
        },
        execution_name_prefix="geojson-launchplan",
        wait=False,
        type_hints={"value": str},
    )


    # Sending a geometry object to a remote workflow also doesn't work

    # # Fetch workflow
    # geometry_workflow = remote.fetch_workflow(name="flyte.geometry_workflow")

    # # Execute
    # remote.execute(
    #     geometry_workflow,
    #     inputs={
    #         "value": Point(-76, 38),
    #     },
    #     execution_name_prefix="geometry-workflow",
    #     wait=False,
    #     type_hints={"value": BaseGeometry},
    # )



    # # Fetch workflow
    # geometry_launchplan = remote.fetch_workflow(name="flyte.geometry_launchplan")

    # # Execute
    # remote.execute(
    #     geometry_launchplan,
    #     inputs={
    #         "value": Point(-76, 38),
    #     },
    #     execution_name_prefix="geometry-launchplan",
    #     wait=False,
    #     type_hints={"value": BaseGeometry},
    # )

if __name__ == "__main__":
    test_type_transformer()
d
I'll take a look next week
I have other priorities discussed with my teams this week
Hi, Thomas, I have 4 things to do this week
I will come here after I finished all of them
if I really can't finish them
I will take a look at my Friday
this is a bug written by me
flyte 1.14 will solve this
sorry Thomas
c
Thanks!
here!
right now this is the correct one
you can update your private fork in your company or wait for flyte 1.14 release (will release at early next month
a
@damp-lion-88352 do you think it could affect other custom types with tags?
d
nope
this will not
it's backward compatible
in flyte 1.14
c
Thanks again for fixing that bug. This works now. However, I noticed that I run into a similar issue when I try to cache a task that returns such binary type. The workflow hangs, and I see this error in the propellor logs
Copy code
Failed to check Catalog for previous results: unexpected artifactData: [o0] type: [simple:STRUCT] does not match any task output type: [simple:BINARY]
a
@colossal-nightfall-74781 sorry for the delays but, I think we included in the release a fix for cached outputs produced by
flytekit < 1.14
tasks. I guess this is not your case right?
c
I updated to flytekit 1.14 after running into issues with custom binary types and launch plans. The new version fixed that but the caches results still throw an error