Victor Churikov
06/19/2023, 9:14 AMAttributeError: 'FlyteBranchNode' object has no attribute 'interface'
(it does not abort my run on flyte remote, so it is tolerable)Samhita Alla
pyflyte run
command locally?Victor Churikov
06/19/2023, 10:23 AMSamhita Alla
Victor Churikov
06/19/2023, 10:25 AM@workflow
def smplx_proccess_pipeline(
...
):
run_fit = should_run_fit(...) #return boolean
conditional("should_run_fit")
.if_(run_fit.is_true())
.then(shape_fit_flow(
... inputs ...
))
.else_()
.then(dummy_task())
where shape_fit_flow is a @workflow
that has tasks that use these inputs
I suspect that if the outputs were ready by the time the conditional happened this may not have happened, and the view on UI would be more logically correct (because the subworkflow does depend on these outputs but this is not reflected visually)Samhita Alla
I am aware of this issue with wait, but the reason it has to wait in the first place is because the branch node is waiting for the other tasks to finish, so it can use their outputs, no?FlyteRemote
wait
will wait for the whole execution to finish.
I suspect that if the outputs were ready by the time the conditional happened this may not have happenedThat should be the case. Which flytekit version have you installed?
Victor Churikov
06/19/2023, 2:29 PM@workflow
to @dynamic
and the error went away
update 2023/06/20: the error did not go away it was because of caching that i did not hit it. I updated my cache version and the outputs were not ready soon enough for the branch node, causing this error againSamhita Alla
Victor Churikov
06/21/2023, 9:09 AMmotion_capture_task = create_node(git_clone, repo_url=mo_cap_repo, branch=mo_cap_git_tag).with_overrides(name="Clone CGMocap")
core_algo_task = create_node(git_clone, repo_url=core_algo_repo, branch=core_algo_git_tag).with_overrides(name="Clone CoreAlgo")
evaluators_task = create_node(git_clone, repo_url=evaluators_repo, branch=evaluators_git_tag).with_overrides(name="Clone Evaluators")
fitting_task = create_node(git_clone, repo_url=ts_data_repo, branch=fitting_git_tag).with_overrides(name="Clone Fitting")
common_algo_task = create_node(git_clone, repo_url=common_algo_repo, branch=common_algo_tag).with_overrides(name="Clone CommonAlgo")
motion_capture_repo = motion_capture_task.o0
core_algo_repo = core_algo_task.o0
evaluators_repo = evaluators_task.o0
fitting_repo = fitting_task.o0
common_algo_repo = common_algo_task.o0
run_fit = create_node(should_run_fit,
takes_path=parsed_session_info.expression_paths_concatenated,
source_bucket_name=parsed_session_info.scan_bucket,
data_version=fitting_git_tag,
bucket_name=output_bucket_name,
force_fit=force_run_fit,
)
motion_capture_task >> run_fit
core_algo_task >> run_fit
evaluators_task >> run_fit
fitting_task >> run_fit
common_algo_task >> run_fit
run_fit_conditional = (conditional("should_run_fit").if_(run_fit.o0.is_true()
).then(shape_fit_flow(
fitting_repo=fitting_repo,
core_algo_repo=core_algo_repo,
common_algo_repo=common_algo_repo,
motion_capture_repo=motion_capture_repo,
smplx_version=smplx_version,
debug_fit=debug_fit,
shape_input_path=parsed_session_info.shape_input_path,
scan_bucket=parsed_session_info.scan_bucket,
is_ipad=parsed_session_info.is_ipad,
gender=parsed_session_info.gender,
user_id=parsed_session_info.user_id,
session_id=parsed_session_info.session_id,
prefix_for_algo_bucket=parsed_session_info.prefix_for_algo_bucket,
fitting_git_tag=fitting_git_tag,
output_bucket_name=output_bucket_name,
expression_paths=parsed_session_info.expression_paths,
)).else_().then(skip()))
Hope it’s not too hacky. By setting the clone tasks to run before the task that decides the conditional’s boolean, using >>, the outputs are guaranteed to be ready in time, so the error does not happen and the graph view is more accurate to my expectations
ThanksSamhita Alla
create_node
.
https://docs.flyte.org/projects/cookbook/en/latest/auto/core/control_flow/chain_entities.htmlVictor Churikov
06/21/2023, 10:13 AMrun_fit = should_run_fit(
takes_path=parsed_session_info.expression_paths_concatenated,
source_bucket_name=parsed_session_info.scan_bucket,
data_version=fitting_git_tag,
bucket_name=output_bucket_name,
force_fit=force_run_fit,
)
(motion_capture_repo := git_clone(repo_url=mo_cap_repo, branch=mo_cap_git_tag).with_overrides(name="Clone CGMocap")) >> run_fit
(core_algo_repo := git_clone(repo_url=core_algo_repo, branch=core_algo_git_tag).with_overrides(name="Clone CoreAlgo")) >> run_fit
(evaluators_repo := git_clone(repo_url=evaluators_repo, branch=evaluators_git_tag).with_overrides(name="Clone Evaluators")) >> run_fit
(fitting_repo := git_clone(repo_url=ts_data_repo, branch=fitting_git_tag).with_overrides(name="Clone Fitting")) >> run_fit
(common_algo_repo := git_clone(repo_url=common_algo_repo, branch=common_algo_tag).with_overrides(name="Clone CommonAlgo")) >> run_fit
conditional("should_run_fit").if_(run_fit.is_true()
).then(shape_fit_flow(
fitting_repo=fitting_repo,
core_algo_repo=core_algo_repo,
common_algo_repo=common_algo_repo,
motion_capture_repo=motion_capture_repo,
smplx_version=smplx_version,
debug_fit=debug_fit,
shape_input_path=parsed_session_info.shape_input_path,
scan_bucket=parsed_session_info.scan_bucket,
is_ipad=parsed_session_info.is_ipad,
gender=parsed_session_info.gender,
user_id=parsed_session_info.user_id,
session_id=parsed_session_info.session_id,
prefix_for_algo_bucket=parsed_session_info.prefix_for_algo_bucket,
fitting_git_tag=fitting_git_tag,
output_bucket_name=output_bucket_name,
expression_paths=parsed_session_info.expression_paths,
)).else_().then(skip())
Works well
Thanks for the tip!