acoustic-carpenter-78188
10/30/2023, 7:27 PMmap_task.min_success_ratio
functionality to allow for some failures of our input files without stopping workflow execution. This worked great remotely. As I wrote the unit test for the workflow I realized that min_success_ratio
field was not respected for local execution. As soon as any map_task
task failed the workflow exited.
To be clear this is not a blocking issue for us, and is not a critical feature. We simply support parity between local and remote execution when possible
Goal: What should the final outcome look like, ideally?
local execution of a map_task should respect the min_success_ratio
field if set below 1.0
Describe alternatives you've considered
This works remotely, so I skipped writing the unit test for the workflow.
Propose: Link/Inline OR Additional context
from dataclasses import dataclass
from dataclasses_json import dataclass_json
from flytekit import workflow, task, map_task, LaunchPlan
from flytekit.testing import task_mock
import functools
from typing import Optional
@dataclass_json
@dataclass
class AData:
num: Optional[int]
@dataclass_json
@dataclass
class BData:
num: Optional[int]
@task
def t1(a: AData, b: BData, c: str) -> int:
print(c)
if a.num == 2:
raise ValueError("'a' input was 2")
return a.num * b.num
@task
def coalesce(nums: list[Optional[int]]) -> int:
sum = 0
for n in nums:
sum += n if n else 0
return sum
@workflow
def wf(a: list[AData], b: list[BData], c: str) -> int:
t1_fixed_c = functools.partial(t1, c=c)
results = map_task(t1_fixed_c, min_success_ratio=0.5)(a=a, b=b)
return coalesce(nums=results)
def test_map_wf():
a_list = [AData(num=1), AData(num=2), AData(num=3), AData(num=4)]
b_list = [BData(num=5), BData(num=6), BData(num=7), BData(num=8)]
results = wf(a=a_list, b=b_list, c="hello")
assert results == 58
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteacoustic-carpenter-78188
10/30/2023, 7:27 PM