Hi all! I am trying to use conditional tasks in a ...
# ask-the-community
n
Hi all! I am trying to use conditional tasks in a workflow, but I want to control the execution order. when I use ">>"; I get this error:
unsupported operand type(s) for >>: 'Condition' and 'Condition'
is it supported? am I not completing the conditional statement? code:
Copy code
@workflow
def wf(
        run_task_1: bool,
        run_task_2: bool
):


    task1_output = (
        conditional("task_one_condition")
        .if_(run_task_1.is_true())
        .then(
            run_task_one()
        )
    )

    task2_output = (
        conditional("task_two_condition")
        .if_(run_task_2.is_true())
        .then(
            run_task_two()
        )
    )

    task1_output >> task2_output
g
You have to have a full conditional, both branches must be present. So add an else branch to each of those. You can make a throwaway task for it to run if you don't need it to do anything in that branch
"It’s important to note that any
conditional
statement in Flyte is expected to be complete, meaning that all possible branches must be accounted for."
k
or simply add
.fail
no need of throwaway task. The idea is the conditional's have to be complete and always return
g
I had assumed .fail would raise an exception. It will just fall through?
k
Yes it will only if the fail branch is taken
n
so there is no option to end the condition peacefully without failing the task or creating a task just for failure case?
k
Remember this is a data flow right
You can always create an echo task that just returns some data
n
alright, thanks
but wouldn't it make more sense to add a "skip" option instead of spawning a pod for a dummy do-nothing task?
k
Hmm we can make a do nothing task never spawn a pod
Maybe we can add one in propeller- like echo or do nothing
The engine can return
Then of a conditional expression- X = if blah x else y
n
Copy code
task1_output = (
        conditional("task_one_condition")
        .if_(run_task_1.is_true())
        .then(
            run_task_one()
        )
        .end() #
    )
or after else()
k
What does end mean
What is the lvalue in this expression
How can you get task_1 output - is it none
I guess you can return else None
Did you try else None?
n
else doesnt accept inputs.
k
Let me tal
n
I am suggesting that conditions should have this .end() - maybe .yield() or something for a better naming - method that instructs the propeller to skip it, or return None by default
k
I think else None is more readable no?
n
yeah, could be
k
@Nizar Hattab i checked, sadly we do now allow
value
return. Can you open an issue to support None or echo tasks as defaults in flyte backend. This is some work, but pretty simple
would you be open to creating an issue?
s
here's an issue to add support for noop in conditional else: https://github.com/flyteorg/flyte/issues/3533
k
Cool i commented about a solution - @Nizar Hattab / @Yubo Wang others
contributions welcome
y
I like the solution you proposed. so two tasks: 1. implement a task-plugin in propeller 2. Implement echo in flytekit. Is that correct?
k
Yup
Should be fast
y
circling back on this issue, our intern @Allison Liu is working on it. internally we are implementing in this way
Copy code
from typing import Dict, Type, Optional

from flytekit.core.base_task import PythonTask
from flytekit.core.interface import Interface


class EchoTask(PythonTask):
    """
    This is the simplest form of an Echo Task, that can be used even for tasks that do not produce any output.
    Requires task name, inputs is optional and defaulted to None
    """

    _TASK_TYPE = "echo"

    def __init__(
        self,
        name: str,
        inputs: Optional[Dict[str, Type]] = None,
        **kwargs,
    ):
        """
        To be used to run echo job. In order to skip the node, set given inputs as the inputs and
        given inputs as the outputs. 
        """
        super().__init__(
            name=name,
            task_config=None,
            inputs=inputs,
            interface=Interface(inputs=inputs, outputs=inputs),
            task_type=self._TASK_TYPE,
            **kwargs
        )
then, you use it as, basically your inputs and outputs will share the same signature
Copy code
@workflow
def wf(isRunning: bool):
    task = EchoTask(name="skip",inputs={"a": str})
    return(
        conditional("skip")
        .if_(isRunning.is_true())
        .then(task(a="1"))
        .else_().then(task(a="2"))
    )
Not sure if this is something oss wanted.