Hi! For some reason, I’m unable to figure out how ...
# ask-the-community
h
Hi! For some reason, I’m unable to figure out how to run a map_task with a custom container_image. I’ve tried
.with_overrides(container_image='image:version')
and using a decorator for the task, but both approaches don’t seem to do anything (just the default image is used). What’s the appropriate way to do this? Thanks!
k
hmm, this is interesting. @Samhita Alla can you please try it out? I hope its not a bug
s
Sure, I'll try to repro.
This works when I'm sending image to the task decorator.
with_overrides
, however, is showing the default flytekit image, not the custom one.
@Harmen van Rossum, could you give it a try again and let us know? Would you mind filing a bug since
with_overrides
isn't working?
[flyte-bug]
h
I’ll try again!
I tried again with a few variations: This works:
Copy code
@task(container_image='{{.image.cowsay.fqn}}:{{.image.cowsay.version}}')
def say_hello(name: str):
    result = subprocess.run(['cowsay', f'Hello {name}!'], check=True)
    print(result.stdout)

@workflow
def wf():
    map_task(say_hello, metadata=TaskMetadata(retries=1), concurrency=10)(
        name=['a', 'b', 'c']
    )
This doesn’t (No such file or directory: ‘cowsay’):
Copy code
@task(container_image='{{.image.cowsay.fqn}}:{{.image.cowsay.version}}')
def say_hello(name: str):
    result = subprocess.run(['cowsay', f'Hello {name}!'], check=True)
    print(result.stdout)

@task
def go_say_hello(values: list[str]):
    map_task(say_hello, metadata=TaskMetadata(retries=1), concurrency=10)(
        name=values
    )

@workflow
def wf():
    go_say_hello(values=['a', 'b', 'c'])
(This mimics my original problem, it wasn’t clear to me that
map_task
should be called from a
workflow
.) Then I tried changing go_say_hello to a workflow (that did work):
Copy code
@task(container_image='{{.image.cowsay.fqn}}:{{.image.cowsay.version}}')
def say_hello(name: str):
    result = subprocess.run(['cowsay', f'Hello {name}!'], check=True)
    print(result.stdout)

@workflow
def go_say_hello(values: list[str]):
    map_task(say_hello, metadata=TaskMetadata(retries=1), concurrency=10)(
        name=values
    )

@workflow
def wf(): 
    go_say_hello(values=['a', 'b', 'c'])
Using
with_overrides
doesn’t work (No such file or directory: ‘cowsay’):
Copy code
@task
def say_hello(name: str):
    result = subprocess.run(['cowsay', f'Hello {name}!'], check=True)
    print(result.stdout)

@workflow
def go_say_hello(values: list[str]):
    map_task(say_hello, metadata=TaskMetadata(retries=1), concurrency=10)(
        name=values
    ).with_overrides(container_image='{{.image.cowsay.fqn}}:{{.image.cowsay.version}}')

@workflow
def wf():
    go_say_hello(values=['a', 'b', 'c'])
I’m happy to generate a bug report for the
with_overrides
. With regards to where map_task should be called from (
workflow
vs
task
) is that something that should be mentioned in the documentation?
k
Ohh this should have failed we cannot beat a map task in a task. This is a bug
106 Views