Dan Butler
10/24/2023, 7:37 PMlist_images = ContainerTask(
name="list-images",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs=kwtypes(images=List[FlyteFile]),
outputs=kwtypes(result=FlyteFile),
image="<http://ghcr.io/flyteorg/rawcontainers-shell:v2|ghcr.io/flyteorg/rawcontainers-shell:v2>",
command=[
"/bin/sh",
"-c",
"ls /var/inputs > /var/outputs/result",
],
)
@workflow
def list_images_wf() -> FlyteFile:
image_paths = [
"s3://...",
"s3://..."
]
images = [FlyteFile(path=p) for p in image_paths]
result = list_images(images=images)
return result
When I look at the result
file, it's emptyDan Farrell
10/24/2023, 8:06 PMDan Butler
10/24/2023, 10:04 PMJay Ganbat
10/24/2023, 11:38 PM@workflow
can you create a separate regular task and feed that task into your container taskDan Farrell
10/25/2023, 12:15 AMimport pandas as pd
from flytekit import task, workflow, ContainerTask, kwtypes
from flytekit.types.file import FlyteFile
from flytekit.types.directory import FlyteDirectory
@task()
def my_task(input_data: FlyteDirectory)->pd.DataFrame:
print(input_data)
return pd.DataFrame([{'a':1,'b':2}, {'a':1,'b':-1}])
@task()
def create_files()->list[FlyteFile]:
m_input = []
for x in range(5):
with open(f"{x}.txt", "w") as fh:
fh.write("")
m_input.append(FlyteFile(f"{x}.txt"))
return m_input
@task()
def create_file()->FlyteFile:
m_input = []
for x in range(5):
with open(f"{x}.txt", "w") as fh:
fh.write("")
m_input.append(FlyteFile(f"{x}.txt"))
return m_input[1]
@workflow
def wf():
square_file = ContainerTask(
name="square_file",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs=kwtypes(val=FlyteFile),
outputs=kwtypes(out=FlyteDirectory),
image="alpine",
environment={"a": "b"},
command=["sh", "-c", "mkdir /var/outputs/out && ls -la /var/inputs/* | tee /var/outputs/out/stdout"],
)
square_files = ContainerTask(
name="square_files",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs=kwtypes(val=list[FlyteFile]),
outputs=kwtypes(out=FlyteDirectory),
image="alpine",
environment={"a": "b"},
command=["sh", "-c", "mkdir /var/outputs/out && ls -la /var/inputs/* | tee /var/outputs/out/stdout"],
)
df = my_task(input_data=square_files(val=create_files()))
df = my_task(input_data=square_file(val=create_file()))
print(df)
ls: /var/inputs/*: No such file or directory
Dan Butler
10/25/2023, 12:37 AMSamhita Alla
As a workaround, is it possible to create a persistent volume claim and have a plain Python task download the data to itIt should be possible with pod templates: https://docs.flyte.org/en/latest/deployment/configuration/general.html#compile-time-podtemplates
Dan Butler
10/25/2023, 3:56 PM