Hi all, Has anyone encountered dict[str, FlyteDir...
# flyte-support
h
Hi all, Has anyone encountered dict[str, FlyteDirectory] issues? I have a task that passes a dictionary with FlyteDirectories initated using FlyteDirectory(path="s3://...") that doesn't seem to work fine but another input that is passed just as a FlyteDirecotry works fine. When Im checking the inputs from the console, it seems to be not properly serialized.
Copy code
{
  "cpus": 16,
  "mesh_result": {
    "instance_info": {
      "instance_id": "...",
      "public_ip": "..."
    },
    "mesh_log": {
      "path": "<s3://flytecfd-bucket/task-data/mesh-dir-6d95f0d05e21457aa117451cbf4ffdfe/mesh/mesh.log>"
    },
    "mesh_dir": {
      "path": "<s3://flytecfd-bucket/task-data/mesh-dir-6d95f0d05e21457aa117451cbf4ffdfe/mesh/constant/polyMesh/>"
    }
  },
  "cases": {
    "case_aoa_01.00": {
      "type": "multi-part blob",
      "uri": "<s3://flytecfd-bucket/task-data/cases-dir-29b204be507a48e79a657657beb1e1f3/case_aoa_01.00/>"
    }
  }
}
Here the mesh_log and mesh_dir is part of a dataclass and is working as expected and FlyteDirectory and FlyteFile is initailized the same way.
Copy code
@dataclasses.dataclass
class MeshResult:
    instance_info: InstanceInfo
    mesh_log: FlyteFile
    mesh_dir: FlyteDirectory
Copy code
def start_solvers_wf(
    cases: dict[str, FlyteDirectory], mesh_result: MeshResult, cpus: int
)
Any idea what could be causing this?
b
hi Ronalds, can you share more of your code? mainly the task function body producing the
dict[str, FlyteDirectory]
and the one consuming it
h
So I was trying to reproduce to show the issue, but seems that I have been using incorrect property on the FlyteDirectory. Not sure why .path did work for some occurrences, but now after tidying up the workflow code and using remote_source helped.
Copy code
import flytekit as fl
from flytekit import FlyteDirectory

@fl.task
def output_directory(name: str, dir: FlyteDirectory):
    print(f"Output Directory Name: {name}, Directory Path: {dir.path}")

@fl.dynamic
def test_subtask(
    test_dict: dict[str, FlyteDirectory],
    flyte_directory_path: FlyteDirectory,
    flyte_directory_from_source_path: FlyteDirectory,
)->None:
    output_directory(name="flyte_directory_path", dir=flyte_directory_path)
    output_directory(name="flyte_directory_from_source_path", dir=flyte_directory_from_source_path)
    for k, v in test_dict.items():
        output_directory(name=k, dir=v)

@fl.task
def get_dir_task()->dict[str, FlyteDirectory]:
    return {
        "path": FlyteDirectory(path="<s3://flytecfd-bucket/ronny-input/>"),
        "from_source": FlyteDirectory.from_source("<s3://flytecfd-bucket/ronny-input/>")
    }

@fl.dynamic
def test_task_wf() -> None:
    test_subtask(
        test_dict=get_dir_task(),
        flyte_directory_path=FlyteDirectory(path="<s3://flytecfd-bucket/ronny-input/>"),
        flyte_directory_from_source_path=FlyteDirectory.from_source("<s3://flytecfd-bucket/ronny-input/>")
    )
So its kind of solved for now, thanks!