Does `Mapped Task` supported in local executions, ...
# flytekit
j
Does
Mapped Task
supported in local executions, I have a mapped task in one of my workflow and tried to run it locally but im getting into a Promise issue like below
Copy code
E                   TypeError: Failed to convert return value for var o0 for function balrog_pipeline.workflows.primer_qc.process_expected_primer_qc_workflow.mapper_calculate_spike_in_metrics_in_fastq_dir_2 with error <class 'TypeError'>: No automatic conversion found from type <class 'flytekit.core.promise.Promise'> to FlyteFile.Supported (os.PathLike, str, Flytefile)
mapper task looks like this
Copy code
out_spike_in_metrics_list = map_task(
        calculate_spike_in_metrics_in_fastq_dir,
        metadata=TaskMetadata(retries=DEFAULT_DYNAMIC_TASK_RETRY),
    )(task_params=task_params_list)
and
calculate_spike_in_metrics_in_fastq_dir
is the task with following signature
Copy code
@BASE_TASK_WORKER.run_task_with_args()
def calculate_spike_in_metrics_in_fastq_dir(
    task_params: dict[str, FlyteFile]
) -> FlyteFile:
y
one thing, i think that should be
typing.Dict
and what is base_task_worker?
j
oh we are on python 3.9
so native typing works without specifically importing typing
y
got it
j
base_task_worker is a decorator
that just adds resource request and cache versions to task/dynamic tasks
y
and just to be clear, running the calculate_spike not in a map task works right?
j
yup running remotely works
y
if you try with just one element of the task_params_list
no i mean locally
j
locally doesnt work, getting this Promise error
y
i mean just the
calculate_spike_in_metrics_in_fastq_dir
task, not the mapped version
it fails with the same error?
j
oh that i havent tested but i imagine it works fine
since other regular flyt tasks run just fine
y
let me try to repro this.
just look at the top file
does that kinda reflect what you were doing?
that works locally on my end
(the lp.py file is my attempt at debugging the other issue, ignore that for now)
j
Hi Yee I'll look into that in an hour currently not at my computer 😅
hmm yeah it is pretty much that. Although input to the map task was generated by another python task, like this
Copy code
@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def process_expected_primer_qc_workflow(
    in_spec_path: FlyteFile,
) -> tuple[FlyteFile, FlyteFile, FlyteFile]:
    task_params_list, can_run_contamination_metric = build_task_params(
        in_spec_path=in_spec_path
    )

    out_spike_in_metrics_list = map_task(calculate_spike_in_metrics_in_fastq_dir, metadata=TaskMetadata(retries=DEFAULT_DYNAMIC_TASK_RETRY),
    )(task_params=task_params_list)
hmm ill try to replicate it on a sandbox to see if that would work
ok i think i know the issue
but still wierdly why your example is working
i have a sandbox workflow like this
Copy code
@task
def mappable_task(input: Dict[str, int]) -> Dict[str, int]:
    return {"number": input["number"] ** 2, "is_even": input["is_even"]}


@task
def get_numbers(input: int) -> List[Dict[str, int]]:
    return [
        {"number": i, "is_even": 1 if i % 2 == 0 else 0} for i in range(input)
    ]


@workflow
def mappable_workflow(limit: int) -> List[Dict]:
    mappable_inputs = get_numbers(input=limit)
    results_mapped_task = map_task(mappable_task, metadata=TaskMetadata(retries=1))(
        input=mappable_inputs
    )
    return results_mapped_task
this will give the following error
Copy code
raise AssertionError(f"Failed to convert value of output {k}, expected type {v}.") from e
AssertionError: Failed to convert value of output o0, expected type typing.List[typing.Dict].
i think it specific to Map types, this example was a generic Dict type before and it was working so Generic Struct works but when i type it then it fails
@Yee do you think its because i am on py3.9 with no native typing annotation
y
not sure… can you try with the typing.Dict?
or are you saying that doesn’t work
it should, we run our unit tests on 3.7-3.10
j
nvm yeah its not workoing the example above is using the
from typing import Dict, List
hmm ok above example is working on v0.31.0 i was on 0.30.3 in the sandbox 🤦 sorry for the alarm 😅
168 Views