Hello there! :wave: Question on handling files/dir...
# flyte-support
b
Hello there! 👋 Question on handling files/directories on s3/gcs for performance. In all the examples I've seen about reading files from a remote directory, we define workflows as multiple tasks. To my understanding, generally the first one yields a FlyteDirectory, and later tasks work on the content that was downloaded locally (tmp/...). For instance, when reading from a bucket containing two txt files (
foo.txt
and
bar.txt
):
Copy code
import os
from flytekit import task, workflow, Resources
from flytekit.types.file import FlyteFile
from flytekit.types.directory import FlyteDirectory
from typing import Tuple

@workflow
def wf(dirpath: str) -> str:

    # These two below works, as get_dir downloads the directory content to the local disk.
    flytedir = get_dir(dirpath=dirpath)
    foo, bar = do_stuff_on_dir(flytedir=flytedir)

    hello = say_hello(foo=foo, bar=bar)

    return hello

@task()
def get_dir(dirpath: str) -> FlyteDirectory:
    fd = FlyteDirectory(path=dirpath)
    return fd

@task()
def do_stuff_on_dir(flytedir: FlyteDirectory) -> Tuple[str, str]:
    texts = []

    for fname in os.listdir(flytedir):
        filepath = os.path.join(flytedir, fname)

        with open(filepath, 'r') as f:
            content = f.read()
        texts.append(content)

    # Do stuff
    texts = [texts[1], texts[0]]

    return texts

@task(requests=Resources(cpu="1", mem="1000Mi", ephemeral_storage="2000Mi"), limits=Resources(cpu="2", mem="1000Mi", ephemeral_storage="3000Mi"))
def say_hello(foo: str, bar: str) -> str:
    return f"Hello, {foo} and {bar}!"
However when run on a cluster (deployment on a single cluster) this comes with a k8s overhead for spawning the temporary pods for each task. To speed up the workflow I was thinking about reducing the overhead by aggregating tasks, reading files from the bucket and doing stuff on them in a single task. Is there a preferred way to do it, effectively getting to the same local tmp directory structure before returning a FlyteDirectory as a task output? For instance something like the following chunk works in sandbox but not on my single cluster deployment (exploiting
FlyteDirectory.listdir(flytedir)
)
Copy code
import os
from flytekit import task, workflow, Resources
from flytekit.types.file import FlyteFile
from flytekit.types.directory import FlyteDirectory
from typing import Tuple


@workflow
def wf(dirpath: str) -> str:

    foo, bar = get_dir_and_do_stuff(dirpath=dirpath)

    hello = say_hello(foo=foo, bar=bar)

    return hello

@task()
def get_dir_and_do_stuff(dirpath: str) -> Tuple[str, str]:
    
    flytedir = FlyteDirectory(path=dirpath)

    texts = []
    # for fname in os.listdir(flytedir): # This works only locally (not even sandbox)
    for fname in flytedir.listdir(flytedir):  # This works on sandbox but not on cluster

        filepath = os.path.join(flytedir, fname)
        infile = FlyteFile(path=filepath)
        with open(infile, 'r') as f:
            content = f.read()
        texts.append(content)
        
    # Do stuff
    # ...

    return texts

@task()
def say_hello(foo: str, bar: str) -> str:
    return f"Hello, {foo} and {bar}!"
I'd like to avoid the alternative of having a single task which instead of exploiting FlyteDirectories just uses the GS client to read from a bucket. Any other tip for optimizing workflows for speed? Thanks!
h
This is an awesome question! There are a few things to unpack here. #1 As I'm sure you are aware, flyte tasks are always stateless (part of the system guarantee for safety to ensure it can cache/retry... etc.). That means when a task returns a FlyteDirectory, what actually happens is that it uploads the contents of that local directory to GCS. And depending on how big the directory is that might be an expensive operation... One thing I would optimize away is removing
get_dir
altogether.
Copy code
@workflow
def wf(flytedir: FlyteDirectory) -> str:

    foo, bar = do_stuff_on_dir(flytedir=flytedir)

    hello = say_hello(foo=foo, bar=bar)

    return hello
