Jay Ganbat
04/04/2022, 11:25 PMMapped Task
supported in local executions, I have a mapped task in one of my workflow and tried to run it locally but im getting into a Promise issue like below
E TypeError: Failed to convert return value for var o0 for function balrog_pipeline.workflows.primer_qc.process_expected_primer_qc_workflow.mapper_calculate_spike_in_metrics_in_fastq_dir_2 with error <class 'TypeError'>: No automatic conversion found from type <class 'flytekit.core.promise.Promise'> to FlyteFile.Supported (os.PathLike, str, Flytefile)
mapper task looks like this
out_spike_in_metrics_list = map_task(
calculate_spike_in_metrics_in_fastq_dir,
metadata=TaskMetadata(retries=DEFAULT_DYNAMIC_TASK_RETRY),
)(task_params=task_params_list)
and calculate_spike_in_metrics_in_fastq_dir
is the task with following signature
@BASE_TASK_WORKER.run_task_with_args()
def calculate_spike_in_metrics_in_fastq_dir(
task_params: dict[str, FlyteFile]
) -> FlyteFile:
Yee
typing.Dict
Jay Ganbat
04/04/2022, 11:50 PMYee
Jay Ganbat
04/04/2022, 11:51 PMYee
Jay Ganbat
04/04/2022, 11:51 PMYee
Jay Ganbat
04/04/2022, 11:52 PMYee
calculate_spike_in_metrics_in_fastq_dir
task, not the mapped versionJay Ganbat
04/04/2022, 11:52 PMYee
Jay Ganbat
04/05/2022, 12:56 AM@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def process_expected_primer_qc_workflow(
in_spec_path: FlyteFile,
) -> tuple[FlyteFile, FlyteFile, FlyteFile]:
task_params_list, can_run_contamination_metric = build_task_params(
in_spec_path=in_spec_path
)
out_spike_in_metrics_list = map_task(calculate_spike_in_metrics_in_fastq_dir, metadata=TaskMetadata(retries=DEFAULT_DYNAMIC_TASK_RETRY),
)(task_params=task_params_list)
@task
def mappable_task(input: Dict[str, int]) -> Dict[str, int]:
return {"number": input["number"] ** 2, "is_even": input["is_even"]}
@task
def get_numbers(input: int) -> List[Dict[str, int]]:
return [
{"number": i, "is_even": 1 if i % 2 == 0 else 0} for i in range(input)
]
@workflow
def mappable_workflow(limit: int) -> List[Dict]:
mappable_inputs = get_numbers(input=limit)
results_mapped_task = map_task(mappable_task, metadata=TaskMetadata(retries=1))(
input=mappable_inputs
)
return results_mapped_task
this will give the following error
raise AssertionError(f"Failed to convert value of output {k}, expected type {v}.") from e
AssertionError: Failed to convert value of output o0, expected type typing.List[typing.Dict].
Yee
Jay Ganbat
04/06/2022, 4:39 PMfrom typing import Dict, List