colossal-nightfall-74781
11/20/2024, 1:50 PMfailed to launch workflow, caused by: rpc error: code = InvalidArgument desc = invalid aoi input wrong type. Expected simple:BINARY, but got simple:STRUCT
Why is that? Do i need to install the type plugin somewhere else (ie the propeller image used) so that I can use it with lauchplans, too?damp-lion-88352
11/20/2024, 2:00 PMdamp-lion-88352
11/20/2024, 2:00 PMcolossal-nightfall-74781
11/20/2024, 3:56 PMv1.13.2
for our containers we are using flytekit v1.13.13
damp-lion-88352
11/20/2024, 3:57 PMcolossal-nightfall-74781
11/20/2024, 3:58 PMdamp-lion-88352
11/20/2024, 3:58 PMdamp-lion-88352
11/20/2024, 3:58 PMget literal type
damp-lion-88352
11/20/2024, 3:58 PMdamp-lion-88352
11/20/2024, 3:58 PMdamp-lion-88352
11/20/2024, 3:59 PMcolossal-nightfall-74781
11/20/2024, 3:59 PMcolossal-nightfall-74781
11/20/2024, 3:59 PMdamp-lion-88352
11/20/2024, 3:59 PMdamp-lion-88352
11/20/2024, 4:00 PMdamp-lion-88352
11/20/2024, 4:00 PMdamp-lion-88352
11/20/2024, 4:00 PMdamp-lion-88352
11/20/2024, 4:00 PMdamp-lion-88352
11/20/2024, 4:00 PMdamp-lion-88352
11/20/2024, 4:00 PMdamp-lion-88352
11/20/2024, 4:00 PMdamp-lion-88352
11/20/2024, 4:01 PMcolossal-nightfall-74781
11/20/2024, 4:01 PMdamp-lion-88352
11/20/2024, 4:01 PMdamp-lion-88352
11/20/2024, 4:01 PMdamp-lion-88352
11/20/2024, 4:01 PMcolossal-nightfall-74781
11/20/2024, 5:31 PMfrom typing import Type
from flytekit import FlyteContext, LiteralType
from flytekit.core.type_engine import TypeTransformerFailedError
from flytekit.extend import TypeEngine, TypeTransformer
from flytekit.models.literals import (
Binary,
Literal,
Scalar,
)
from flytekit.models.types import SimpleType
from shapely import (
GeometryCollection,
LinearRing,
LineString,
MultiLineString,
MultiPoint,
MultiPolygon,
Point,
Polygon,
from_wkb,
)
from shapely.geometry.base import BaseGeometry
class GeometryTransformer(TypeTransformer[BaseGeometry]):
def __init__(self) -> None:
super(GeometryTransformer, self).__init__(
name="custom-geometry-transform", t=BaseGeometry
)
def get_literal_type(self, t: Type[BaseGeometry]) -> LiteralType:
return LiteralType(simple=SimpleType.BINARY)
def to_literal(
self,
ctx: FlyteContext,
python_val: BaseGeometry,
python_type: Type[BaseGeometry],
expected: LiteralType,
) -> Literal:
if isinstance(python_val, BaseGeometry):
return Literal(
scalar=Scalar(binary=Binary(value=python_val.wkb, tag="WKB")),
metadata={"type": "Geometry", "geometry_type": python_val.geom_type},
) # type: ignore
else:
raise TypeTransformerFailedError("Expected a Geometry or None")
def to_python_value(
self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[BaseGeometry]
) -> BaseGeometry:
if lv.scalar.binary:
geometry = from_wkb(lv.scalar.binary.value)
if not isinstance(geometry, expected_python_type):
raise ValueError("Not a geometry")
return geometry
def guess_python_type(self, literal_type: LiteralType) -> Type[BaseGeometry]:
if metadata := literal_type.metadata:
if isinstance(metadata, dict) and metadata.get("type") == "Geometry":
geometry_type = metadata["geometry_type"]
match geometry_type:
case "Point":
return Point # type:ignore
case "Polygon":
return Polygon # type:ignore
case "LinearRing":
return LinearRing # type:ignore
case "LineString":
return LineString # type:ignore
case "MultiPoint":
return MultiPoint # type:ignore
case "MultiLineString":
return MultiLineString # type:ignore
case "MultiPolygon":
return MultiPolygon # type:ignore
case "GeometryCollection":
return GeometryCollection # type:ignore
case _:
return BaseGeometry # type:ignore
raise ValueError(f"Geometry transformer cannot reverse {literal_type}")
TypeEngine.register(GeometryTransformer())
Here my test workflow
from flytekit import task, workflow, ImageSpec, LaunchPlan
from shapely import from_geojson, to_geojson
from shapely.geometry.base import BaseGeometry
@task(
container_image=ImageSpec(
name="flytekit-shapely",
packages=["shapely", "my_type_plugin"],
registry="<http://ghcr.io/flyteorg|ghcr.io/flyteorg>",
)
)
def geojson_to_geometry(value: str) -> BaseGeometry:
return from_geojson(value)
@task(
container_image=ImageSpec(
name="flytekit-shapely",
packages=["shapely", "my_type_plugin"],
registry="<http://ghcr.io/flyteorg|ghcr.io/flyteorg>",
)
)
def geometry_to_geojson(value: BaseGeometry) -> str:
return to_geojson(value)
@workflow
def round_robin_geojson(value: str) -> str:
geom = geojson_to_geometry(value)
geojson = geometry_to_geojson(geom)
print(geojson)
return geojson
@workflow
def round_robin_geometry(value: BaseGeometry) -> BaseGeometry:
geojson = geometry_to_geojson(value)
geom = geojson_to_geometry(geojson)
print(geom)
return geom
round_robin_geojson_lp = LaunchPlan.get_or_create(
round_robin_geojson, "round_robin_geojson"
)
round_robin_geometry_lp = LaunchPlan.get_or_create(
round_robin_geometry, "round_robin_geometry"
)
@workflow
def geojson_workflow(value: str) -> None:
geom = geojson_to_geometry(value)
round_robin_geometry(value=geom)
@workflow
def geojson_launchplan(value: str) -> None:
geom = geojson_to_geometry(value)
round_robin_geometry_lp(value=geom)
@workflow
def geometry_workflow(value: BaseGeometry) -> None:
geojson = geometry_to_geojson(value)
round_robin_geojson(value=geojson)
@workflow
def geometry_launchplan(value: BaseGeometry) -> None:
geojson = geometry_to_geojson(value)
round_robin_geojson_lp(value=geojson)
and here how i call them after I registered them with the cluster
from flytekit.configuration import Config
from flytekit.remote import FlyteRemote
from shapely import Point, to_geojson
from shapely.geometry.base import BaseGeometry
def test_type_transformer():
remote = FlyteRemote(
config=Config.auto(),
default_project="project",
default_domain="dev",
)
# Using a workflow that calls a workflow
# This works
geojson_workflow = remote.fetch_workflow(name="flyte.geojson_workflow")
remote.execute(
geojson_workflow,
inputs={
"value": to_geojson(Point(-76, 38)),
},
execution_name_prefix="geojson-workflow",
wait=False,
type_hints={"value": str},
)
# Using a workflow that calls a launchplan
# this execution will fail with
# [UserError] failed to launch workflow, caused by: rpc error: code = InvalidArgument desc = invalid value input wrong type. Expected simple:BINARY, but got simple:STRUCT
geojson_launchplan = remote.fetch_workflow(name="flyte.geojson_launchplan")
remote.execute(
geojson_launchplan,
inputs={
"value": to_geojson(Point(-76, 38)),
},
execution_name_prefix="geojson-launchplan",
wait=False,
type_hints={"value": str},
)
# Sending a geometry object to a remote workflow also doesn't work
# # Fetch workflow
# geometry_workflow = remote.fetch_workflow(name="flyte.geometry_workflow")
# # Execute
# remote.execute(
# geometry_workflow,
# inputs={
# "value": Point(-76, 38),
# },
# execution_name_prefix="geometry-workflow",
# wait=False,
# type_hints={"value": BaseGeometry},
# )
# # Fetch workflow
# geometry_launchplan = remote.fetch_workflow(name="flyte.geometry_launchplan")
# # Execute
# remote.execute(
# geometry_launchplan,
# inputs={
# "value": Point(-76, 38),
# },
# execution_name_prefix="geometry-launchplan",
# wait=False,
# type_hints={"value": BaseGeometry},
# )
if __name__ == "__main__":
test_type_transformer()
damp-lion-88352
11/21/2024, 3:15 AMdamp-lion-88352
11/21/2024, 3:15 AMdamp-lion-88352
11/25/2024, 1:38 AMdamp-lion-88352
11/25/2024, 1:38 AMdamp-lion-88352
11/25/2024, 1:38 AMdamp-lion-88352
11/25/2024, 1:38 AMdamp-lion-88352
11/25/2024, 2:33 PMdamp-lion-88352
11/25/2024, 2:33 PMdamp-lion-88352
11/25/2024, 2:33 PMcolossal-nightfall-74781
11/25/2024, 2:38 PMdamp-lion-88352
11/25/2024, 2:45 PMdamp-lion-88352
11/25/2024, 2:45 PMdamp-lion-88352
11/25/2024, 2:45 PMdamp-lion-88352
11/25/2024, 2:46 PMaverage-finland-92144
11/25/2024, 3:51 PMdamp-lion-88352
11/25/2024, 4:01 PMdamp-lion-88352
11/25/2024, 4:01 PMdamp-lion-88352
11/25/2024, 4:01 PMdamp-lion-88352
11/25/2024, 4:01 PMcolossal-nightfall-74781
12/20/2024, 9:21 PMFailed to check Catalog for previous results: unexpected artifactData: [o0] type: [simple:STRUCT] does not match any task output type: [simple:BINARY]
average-finland-92144
12/30/2024, 1:29 PMflytekit < 1.14
tasks. I guess this is not your case right?colossal-nightfall-74781
12/30/2024, 3:35 PM