I tried again with a few variations:
This works:
@task(container_image='{{.image.cowsay.fqn}}:{{.image.cowsay.version}}')
def say_hello(name: str):
result = subprocess.run(['cowsay', f'Hello {name}!'], check=True)
print(result.stdout)
@workflow
def wf():
map_task(say_hello, metadata=TaskMetadata(retries=1), concurrency=10)(
name=['a', 'b', 'c']
)
This doesn’t (No such file or directory: ‘cowsay’):
@task(container_image='{{.image.cowsay.fqn}}:{{.image.cowsay.version}}')
def say_hello(name: str):
result = subprocess.run(['cowsay', f'Hello {name}!'], check=True)
print(result.stdout)
@task
def go_say_hello(values: list[str]):
map_task(say_hello, metadata=TaskMetadata(retries=1), concurrency=10)(
name=values
)
@workflow
def wf():
go_say_hello(values=['a', 'b', 'c'])
(This mimics my original problem, it wasn’t clear to me that
map_task
should be called from a
workflow
.)
Then I tried changing go_say_hello to a workflow (that did work):
@task(container_image='{{.image.cowsay.fqn}}:{{.image.cowsay.version}}')
def say_hello(name: str):
result = subprocess.run(['cowsay', f'Hello {name}!'], check=True)
print(result.stdout)
@workflow
def go_say_hello(values: list[str]):
map_task(say_hello, metadata=TaskMetadata(retries=1), concurrency=10)(
name=values
)
@workflow
def wf():
go_say_hello(values=['a', 'b', 'c'])
Using
with_overrides
doesn’t work (No such file or directory: ‘cowsay’):
@task
def say_hello(name: str):
result = subprocess.run(['cowsay', f'Hello {name}!'], check=True)
print(result.stdout)
@workflow
def go_say_hello(values: list[str]):
map_task(say_hello, metadata=TaskMetadata(retries=1), concurrency=10)(
name=values
).with_overrides(container_image='{{.image.cowsay.fqn}}:{{.image.cowsay.version}}')
@workflow
def wf():
go_say_hello(values=['a', 'b', 'c'])