Hi all, i have seen this odd error in dynamic task...
# ask-the-community
j
Hi all, i have seen this odd error in dynamic task where • Dynamic task does not generate a node ◦ one of the condition is not met • Dynamic task returns a Map type • So we just return some default value Then we are getting hit with this error
Copy code
Traceback (most recent call last):
  File "/fn/lib/venv/lib/python3.10/site-packages/flytekit/core/base_task.py", line 479, in dispatch_execute
    native_outputs = self.execute(**native_inputs)
  File "/fn/lib/venv/lib/python3.10/site-packages/flytekit/core/python_function_task.py", line 163, in execute
    return self.dynamic_execute(self._task_function, **kwargs)
  File "/fn/lib/venv/lib/python3.10/site-packages/flytekit/core/python_function_task.py", line 268, in dynamic_execute
    return self.compile_into_workflow(ctx, task_function, **kwargs)
  File "/fn/lib/venv/lib/python3.10/site-packages/flytekit/core/python_function_task.py", line 204, in compile_into_workflow
    literals={
  File "/fn/lib/venv/lib/python3.10/site-packages/flytekit/core/python_function_task.py", line 205, in <dictcomp>
    binding.var: binding.binding.to_literal_model() for binding in workflow_spec.template.outputs
  File "/fn/lib/venv/lib/python3.10/site-packages/flytekit/models/literals.py", line 467, in to_literal_model
    return Literal(map=LiteralMap(literals={k: binding.to_literal_model() for k, binding in self.map.bindings}))
  File "/fn/lib/venv/lib/python3.10/site-packages/flytekit/models/literals.py", line 467, in <dictcomp>
    return Literal(map=LiteralMap(literals={k: binding.to_literal_model() for k, binding in self.map.bindings}))
ValueError: too many values to unpack (expected 2)
So we do construct the dictionary in the dynamic task and return it though like
Copy code
return in_fastq1, in_fastq2, {k: get_empty_flyte_file() for k in in_metrics_keys}
is this expected?
its crashing here
Copy code
# If no nodes were produced, let's just return the strict outputs
            if len(workflow_spec.template.nodes) == 0:
                return _literal_models.LiteralMap(
                    literals={
                        binding.var: binding.binding.to_literal_model() for binding in workflow_spec.template.outputs
                    }
                )
e
@Jay Ganbat, can you share the signature of the dynamic task?
j
Copy code
@BASE_TASK_WORKER.run_dynamic_task_with_args()
def execute_umi_extraction_task(
    in_spec_path: FlyteFile,
    in_fastq1: FlyteFile,
    in_fastq2: FlyteFile,
    in_stage_name: str,
    out_gcs_fastq_dir: str,
    out_gcs_metrics_dir: str,
) -> tuple[FlyteFile, FlyteFile, dict[str, FlyteFile]]:
so if some condition in
in_spec_path
dont exists we just return
Copy code
metrics_keys = [
        "out_umi_counts",
        "out_fuzzy_matching_umi_counts",
        "out_non_matching_umi_counts",
        "out_umi_conversion_metric",
    ]
return in_fastq1, in_fastq2, {k: get_empty_flyte_file() for k in metrics_keys}
e
@Jay Ganbat, I was able to find a minimal repro for this. flytekit has a bug right now where dictionaries cannot be returned by @dynamic workflows. I opened https://github.com/flyteorg/flyte/issues/3032 to track this and will prioritize this.
(worth noting that it works if the returned dictionary is empty)
j
Thank you for taking care of this. We added a dummy task that does nothing before returning and that circumvents the issue
e
of course. I'll ping here when we have a fix out.
159 Views