acoustic-carpenter-78188
04/10/2024, 6:30 PM@task(requests=Resources(mem="5Gi"))
def ff_list_task() -> List[FlyteFile]:
print('entering ff_list_task')
return [FlyteFile(path="file_1"), FlyteFile(path="file_2")]
@dynamic(requests=Resources(mem="5Gi"))
def dynamic_task() -> List[FlyteFile]:
batched_input_files = ff_list_task()
for file in batched_input_files:
print('entering for loop')
pass
return batched_input_files
@workflow
def wf():
dynamic_task()
Looking at the print statements when we run this example, we see that ff_list_task
never gets executed, yet the for loop is repeatedly executed until we manually terminate the workflow.
2. Calling a sub-workflow in a dynamic task:
In the above example, if we call a sub-workflow when iterating over the FlyteFile list, we see runaway memory growth.
e.g.
@task(requests=Resources(mem="5Gi"))
def ff_list_task() -> List[FlyteFile]:
return [FlyteFile(path="file_1"), FlyteFile(path="file_2")]
@workflow
def sub_wf(input_file: FlyteFile) -> FlyteFile:
return input_file
@dynamic(requests=Resources(mem="5Gi"))
def dynamic_task() -> List[FlyteFile]:
batched_input_files = ff_list_task()
result_files = []
for file in batched_input_files:
batch_result_file = sub_wf(input_file=file)
result_files.append(batch_result_file)
return result_files
@workflow
def wf():
files = dynamic_task()
Even though sub_wf
shouldn't allocate much memory and should only be called twice, simply adding the sub_wf
call causes the memory to increases until an OOM error. In example #1 the memory usage stayed flat.
Expected behavior
In example #1, we would expect that ff_list_task
would execute before the for loop. We would also expect that for file in batched_input_files
may error as it is iterating over a promise. Then, when the for loop does iterate, we would only expect it to iterate two times in the example (as the list only has two files) rather than infinitely.
In example #2, we would expect all the same behavior as outlined above, but we would also expect memory to not increase until OOM.
Additional context to reproduce
These examples were run on union cloud and resource usage was monitored there. Flytekit version 1.11.0 was used.
Screenshots
Usage Screenshots:
image
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteacoustic-carpenter-78188
04/10/2024, 6:30 PM