I need to conditionally run some tasks based on an...
# ask-the-community
v
I need to conditionally run some tasks based on an output of a previous task. I saw that using
if
in my @workflow to conditionally run these tasks is not supported because of limitations related to DAG and/or serialization I tried with a conditional (
from flytekit import conditional
), when I do
conditional('somename').if_(mybool).then(mytask(myinputs=myinputs))
it ignores my conditional with no representation in the graph UI. When I add an
.else_().then(anytask())
it gets represented in graphs UI but errors with
AttributeError: 'FlyteBranchNode' object has no attribute 'interface'
after the run already begins I tried with
@task
and with
@dynamic
What is the correct way to conditionally run these multiple tasks based an a boolean output of another task? Note that these tasks also have dependencies on each other, one of them produces outputs and the rest receive it as input
j
have you tried this
.if_(mybool.is_true())
in the conditional it is stated you cannot exactly directly compare
we have a similar conditional WF that has similar logic as yours and it is working
v
I just assumed that
.is_true()
in the docs was a placeholder example for any boolean, now that I think of it it makes sense with the serialization and translation into flyte types that my boolean could now have additional properties or methods, because it’s not the same simple boolean it used to be I think that’s it, thanks! I’ll test and confirm
With is_true() and with changing my task/dynamic to a subflow, it works very well! It still requires me to use an “else”, otherwise the conditional seems to be ignored, but the “else” can be a minimal dummy task that immediately exits, and as soon as the conditional’s result is decided it expands the graph to show my subflow tasks. Working example (where
shape_fit_flow
is a sub @workflow):
Copy code
conditional("should_run_fit")
        .if_(run_fit.is_true())
        .then(shape_fit_flow(
            fitting_repo=fitting_repo,
            core_algo_repo=core_algo_repo,
            smplx_version=smplx_version,
            debug_fit=debug_fit,
            shape_input_path=parsed_session_info.shape_input_path,
            scan_bucket=parsed_session_info.scan_bucket,
            is_ipad=parsed_session_info.is_ipad
        ).else_().then(dummy_task())
Broken example (I removed the else then with the dummy task on the last line):
Copy code
conditional("should_run_fit")
        .if_(run_fit.is_true())
        .then(shape_fit_flow(
            fitting_repo=fitting_repo,
            core_algo_repo=core_algo_repo,
            smplx_version=smplx_version,
            debug_fit=debug_fit,
            shape_input_path=parsed_session_info.shape_input_path,
            scan_bucket=parsed_session_info.scan_bucket,
            is_ipad=parsed_session_info.is_ipad
        )
The docs didn’t show an example without else/then, and it’s not working for me without it, so I’ll assume it’s not supported. Not too much of a problem, because using dummy task is an acceptable workaround in my case. Thanks!
j
nice, glad it worked 👍
yeah probably not supported, but i think it works for dynamic task though
s
The docs didn’t show an example without else/then, and it’s not working for me without it
conditional
doesn't work without
else
. An issue was created some time ago to support no task execution in a conditional: https://github.com/flyteorg/flyte/issues/3533. Please upvote if this is something you require.
219 Views