This can continue to be invoked locally with
--flytedir /my-local/path
And it will correctly behave whether you run locally (will just operate on the local paths) or try to launch it on a cluster (will automatically upload the local path to remote GCS then pass the GCS path around)
#2 It looks like your task
do_stuff_on_dir
intends to read all files and take action across these files, is that the case or is it parallelizable? (e.g. do action X on each file). If it's the former as in the sample code you posted, it's worth noting that each
f.read()
is a call through GCS to download/stream the file. As far as I can tell, if you run
flytedir.download()
it relies on fsspec which doesn't do async downloads. I suggest you implement that in your code, something like:
Copy code
@task()
def do_stuff_on_dir(flytedir: FlyteDirectory) -> Tuple[str, str]:
    texts = []

    futures = []
    for fname in os.listdir(flytedir):
        filepath = os.path.join(flytedir, fname)

        async def internal():
          async with aiofiles.open(filepath, 'r') as f:
              content = await f.read()
          texts.append(content) # if ordering doesn't matter
        futures.append(internal())
    asyncio.gather(*futures)
    # Do stuff
    texts = [texts[1], texts[0]]

    return texts
Note: This is not working code... If you can, however, perform these actions independently on each file, I suggest you take a look at
map_task
to parallelize the work automatically.
#3 Pod overhead is real. One of the things we worked on is Pod reuse at Union.ai but that's proprietary feature that I'm happy to discuss further if this is of interest to you. As explained in #1 above, you don't need two separate tasks so that might be enough for you? Do you have a sense of how much Pod spin up time vs file download time? are these directories small? generally speaking when you are processing a lot of data, Pod spin up time won't be the biggest issue. It only crops up as a problem when you run tasks that take a few seconds to complete..
#4 I've started this issue a while back to collect feedback and improvements for FlyteFile/FlyteDirectory usability: https://github.com/flyteorg/flyte/issues/4542 If you have other suggestions (e.g. listdir semantics) please add them there
👍 1
b
Thanks Haytham for your detailed answer! About #1: that makes sense if files are in a local path from the start. But what for reading stuff which is exclusively on a bucket? Defining the workflow input as a FlyteDirectory and providing a bucket uri instead would result in something like
Invalid value for '--flytedir': parameter should be a valid directory path, <s3://my-s3-bucket/foobar>
, or am I missing something? Can we skip get_dir also without a local path to provide?
About #2: You got it right, in this example I'm performing some actions across all files in a directory (you can think about aggregating some data coming from different data fetching operations and then compute some statistics across all of them), so I don't think map tasks are viable - but thank you for mentioning that. About the solution you propose (or even the sync download): again, would that be doable without a local copy of the files - does
flytedir.download()
initiate the download in a local tmp? I tried playing with it some days ago but still wasn't able to locally read the files even if I ran a
flytedir.download()
, unless I added an extra task above yielding the FlyteDirectory (yep, that's
get_dir
)
#3: This will depend on the output of a data pipeline we are building, so I still don't have a lot of visibility on it. As you say it will likely be way less impactful than in the current tests that I'm doing, in this case I'm testing with small files so pod spin up time amounts to a significant part of the total time. I'll be able to reevaluate this in the near future, but as of now my take is that by refactoring tasks and removing some spin up time I could get some 10s of seconds basically for free (I guess that will depend also on the cluster resources that we'll allocate)
t
i can repro the `parameter should be a valid directory path`… agree that’s a bit odd, we’ll take a look
b
Thanks @thankful-minister-83577!
t
https://github.com/flyteorg/flytekit/pull/2547 was fixed. it’ll get pushed out with the next release.
it worked previously for files… somehow missed this in directory
b
Thank you @thankful-minister-83577! 🙏🏻 I will test this again once released then, is there a planned release calendar that you know?
t
at the very least, we can get a beta release out for flytekit today.
🚀 1
a
@brash-autumn-99506 ICYMI this is the release: https://github.com/flyteorg/flytekit/releases/tag/v1.13.0
b
Thank you @average-finland-92144! Tested, it works 🙌🚀
🙌🏽 1