My subworkflow inside a flyte conditional receives...
# ask-the-community
v
My subworkflow inside a flyte conditional receives inputs from these 5 tasks, but it doesn’t appear to depend on them on graph UI. I think that’s because the “conditional” itself doesn’t depend on them, but the workflow it calls does. Is it possible to configure dependencies for a conditional, or assign this dependency explicitly? This is mostly a visual issue, but I’m also getting an error locally which I suspect may be related
AttributeError: 'FlyteBranchNode' object has no attribute 'interface'
(it does not abort my run on flyte remote, so it is tolerable)
s
Would you mind sharing the code snippet? Also are you seeing that error when you run the
pyflyte run
command locally?
v
I run using flytekit’s register_script() and execute(), I will share the relevant code pieces
s
An issue has been created already to tackle this error: https://github.com/flyteorg/flyte/issues/3628
v
I am aware of this issue with wait, but the reason it has to wait in the first place is because the branch node is waiting for the other tasks to finish, so it can use their outputs, no?
Copy code
@workflow
def smplx_proccess_pipeline(
    ...
    ):

    run_fit = should_run_fit(...) #return boolean

    conditional("should_run_fit")
    .if_(run_fit.is_true())
    .then(shape_fit_flow(
        ... inputs ...
    ))
    .else_()
    .then(dummy_task())
where shape_fit_flow is a
@workflow
that has tasks that use these inputs I suspect that if the outputs were ready by the time the conditional happened this may not have happened, and the view on UI would be more logically correct (because the subworkflow does depend on these outputs but this is not reflected visually)
Another image to visually explain it If I could explicitly define dependencies for the conditional then it would probably wait until the inputs were ready and these sections here would not overlap, and probably there would be no issue with the branch node wait Or maybe there would still be issues, in that case I would have the dependencies on the task that decides the conditional (should_run_fit). Then the wait would be on the task and not a conditional and it should also work. But I don’t want to configure it as an input for this task because it takes a moment to download these files and it doesnt need them. I’d like to configure it to run after those tasks without relying on inputs/outputs Any way I still need to configure a dependency here
s
I am aware of this issue with wait, but the reason it has to wait in the first place is because the branch node is waiting for the other tasks to finish, so it can use their outputs, no?
FlyteRemote
wait
will wait for the whole execution to finish.
I suspect that if the outputs were ready by the time the conditional happened this may not have happened
That should be the case. Which flytekit version have you installed?
v
My flytekit is version 1.6.2 I had to add dynamic parallels later on in this subworkflow so I changed it from
@workflow
to
@dynamic
and the error went away update 2023/06/20: the error did not go away it was because of caching that i did not hit it. I updated my cache version and the outputs were not ready soon enough for the branch node, causing this error again
s
Oh interesting.
v
Solved using explicit dependencies with create_node:
Copy code
motion_capture_task = create_node(git_clone, repo_url=mo_cap_repo, branch=mo_cap_git_tag).with_overrides(name="Clone CGMocap")
    core_algo_task = create_node(git_clone, repo_url=core_algo_repo, branch=core_algo_git_tag).with_overrides(name="Clone CoreAlgo")
    evaluators_task = create_node(git_clone, repo_url=evaluators_repo, branch=evaluators_git_tag).with_overrides(name="Clone Evaluators")
    fitting_task = create_node(git_clone, repo_url=ts_data_repo, branch=fitting_git_tag).with_overrides(name="Clone Fitting")
    common_algo_task = create_node(git_clone, repo_url=common_algo_repo, branch=common_algo_tag).with_overrides(name="Clone CommonAlgo")

    motion_capture_repo = motion_capture_task.o0
    core_algo_repo = core_algo_task.o0
    evaluators_repo = evaluators_task.o0
    fitting_repo = fitting_task.o0
    common_algo_repo = common_algo_task.o0

    run_fit = create_node(should_run_fit,
        takes_path=parsed_session_info.expression_paths_concatenated,
        source_bucket_name=parsed_session_info.scan_bucket,
        data_version=fitting_git_tag,
        bucket_name=output_bucket_name,
        force_fit=force_run_fit,
    )

    motion_capture_task >> run_fit
    core_algo_task >> run_fit
    evaluators_task >> run_fit
    fitting_task >> run_fit
    common_algo_task >> run_fit


    run_fit_conditional = (conditional("should_run_fit").if_(run_fit.o0.is_true()
    ).then(shape_fit_flow(
        fitting_repo=fitting_repo,
        core_algo_repo=core_algo_repo,
        common_algo_repo=common_algo_repo,
        motion_capture_repo=motion_capture_repo,
        smplx_version=smplx_version,
        debug_fit=debug_fit,
        shape_input_path=parsed_session_info.shape_input_path,
        scan_bucket=parsed_session_info.scan_bucket,
        is_ipad=parsed_session_info.is_ipad,
        gender=parsed_session_info.gender,
        user_id=parsed_session_info.user_id,
        session_id=parsed_session_info.session_id,
        prefix_for_algo_bucket=parsed_session_info.prefix_for_algo_bucket,
        fitting_git_tag=fitting_git_tag,
        output_bucket_name=output_bucket_name,
        expression_paths=parsed_session_info.expression_paths,
    )).else_().then(skip()))
