Any tips to improve the responsiveness of the flyt...
# ask-the-community
g
Any tips to improve the responsiveness of the flyte console when rendering a map task with 30K executions? We're using UI version 1.9.6. In the console I see requests to
Copy code
/api/v1/data/node_executions
that are failing with 429: Too Many Requests Eventually things do load but it takes 1.3 minutes.
k
Map task with 30k executions is ridiculous, I thought we allow 5k only
Sadly today no other way. Ui changes are planned
d
fwiw (in a biology context) we regularly run simple map tasks (not in flyte) with N=1 million or more. (just saying that there is a need)
k
Ya I know, but those are short lived
We are working on certain things
Would love real world examples
d
at what computational level? a lot of things these days are gpu-based so running one test would be quite expensive
How would you define short lived?
g
Yeah we’re doing the same kind of thing as Dan- trying to use Flyte to run experiments. Would be nice if we could just disable the rendering of all runs or make it lazy
k
short-lived means - total runtime - each task does it run for seconds or minutes? Flyte map tasks are designed for longer running ("more than a couple minutes of runtime")
g
in our case each one takes 5 minutes - is there something else we should be using? also started observing an issue where the map task initially does render in the UI but when the number of successful executions gets above ~11000, it ceases to render. Is that expected? In dev tools I notice some calls to
n1-0-n1?limit=10000
wondering if there's a way to change that limit
k
I would use heirarchical map tasks, I.e use a dynamic to create multiple - that make 6 map tasks each with 5k
g
tried doing:
Copy code
@dynamic(cache=True, cache_version="0.1", requests=Resources(cpu="512m", mem="1Gi"))
def run_funs(experiments: list[Experiment]) -> list[CompletedExperiment]:
    i = 0
    completed_experiments = []
    while i < len(experiments):
        mini_batch = experiments[i : i + 500]
        completed_experiments.extend(map_task(fun, concurrency=CONCURRENCY_CONSTRAINT, experiment=mini_batch))
        i += 500
    return completed_experiments
but getting:
Copy code
Traceback (most recent call last):

      File "/usr/local/lib/python3.10/site-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
        return wrapped(*args, **kwargs)
      File "<redacted>", line 73, in run_funs
        completed_experiments.extend(map_task(fun, concurrency=CONCURRENCY_CONSTRAINT, experiment=mini_batch))

Message:

    'MapPythonTask' object is not iterable

User error.
k
I think you need a reduce step
There is no extend primitive for lists
g
Is "reduce" a python thing or a flyte thing?
k
I mean a simple task that takes lists and concatenates them haha
g
trying...
Copy code
def reduce(base_list: list[CompletedExperiment], new_list: list[CompletedExperiment]) -> list[CompletedExperiment]:
    base_list.extend(new_list)
    return base_list


@dynamic(cache=True, cache_version="0.1", requests=Resources(cpu="512m", mem="1Gi"))
def run_funs(experiments: list[Experiment]) -> list[CompletedExperiment]:
    i = 0
    completed_experiments = []
    while i < len(experiments):
        mini_batch = experiments[i : i + 500]
        outputs = map_task(run_fun, concurrency=CONCURRENCY_CONSTRAINT)(experiment=mini_batch)

        completed_experiments = reduce(base_list=completed_experiments, new_list=outputs)
        i += 500
    return completed_experiments
returns:
Copy code
Traceback (most recent call last):

      File "/usr/local/lib/python3.10/site-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
        return wrapped(*args, **kwargs)
      File "<redacted>", line 79, in run_funs
        completed_experiments = reduce(base_list=completed_experiments, new_list=outputs)
      File "<redacted>", line 67, in reduce
        base_list.extend(new_list)

Message:

    'Promise' object is not iterable

User error.
any good examples of how to write this reducer?
k
Will share
AFK
s
@Gopal Vashishtha, when Ketan mentioned 'reduce,' he meant that you need to define a separate Flyte task to materialize the map task outputs. If you refer to the map task example present in the docs,
coalesce
serves as an example of a reduce task.
g
Got it, Samhita, that's exactly what I needed. Thanks for sending
@Samhita Alla I tried doing:
Copy code
@task()
def coalesce(completed_experiments: List[List[CompletedExperiment]]) -> List[CompletedExperiment]:
    return list(itertools.chain(completed_experiments))


@dynamic(requests=Resources(cpu="512m", mem="1Gi"))
def run_funs(experiments: list[Experiment]) -> list[CompletedExperiment]:
    i = 0
    completed_experiments = []
    while i < len(experiments):
        mini_batch = experiments[i : i + 500]
        outputs = map_task(run_fun, concurrency=CONCURRENCY_CONSTRAINT)(experiment=mini_batch)
        i += 500
completed_experiments = coalesce(base_list=completed_experiments, new_list=outputs)
return completed_experiments
but I'm getting
Copy code
Message:

    Failed to convert outputs of task 'coalesce' at position 0:
  must be called with a dataclass type or instance

SYSTEM ERROR! Contact platform administrators.
I'm guessing the object returned by
map_task
isn't actually a List of CompletedExperiments but rather some kind of Promise? Is there any way to get this to work?
I'm trying to avoid running
coalesce
within the loop because then I get one coalesce task per run_fun when I really only need one
d
@Gopal Vashishtha You might be just doing some weird copy/paste errors but: 1. assuming this line:
completed_experiments = coalesce(base_list=completed_experiments, new_list=outputs)
should be indented a. your
coalesce
doesn't match the above implementation:
coalesce(completed_experiments: List[List[CompletedExperiment]]) -> List[CompletedExperiment]:
2. You should probably be appending your
outputs = map_task
to
completed_experiments
a. Otherwise you're only coalescing the last result of the map_task
g
ah Dan thanks for confirming all this. I am indeed copy-pasting badly (trying to obfuscate the exact function names here). Turned out that if I replace the implementation of
coalesce
with:
Copy code
@task()
def coalesce(completed_experiments: List[List[CompletedExperiment]]) -> List[CompletedExperiment]:
    # itertools.chain would be more idiomatic here but produces an error when running in Flyte
    return sum(completed_experiments, [])
then this code works:
Copy code
@dynamic(requests=Resources(cpu="512m", mem="1Gi"))
def run_funs(experiments: List[Experiment]) -> List[CompletedExperiment]:
    i = 0
    completed_experiments = []
    while i < len(experiments):
        mini_batch = experiments[i : i + CONCURRENCY_CONSTRAINT]
        completed_experiments.append(map_task(run_fun, concurrency=CONCURRENCY_CONSTRAINT)(experiment=mini_batch))

        i += CONCURRENCY_CONSTRAINT
    completed_experiments = coalesce(completed_experiments=completed_experiments)
    return completed_experiments
d
wow, didn't know that sum could flatten lists! .... seems.... suspicious....
g
@Ketan (kumare3) since upgrading I'm now seeing the error
You are not supposed to nest @Task/@Workflow inside a @Task!\"
- is there a new recommended approach rather than wrapping a map task in a dynamic?
d
@Gopal Vashishtha that was fixed for the next version https://github.com/flyteorg/flyte/issues/3995
g
oh nice... so the warning is a false alarm? this approach should still work?
d
I guess so ¯\_(ツ)_/¯