varsha Parthasarathy
12/15/2022, 12:08 PMMaarten de Jong
12/15/2022, 12:39 PM@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
varsha Parthasarathy
12/15/2022, 4:16 PMground_truth_workflow = flytekit.LaunchPlan.get_or_create(
name="ground_truth_workflow",
workflow=GroundTruthOfflinePCPWorkFlow,
)
Dynamic tasks calls a launch plan.
@flytekit.dynamic
def run:
# Figures out number of iterations
# ground_truth_workflow(..)
aborted
TypeError: isinstance() arg 2 must be a type or tuple of types
Dan Rammer (hamersaw)
12/15/2022, 7:00 PMJay Ganbat
12/15/2022, 7:01 PMvarsha Parthasarathy
12/15/2022, 7:01 PM@flytekit.workflow
def GroundTruthSchedulerWorkflows(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE):
<http://logger.info|logger.info>("scheduling ground truth workflows")
scheduler_tasks.run()
This is the workflow which calls dynamic task
@flytekit.dynamic(
cache=False,
cache_version="0.12.10",
retries=1,
requests=flytekit.Resources(mem="16Gi", cpu="1"),
limits=flytekit.Resources(mem="16Gi", cpu="1"),
)
def run():
….
File "/app/src/perception/scene_workflows/ground_truth/workflows/py_run_ground_truth_scheduler_workflows.binary.runfiles/com_lyft_avsoftware/src/perception/scene_workflows/ground_truth/workflows/ground_truth_scheduler_workflows.py", line 14, in <module>
def GroundTruthSchedulerWorkflows(failure_policy=flytekit.WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE) :
File "/app/src/perception/scene_workflows/ground_truth/workflows/py_run_ground_truth_scheduler_workflows.binary.runfiles/pyreqs_requirements_py3_pypi__flytekit_1_0_1/flytekit/core/workflow.py", line 739, in workflow
return wrapper(_workflow_function)
File "/app/src/perception/scene_workflows/ground_truth/workflows/py_run_ground_truth_scheduler_workflows.binary.runfiles/pyreqs_requirements_py3_pypi__flytekit_1_0_1/flytekit/core/workflow.py", line 734, in wrapper
workflow_instance.compile()
File "/app/src/perception/scene_workflows/ground_truth/workflows/py_run_ground_truth_scheduler_workflows.binary.runfiles/pyreqs_requirements_py3_pypi__flytekit_1_0_1/flytekit/core/workflow.py", line 604, in compile
self._input_parameters = transform_inputs_to_parameters(ctx, self.python_interface)
File "/app/src/perception/scene_workflows/ground_truth/workflows/py_run_ground_truth_scheduler_workflows.binary.runfiles/pyreqs_requirements_py3_pypi__flytekit_1_0_1/flytekit/core/interface.py", line 188, in transform_inputs_to_parameters
default_lv = TypeEngine.to_literal(ctx, _default, python_type=interface.inputs[k], expected=v.type)
File "/app/src/perception/scene_workflows/ground_truth/workflows/py_run_ground_truth_scheduler_workflows.binary.runfiles/pyreqs_requirements_py3_pypi__flytekit_1_0_1/flytekit/core/type_engine.py", line 680, in to_literal
transformer.assert_type(python_type, python_val)
File "/app/src/perception/scene_workflows/ground_truth/workflows/py_run_ground_truth_scheduler_workflows.binary.runfiles/pyreqs_requirements_py3_pypi__flytekit_1_0_1/flytekit/core/type_engine.py", line 96, in assert_type
if not hasattr(t, "__origin__") and not isinstance(v, t):
Dan Rammer (hamersaw)
12/15/2022, 7:07 PM@flytekit.workflow(failure_policy=...)
and then as i mentioned it should be used on the top-level workflow to define how that handles failures in the dynamic subtasks
@flytekit.workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def foo:
run()
ground_truth_workflow = flytekit.LaunchPlan.get_or_create(
name="ground_truth_workflow",
workflow=GroundTruthOfflinePCPWorkFlow,
)
@flytekit.dynamic(
cache=False,
cache_version="0.12.10",
retries=1,
requests=flytekit.Resources(mem="16Gi", cpu="1"),
limits=flytekit.Resources(mem="16Gi", cpu="1"),
)
def run():
for i in range(2):
ground_truth_workflow()
varsha Parthasarathy
12/16/2022, 4:32 PMDan Rammer (hamersaw)
12/16/2022, 4:38 PM