Hey I was working with a set of flyte tasks in a w...
# announcements
b
Hey I was working with a set of flyte tasks in a workflow and was couldn't figure out an error related to why tasks downstream couldn't find a FlyteFile? My workflow passes around a dataclass
Copy code
@dataclass_json
@dataclass
class Output:
    manifest: FlyteFile
    catalog: typing.Optional[FlyteFile]
    run_results: typing.Optional[FlyteFile]
    sources: typing.Optional[FlyteFile]
My first task makes this object and passes it to the next task but the next task errors with an error that there is no file where the manifest file is. Would love to know if maybe I'm doing something I shouldn't be doing or if there is something I need to do to get this to work.
👍 1
k
Do you have code sample For the example
I think it might be optional support? Cc @Kevin Su
would be interested in seeing the example
b
Hey @Ketan (kumare3)! Sorry Let me write up an example
Here is an example of what we are doing with just the file writing and how the different tasks connect to each other. Assume that the artifact class is just a data class with some fields that are parsed from the json stored in the file
Copy code
@task(cache_version="1", cache=True)
def create_outputs_task(input_dir:FlyteDirectory)->Output:
    manifest_path = os.path.join(input_dir,"manifest.json")
    output = Output(
        manifest=FlyteFile(manifest_path),
        run_results=None,
        catalog=None,
        sources=None
    )
    open(manifest_path,"a").close()
    return output

@task(cache_version="1", cache=True)
def consume_outputs_task(outputs:Output)->Artifact:
    with open(output.manifest,"r") as f:
        raw_json = json.load(f)
    artifact = parse_artifact(raw_json)
    return artifact 

@workflow 
def the_workflow(input_dir:FlyteDirectory)->Artifact
    return consume_outputs_task(
        outputs=create_outputs_task(
            input_dir=input_dir
        )
    )
👀 1
k
@Brandon Segal I just tried it, and it works for me
could I see full error message?
what version of flytekit you are using? maybe you could try to upgrade flytekit
b
I am using flytekit 0.26.1. What I find weird is that it works fine locally but doesn't work on the cluster
Copy code
Traceback (most recent call last):

      File "/root/.venv/lib/python3.8/site-packages/flytekit/common/exceptions/scopes.py", line 203, in user_entry_point
        return wrapped(*args, **kwargs)
      File "/root/.venv/lib/python3.8/site-packages/spotify_dbt_flytekit/tasks/parse_dbt_artifacts.py", line 22, in parse_dbt_artifacts_task
        return parse_dbt_artifacts(outputs)
      File "/root/.venv/lib/python3.8/site-packages/spotify_dbt_flytekit/tasks/parse_dbt_artifacts.py", line 10, in parse_dbt_artifacts
        manifest = parse_manifest(outputs)
      File "/root/.venv/lib/python3.8/site-packages/spotify_dbt_flytekit/tasks/parse_manifest.py", line 12, in parse_manifest
        with open(outputs.manifest, "r") as f:

Message:

    [Errno 2] No such file or directory: '/84/f95f818cb52bf410d9d6-n0-0/ae6a7be9d55ff1a7fe3014fea510a66b/manifest.json'

User error.
k
ohh, okay. let me try it
🙏 1
k
cc @Brandon Segal, what version of flytekit are you on?
k
0.26.1
k
ohh i missed that
185 Views