acoustic-parrot-22802
04/30/2025, 6:25 PMjolly-florist-66728
04/30/2025, 6:26 PMacoustic-parrot-22802
04/30/2025, 6:30 PMacoustic-parrot-22802
04/30/2025, 6:31 PMjolly-florist-66728
04/30/2025, 6:31 PMimport os
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
@task
def task1() -> FlyteDirectory:
# Create a directory and write some files
local_dir = os.path.join(os.getcwd(), "my_data")
os.makedirs(local_dir, exist_ok=True)
with open(os.path.join(local_dir, "file1.txt"), "w") as f:
f.write("This is file 1.")
with open(os.path.join(local_dir, "file2.txt"), "w") as f:
f.write("This is file 2.")
return FlyteDirectory(local_dir)
@task
def task2(fd: FlyteDirectory):
# List files in the directory and read contents
for file_name in os.listdir(fd.path):
with open(os.path.join(fd.path, file_name), "r") as f:
print(f"Contents of {file_name}: {f.read()}")
@workflow
def my_workflow():
fd = task1()
task2(fd=fd)
if __name__ == "__main__":
my_workflow()
In this example, task1
creates a directory with some files and returns it as a FlyteDirectory
. task2
takes this FlyteDirectory
as input, lists the files, and reads their contents. The workflow my_workflow
orchestrates these tasks. Would you like more details on any specific part of this example?
Sources:
- FlyteFile and FlyteDirectory 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.acoustic-parrot-22802
04/30/2025, 6:34 PMjolly-florist-66728
04/30/2025, 6:34 PM