Hi! I'm trying to paralallize a task using nested...
# flyte-support
f
Hi! I'm trying to paralallize a task using nested dynamic tasks but i get the following error when triying to use an image_spec on a dynamic task
Copy code
Exception: failed to run command envd build --path /tmp/flyte_k1dlt2c/local_flytekit/9fe1ed17ed098e7909aaa36aa4c77792  --platform linux/amd64 --output type=image,name=localhost:30000/ssns-requests:qhKHsBWnyc7pVe_RBxNtww,push=true with error b'segment 2024/06/10 21:21:01 ERROR: sending request - Post "<https://api.segment.io/v1/batch>": dial tcp 0.0.0.0:443: connect: connection refused\nsegment 2024/06/10 21:21:01 ERROR: sending request - Post "<https://api.segment.io/v1/batch>": dial tcp 0.0.0.0:443: connect: connection refused\nsegment 2024/06/10 21:21:06 ERROR: 1 messages dropped because they failed to be sent and the client was closed\nsegment 2024/06/10 21:21:06 ERROR: sending request - Post "<https://api.segment.io/v1/batch>": dial tcp 0.0.0.0:443: connect: connection refused\nsegment 2024/06/10 21:21:06 ERROR: 1 messages dropped because they failed to be sent and the client was closed\ntime="2024-06-10T21:21:06Z" level=fatal msg=exit app=envd error="failed to create the builder: failed to create buildkit client: failed to bootstrap the buildkitd: failed to connect to buildkitd <tcp://localhost:30003>: timeout 5s: cannot connect to buildkitd" version=v0.3.45\n'
This is the code , im getting the error on
get_declaration_installations
Copy code
requests_image = ImageSpec(
    name="ssns-requests",
    base_image="ghcr.io/flyteorg/flytekit:py3.12-latest",
    packages=["pydantic", "requests"],
    registry="localhost:30000",
    python_version="3.12",
)

image_spec = ImageSpec(
    base_image="ghcr.io/flyteorg/flytekit:py3.12-latest",
    registry="localhost:30000",
    name="ssns-scraper",
    python_version="3.12",
    requirements="requirements.txt",
)

@dynamic(
    container_image=requests_image,
    requests=Resources(cpu="0.1", mem="100Mi"),
    limits=Resources(cpu="0.5", mem="200Mi"),
)
def get_declaration_installations(
    snssns_credentials: dict, declaration: dict
) -> List[dict]:
    import json

    import requests
    from pydantic import BaseModel as BaseModel2

    class Installation(BaseModel2):
        name: str

    token = snssns_credentials["token"]
    cookies = snssns_credentials["cookies"]

    payload = {
        "args": json.dumps({"declaration_id": declaration["snssns_id"], "token": token})
    }
    url = "<url>"
    response = requests.post(url, data=payload, cookies=cookies, verify=False)

    snssns_installations = json.loads(response.text)["collection"]
    installations = []
    for installation in snssns_installations:
        installations.append(Installation(name=installation["nombre"]).model_dump())

    return installations


@dynamic(
    requests=Resources(cpu="0.1", mem="100Mi"),
    limits=Resources(cpu="0.5", mem="200Mi"),
    container_image=image_spec,
)
def declaration_installations(snssns_credentials: dict, declarations: List[dict]):
    installations = []
    for declaration in declarations:
        installations.append(
            get_declaration_installations(
                declaration=declaration, snssns_credentials=snssns_credentials
            )
        )

    return installations
g
which version of flytekit are you using
we just fixed the dynamic issue in flytekit v1.12.2
f
I was using v1.12.0, now I updated it but I'm still getting the same error
the first dynamic (declaration_installations) has no problem getting the image, the problem is when I go deeper