<#5164 [BUG] Infinite loop and memory growth in dy...
# flytekit
a
#5164 [BUG] Infinite loop and memory growth in dynamic task. Issue created by dansola Describe the bug Two interesting outcomes when iterating over task output and calling sub-workflows within a dynamic task. 1. Iterating over task output in a dynamic task: Within a dynamic task, if we iterate over a list of FlyteFiles outputted by another task, the dynamic task never appears to enter the first task, yet it seemingly infinitely iterates over the FlyteFile list. e.g.
Copy code
@task(requests=Resources(mem="5Gi"))
def ff_list_task() -> List[FlyteFile]:
    print('entering ff_list_task')
    return [FlyteFile(path="file_1"), FlyteFile(path="file_2")]

@dynamic(requests=Resources(mem="5Gi"))
def dynamic_task() -> List[FlyteFile]:
    batched_input_files = ff_list_task()
    for file in batched_input_files:
        print('entering for loop')
        pass
    return batched_input_files

@workflow
def wf():
    dynamic_task()
Looking at the print statements when we run this example, we see that
ff_list_task
never gets executed, yet the for loop is repeatedly executed until we manually terminate the workflow. 2. Calling a sub-workflow in a dynamic task: In the above example, if we call a sub-workflow when iterating over the FlyteFile list, we see runaway memory growth. e.g.
Copy code
@task(requests=Resources(mem="5Gi"))
def ff_list_task() -> List[FlyteFile]:
    return [FlyteFile(path="file_1"), FlyteFile(path="file_2")]

@workflow
def sub_wf(input_file: FlyteFile) -> FlyteFile:
    return input_file

@dynamic(requests=Resources(mem="5Gi"))
def dynamic_task() -> List[FlyteFile]:
    batched_input_files = ff_list_task()
    result_files = []
    for file in batched_input_files:
        batch_result_file = sub_wf(input_file=file)
        result_files.append(batch_result_file)
    return result_files

@workflow
def wf():
    files = dynamic_task()
Even though
sub_wf
shouldn't allocate much memory and should only be called twice, simply adding the
sub_wf
call causes the memory to increases until an OOM error. In example #1 the memory usage stayed flat. Expected behavior In example #1, we would expect that
ff_list_task
would execute before the for loop. We would also expect that
for file in batched_input_files
may error as it is iterating over a promise. Then, when the for loop does iterate, we would only expect it to iterate two times in the example (as the list only has two files) rather than infinitely. In example #2, we would expect all the same behavior as outlined above, but we would also expect memory to not increase until OOM. Additional context to reproduce These examples were run on union cloud and resource usage was monitored there. Flytekit version 1.11.0 was used. Screenshots Usage Screenshots: image Are you sure this issue hasn't been raised already? ☑︎ Yes Have you read the Code of Conduct? ☑︎ Yes flyteorg/flyte