Something like: ```import os from flytekit import...
# flyte-support
r
Something like:
Copy code
import os

from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory


@task
def t1() -> FlyteDirectory:
    dir = os.path.join(os.getcwd(), "results")
    os.makedirs(dir, exist_ok=True)
    with open(os.path.join(dir, "file1.txt"), "w") as file:
        file.write("Content 1\n")
    return FlyteDirectory(dir)


@task
def t2() -> FlyteDirectory:
    dir = os.path.join(os.getcwd(), "results")
    os.makedirs(dir, exist_ok=True)
    with open(os.path.join(dir, "file2.txt"), "w") as file:
        file.write("Content 2\n")
    return FlyteDirectory(dir)


@workflow
def wf() -> FlyteDirectory:
    dir1 = t1()
    dir2 = t2()
    return combine(dir1, dir2)


if __name__ == "__main__":
    print(wf())
How should the combine function look like?
f
You have to sadly use streaming write - I.e. the data will train at through your task
If thatโ€™s ok, I will send a sample
For some Reason docs are not getting generated properly- But look at directory https://docs.flyte.org/en/latest/api/flytekit/generated/flytekit.types.directory.FlyteDirectory.html#flytekit.types.directory.FlyteDirectory I like using crawl and opening a new directory and writing flytefiles To it
You can also use listdir
r
Thanks @freezing-airport-6809 for your answer. Streaming write sounds cool. If you have something on hand, it would be nice to see an example. I have also a very naive solution in mind, don't know if it works:
Copy code
@task
def combine(dir1: FlyteDirectory, dir2: FlyteDirectory) -> FlyteDirectory:
    dir1.download()
    dir2.download()
    return FlyteDirectory(dir1.path)
I dislike that both directories are downloaded and uploaded again, and it only works if the paths of both directories are identical.
f
I will but still away from keyboard
๐Ÿ‘ 1
Will share Cc @white-chef-57887 another request for the crawl and open operations on Flyte dir and file. But the docs are not rendering correctly
๐Ÿ‘€ 1
w
@freezing-airport-6809 taking a look -- can you say more about what's not rendering correctly? should there be an
open
method on that class or should we be telling people to use the builtin
open
?
f
There is an open in the code
Check the flytedir and flytefile it does not render weirdly
@red-school-96573 here you go - sorry for the delay
Copy code
import os

from flytekit import task, workflow, current_context, FlyteContextManager
from flytekit.configuration import Config
from flytekit.core.data_persistence import FileAccessProvider
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile


@task
def copy_file(ff: FlyteFile) -> FlyteFile:
    new_file = FlyteFile.new_remote_file(ff.remote_path)
    with ff.open("r", cache_type="simplecache", cache_options={}) as r:
        with new_file.open("w") as w:
            w.write(r.read())
    return new_file

@task
def process_folder(fd: FlyteDirectory) -> FlyteDirectory:
    print(f"Remote dir {fd.path}, {fd.remote_directory}, {fd.remote_source}")
    out_fd = FlyteDirectory.new_remote()
    print(f"Writing folder to {out_fd}")
    for base, x in fd.crawl():
        print(f"\t writing {x}")
        src = os.path.join(base, x)
        out_file = out_fd.new_file(x)
        with FlyteFile(src).open("rb") as f:
            with out_file.open("wb") as o:
                print(f"Writing file to {out_file}")
                o.write(f.read())
    return out_fd


@workflow
def wf(fd: FlyteDirectory, ff: FlyteFile):
    copy_file(ff=ff)
    process_folder(fd=fd)


if __name__ == "__main__":
    ctx = FlyteContextManager.current_context()
    print(Config.for_sandbox().data_config)
    new_f = FileAccessProvider(
        local_sandbox_dir=ctx.file_access.local_sandbox_dir,
        raw_output_prefix="<s3://my-s3-bucket/stream-test>",
        data_config=Config.for_sandbox().data_config,
    )
    with FlyteContextManager.with_context(ctx.new_builder().with_file_access(new_f)) as ctx:
        print(ctx)
        print(f"Sample: {ctx.file_access.get_random_remote_path()}")
        wf(ff=FlyteFile(path="/tmp/file_a", remote_path=False), fd=FlyteDirectory(path="/tmp/test", remote_directory=False))
These are streaming read and writes- cc @white-chef-57887 for examples and cc @tall-lock-23197
๐Ÿ‘ 1
r
Thanks @freezing-airport-6809! This looks very interesting. I will try it out.
โค๏ธ 1
f
if you can also help add it to docs, or share an example in a blog would be amazing !!!!!!!
r
Absolutely, if it is successful, this could be feasible.
๐Ÿ™Œ๐Ÿฝ 1
โค๏ธ 2
w
@powerful-gold-59386 for docs as well