Sam Eckert
08/11/2023, 8:05 PMwith_overrides
of a downstream task? We have a dynamic function that calls n tasks in a row. In each task, we do some modification to a dataframe. As we go through the list of tasks, we want to be able to dynamically size the next task based on the size of the dataframe after the previous task.
I've looked into using conditionals, but since we are inside a dynamic task this doesn't work. We could write the size to some DB and then do a lookup in the caller, but that seems less than ideal. I'm hoping theres some way to call .eval
on the resulting promise and then use that.conditional
in dynamic tasks, as long as we manually check if we are operating on a Promise. So the following works:
# Atomwise
from atomwise.utils.flyte import aw_dynamic, aw_task, aw_workflow
from flytekit import Resources, conditional
from flytekit.extend import Promise
SETTINGS = {
'small': '1000Mi',
'medium': '2000Mi',
'large': '3000Mi'
}
@aw_task
def example_task(input: str, idx: int) -> tuple[str, str]:
# NOTE: this would be based on dataframe size
if idx == 0:
size = 'small'
elif idx == 1:
size = 'medium'
elif idx == 2:
size = 'large'
return input, size
@aw_dynamic
def example_dynamic(input: str) -> str:
size = 'small'
for i in range(3):
if type(size) == Promise:
result, size = (conditional("test")
.if_(size == 'small')
.then(example_task(input=input, idx=i).with_overrides(requests=Resources(mem=SETTINGS['small'])))
.elif_(size == 'medium')
.then(example_task(input=input, idx=i).with_overrides(requests=Resources(mem=SETTINGS['medium'])))
.elif_(size == 'large')
.then(example_task(input=input, idx=i).with_overrides(requests=Resources(mem=SETTINGS['large'])))
.else_()
.fail("Blah")
)
else:
result, size = example_task(input=input, idx=i).with_overrides(requests=Resources(mem=SETTINGS['small']))
return result
@aw_workflow
def example_workflow(input: str) -> str:
return example_dynamic(input=input)
Samhita Alla
Eduardo Apolinario (eapolinario)
08/14/2023, 3:52 PMSam Eckert
08/14/2023, 11:03 PM