https://flyte.org logo
#ask-the-community
Title
# ask-the-community
s

Stephen

12/11/2023, 12:38 PM
Hi, I occasionally am getting
Copy code
RPC Failed, with Status: StatusCode.INVALID_ARGUMENT
        details: workflow with different structure already exists
I am actively developing the workflow and deploying+running it using
pyflyte run
. How is the "version" of the workflow being calculated?
s

Samhita Alla

12/12/2023, 8:42 AM
you may want to re-register your workflow only when there are changes in the code or parameters.
b

Brandon Harrison

12/12/2023, 2:43 PM
I ran into this same issue. Instead of doing
pyflyte run
I did
pyflyte package
and
flytectl register
(as described here). You can specify a version when you do
flytectl register
.
k

Ketan (kumare3)

12/12/2023, 3:22 PM
You can use pyflyte register
And that will do all of this
Also pyflyte run you can provide a version
That is odd that you are changing code but version is not changing
We would love to understand more, as an escape hatch pass version to run
s

Stephen

12/12/2023, 5:16 PM
Hi, I managed to find the issue. I was passing a dict with configuration to a task directly in the workflow definition function. This dict ended up being part of the "template" and serialised. This worked well when the configuration was static, but then added a tag which contains a timestamp, which effectively changes the template every time it is "compiled" even if the source does not change.
Thanks a lot for your help! But in the end, it was my fault. Thanks again.
k

Ketan (kumare3)

12/12/2023, 5:24 PM
thank you for understanding and sharing
q

Quentin Chenevier

01/24/2024, 8:54 AM
Hello 🙂 I'm piggybacking on this discussion as I'm noticing the same issue (
workflow with different structure already exists
) using
pyflyte run
or
pyflyte register
without changing the code. Looking at the "Input request" which makes the RPC fail, it seems that in my case its due to the
map_task
names which are changed each time I rerun the registration. E.g: •
workflow_structure.map_task_reconstruction_export_ff143a7aae0d9318af407c6d809ad586
workflow_structure.map_task_reconstruction_export_81fa1b95e5ddc8a7850383cf2c28ac0d
The hash at the end of the name property changes.
For those map_tasks I'm transmitting the inputs using a serialized dataclass. I guess that somewhere here that something is changed at each rerun of the registration: https://github.com/flyteorg/flytekit/blob/master/flytekit/core/map_task.py#L85-L87
When I actually print this
collection_interface.__str__()
which is hashed to be added at the end of the name of the map_task function, I'm noticing that the issue comes from the
BatchSize
object which has a naive str representation. Somewhere in my function signature I'm using
Annotated[FlyteDirectory, BatchSize(100)]
(this is the output of each task) and this translates in the
collection_interface.__str__()
as: •
typing.Annotated[flytekit.types.directory.types.FlyteDirectory, <flytekit.core.type_engine.BatchSize object at 0x132e8fb50>]
The memory adress (
0x132e8fb50
) in the str representation changes each time.
The quick & dirty solution I found to fix this is to implement my own subclass:
Copy code
class MyBatchSize(BatchSize):
    def __repr__(self) -> str:
        """returns the same naive str representation as any python object but without the memory address"""
        return f"<{self.__class__.__module__}.{self.__class__.__name__} object>"
and us this class to annotate the FlyteDirectory. Works like a charm.
@Ketan (kumare3) @Samhita Alla what do you think ? is it worth filing an issue ?
s

Samhita Alla

01/24/2024, 3:40 PM
i believe so! please feel free to file an issue. great to hear that you figured it out!
3 Views