https://flyte.org logo
#ask-the-community
Title
# ask-the-community
g

Gopal Vashishtha

10/05/2023, 1:37 PM
Any tips to improve the responsiveness of the flyte console when rendering a map task with 30K executions? We're using UI version 1.9.6. In the console I see requests to
Copy code
/api/v1/data/node_executions
that are failing with 429: Too Many Requests Eventually things do load but it takes 1.3 minutes.
k

Ketan (kumare3)

10/05/2023, 2:26 PM
Map task with 30k executions is ridiculous, I thought we allow 5k only
Sadly today no other way. Ui changes are planned
d

Dan Farrell

10/05/2023, 3:09 PM
fwiw (in a biology context) we regularly run simple map tasks (not in flyte) with N=1 million or more. (just saying that there is a need)
k

Ketan (kumare3)

10/05/2023, 3:35 PM
Ya I know, but those are short lived
We are working on certain things
Would love real world examples
d

Dan Farrell

10/05/2023, 4:12 PM
at what computational level? a lot of things these days are gpu-based so running one test would be quite expensive
How would you define short lived?
g

Gopal Vashishtha

10/05/2023, 4:40 PM
Yeah we’re doing the same kind of thing as Dan- trying to use Flyte to run experiments. Would be nice if we could just disable the rendering of all runs or make it lazy
k

Ketan (kumare3)

10/06/2023, 3:39 AM
short-lived means - total runtime - each task does it run for seconds or minutes? Flyte map tasks are designed for longer running ("more than a couple minutes of runtime")
g

Gopal Vashishtha

10/06/2023, 1:18 PM
in our case each one takes 5 minutes - is there something else we should be using? also started observing an issue where the map task initially does render in the UI but when the number of successful executions gets above ~11000, it ceases to render. Is that expected? In dev tools I notice some calls to
n1-0-n1?limit=10000
wondering if there's a way to change that limit
k

Ketan (kumare3)

10/06/2023, 1:48 PM
I would use heirarchical map tasks, I.e use a dynamic to create multiple - that make 6 map tasks each with 5k
g

Gopal Vashishtha

10/06/2023, 4:37 PM
tried doing:
Copy code
@dynamic(cache=True, cache_version="0.1", requests=Resources(cpu="512m", mem="1Gi"))
def run_funs(experiments: list[Experiment]) -> list[CompletedExperiment]:
    i = 0
    completed_experiments = []
    while i < len(experiments):
        mini_batch = experiments[i : i + 500]
        completed_experiments.extend(map_task(fun, concurrency=CONCURRENCY_CONSTRAINT, experiment=mini_batch))
        i += 500
    return completed_experiments
but getting:
Copy code
Traceback (most recent call last):

      File "/usr/local/lib/python3.10/site-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
        return wrapped(*args, **kwargs)
      File "<redacted>", line 73, in run_funs
        completed_experiments.extend(map_task(fun, concurrency=CONCURRENCY_CONSTRAINT, experiment=mini_batch))

Message:

    'MapPythonTask' object is not iterable

User error.
k

Ketan (kumare3)

10/06/2023, 4:38 PM
I think you need a reduce step
There is no extend primitive for lists
g

Gopal Vashishtha

10/06/2023, 4:39 PM
Is "reduce" a python thing or a flyte thing?
k

Ketan (kumare3)

10/06/2023, 4:40 PM
I mean a simple task that takes lists and concatenates them haha
g

Gopal Vashishtha

10/06/2023, 4:42 PM
trying...
Copy code
def reduce(base_list: list[CompletedExperiment], new_list: list[CompletedExperiment]) -> list[CompletedExperiment]:
    base_list.extend(new_list)
    return base_list


@dynamic(cache=True, cache_version="0.1", requests=Resources(cpu="512m", mem="1Gi"))
def run_funs(experiments: list[Experiment]) -> list[CompletedExperiment]:
    i = 0
    completed_experiments = []
    while i < len(experiments):
        mini_batch = experiments[i : i + 500]
        outputs = map_task(run_fun, concurrency=CONCURRENCY_CONSTRAINT)(experiment=mini_batch)

        completed_experiments = reduce(base_list=completed_experiments, new_list=outputs)
        i += 500
    return completed_experiments
returns:
Copy code
Traceback (most recent call last):

      File "/usr/local/lib/python3.10/site-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
        return wrapped(*args, **kwargs)
      File "<redacted>", line 79, in run_funs
        completed_experiments = reduce(base_list=completed_experiments, new_list=outputs)
      File "<redacted>", line 67, in reduce
        base_list.extend(new_list)

