brash-autumn-99506
07/01/2024, 1:09 PMfoo.txt and bar.txt):
import os
from flytekit import task, workflow, Resources
from flytekit.types.file import FlyteFile
from flytekit.types.directory import FlyteDirectory
from typing import Tuple
@workflow
def wf(dirpath: str) -> str:
# These two below works, as get_dir downloads the directory content to the local disk.
flytedir = get_dir(dirpath=dirpath)
foo, bar = do_stuff_on_dir(flytedir=flytedir)
hello = say_hello(foo=foo, bar=bar)
return hello
@task()
def get_dir(dirpath: str) -> FlyteDirectory:
fd = FlyteDirectory(path=dirpath)
return fd
@task()
def do_stuff_on_dir(flytedir: FlyteDirectory) -> Tuple[str, str]:
texts = []
for fname in os.listdir(flytedir):
filepath = os.path.join(flytedir, fname)
with open(filepath, 'r') as f:
content = f.read()
texts.append(content)
# Do stuff
texts = [texts[1], texts[0]]
return texts
@task(requests=Resources(cpu="1", mem="1000Mi", ephemeral_storage="2000Mi"), limits=Resources(cpu="2", mem="1000Mi", ephemeral_storage="3000Mi"))
def say_hello(foo: str, bar: str) -> str:
return f"Hello, {foo} and {bar}!"
However when run on a cluster (deployment on a single cluster) this comes with a k8s overhead for spawning the temporary pods for each task. To speed up the workflow I was thinking about reducing the overhead by aggregating tasks, reading files from the bucket and doing stuff on them in a single task. Is there a preferred way to do it, effectively getting to the same local tmp directory structure before returning a FlyteDirectory as a task output?
For instance something like the following chunk works in sandbox but not on my single cluster deployment (exploiting FlyteDirectory.listdir(flytedir))
import os
from flytekit import task, workflow, Resources
from flytekit.types.file import FlyteFile
from flytekit.types.directory import FlyteDirectory
from typing import Tuple
@workflow
def wf(dirpath: str) -> str:
foo, bar = get_dir_and_do_stuff(dirpath=dirpath)
hello = say_hello(foo=foo, bar=bar)
return hello
@task()
def get_dir_and_do_stuff(dirpath: str) -> Tuple[str, str]:
flytedir = FlyteDirectory(path=dirpath)
texts = []
# for fname in os.listdir(flytedir): # This works only locally (not even sandbox)
for fname in flytedir.listdir(flytedir): # This works on sandbox but not on cluster
filepath = os.path.join(flytedir, fname)
infile = FlyteFile(path=filepath)
with open(infile, 'r') as f:
content = f.read()
texts.append(content)
# Do stuff
# ...
return texts
@task()
def say_hello(foo: str, bar: str) -> str:
return f"Hello, {foo} and {bar}!"
I'd like to avoid the alternative of having a single task which instead of exploiting FlyteDirectories just uses the GS client to read from a bucket.
Any other tip for optimizing workflows for speed?
Thanks!high-park-82026
get_dir altogether.
@workflow
def wf(flytedir: FlyteDirectory) -> str:
foo, bar = do_stuff_on_dir(flytedir=flytedir)
hello = say_hello(foo=foo, bar=bar)
return hello
This can continue to be invoked locally with --flytedir /my-local/path
And it will correctly behave whether you run locally (will just operate on the local paths) or try to launch it on a cluster (will automatically upload the local path to remote GCS then pass the GCS path around)high-park-82026
do_stuff_on_dir intends to read all files and take action across these files, is that the case or is it parallelizable? (e.g. do action X on each file). If it's the former as in the sample code you posted, it's worth noting that each f.read() is a call through GCS to download/stream the file. As far as I can tell, if you run flytedir.download() it relies on fsspec which doesn't do async downloads. I suggest you implement that in your code, something like:
@task()
def do_stuff_on_dir(flytedir: FlyteDirectory) -> Tuple[str, str]:
texts = []
futures = []
for fname in os.listdir(flytedir):
filepath = os.path.join(flytedir, fname)
async def internal():
async with aiofiles.open(filepath, 'r') as f:
content = await f.read()
texts.append(content) # if ordering doesn't matter
futures.append(internal())
asyncio.gather(*futures)
# Do stuff
texts = [texts[1], texts[0]]
return texts
Note: This is not working code...
If you can, however, perform these actions independently on each file, I suggest you take a look at map_task to parallelize the work automatically.high-park-82026
high-park-82026
brash-autumn-99506
07/01/2024, 2:09 PMInvalid value for '--flytedir': parameter should be a valid directory path, <s3://my-s3-bucket/foobar> , or am I missing something? Can we skip get_dir also without a local path to provide?brash-autumn-99506
07/01/2024, 2:15 PMflytedir.download() initiate the download in a local tmp? I tried playing with it some days ago but still wasn't able to locally read the files even if I ran a flytedir.download(), unless I added an extra task above yielding the FlyteDirectory (yep, that's get_dir)brash-autumn-99506
07/01/2024, 2:19 PMthankful-minister-83577
brash-autumn-99506
07/02/2024, 6:54 AMthankful-minister-83577
thankful-minister-83577
brash-autumn-99506
07/03/2024, 7:11 AMthankful-minister-83577
average-finland-92144
07/11/2024, 5:22 PMbrash-autumn-99506
07/12/2024, 7:10 AM