We’re isolated that workloads under flytekit 1.5.0...
# flytekit
We’re isolated that workloads under flytekit 1.5.0 have much worse memory profiles than workloads under flytekit 1.4.2 Tasks which work just fine with
requests.memory=2Gi, limits.memory=2Gi
under flytekit 1.4.2 fail under 1.5.0 We bumped these tasks to have
requests.memory=64Gi, limits.memory=64Gi
and they succeed under 1.5.0. Here are two graphs that illustrate the difference in RAM usage. The k8s request differences are listed on the graphs. Everything else (inputs, etc) are the same. The only differences are one flytekit 1.4.2 vs 1.5.0. What changed?
An important observation here is that the data that is being fetched is approx. 12Gb - so the best I can come up with is that s3fs is allocating where in the past the fetch mechanism was simply downloading the data and making it available on the filesystem.
If that turns out to be the case it would be helpful to know whether there are controls on this decision. Our data is in the Tb’s but we’re able to process it by a vector that only requires we load Gb’s into RAM.
I’ve opened a bug on github: https://github.com/flyteorg/flyte/issues/3665
Hmm this is interesting
Can you try using flytefile streaming
Or write to a file
Cc @Kevin Su / @Yee
The devs that caught this were up until the wee hours of the morning investigating the issue - I’ll ask them to look into this when they’re online.
It’s unlikely to be a “quick” test for us due to the path we’ve taken to go from our legacy pipeline solution to flyte.
Sure but I don’t follow the problem- we have not seen it, will have to reproduce- happy to suggest a fix
@Ketan (kumare3) - for some context, the task that is consuming so much memory (and under 1.4.2 does not) is setting the folder on a FlyteDirectory. In a downstream task, this FlyteDirectory will be used to
the data to the compute node. But we aren't even getting to the downstream task. The setting of the folder on the FlyteDirectory appears to be causing something, presumably the large amount of data, to get loaded into memory.
Interesting it’s the directory
Ideally it should be consuming low memory, please add a code sample to the issue
Maybe it's clearest for me to create a super-simple repro case to isolate this to FlyteDirectory. It's just a theory at present. 🙂
Below is the workflow that employs two tasks. The only thing that is done: 1. In task1, create a FlyteDirectory object passing it a folder on a networked (EFS) filesystem 2. In task2, consume the FlyteDirectory, and download the folder. The question is, why does flyte require so much memory for task1? The folder in question is 19G of data. Creating the FlyteDirectory object consumes this much memory in flytekit 1.5. On the other hand, downloading the data only consumes ~2G.
Blue line is task 1 (only creates a FlyteDirectory object, causing the data to be uploaded to s3) purple is task 2 (download data to node)
I have a more specific conclusion to add here based on another test: 1. flytekit 1.4.2 will also consume more memory (not as much as above, but still ~12G in this example) if that memory is available to the POD as specified in a task-based Resource request. 2. However, on this same 19G dataset, flytekit 1.4.2 will happily create the FlyteDirectory object and upload this data even with a 2Gi memory request and limit (see attached) 3. Flytekit 1.50 will not -- it will OOM unless you have memory available to the POD on par with the size of the data folder you are trying to upload.
@Ketan (kumare3) Let me know if you can verify the above. We're planning to roll back to flytekit 1.4.2 until this is resolved.
hey @Thomas Blom can you confirm which versions of fsspec and s3fs you have?
also when you were on 1.4.2 did you have the flytekit-data-fsspec plugin installed as well?
we just tried locally (on an arm mac admittedly) uploading 10G and 1G files. we are unable to repro.
From the image/container I'm running this test in:
Copy code
>>> import fsspec
>>> fsspec.__version__
I'm not sure how to get the version of
, and I don't know if we are using
- I don't see this in our dependencies.
Copy code
root@078cd129a8a3:/app# pip list | grep -E '(fsspec|flytekit|s3fs)'
flytekit                 1.4.2
flytekitplugins-pod      1.4.2
fsspec                   2023.5.0
(same container that Thomas is in fwiw)
pip show s3fs too?
just to be sure
Copy code
root@078cd129a8a3:/app# pip show s3fs
WARNING: Package(s) not found: s3fs
can you
which aws
? (in the container)
here is from the 1.5.0 image:
Copy code
root@e28c57ef87a7:/app# pip list | grep -E '(fsspec|flytekit|s3fs)'
flytekit                 1.5.0
flytekitplugins-pod      1.5.0
fsspec                   2023.5.0
s3fs                     2023.5.0
1.4.2 image:
Copy code
root@078cd129a8a3:/app# which aws
root@078cd129a8a3:/app# aws --version
aws-cli/1.27.132 Python/3.9.16 Linux/5.10.178-162.673.amzn2.x86_64 botocore/1.29.132
so in 1.4.2, if you didn’t have the
plugin, you would default to the aws cli.
in 1.5, we defaulted to using fsspec (and added it as a dependency)
the reason for this was because the default flytekit image has had the flytekit-data-fsspec plugin for a while, so we’ve been testing it for quite some time
and we’ve not seen any issues.
yeah testing with 2023.5.0 isn’t showing more than 270MB of memory usage
but still on the arm machine. i think we need to move to amd
We can do some more testing here. I'm curious, though - you said you'd uploaded a 1G and 10G File -- is this using FlyteFile, or FlyteDirectory?
it was done with flytedirectory, and directly with fsspec
yeah i still can’t - now using amd/eks cluster
ran this workflow
Copy code
import time
import subprocess
from flytekit import task, workflow, Resources
from flytekit.types.directory import FlyteDirectory

