https://flyte.org logo
#ask-the-community
Title
# ask-the-community
e

Enrico Rotundo

08/18/2023, 4:26 PM
Heya! 👋 When trying to chain
task_1
output with
task_2
input I get the following error
TypeError: Object of type Promise is not JSON serializable
. I get
Promise
is only resolved at a deferred moment but how to pipe one’s output into a downstream task? Whole script in 🧵 👇
Copy code
from flytekit import workflow, task

from flytekitplugins.bacalhau import BacalhauTask
from flytekit import kwtypes


bacalhau_task = BacalhauTask(
        name="hello_world",
        inputs=kwtypes(
                spec=dict,
                api_version=str,
        )
    )

bacalhau_task_2 = BacalhauTask(
        name="second_hello_world",
        inputs=kwtypes(
                spec=dict,
                api_version=str,
        )
    )


@task
def print_cid(bac_task: str) -> str:
    print(f"Your Bacalhau's output CID: {bac_task}")
    return bac_task

@workflow
def wf():
    bac_task_1 = bacalhau_task(
        api_version="V1beta1",
        spec=dict(
            engine="Docker",
            verifier="Noop",
            PublisherSpec={"type": "IPFS"},
            docker={
                "image": "ubuntu",
                "entrypoint": ["echo", "Flyte is awesome!"],
            },
            language={"job_context": None},
            wasm=None,
            resources=None,
            timeout=1800,
            outputs=[{
                    "storage_source": "IPFS",
                    "name": "outputs",
                    "path": "/outputs",
            }],
            deal={"concurrency": 1},
        ),
    )

    bac_task_2 = bacalhau_task_2(
        api_version="V1beta1",
        spec=dict(
            engine="Docker",
            verifier="Noop",
            PublisherSpec={"type": "IPFS"},
            docker={
                "image": "ubuntu",
                "entrypoint": ["echo", "Flyte is awesome!"],
            },
            language={"job_context": None},
            wasm=None,
            resources=None,
            timeout=1800,
            outputs=[{
                    "storage_source": "IPFS",
                    "name": "outputs",
                    "path": "/outputs",
            }],
            inputs=[{
                    "storage_source": "IPFS",
                    "cid": bac_task_1,
                    "name": "myinputs",
                    "path": "/myinputs",
            }],
            deal={"concurrency": 1},
        ),
    )
    
    print_cid(bac_task=bac_task_2)


if __name__ == "__main__":
    wf()
basically in the second task I’m trying to pass
bac_task_1
in
spec.inputs.cid
k

Kevin Su

08/18/2023, 4:45 PM
you have to create a dynamic task
Copy code
@dynamic
def d1(task: str):
  bac_task_2 = bacalhau_task_2(
        cicd: task
   ...
e

Enrico Rotundo

08/21/2023, 12:28 PM
13 Views