Hope it’s not too hacky. By setting the clone tasks to run before the task that decides the conditional’s boolean, using >>, the outputs are guaranteed to be ready in time, so the error does not happen and the graph view is more accurate to my expectations Thanks
s
Nice. You should be able to chain tasks without using
create_node
. https://docs.flyte.org/projects/cookbook/en/latest/auto/core/control_flow/chain_entities.html
v
This makes everything so much simpler:
Copy code
run_fit = should_run_fit(
        takes_path=parsed_session_info.expression_paths_concatenated,
        source_bucket_name=parsed_session_info.scan_bucket,
        data_version=fitting_git_tag,
        bucket_name=output_bucket_name,
        force_fit=force_run_fit,
    )

    (motion_capture_repo := git_clone(repo_url=mo_cap_repo, branch=mo_cap_git_tag).with_overrides(name="Clone CGMocap")) >> run_fit
    (core_algo_repo := git_clone(repo_url=core_algo_repo, branch=core_algo_git_tag).with_overrides(name="Clone CoreAlgo")) >> run_fit
    (evaluators_repo := git_clone(repo_url=evaluators_repo, branch=evaluators_git_tag).with_overrides(name="Clone Evaluators")) >> run_fit
    (fitting_repo := git_clone(repo_url=ts_data_repo, branch=fitting_git_tag).with_overrides(name="Clone Fitting")) >> run_fit
    (common_algo_repo := git_clone(repo_url=common_algo_repo, branch=common_algo_tag).with_overrides(name="Clone CommonAlgo")) >> run_fit

    conditional("should_run_fit").if_(run_fit.is_true()
    ).then(shape_fit_flow(
        fitting_repo=fitting_repo,
        core_algo_repo=core_algo_repo,
        common_algo_repo=common_algo_repo,
        motion_capture_repo=motion_capture_repo,
        smplx_version=smplx_version,
        debug_fit=debug_fit,
        shape_input_path=parsed_session_info.shape_input_path,
        scan_bucket=parsed_session_info.scan_bucket,
        is_ipad=parsed_session_info.is_ipad,
        gender=parsed_session_info.gender,
        user_id=parsed_session_info.user_id,
        session_id=parsed_session_info.session_id,
        prefix_for_algo_bucket=parsed_session_info.prefix_for_algo_bucket,
        fitting_git_tag=fitting_git_tag,
        output_bucket_name=output_bucket_name,
        expression_paths=parsed_session_info.expression_paths,
    )).else_().then(skip())
Works well Thanks for the tip!
116 Views