Is there a way to us the output of a task in the `...
# ask-the-community
s
Is there a way to us the output of a task in the
with_overrides
of a downstream task? We have a dynamic function that calls n tasks in a row. In each task, we do some modification to a dataframe. As we go through the list of tasks, we want to be able to dynamically size the next task based on the size of the dataframe after the previous task. I've looked into using conditionals, but since we are inside a dynamic task this doesn't work. We could write the size to some DB and then do a lookup in the caller, but that seems less than ideal. I'm hoping theres some way to call
.eval
on the resulting promise and then use that.
Actually, it looks like we can use
conditional
in dynamic tasks, as long as we manually check if we are operating on a Promise. So the following works:
Copy code
# Atomwise
from atomwise.utils.flyte import aw_dynamic, aw_task, aw_workflow

from flytekit import Resources, conditional
from flytekit.extend import Promise


SETTINGS = {
    'small': '1000Mi',
    'medium': '2000Mi',
    'large': '3000Mi'
}


@aw_task
def example_task(input: str, idx: int) -> tuple[str, str]:
    # NOTE: this would be based on dataframe size
    if idx == 0:
        size = 'small'
    elif idx == 1:
        size = 'medium'
    elif idx == 2:
        size = 'large'
    
    return input, size


@aw_dynamic
def example_dynamic(input: str) -> str:
    size = 'small'
    for i in range(3):
        if type(size) == Promise:
            result, size = (conditional("test")
                .if_(size == 'small')
                .then(example_task(input=input, idx=i).with_overrides(requests=Resources(mem=SETTINGS['small'])))
                .elif_(size == 'medium')
                .then(example_task(input=input, idx=i).with_overrides(requests=Resources(mem=SETTINGS['medium'])))
                .elif_(size == 'large')
                .then(example_task(input=input, idx=i).with_overrides(requests=Resources(mem=SETTINGS['large'])))
                .else_()
                .fail("Blah")
            )
        else:
            result, size = example_task(input=input, idx=i).with_overrides(requests=Resources(mem=SETTINGS['small']))

    return result


@aw_workflow
def example_workflow(input: str) -> str:
    return example_dynamic(input=input)
It does mean though that the UI gets pretty cluttered (and would be more so if we had more refined settings)
Is there a way to hide non-executed conditional branches?
s
I don't think you can hide the non-executed conditional nodes. cc @Eduardo Apolinario (eapolinario)
e
correct, @Samhita Alla,we can't hide non-executed conditional nodes. That said, you might be interested in the the work to extend the config overrides happening as part of https://github.com/flyteorg/flyte/pull/3553, @Sam Eckert.
s
Thanks, I'll take a look!