Message:

    'Promise' object is not iterable

User error.
any good examples of how to write this reducer?
k

Ketan (kumare3)

10/06/2023, 6:39 PM
Will share
AFK
s

Samhita Alla

10/07/2023, 11:26 AM
@Gopal Vashishtha, when Ketan mentioned 'reduce,' he meant that you need to define a separate Flyte task to materialize the map task outputs. If you refer to the map task example present in the docs,
coalesce
serves as an example of a reduce task.
g

Gopal Vashishtha

10/10/2023, 2:30 AM
Got it, Samhita, that's exactly what I needed. Thanks for sending
@Samhita Alla I tried doing:
Copy code
@task()
def coalesce(completed_experiments: List[List[CompletedExperiment]]) -> List[CompletedExperiment]:
    return list(itertools.chain(completed_experiments))


@dynamic(requests=Resources(cpu="512m", mem="1Gi"))
def run_funs(experiments: list[Experiment]) -> list[CompletedExperiment]:
    i = 0
    completed_experiments = []
    while i < len(experiments):
        mini_batch = experiments[i : i + 500]
        outputs = map_task(run_fun, concurrency=CONCURRENCY_CONSTRAINT)(experiment=mini_batch)
        i += 500
completed_experiments = coalesce(base_list=completed_experiments, new_list=outputs)
return completed_experiments
but I'm getting
Copy code
Message:

    Failed to convert outputs of task 'coalesce' at position 0:
  must be called with a dataclass type or instance

SYSTEM ERROR! Contact platform administrators.
I'm guessing the object returned by
map_task
isn't actually a List of CompletedExperiments but rather some kind of Promise? Is there any way to get this to work?
I'm trying to avoid running
coalesce
within the loop because then I get one coalesce task per run_fun when I really only need one
d

Dan Farrell

10/10/2023, 7:49 PM
@Gopal Vashishtha You might be just doing some weird copy/paste errors but: 1. assuming this line:
completed_experiments = coalesce(base_list=completed_experiments, new_list=outputs)
should be indented a. your
coalesce
doesn't match the above implementation:
coalesce(completed_experiments: List[List[CompletedExperiment]]) -> List[CompletedExperiment]:
2. You should probably be appending your
outputs = map_task
to
completed_experiments
a. Otherwise you're only coalescing the last result of the map_task
g

Gopal Vashishtha

10/10/2023, 8:48 PM
ah Dan thanks for confirming all this. I am indeed copy-pasting badly (trying to obfuscate the exact function names here). Turned out that if I replace the implementation of
coalesce
with:
Copy code
@task()
def coalesce(completed_experiments: List[List[CompletedExperiment]]) -> List[CompletedExperiment]:
    # itertools.chain would be more idiomatic here but produces an error when running in Flyte
    return sum(completed_experiments, [])
then this code works:
Copy code
@dynamic(requests=Resources(cpu="512m", mem="1Gi"))
def run_funs(experiments: List[Experiment]) -> List[CompletedExperiment]:
    i = 0
    completed_experiments = []
    while i < len(experiments):
        mini_batch = experiments[i : i + CONCURRENCY_CONSTRAINT]
        completed_experiments.append(map_task(run_fun, concurrency=CONCURRENCY_CONSTRAINT)(experiment=mini_batch))

        i += CONCURRENCY_CONSTRAINT
    completed_experiments = coalesce(completed_experiments=completed_experiments)
    return completed_experiments
d

Dan Farrell

10/10/2023, 9:10 PM
wow, didn't know that sum could flatten lists! .... seems.... suspicious....
g

Gopal Vashishtha

10/12/2023, 2:55 PM
@Ketan (kumare3) since upgrading I'm now seeing the error
You are not supposed to nest @Task/@Workflow inside a @Task!\"
- is there a new recommended approach rather than wrapping a map task in a dynamic?
d

Dan Farrell

10/12/2023, 3:16 PM
@Gopal Vashishtha that was fixed for the next version https://github.com/flyteorg/flyte/issues/3995
g

Gopal Vashishtha

10/12/2023, 4:36 PM
oh nice... so the warning is a false alarm? this approach should still work?
d

Dan Farrell

10/12/2023, 4:38 PM
I guess so ¯\_(ツ)_/¯
2 Views