@task(requests=Resources(mem="1Gi"), limits=Resources(mem="1Gi"))
def waiter_task(a: int) -> str:
    if a == 0:
    return "hello world"

@task(requests=Resources(mem="1Gi"), limits=Resources(mem="1Gi"))
def dd_and_upload() -> FlyteDirectory:
    command = ["dd", "if=/dev/random", "of=/root/temp_10GB_file", "bs=1", "count=0", "seek=10G"]
    return FlyteDirectory("/root/temp_10GB_file")

def waiter(a: int = 0) -> str:
    return waiter_task(a=a)

def uploader() -> FlyteDirectory:
    return dd_and_upload()
first time i exec’ed in, created the file, and uploaded it via a separate script
second attempt was with the second workflow - confirmed the 10gb file now sitting in s3
monitoring it on the side, memory usage never went above 250MB
Ok, thanks for your help. I'll see what more I can find out over here.
let us know
@Yee to be clear -- this last test you did with flytekit 1.5.0? Or with 1.4.2 with fsspec installed?
Thanks for testing @Yee 👍🏼
Hey @Yee, sadly I haven't come to any clear-cut demonstration of the problem after many hours of testing. It seems to be the case that pods running together on a node put memory pressure on each other and the result is hard to characterize. Mostly, I've had LOTS of file copies fail in my testing, but it has happened with both flytekit 1.4.2 and 1.50. The one common trend that I have noticed, when trying to replicate the pattern from our real workflows, is that trying to upload via a FlyteDirectory fails much more consistently due to OOM when operating on LOTS of subfolders/files. So instead of a single 16G file, I did tests with several subfolders containing 1000 files each. I'll let you know if we find anything further.
I think it might be parallelism in fsspec now, while in the older one we had it serial. This is speed vs memory maybe
This sounds right, thanks @Ketan (kumare3)
@Yee @Ketan (kumare3) I need to revive this thread because coming back to it after a couple months and several flytekit releases later, I still have the same problem. I have created a python module of ~200 lines to experiment and reproduce this problem (attached). TLDR: FlyteDirectory(), when initialized on a ctx.working_directory subfolder containing many files, consumes considerably more memory than expected, and this behavior started with flytekit 1.5 when fsspec became used by default under the hood. If a 2G single file is written/uploaded from a pod that requests 2G of memory, this works fine. If 1000 2M files in a few subfolders are uploaded, the pod will be killed with OOMKilled. See attached image from pod logs for the the task in question -- you can see resident memory reported at various steps, and see that the task appears to finish, but it is OOMKilled before the next task starts, presumably during the file-copy process ongoing in the background. This does not happen when a single file is copied, only when many files under several subfolders are copied. This log trace was produced running code in the attached example.
let me run your examples today.
@Yee great, thanks!
@Yee the log trace in question was run with config params as so:
Copy code
c = Config( folder='', size=1, mem=1, many_files=True )
When folder is blank, it just creates a 'test' folder under
. The other params mean we'll write/upload 1GB worth of files, the POD will request 1G memory, and we'll write many smaller files instead of one large one.
will continue investigating tomorrow
ran this code was able to repro
Copy code
import fsspec

