<#4094 [Core feature] Add support for `min_success...
# flytekit
a
#4094 [Core feature] Add support for `min_success_ratio` for local map_task execution Issue created by nicklofaso Motivation: Why do you think this is important? Parity between local and remote execution is incredibly helpful for us so we can test out workflows/tasks locally before deployment. We recently leveraged the
map_task.min_success_ratio
functionality to allow for some failures of our input files without stopping workflow execution. This worked great remotely. As I wrote the unit test for the workflow I realized that
min_success_ratio
field was not respected for local execution. As soon as any
map_task
task failed the workflow exited. To be clear this is not a blocking issue for us, and is not a critical feature. We simply support parity between local and remote execution when possible Goal: What should the final outcome look like, ideally? local execution of a map_task should respect the
min_success_ratio
field if set below
1.0
Describe alternatives you've considered This works remotely, so I skipped writing the unit test for the workflow. Propose: Link/Inline OR Additional context
Copy code
from dataclasses import dataclass
from dataclasses_json import dataclass_json
from flytekit import workflow, task, map_task, LaunchPlan
from flytekit.testing import task_mock
import functools
from typing import Optional


@dataclass_json
@dataclass
class AData:
    num: Optional[int]


@dataclass_json
@dataclass
class BData:
    num: Optional[int]


@task
def t1(a: AData, b: BData, c: str) -> int:
    print(c)
    if a.num == 2:
        raise ValueError("'a' input was 2")
    return a.num * b.num


@task
def coalesce(nums: list[Optional[int]]) -> int:
    sum = 0
    for n in nums:
        sum += n if n else 0
    return sum


@workflow
def wf(a: list[AData], b: list[BData], c: str) -> int:
    t1_fixed_c = functools.partial(t1, c=c)

    results = map_task(t1_fixed_c, min_success_ratio=0.5)(a=a, b=b)
    return coalesce(nums=results)


def test_map_wf():
    a_list = [AData(num=1), AData(num=2), AData(num=3), AData(num=4)]
    b_list = [BData(num=5), BData(num=6), BData(num=7), BData(num=8)]
    results = wf(a=a_list, b=b_list, c="hello")
    assert results == 58
Are you sure this issue hasn't been raised already? ☑︎ Yes Have you read the Code of Conduct? ☑︎ Yes flyteorg/flyte