I'm not sure how to pass secrets through map_task....
# flytekit
j
I'm not sure how to pass secrets through map_task. I've tried putting the annotation on the task I pass to map_task in addition to passing them to the map_task call itself. Is this related to my issue above? This in not a blocker for me.
Copy code
import flytekit
from flytekit import Secret, map_task, task, workflow


@task
def get_names() -> list[str]:
    return ['JP', 'Ketan']


@task(
    secret_requests=[
        Secret(
            group='aws-access-key',
            key='AWS_ACCESS_KEY_ID',
            mount_requirement=Secret.MountType.ENV_VAR
        ),
        Secret(
            group='aws-access-key',
            key='AWS_SECRET_ACCESS_KEY',
            mount_requirement=Secret.MountType.ENV_VAR
        ),
    ],
)
def task_a(name: str) -> str:
    AWS_ACCESS_KEY_ID = flytekit.current_context().secrets.get(
        'aws-access-key', 'AWS_ACCESS_KEY_ID'
    )
    AWS_SECRET_ACCESS_KEY = flytekit.current_context().secrets.get(
        'aws-access-key', 'AWS_SECRET_ACCESS_KEY'
    )

    print(f'{AWS_ACCESS_KEY_ID}: {AWS_SECRET_ACCESS_KEY}')

    message = f"hello {name}"
    print(message)
    return message


@task
def task_b(name: str) -> str:
    AWS_ACCESS_KEY_ID = flytekit.current_context().secrets.get(
        'aws-access-key', 'AWS_ACCESS_KEY_ID'
    )
    AWS_SECRET_ACCESS_KEY = flytekit.current_context().secrets.get(
        'aws-access-key', 'AWS_SECRET_ACCESS_KEY'
    )

    print(f'{AWS_ACCESS_KEY_ID}: {AWS_SECRET_ACCESS_KEY}')

    message = f"hello world, {name}"
    print(message)
    return message


@workflow
def test_wf():
    names = get_names()

    a = map_task(task_a)(name=names)

    b = map_task(
        task_b,
        secret_requests=[
            Secret(
                group='aws-access-key',
                key='AWS_ACCESS_KEY_ID',
                mount_requirement=Secret.MountType.ENV_VAR
            ),
            Secret(
                group='aws-access-key',
                key='AWS_SECRET_ACCESS_KEY',
                mount_requirement=Secret.MountType.ENV_VAR
            ),
        ],
    )(name=names)
Error
Copy code
[1/1] currentAttempt done. Last Error: UNKNOWN::[0]: code:"USER:Unknown" message:"Traceback (most recent call last):\n\n      File \"/venv/lib/python3.10/site-packages/flytekit/common/exceptions/scopes.py\", line 203, in user_entry_point\n        return wrapped(*args, **kwargs)\n      File \"/venv/lib/python3.10/site-packages/flyte_workflows/workflows/test.py\", line 27, in task_a\n        AWS_ACCESS_KEY_ID = flytekit.current_context().secrets.get(\n      File \"/venv/lib/python3.10/site-packages/flytekit/common/tasks/sdk_runnable.py\", line 65, in get\n        raise ValueError(\n\nMessage:\n\n    Unable to find secret for key AWS_ACCESS_KEY_ID in group aws-access-key in Env Var:_FSEC_AWS-ACCESS-KEY_AWS_ACCESS_KEY_ID and FilePath: /etc/secrets/aws-access-key/aws_access_key_id\n\nUser error." kind:USER
k
cc @Haytham Abuelfutuh
h
Thank you for reporting this. Unfortunately, secrets injection is only implemented on PluginManager which array tasks, at the moment, do not go through… We will address this (along with the other issues reported for Map tasks) in Feb’s release! CC @Dan Rammer (hamersaw) We run this in plugin manager to serialize and inject required secret names as labels (that are later picked up and handled by the Webhook): https://github.com/flyteorg/flytepropeller/blob/master/pkg/controller/nodes/task/k8s/plugin_manager.go#L196-L199 We need to run similar code in k8s array plugin…
👍 1
175 Views