Hello all! Question around the right way of using ...
# announcements
k
Hello all! Question around the right way of using dynamic workflows with mapped tasks inside them. I adapted the map_task_example to run within a dynamic workflow instead of the general workflow like so:
Copy code
@dynamic
def wrapper_task(a: typing.List[int]) -> str:
    mapped_out = map_task(a_mappable_task)(a=a).with_overrides(
        retries=4,
    )
    coalesced = coalesce(b=mapped_out)
    return coalesced


@task
def a_mappable_task(a: int) -> str:
    inc = a + 2
    stringified = str(inc)
    return stringified


@task
def coalesce(b: typing.List[str]) -> str:
    coalesced = "".join(b)
    return coalesced


@workflow
def my_map_workflow(a: typing.List[int]) -> str:

    return wrapper_task(a=a)

    if __name__ == "__main__":
        result = my_map_workflow(a=[1, 2, 3, 4, 5])
        print(f"{result}")
I am seeing a type mismatch compilation error however at run-time:
Copy code
User] malformed dynamic workflow, caused by: Collected Errors: 2
	Error 0: Code: MismatchingTypes, Node Id: dn1, Description: Variable [o0] (type [collection_type:<collection_type:<simple:STRING > > ]) doesn't match expected type [collection_type:<simple:STRING > ].
Any ideas on why the return is a list of lists from the mapped task? And how do I correctly pass these outputs onto other tasks?
y
still looking sorry
will update when we have more information
k
Thanks, @Yee!
y
yeah there’s a small bug in propeller we’re pretty sure.
will put in a PR tomorrow and hopefully make a patch release.
cc @Haytham Abuelfutuh who helped me debug this
k
Thanks, @Yee and @Haytham Abuelfutuh!
y
this was just merged btw https://github.com/flyteorg/flytepropeller/pull/456 - we’ll let you know when we bump the flyte release, but feel free to just pick up the propeller release if you need to
👍 2
k
Thanks, @Yee!
180 Views