red-school-96573
07/26/2024, 10:16 AMimport os
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
@task
def t1() -> FlyteDirectory:
dir = os.path.join(os.getcwd(), "results")
os.makedirs(dir, exist_ok=True)
with open(os.path.join(dir, "file1.txt"), "w") as file:
file.write("Content 1\n")
return FlyteDirectory(dir)
@task
def t2() -> FlyteDirectory:
dir = os.path.join(os.getcwd(), "results")
os.makedirs(dir, exist_ok=True)
with open(os.path.join(dir, "file2.txt"), "w") as file:
file.write("Content 2\n")
return FlyteDirectory(dir)
@workflow
def wf() -> FlyteDirectory:
dir1 = t1()
dir2 = t2()
return combine(dir1, dir2)
if __name__ == "__main__":
print(wf())
How should the combine function look like?freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
red-school-96573
07/26/2024, 2:54 PM@task
def combine(dir1: FlyteDirectory, dir2: FlyteDirectory) -> FlyteDirectory:
dir1.download()
dir2.download()
return FlyteDirectory(dir1.path)
I dislike that both directories are downloaded and uploaded again, and it only works if the paths of both directories are identical.freezing-airport-6809
freezing-airport-6809
white-chef-57887
07/26/2024, 3:40 PMopen
method on that class or should we be telling people to use the builtin open
?freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
import os
from flytekit import task, workflow, current_context, FlyteContextManager
from flytekit.configuration import Config
from flytekit.core.data_persistence import FileAccessProvider
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile
@task
def copy_file(ff: FlyteFile) -> FlyteFile:
new_file = FlyteFile.new_remote_file(ff.remote_path)
with ff.open("r", cache_type="simplecache", cache_options={}) as r:
with new_file.open("w") as w:
w.write(r.read())
return new_file
@task
def process_folder(fd: FlyteDirectory) -> FlyteDirectory:
print(f"Remote dir {fd.path}, {fd.remote_directory}, {fd.remote_source}")
out_fd = FlyteDirectory.new_remote()
print(f"Writing folder to {out_fd}")
for base, x in fd.crawl():
print(f"\t writing {x}")
src = os.path.join(base, x)
out_file = out_fd.new_file(x)
with FlyteFile(src).open("rb") as f:
with out_file.open("wb") as o:
print(f"Writing file to {out_file}")
o.write(f.read())
return out_fd
@workflow
def wf(fd: FlyteDirectory, ff: FlyteFile):
copy_file(ff=ff)
process_folder(fd=fd)
if __name__ == "__main__":
ctx = FlyteContextManager.current_context()
print(Config.for_sandbox().data_config)
new_f = FileAccessProvider(
local_sandbox_dir=ctx.file_access.local_sandbox_dir,
raw_output_prefix="<s3://my-s3-bucket/stream-test>",
data_config=Config.for_sandbox().data_config,
)
with FlyteContextManager.with_context(ctx.new_builder().with_file_access(new_f)) as ctx:
print(ctx)
print(f"Sample: {ctx.file_access.get_random_remote_path()}")
wf(ff=FlyteFile(path="/tmp/file_a", remote_path=False), fd=FlyteDirectory(path="/tmp/test", remote_directory=False))
freezing-airport-6809
red-school-96573
07/27/2024, 4:59 PMfreezing-airport-6809
red-school-96573
07/27/2024, 5:20 PMwhite-chef-57887
07/29/2024, 2:21 PM