Hi all, does Recover on dynamic task work properly...
# ask-the-community
j
Hi all, does Recover on dynamic task work properly, we have a dynamic task that returns a Dict with following
sample_name: sample_workflow_results
. When some sample fails and we recover, unfortunately the output mapped dict is all shuffled up and no longer matches with the sample_name key. For example we expect the certain dynamic task returns
filename -> filename-suffix
map like below
Copy code
{
  "counter_1": 1,
  "counter_2": 2,
  "counter_3": 3,
  "counter_4": 4,
  "counter_5": 5,
  "counter_6": 6
}
and i forcefully fail some tasks and some successfully completes. Then when i recover i got this instead
Copy code
{
  "counter_1": 5,
  "counter_2": 6,
  "counter_3": 3,
  "counter_4": 4,
  "counter_5": 5,
  "counter_6": 4
}
example dynamic task is below
Copy code
@dynamic(cache=True, cache_version=get_workflow_version())
def compute_average_for_chunks(
    chunks: Dict[str, FlyteFile], infile: FlyteFile
) -> Dict[str, int]:
    suffix_dict = {}

    for chunk_name, chunk_path in chunks.items():
        infile_suffix = get_file_suffix_task(chunk_path=chunk_path, infile=infile)
        suffix_dict[chunk_name] = infile_suffix

    return suffix_dict
I was hoping to get a deterministic output here 😅 would love to have someone replicate this issue. `infile`input has a file in cloud that we read to forcefully fail some tasks in
get_file_suffix_task
k
this looks wrong
j
Interestingly if we expand it to the underlying tasks they are all correct I think it’s when we convert from python literal to flyte specific job that is causing this 🤔
k
hmm sounds like it
is this map task? actually no - this is dynamic
cc @Dan Rammer (hamersaw) / @Haytham Abuelfutuh this looks odd and dangerous
j
here is our cluster setup
Copy code
- name: <http://cr.flyte.org/flyteorg/datacatalog|cr.flyte.org/flyteorg/datacatalog>
    newTag: v1.0.1@sha256:b41b322e7a926f7e8efd351e6e51e246452477e28659de0c6152e2a11a7b7e36
  - name: <http://cr.flyte.org/flyteorg/flyteadmin|cr.flyte.org/flyteorg/flyteadmin>
    newTag: v1.1.46@sha256:2359a58d756186992f7eba56743874aa8180fe8d71a7c3e1436141f59f117b31
  - name: <http://cr.flyte.org/flyteorg/flyteconsole|cr.flyte.org/flyteorg/flyteconsole>
    newTag: v1.2.5@sha256:164d946afc39762c61b343b69255414deca12d2d0b14d2dbfb82fc0ac31a1da3
  - name: <http://cr.flyte.org/flyteorg/flytepropeller|cr.flyte.org/flyteorg/flytepropeller>
    newTag: v1.1.40@sha256:db51f3ba890c4198a02f4299e68ec3177a5a42f5dee88bea6691d9db6e906074
and we are on Flytekit
1.1.1
not sure if this issue is gone in the latest versions
d
Interestingly if we expand it to the underlying tasks they are all correct I think it’s when we convert from python literal to flyte specific job that is causing this 🤔
@Jay Ganbat can you expand on this? I've done a bunch of testing locally and am unable to repro. When you check the output of the dynamic in the recovered workflow it is correct, but some downstream node is incorrect?
j
output of dynamic task after recovering is all wrong in the final output but if you expand the individual tasks they have the correct output
d
interesting, since the dynamic task is cached in this case if you just rerun the workflow rather than recovering it is the cached data from the dynamic task output read as expected? Or is this incorrect as well?
oh wow. i actually think i know exactly what is happening here now that i dive into it more.
so using a Dict as input means that all of the data is unordered. so on the initial run the mapping of
chunk_name
to Flyte node names could be something like:
Copy code
{
    "counter_1": "n0",
    "counter_2": "n1",
    "counter_3": "n2",
    "counter_4": "n3",
}
if this runs and you manually fail nodes
n2
and
n3
then these nodes will not be ran during the recovery, rather their outputs will be reused. however, in the recovery we can not guarantee that the Dict is iterated in the same order. so the recovery may now map
chunk_name
to Flyte node names like:
Copy code
{
    "counter_3": "n0",
    "counter_2": "n1",
    "counter_4": "n2",
    "counter_1": "n3",
}
In this case we see the outputs of
n0
and
n1
, which previously succeeded, are now used for "counter_3" and "counter_2" rather than the "counter_1" and "counter_2" that they should have.
j
yeah i think so, if i relaunch everything works just fine
its just this part
Copy code
for chunk_name, chunk_path in chunks.items():
        infile_suffix, average = compute_average_for_chunk(chunk_path=chunk_path, infile=infile)
