brash-autumn-99506
07/01/2024, 1:09 PMfoo.txt
and bar.txt
):
import os
from flytekit import task, workflow, Resources
from flytekit.types.file import FlyteFile
from flytekit.types.directory import FlyteDirectory
from typing import Tuple
@workflow
def wf(dirpath: str) -> str:
# These two below works, as get_dir downloads the directory content to the local disk.
flytedir = get_dir(dirpath=dirpath)
foo, bar = do_stuff_on_dir(flytedir=flytedir)
hello = say_hello(foo=foo, bar=bar)
return hello
@task()
def get_dir(dirpath: str) -> FlyteDirectory:
fd = FlyteDirectory(path=dirpath)
return fd
@task()
def do_stuff_on_dir(flytedir: FlyteDirectory) -> Tuple[str, str]:
texts = []
for fname in os.listdir(flytedir):
filepath = os.path.join(flytedir, fname)
with open(filepath, 'r') as f:
content = f.read()
texts.append(content)
# Do stuff
texts = [texts[1], texts[0]]
return texts
@task(requests=Resources(cpu="1", mem="1000Mi", ephemeral_storage="2000Mi"), limits=Resources(cpu="2", mem="1000Mi", ephemeral_storage="3000Mi"))
def say_hello(foo: str, bar: str) -> str:
return f"Hello, {foo} and {bar}!"
However when run on a cluster (deployment on a single cluster) this comes with a k8s overhead for spawning the temporary pods for each task. To speed up the workflow I was thinking about reducing the overhead by aggregating tasks, reading files from the bucket and doing stuff on them in a single task. Is there a preferred way to do it, effectively getting to the same local tmp directory structure before returning a FlyteDirectory as a task output?
For instance something like the following chunk works in sandbox but not on my single cluster deployment (exploiting FlyteDirectory.listdir(flytedir)
)
import os
from flytekit import task, workflow, Resources
from flytekit.types.file import FlyteFile
from flytekit.types.directory import FlyteDirectory
from typing import Tuple
@workflow
def wf(dirpath: str) -> str:
foo, bar = get_dir_and_do_stuff(dirpath=dirpath)
hello = say_hello(foo=foo, bar=bar)
return hello
@task()
def get_dir_and_do_stuff(dirpath: str) -> Tuple[str, str]:
flytedir = FlyteDirectory(path=dirpath)
texts = []
# for fname in os.listdir(flytedir): # This works only locally (not even sandbox)
for fname in flytedir.listdir(flytedir): # This works on sandbox but not on cluster
filepath = os.path.join(flytedir, fname)
infile = FlyteFile(path=filepath)
with open(infile, 'r') as f:
content = f.read()
texts.append(content)
# Do stuff
# ...
return texts
@task()
def say_hello(foo: str, bar: str) -> str:
return f"Hello, {foo} and {bar}!"
I'd like to avoid the alternative of having a single task which instead of exploiting FlyteDirectories just uses the GS client to read from a bucket.
Any other tip for optimizing workflows for speed?
Thanks!high-park-82026
get_dir
altogether.
@workflow
def wf(flytedir: FlyteDirectory) -> str:
foo, bar = do_stuff_on_dir(flytedir=flytedir)
hello = say_hello(foo=foo, bar=bar)
return hello
This can continue to be invoked locally with --flytedir /my-local/path
And it will correctly behave whether you run locally (will just operate on the local paths) or try to launch it on a cluster (will automatically upload the local path to remote GCS then pass the GCS path around)high-park-82026
do_stuff_on_dir
intends to read all files and take action across these files, is that the case or is it parallelizable? (e.g. do action X on each file). If it's the former as in the sample code you posted, it's worth noting that each f.read()
is a call through GCS to download/stream the file. As far as I can tell, if you run flytedir.download()
it relies on fsspec which doesn't do async downloads. I suggest you implement that in your code, something like:
@task()
def do_stuff_on_dir(flytedir: FlyteDirectory) -> Tuple[str, str]:
texts = []
futures = []
for fname in os.listdir(flytedir):
filepath = os.path.join(flytedir, fname)
async def internal():
async with aiofiles.open(filepath, 'r') as f:
content = await f.read()
texts.append(content) # if ordering doesn't matter
futures.append(internal())
asyncio.gather(*futures)
# Do stuff
texts = [texts[1], texts[0]]
return texts
Note: This is not working code...
If you can, however, perform these actions independently on each file, I suggest you take a look at map_task
to parallelize the work automatically.high-park-82026
high-park-82026
brash-autumn-99506
07/01/2024, 2:09 PMInvalid value for '--flytedir': parameter should be a valid directory path, <s3://my-s3-bucket/foobar>
, or am I missing something? Can we skip get_dir also without a local path to provide?brash-autumn-99506
07/01/2024, 2:15 PMflytedir.download()
initiate the download in a local tmp? I tried playing with it some days ago but still wasn't able to locally read the files even if I ran a flytedir.download()
, unless I added an extra task above yielding the FlyteDirectory (yep, that's get_dir
)brash-autumn-99506
07/01/2024, 2:19 PMthankful-minister-83577
brash-autumn-99506
07/02/2024, 6:54 AMthankful-minister-83577
thankful-minister-83577
brash-autumn-99506
07/03/2024, 7:11 AMthankful-minister-83577
average-finland-92144
07/11/2024, 5:22 PMbrash-autumn-99506
07/12/2024, 7:10 AM