Gopal Vashishtha
10/05/2023, 1:37 PM/api/v1/data/node_executions
that are failing with 429: Too Many Requests
Eventually things do load but it takes 1.3 minutes.Ketan (kumare3)
Dan Farrell
10/05/2023, 3:09 PMKetan (kumare3)
Dan Farrell
10/05/2023, 4:12 PMGopal Vashishtha
10/05/2023, 4:40 PMKetan (kumare3)
Gopal Vashishtha
10/06/2023, 1:18 PMn1-0-n1?limit=10000
wondering if there's a way to change that limitKetan (kumare3)
Gopal Vashishtha
10/06/2023, 4:37 PM@dynamic(cache=True, cache_version="0.1", requests=Resources(cpu="512m", mem="1Gi"))
def run_funs(experiments: list[Experiment]) -> list[CompletedExperiment]:
i = 0
completed_experiments = []
while i < len(experiments):
mini_batch = experiments[i : i + 500]
completed_experiments.extend(map_task(fun, concurrency=CONCURRENCY_CONSTRAINT, experiment=mini_batch))
i += 500
return completed_experiments
but getting:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
return wrapped(*args, **kwargs)
File "<redacted>", line 73, in run_funs
completed_experiments.extend(map_task(fun, concurrency=CONCURRENCY_CONSTRAINT, experiment=mini_batch))
Message:
'MapPythonTask' object is not iterable
User error.
Ketan (kumare3)
Gopal Vashishtha
10/06/2023, 4:39 PMKetan (kumare3)
Gopal Vashishtha
10/06/2023, 4:42 PMdef reduce(base_list: list[CompletedExperiment], new_list: list[CompletedExperiment]) -> list[CompletedExperiment]:
base_list.extend(new_list)
return base_list
@dynamic(cache=True, cache_version="0.1", requests=Resources(cpu="512m", mem="1Gi"))
def run_funs(experiments: list[Experiment]) -> list[CompletedExperiment]:
i = 0
completed_experiments = []
while i < len(experiments):
mini_batch = experiments[i : i + 500]
outputs = map_task(run_fun, concurrency=CONCURRENCY_CONSTRAINT)(experiment=mini_batch)
completed_experiments = reduce(base_list=completed_experiments, new_list=outputs)
i += 500
return completed_experiments
returns:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
return wrapped(*args, **kwargs)
File "<redacted>", line 79, in run_funs
completed_experiments = reduce(base_list=completed_experiments, new_list=outputs)
File "<redacted>", line 67, in reduce
base_list.extend(new_list)
Message:
'Promise' object is not iterable
User error.
Ketan (kumare3)
Samhita Alla
coalesce
serves as an example of a reduce task.Gopal Vashishtha
10/10/2023, 2:30 AM@task()
def coalesce(completed_experiments: List[List[CompletedExperiment]]) -> List[CompletedExperiment]:
return list(itertools.chain(completed_experiments))
@dynamic(requests=Resources(cpu="512m", mem="1Gi"))
def run_funs(experiments: list[Experiment]) -> list[CompletedExperiment]:
i = 0
completed_experiments = []
while i < len(experiments):
mini_batch = experiments[i : i + 500]
outputs = map_task(run_fun, concurrency=CONCURRENCY_CONSTRAINT)(experiment=mini_batch)
i += 500
completed_experiments = coalesce(base_list=completed_experiments, new_list=outputs)
return completed_experiments
but I'm getting
Message:
Failed to convert outputs of task 'coalesce' at position 0:
must be called with a dataclass type or instance
SYSTEM ERROR! Contact platform administrators.
map_task
isn't actually a List of CompletedExperiments but rather some kind of Promise? Is there any way to get this to work?coalesce
within the loop because then I get one coalesce task per run_fun when I really only need oneDan Farrell
10/10/2023, 7:49 PMcompleted_experiments = coalesce(base_list=completed_experiments, new_list=outputs)
should be indented
a. your coalesce
doesn't match the above implementation: coalesce(completed_experiments: List[List[CompletedExperiment]]) -> List[CompletedExperiment]:
2. You should probably be appending your outputs = map_task
to completed_experiments
a. Otherwise you're only coalescing the last result of the map_taskGopal Vashishtha
10/10/2023, 8:48 PMcoalesce
with:
@task()
def coalesce(completed_experiments: List[List[CompletedExperiment]]) -> List[CompletedExperiment]:
# itertools.chain would be more idiomatic here but produces an error when running in Flyte
return sum(completed_experiments, [])
then this code works:
@dynamic(requests=Resources(cpu="512m", mem="1Gi"))
def run_funs(experiments: List[Experiment]) -> List[CompletedExperiment]:
i = 0
completed_experiments = []
while i < len(experiments):
mini_batch = experiments[i : i + CONCURRENCY_CONSTRAINT]
completed_experiments.append(map_task(run_fun, concurrency=CONCURRENCY_CONSTRAINT)(experiment=mini_batch))
i += CONCURRENCY_CONSTRAINT
completed_experiments = coalesce(completed_experiments=completed_experiments)
return completed_experiments
Dan Farrell
10/10/2023, 9:10 PMGopal Vashishtha
10/12/2023, 2:55 PMYou are not supposed to nest @Task/@Workflow inside a @Task!\"
- is there a new recommended approach rather than wrapping a map task in a dynamic?Dan Farrell
10/12/2023, 3:16 PMGopal Vashishtha
10/12/2023, 4:36 PMDan Farrell
10/12/2023, 4:38 PM