Hi, I'm trying to use a combination of dynamic wor...
# flyte-support
f
Hi, I'm trying to use a combination of dynamic workflows + map_task to perform data processing operations on a list of custom data. The input to the
main()
workflow is a
List[CustomDataclass]
. This list is then batched into smaller lists, creating
List[List[CustomDataclass]]
, and passed to a dynamic workflow
level1_wf
. The
level1_wf
workflow calls
map_task()
to let the
process_data
task operate on
CustomDataclass
. Here's how my code is structured:
Copy code
@task
def process_data(input_data: CustomDataclass) -> Dict[str, str]:
    return processing(input_data)

@dynamic
def map_data_proc_tasks(data_chunks: List[List[CustomDataclass]]) -> List[List[Dict[str, str]]:
    chunk_results = []
    for chunk in data_chunks:
        result = map_task(process_data, concurrency=16)(input_data=chunk)
        chunk_results.append(result)
    return chunk_results

@task
def merge_results(results_list: List[List[Dict[str, str]]]) -> JSONLFile:
    merged_data = []
    for result in results_list:
        merged_data.extend(result)
    _, merged_filepath = tempfile.mkstemp(suffix=".json")
    write_json_file(merged_filepath, merged_data)
    return merged_filepath

@workflow
def wf_transform_input_imgs(data_list: List[CustomDataClass]) -> JSONLFile:
    data_chunks = chunk_list(input_list=data_list, chunk_size=64)
    chunk_results = map_data_proc_tasks(data_chunks)
    return merge_results(chunk_results)
When I try to register the tasks and workflows using
pyflyte register
, I get this error:
Copy code
Request rejected by the API, due to Invalid input.
RPC Failed, with Status: StatusCode.INVALID_ARGUMENT
        details: map_data_proc_tasks task with different structure already exists
I've tried a teardown and restart of the sandbox cluter, but that doesn't help. What am I doing wrong here ?
t
is that the whole error? doesn't it tell what caused the structure to be different from the previously registered version?
f
I just see a huge alphanumeric string. It doesn't mention what caused the difference.
Copy code
map_data_proc_tasks task with different structure already exists:
                - /template/Target/Container/env/0/value: H4sIAAAAAAAC/+1VTW/bMAz9K4XPi5XY7ZIG2KEf21pkS7sO65IMgyHLtK1YllyJDuoU/e+TlGxNgfWwS3cJARsU+cRHPgHSQ8BrWkDClMx5EYwPHoIMctoKTHzCRyStnfM7E7w5CPI76SJMh7noEEKlC+K9P07FPRCpqxo0XRwO+r1BOIjDvotnvACDNiVbIR5twNMZG/jxOoRPLKPF6Gi+GJ1dl/B+UKJh38+rb8UOqVCMilIZHMd9a3+je7nGnnfP+7q8/avJdLguZsNrwT6nyfImy24+/CPvyzX2Ov8PnX+6K7LRagnMxQJIs15GbRGQHqxqyuXmylyBUE0N0tOsQBuufIZeHM9n4vBOd5nSaTWpznp9+cWBQK62PHZRcEw0NMptsb6gKUQh3GMowX6q4jRkqiaU14ZQSe1InBkCdYMdatsEl0VYbGZsOiyVTLhE0I0G+3dFiWqQrCwnSbkkG0zsVdvKk6y4xpYKC0m0UvhskwdSg4kBzanga4p2PrtCtMzGv1cgaSogsz7qFpw8VkkuN8iMb7rwlb3OBjVPW590J4ZbvUw8JqTueibupS2rAMmO6GRHZzI9ubqdn14OT+cfpydfbyfz6NN5NFtcHF2+80ZcwxBF2WGUpXk8iPM8jkYRTXNgb0fW2PA4DpHqsFgH7qCNajWDp+FLVQORvCq5oHL5XOzg8RfP4QooxAcAAA== -> H4sIAAAAAAAC/+2W32/TMBDH/5UpzzRuk412lXjYD2BToRtDjLYIRY5zSd04dmZfKtJp/zu2W1gn0Qde9hRLic535/v4vpYlPwa8ogUkTMmcF8H46DHIIKeNwMQHvEfSyhl/IsGboyB/kM7DdJiLFiFUuiDe+muU3CcidVWDuo3DQb83CAdx2Hf+jBdg0IZkI8STdXicsY4frwN8powWo5P5YnRxu4T3gyUa9v2y/FbsQYViVCyVwXHct+NfuMM1Om7HfV1u/2YyHW6K2fBWsM9psrrLsrsP/8k9XKPTudO543bc7h51Oh/W+ad70tVarYA5XwBp1suoLQLSJ6uKcrl94q1BqLoC6TFr0IYrH6FXp/OZOH7QbaZ0Wk7Ki15ffnFJINc7jp0UHBMNtXJLrC1oClEIvzCUYD9VchoyVRHKK0OopLYlzgyBqsYWtd0El0VYbHusW1wqmXCJoGsN9u+KElUjWVsmSbkk25zYq7aTJ1lzjQ0VNiXRSuGLRT6RGkwMaE4F31C0/dkZoiUb/74GSVMBmbVRN+DksUpyuc3M+HYXvrLX2aDmaeOD7sRwp5eJx4RUbc/EvbRhJSDZE53s6UymZzf38/Pr4fn84/Ts6/1kHn26jGaLq5Prd34Qt2GIouw4ytI8HsR5HkejiKY5sLcjO9jwNA6R6rDYBO6gjWo0g+fml6oCInm55ILK1Uuxg6ffINxUZHQMAAA=
With verbose mode:
Copy code
FlyteInvalidInputException: USER:BadInputToAPI: error=None, cause=<_InactiveRpcError of RPC that terminated with:
        status = StatusCode.INVALID_ARGUMENT
        details = "map_data_proc_tasks task with different structure already exists:
                - /template/Target/Container/env/0/value: 
H4sIAAAAAAAC/81SUW/TMBD+K1Oeqd00HWsr8UBR4WEFptHBBELRxbmkbhw7sy9l7bT/ju0W1knwirCU6Hz3+fvuPvshkS3UmAujK1kns7OHpMQKekV5LMSMhjYEvyrJi7OkutMhIyyr1I6QGVvzGP0OGhmBBIE16XYZS4eDlKUZG4Z8KWt05Eu6V+rRJ6Kc84lv/0bwSWX48fLDxb6
+vbhS4n2Rb67L8vptfSKqjAC1No5m2dCvP8n9neP/0P0eLO6s2aAIuQSLclCCJ0EdwaYFqQ+Wb1GZrkUdZbZonTSxcju/rz43CnDcg138KL+um7vLKIV6e9Txm1pSbrEz4YiPFRQ4YnhPTKP/TCOBCdNykK3joMGPJIXj2Ha0I+ubkLpm9WHGbkdro3OpCW1n0f8DKTcd8a3X5IXU/I
DJomtHe/KttNSD8pDcGkPPDkUgOModWglK7oH8fH5H5JVdfO+ooVBY+phsj8Ee76TUB2QpD11E5uizIyuLPhbDjdHRL5fNOG93A5cNil40SPzEdH7iM795Oc6Wq/Mvy9W7xWo1v/q0WsyX569v3ixexcVDw2k1nUzTIYxHaSUQpzgSWBWYelKossmEEVhW75Nw0c70VuDT8GvTIteyW
UsFevPc7OTxJ1P2ADsEBAAA -> 
H4sIAAAAAAAC/+1TUW/TMBD+K1Oeqd00HVsr8UBR4WEFptHBBELRxbmkbhw7sy/d2mn/HdstrJPgFV6wlOh89/n7fJ/th0S2UGMujK5knUxPHpISK+gV5bEQMxraEPysJC9OkupWh4ywrFJbQmZszWP0K2hkBBIE1qTbZiwdDlKWZmwY8qWs0ZEv6V6pR5+Ics4nvv0dwSeV4ceLD2e
7+ubsUon3Rb6+Ksurt/WRqDIC1Mo4mmZDP34n92eO/7r/Qvd7uFKdNWsUIZdgUQ5K8CSoI9i0IPX+im1Qma5FHWU2aJ00sXIzu68+Nwpw3IOd35VfV83tRZRCvTno+EktKbfYmbDExwoKHDG8J6bRf6aRwIRpOcjWcdDgW5LCcWw72pL1m5C6ZvW+x25LK6NzqQltZ9H/Ayk3HfGN1+
SF1HyPyaJrB3vyjbTUg/KQ3BpDzxZFIDjKHVoJSu6AfH9+RuSVXXzfqKFQWPqYbI/BHu+k1HtkKfe7iMzRZ0dWFn0shhOjg18um3LebgcuGxS9aJD4ken8yGd+/XKcLZanXxbLd/Plcnb5aTmfLU5fX7+Zv4qDhw2n1eR8kg5hPEorgTjBkcCqwNSTQpWdnzMCy+pdEg7amd4KfGp+Z
VrkWjYrqUCvn5udPP4AJCFwpvQEAAA="
        debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"map_data_proc_tasks task with different structure already exists:\n\t\t- 
/template/Target/Container/env/0/value: 
H4sIAAAAAAAC/81SUW/TMBD+K1Oeqd00HWsr8UBR4WEFptHBBELRxbmkbhw7sy9l7bT/ju0W1knwirCU6Hz3+fvuPvshkS3UmAujK1kns7OHpMQKekV5LMSMhjYEvyrJi7OkutMhIyyr1I6QGVvzGP0OGhmBBIE16XYZS4eDlKUZG4Z8KWt05Eu6V+rRJ6Kc84lv/0bwSWX48fLDxb6
+vbhS4n2Rb67L8vptfSKqjAC1No5m2dCvP8n9neP/0P0eLO6s2aAIuQSLclCCJ0EdwaYFqQ+Wb1GZrkUdZbZonTSxcju/rz43CnDcg138KL+um7vLKIV6e9Txm1pSbrEz4YiPFRQ4YnhPTKP/TCOBCdNykK3joMGPJIXj2Ha0I+ubkLpm9WHGbkdro3OpCW1n0f8DKTcd8a3X5IXU/I
DJomtHe/KttNSD8pDcGkPPDkUgOModWglK7oH8fH5H5JVdfO+ooVBY+phsj8Ee76TUB2QpD11E5uizIyuLPhbDjdHRL5fNOG93A5cNil40SPzEdH7iM795Oc6Wq/Mvy9W7xWo1v/q0WsyX569v3ixexcVDw2k1nUzTIYxHaSUQpzgSWBWYelKossmEEVhW75Nw0c70VuDT8GvTIteyW
UsFevPc7OTxJ1P2ADsEBAAA -> 
H4sIAAAAAAAC/+1TUW/TMBD+K1Oeqd00HVsr8UBR4WEFptHBBELRxbmkbhw7sy/d2mn/HdstrJPgFV6wlOh89/n7fJ/th0S2UGMujK5knUxPHpISK+gV5bEQMxraEPysJC9OkupWh4ywrFJbQmZszWP0K2hkBBIE1qTbZiwdDlKWZmwY8qWs0ZEv6V6pR5+Ics4nvv0dwSeV4ceLD2e
7+ubsUon3Rb6+Ksurt/WRqDIC1Mo4mmZDP34n92eO/7r/Qvd7uFKdNWsUIZdgUQ5K8CSoI9i0IPX+im1Qma5FHWU2aJ00sXIzu68+Nwpw3IOd35VfV83tRZRCvTno+EktKbfYmbDExwoKHDG8J6bRf6aRwIRpOcjWcdDgW5LCcWw72pL1m5C6ZvW+x25LK6NzqQltZ9H/Ayk3HfGN1+
SF1HyPyaJrB3vyjbTUg/KQ3BpDzxZFIDjKHVoJSu6AfH9+RuSVXXzfqKFQWPqYbI/BHu+k1HtkKfe7iMzRZ0dWFn0shhOjg18um3LebgcuGxS9aJD4ken8yGd+/XKcLZanXxbLd/Plcnb5aTmfLU5fX7+Zv4qDhw2n1eR8kg5hPEorgTjBkcCqwNSTQpWdnzMCy+pdEg7amd4KfGp+Z
VrkWjYrqUCvn5udPP4AJCFwpvQEAAA=", grpc_status:3, created_time:"2024-07-25T09:57:57.647926171-04:00"}
t
hmm.. something must have changed in the backend. have you reinstalled any package by chance? to reregister, you can make a trivial change in the code.
f
"have you reinstalled any package by chance" -- Python packages ? No, I don't think so. Even when reregistering, I get the same error with any
dynamic
workflow I create.
t
are you seeing the error occasionally?
f
Nope - every time
h
@future-room-59953, can you give more details? What version of flytekit are you running? How about flyte? I can't repro using https://gist.github.com/eapolinario/d5244963e4f2b7481589738fd736a047 running flyte 1.13 and flytekit 1.13.0.
f
@high-accountant-32689 I think we can park this issue for now. I worked around by using map_task() on data_chunks, without the dynamic workflows. I've had to shift my focus away from our Flyte work for now.
h
ok. Still interested in knowing more about this error though.