is somehow shuffled. Do you think it would work if we do not attempt to cache dynamic task 🤔
nope, disabling cache on dynamic task didnt change the result.
d
yeah, sorry. i would not expect it to. i'm running a few tests here locally to make sure i understand this correctly. if so, i think this is a bug that we'll have to address with a code change.
@Jay Ganbat so it's as I feared. I was able to repro locally now - recovery over dynamic workflows may not work in every scenario because the Flyte node id's are not deterministic because the iteration order of python is not deterministic. in your scenario above if you sorted the dict on key then it would be, but a general fix will require Flyte code changes. I will submit an issue shortly.
cc @Ketan (kumare3) @Haytham Abuelfutuh @Yee ^^^ recovery of dynamic workflows is broken because the Flyte node IDs are not deterministic when iterating over a python Dict. We currently attempt to recover any subnodes that succeeded in a dynamic task, but since the Flyte node IDs may not match between runs we can assign incorrect output. In this scenario the error was incorrect output values, but it could easily be misformatted as well (ex. n0 outputs int and n1 outputs string and they are swapped).
j
Thank you very much for looking into it. So in the meantime after all results returned, i could sort them before returning it in the dynamic task right
d
I think you would have to sort before starting the iteration so something like:
Copy code
@dynamic(cache=True, cache_version=get_workflow_version())
def compute_average_for_chunks(
    chunks: Dict[str, FlyteFile], infile: FlyteFile
) -> Dict[str, int]:
    suffix_dict = {}

    for chunk_name, chunk_path in sorted(chunk.items(), key = lambda ele: ele[0])}:
        infile_suffix = get_file_suffix_task(chunk_path=chunk_path, infile=infile)
        suffix_dict[chunk_name] = infile_suffix

    return suffix_dict
This way Flyte will assign deterministic node IDs, ex. "counter_1" will always be "n0", "counter_2" will always be "n1", etc
j
ohhh i see, so its actually the input dict that is shuffled? even if in flyte console it looks in order?
d
exactly. if you iterate over a dict in python 10 times you could get 10 different orderings.
y
not following what’s happening here - can you elaborate dan? it’s true that python map iteration order is not guaranteed, but i don’t follow why that matters. what is happening in the recovery scenario?
d
so Flyte is saying, in the previous run element 0 succeeded successfully lets just reuse that result, but the input value for element 0 may be different in the second run.
j
oh wait, yeah as long as key:value pair is consistent it shouldnt matter right?
d
@Yee hop on a call quick? Jay feel free to join.
j
sure 😄
d
To join the video meeting, click this link: https://meet.google.com/xnt-qfyn-yjh
j
can you share the issue here, if you created one 😅
d
@Jay Ganbat here is the issue - please feel free to add more context if you think it will help! We will try to get a fix out soon!
j
Hi @Dan Rammer (hamersaw) do you think we would have similar issue with Map task too. does inputs to map task preserved? So we have a map task like this and i cant sort in puts since they will be
Promise
type during the hand over from map_task_input to run_map_task
Copy code
# in a dynamic task we have the following

map_task_input = build_map_task_inputs # this should return a list of inputs 

run_map_task(map_task_input)
d
Hey @Jay Ganbat, there shouldn't be any issue with map tasks. Right now, map tasks will not recover completed subtasks on the workflow recovery. Caching should still work, so if the first one succeeded and wrote to cache then when recovering the workflow it will read results from the cache.
This is one of a few issues with the current implementation of map tasks. We have a proposal to introduce a new node type in the Flyte DAG, namely ArrayNode, which would remedy this issue. Additionally, it would allow for map tasks over different types (ie. Spark, Ray, etc).
j
Oh i meant the order of inputs would still cause an issue for map task
d
@Jay Ganbat yeah, so I stand corrected in my answer based on testing for Laura's question. So the order of inputs will only matter if the map task was successful. For example if the inputs for the first one are
[0,1,2,3,4]
(ordered) and all but
4
is successful, currently map task subtask successes can not be recovered. So even though
[0,1,2,3]
all completed successfully, they will all be recomputed when recovering. However, if all of the subtasks succeeded and the recovery has an order of
[4,3,2,1,0]
then we will run into the same issue with outputs being set incorrectly. Does this make sense?
j
Oh I see yeah that makes sense
114 Views