target_bucket = "<s3://my-bucket/yt/memtest1>"

container_dir = "/tmp/flyte-ox9aa6ku/sandbox/local_flytekit/e69fd8f684d1e5f02eadd7f427aeb2d8/test"
fs = fsspec.filesystem("s3")

fs.put(container_dir, target_bucket, recursive=True)
so this is very much in the implementation of s3fs.
as for what to do about it, i’ll have to dig through s3fs code some
This is great @Yee, thanks! s3fs seems to have a history of memory issues; this one is a couple years old, but I the problem is quite similar.
not a bug, just how it works.
cc @Eduardo Apolinario (eapolinario) i think this is a valid use-case, we need to think about how to expose this. https://github.com/flyteorg/flytekit/pull/1737/files
this works.
so never went above 250mb
@Yee I'm not certain what you're indicating here -- that with the s3 batch_size param in place, the memory utilization stayed at 250mb when running that same test that was OOMKilled previously?
by default s3fs is reading in chunks of 50 Mbs?
and it reads all 1000 files at once
actually i don’t know what 50*2**20 is, is that bits or bytes?
should be bytes
2^20 is 1MB
So I thikn you're right: 50MB
so it’s 50 mbs at a time, times a thousand files. but i don’t think it’s a one to one mapping
but in any case, limiting the parallelism to only upload 100 at a time definitely lowers memory.
but this control is not currently exposed through the type engine. need to think of a friendly way to do that.
Ok, yeah that would be nice to have that control. Otherwise the caller needs to manually throttle s3fs by requesting subsets of things to upload, which is pretty ugly
i’ll reopen the original issue and add some notes on there.
Ok, sounds good. Let me know if there is anything else I can do to help this along; this is currently causing us to stay pinned at flytekit 1.4.2 because the resource requirements are more sane before fsspec starts getting used in 1.5.0+
you can bump
just add this to your code for now…
Copy code
import fsspec.config
fsspec.config.conf["gather_batch_size"] = 100
it’s a hack, but it does unblock this for now.
we’ll figure out a nice way to incorporate, expose fsspec controls into flytekit
Thanks @Yee, in my initial test of the repro case, this does fix the issue. I'm a little surprised by this, given the DEFAULT_BATCH_SIZE doesn't seem to be much higher, but maybe this code is not executed in the current use-case.
Actually, maybe we're seeing the soft_limit RLIM_INFINITY case.
good to hear. we’ll continue to try think of a way to expose this
i think this default number isn’t used, the default is no limit. it’s the gather batch size, like how many coroutines are run at the same time.
@Don Baccus @Alexander Kislukhin This is the thread I was referring to, and a problem we're still having, albeit less frequently.
@Yee - do you know if there has been any additional work in this area? We still experience this problem, albeit less frequently. TLDR: fsspec, s3fs, and the gather_batch_size hack. We keep having to make that batch size smaller, which makes OOM errors less frequent, but even at 32 they still occur - the issue is FlyteDirectory with LOTS of files across a few subfolders.. @Alexander Kislukhin @Don Baccus
@Thomas Blom did you try latest flytekit? you can add a batch size config to the flyte directory. here is an example. https://github.com/flyteorg/flytekit/pull/1806
It’s not in flyte org doc, sorry. I’ll update that.
Hey @Kevin Su thanks, we'll give this a look. I have a couple immediate questions though: 1. Is this expected to give different behavior than the previous (admittedly uglier) solution? E.g.
Copy code
import fsspec.config
fsspec.config.conf["gather_batch_size"] = 100 # or whatever, we keep reducing it!
2. Re: "s3fs will try to read all the files in the directory into memory by default" -- I still don't understand (and haven't studied the code) the need to load all files to memory, even in small batch sizes -- this is an odd pattern for a file copy, isn't it? What if you had huge files that exceed memory size? This is what has been so confusing about this issue all along -- the large memory requirement for just copying files.
It’s not all the files - but all parallel chunks in memory and the. Flush to disk
It can be too greedy