magnificent-teacher-86590
11/28/2022, 11:52 PMsample_name: sample_workflow_results
. When some sample fails and we recover, unfortunately the output mapped dict is all shuffled up and no longer matches with the sample_name key.
For example we expect the certain dynamic task returns filename -> filename-suffix
map like below
{
"counter_1": 1,
"counter_2": 2,
"counter_3": 3,
"counter_4": 4,
"counter_5": 5,
"counter_6": 6
}
and i forcefully fail some tasks and some successfully completes. Then when i recover i got this instead
{
"counter_1": 5,
"counter_2": 6,
"counter_3": 3,
"counter_4": 4,
"counter_5": 5,
"counter_6": 4
}
example dynamic task is below
@dynamic(cache=True, cache_version=get_workflow_version())
def compute_average_for_chunks(
chunks: Dict[str, FlyteFile], infile: FlyteFile
) -> Dict[str, int]:
suffix_dict = {}
for chunk_name, chunk_path in chunks.items():
infile_suffix = get_file_suffix_task(chunk_path=chunk_path, infile=infile)
suffix_dict[chunk_name] = infile_suffix
return suffix_dict
magnificent-teacher-86590
11/28/2022, 11:53 PMget_file_suffix_task
freezing-airport-6809
magnificent-teacher-86590
11/29/2022, 1:12 AMfreezing-airport-6809
freezing-airport-6809
freezing-airport-6809
magnificent-teacher-86590
11/29/2022, 1:50 AM- name: <http://cr.flyte.org/flyteorg/datacatalog|cr.flyte.org/flyteorg/datacatalog>
newTag: v1.0.1@sha256:b41b322e7a926f7e8efd351e6e51e246452477e28659de0c6152e2a11a7b7e36
- name: <http://cr.flyte.org/flyteorg/flyteadmin|cr.flyte.org/flyteorg/flyteadmin>
newTag: v1.1.46@sha256:2359a58d756186992f7eba56743874aa8180fe8d71a7c3e1436141f59f117b31
- name: <http://cr.flyte.org/flyteorg/flyteconsole|cr.flyte.org/flyteorg/flyteconsole>
newTag: v1.2.5@sha256:164d946afc39762c61b343b69255414deca12d2d0b14d2dbfb82fc0ac31a1da3
- name: <http://cr.flyte.org/flyteorg/flytepropeller|cr.flyte.org/flyteorg/flytepropeller>
newTag: v1.1.40@sha256:db51f3ba890c4198a02f4299e68ec3177a5a42f5dee88bea6691d9db6e906074
and we are on Flytekit 1.1.1
not sure if this issue is gone in the latest versionshallowed-mouse-14616
11/29/2022, 8:36 AMInterestingly if we expand it to the underlying tasks they are all correct I think it’s when we convert from python literal to flyte specific job that is causing this 🤔@magnificent-teacher-86590 can you expand on this? I've done a bunch of testing locally and am unable to repro. When you check the output of the dynamic in the recovered workflow it is correct, but some downstream node is incorrect?
magnificent-teacher-86590
11/29/2022, 5:28 PMhallowed-mouse-14616
11/29/2022, 5:45 PMhallowed-mouse-14616
11/29/2022, 5:57 PMhallowed-mouse-14616
11/29/2022, 6:01 PMchunk_name
to Flyte node names could be something like:
{
"counter_1": "n0",
"counter_2": "n1",
"counter_3": "n2",
"counter_4": "n3",
}
if this runs and you manually fail nodes n2
and n3
then these nodes will not be ran during the recovery, rather their outputs will be reused. however, in the recovery we can not guarantee that the Dict is iterated in the same order. so the recovery may now map chunk_name
to Flyte node names like:
{
"counter_3": "n0",
"counter_2": "n1",
"counter_4": "n2",
"counter_1": "n3",
}
In this case we see the outputs of n0
and n1
, which previously succeeded, are now used for "counter_3" and "counter_2" rather than the "counter_1" and "counter_2" that they should have.magnificent-teacher-86590
11/29/2022, 6:11 PMmagnificent-teacher-86590
11/29/2022, 6:11 PMfor chunk_name, chunk_path in chunks.items():
infile_suffix, average = compute_average_for_chunk(chunk_path=chunk_path, infile=infile)
is somehow shuffled.
Do you think it would work if we do not attempt to cache dynamic task 🤔magnificent-teacher-86590
11/29/2022, 6:30 PMhallowed-mouse-14616
11/29/2022, 6:31 PMhallowed-mouse-14616
11/29/2022, 6:39 PMhallowed-mouse-14616
11/29/2022, 6:42 PMmagnificent-teacher-86590
11/29/2022, 6:44 PMhallowed-mouse-14616
11/29/2022, 6:47 PM@dynamic(cache=True, cache_version=get_workflow_version())
def compute_average_for_chunks(
chunks: Dict[str, FlyteFile], infile: FlyteFile
) -> Dict[str, int]:
suffix_dict = {}
for chunk_name, chunk_path in sorted(chunk.items(), key = lambda ele: ele[0])}:
infile_suffix = get_file_suffix_task(chunk_path=chunk_path, infile=infile)
suffix_dict[chunk_name] = infile_suffix
return suffix_dict
This way Flyte will assign deterministic node IDs, ex. "counter_1" will always be "n0", "counter_2" will always be "n1", etcmagnificent-teacher-86590
11/29/2022, 6:49 PMhallowed-mouse-14616
11/29/2022, 6:54 PMthankful-minister-83577
hallowed-mouse-14616
11/29/2022, 6:55 PMmagnificent-teacher-86590
11/29/2022, 6:55 PMhallowed-mouse-14616
11/29/2022, 6:55 PMmagnificent-teacher-86590
11/29/2022, 6:55 PMhallowed-mouse-14616
11/29/2022, 6:57 PMmagnificent-teacher-86590
11/29/2022, 7:25 PMhallowed-mouse-14616
11/30/2022, 5:14 PMmagnificent-teacher-86590
12/02/2022, 6:50 PMPromise
type during the hand over from map_task_input to run_map_task
# in a dynamic task we have the following
map_task_input = build_map_task_inputs # this should return a list of inputs
run_map_task(map_task_input)
hallowed-mouse-14616
12/05/2022, 2:47 PMhallowed-mouse-14616
12/05/2022, 2:51 PMmagnificent-teacher-86590
12/05/2022, 5:10 PMhallowed-mouse-14616
12/06/2022, 2:24 AM[0,1,2,3,4]
(ordered) and all but 4
is successful, currently map task subtask successes can not be recovered. So even though [0,1,2,3]
all completed successfully, they will all be recomputed when recovering. However, if all of the subtasks succeeded and the recovery has an order of [4,3,2,1,0]
then we will run into the same issue with outputs being set incorrectly. Does this make sense?magnificent-teacher-86590
12/06/2022, 3:05 AM