Jay Ganbat
11/28/2022, 11:52 PMsample_name: sample_workflow_results
. When some sample fails and we recover, unfortunately the output mapped dict is all shuffled up and no longer matches with the sample_name key.
For example we expect the certain dynamic task returns filename -> filename-suffix
map like below
{
"counter_1": 1,
"counter_2": 2,
"counter_3": 3,
"counter_4": 4,
"counter_5": 5,
"counter_6": 6
}
and i forcefully fail some tasks and some successfully completes. Then when i recover i got this instead
{
"counter_1": 5,
"counter_2": 6,
"counter_3": 3,
"counter_4": 4,
"counter_5": 5,
"counter_6": 4
}
example dynamic task is below
@dynamic(cache=True, cache_version=get_workflow_version())
def compute_average_for_chunks(
chunks: Dict[str, FlyteFile], infile: FlyteFile
) -> Dict[str, int]:
suffix_dict = {}
for chunk_name, chunk_path in chunks.items():
infile_suffix = get_file_suffix_task(chunk_path=chunk_path, infile=infile)
suffix_dict[chunk_name] = infile_suffix
return suffix_dict
get_file_suffix_task
Ketan (kumare3)
11/29/2022, 1:08 AMJay Ganbat
11/29/2022, 1:12 AMKetan (kumare3)
11/29/2022, 1:17 AMJay Ganbat
11/29/2022, 1:50 AM- name: <http://cr.flyte.org/flyteorg/datacatalog|cr.flyte.org/flyteorg/datacatalog>
newTag: v1.0.1@sha256:b41b322e7a926f7e8efd351e6e51e246452477e28659de0c6152e2a11a7b7e36
- name: <http://cr.flyte.org/flyteorg/flyteadmin|cr.flyte.org/flyteorg/flyteadmin>
newTag: v1.1.46@sha256:2359a58d756186992f7eba56743874aa8180fe8d71a7c3e1436141f59f117b31
- name: <http://cr.flyte.org/flyteorg/flyteconsole|cr.flyte.org/flyteorg/flyteconsole>
newTag: v1.2.5@sha256:164d946afc39762c61b343b69255414deca12d2d0b14d2dbfb82fc0ac31a1da3
- name: <http://cr.flyte.org/flyteorg/flytepropeller|cr.flyte.org/flyteorg/flytepropeller>
newTag: v1.1.40@sha256:db51f3ba890c4198a02f4299e68ec3177a5a42f5dee88bea6691d9db6e906074
and we are on Flytekit 1.1.1
not sure if this issue is gone in the latest versionsDan Rammer (hamersaw)
11/29/2022, 8:36 AMInterestingly if we expand it to the underlying tasks they are all correct I think it’s when we convert from python literal to flyte specific job that is causing this 🤔@Jay Ganbat can you expand on this? I've done a bunch of testing locally and am unable to repro. When you check the output of the dynamic in the recovered workflow it is correct, but some downstream node is incorrect?
Jay Ganbat
11/29/2022, 5:28 PMDan Rammer (hamersaw)
11/29/2022, 5:45 PMchunk_name
to Flyte node names could be something like:
{
"counter_1": "n0",
"counter_2": "n1",
"counter_3": "n2",
"counter_4": "n3",
}
if this runs and you manually fail nodes n2
and n3
then these nodes will not be ran during the recovery, rather their outputs will be reused. however, in the recovery we can not guarantee that the Dict is iterated in the same order. so the recovery may now map chunk_name
to Flyte node names like:
{
"counter_3": "n0",
"counter_2": "n1",
"counter_4": "n2",
"counter_1": "n3",
}
In this case we see the outputs of n0
and n1
, which previously succeeded, are now used for "counter_3" and "counter_2" rather than the "counter_1" and "counter_2" that they should have.Jay Ganbat
11/29/2022, 6:11 PMfor chunk_name, chunk_path in chunks.items():
infile_suffix, average = compute_average_for_chunk(chunk_path=chunk_path, infile=infile)
is somehow shuffled.
Do you think it would work if we do not attempt to cache dynamic task 🤔Dan Rammer (hamersaw)
11/29/2022, 6:31 PMJay Ganbat
11/29/2022, 6:44 PMDan Rammer (hamersaw)
11/29/2022, 6:47 PM@dynamic(cache=True, cache_version=get_workflow_version())
def compute_average_for_chunks(
chunks: Dict[str, FlyteFile], infile: FlyteFile
) -> Dict[str, int]:
suffix_dict = {}
for chunk_name, chunk_path in sorted(chunk.items(), key = lambda ele: ele[0])}:
infile_suffix = get_file_suffix_task(chunk_path=chunk_path, infile=infile)
suffix_dict[chunk_name] = infile_suffix
return suffix_dict
This way Flyte will assign deterministic node IDs, ex. "counter_1" will always be "n0", "counter_2" will always be "n1", etcJay Ganbat
11/29/2022, 6:49 PMDan Rammer (hamersaw)
11/29/2022, 6:54 PMYee
11/29/2022, 6:54 PMDan Rammer (hamersaw)
11/29/2022, 6:55 PMJay Ganbat
11/29/2022, 6:55 PMDan Rammer (hamersaw)
11/29/2022, 6:55 PMJay Ganbat
11/29/2022, 6:55 PMDan Rammer (hamersaw)
11/29/2022, 6:57 PMJay Ganbat
11/29/2022, 7:25 PMDan Rammer (hamersaw)
11/30/2022, 5:14 PMJay Ganbat
12/02/2022, 6:50 PMPromise
type during the hand over from map_task_input to run_map_task
# in a dynamic task we have the following
map_task_input = build_map_task_inputs # this should return a list of inputs
run_map_task(map_task_input)
Dan Rammer (hamersaw)
12/05/2022, 2:47 PMJay Ganbat
12/05/2022, 5:10 PMDan Rammer (hamersaw)
12/06/2022, 2:24 AM[0,1,2,3,4]
(ordered) and all but 4
is successful, currently map task subtask successes can not be recovered. So even though [0,1,2,3]
all completed successfully, they will all be recomputed when recovering. However, if all of the subtasks succeeded and the recovery has an order of [4,3,2,1,0]
then we will run into the same issue with outputs being set incorrectly. Does this make sense?Jay Ganbat
12/06/2022, 3:05 AM