https://flyte.org logo
#ask-the-community
Title
# ask-the-community
r

Rupsha Chaudhuri

12/14/2022, 6:09 PM
Hi team.. I’m exploring Flyte’s map task for a workflow I’m working on. If I have a very large data set (say 100 million rows)… how efficient is the map task in processing this row by row vs chunks of rows? Does Flyte spin up and tear down the task for every row or is it more like a streaming model?
d

Dan Rammer (hamersaw)

12/14/2022, 6:45 PM
Hey Rupsha! So map tasks will start up a new Pod for each instance, meaning row-by-row would be very inefficient. Currently, the advantages of map tasks are maintaining a very small collection of subtask metadata, this allows executing the same task 1000's of times without significant overhead of dynamic tasks or other approaches. We have explored reusing the Pods between subtasks, because as you mentioned this would be much more efficient, but there are some difficulties in ensuring correctness. For example if a task somehow modifies the environment that will effect other tasks. If this is something you're interesting we could certainly discuss it further!
n

Niels Bantilan

12/14/2022, 6:54 PM
agreed with @Dan Rammer (hamersaw) here, a map task that process one row means 100 million fan out, which is quite a fan out number! Chunks of rows would be more feasible. What is the underlying framework you’re using to process the dataset (spark?), and what is the serialization format? (parquet?)
r

Rupsha Chaudhuri

12/14/2022, 7:01 PM
data format: parquet At this point the initial code I have runs as a regular python script and uses pandas… I was hoping to break this up and parallelize this using map task. Generating the chunks for this step shouldn’t be a big problem
y

Yee

12/14/2022, 7:17 PM
what is the output here? and roughly what’s the size?
one thing to keep in mind is that propeller ends up doing the reduce step in a map task (ie joining all the data together)
r

Rupsha Chaudhuri

12/14/2022, 7:22 PM
input dataset ~100 million rows: for each row i need to do something that’s “slightly” CPU heavy.. I’m trying to parallelize this specific step. output dataset size ~ input dataset size in the worst case.. best case orders of magnitude smaller. the reducer simply has to combine the outputs into a single dataframe
y

Yee

12/14/2022, 7:25 PM
got it cool. 100mm rows is 10 gb if each row is 100 bytes. so yeah should be fine in terms of memory in propeller assuming it can scale up.
do you guys typically use any dataframe processing frameworks in your stack (like ray, dask, spark, etc)
r

Rupsha Chaudhuri

12/14/2022, 7:26 PM
spark yes
do you recommend using flyte-spark instead of map-task?
y

Yee

12/14/2022, 7:27 PM
i’ll let niels chime in on that one. i think if the data is easily chunk-able and is slightly cpu intensive, i would opt for the map task approach, but haven’t used spark enough to confidently make this call
r

Rupsha Chaudhuri

12/14/2022, 7:27 PM
I didn’t use spark for my prototype which had around 750k rows.. but for 100 million records I want to now speed things up
n

Niels Bantilan

12/14/2022, 10:05 PM
i think if the data is easily chunk-able and is slightly cpu intensive, i would opt for the map task approach
agreed! basically you’ll want the task that produces the data to output 2 things: (i) the
StructuredDataset
itself with the chunked parquet file and (ii) a list of filenames for each chunk. Then, you’ll want to map over a dataclass that contains a reference to the
StructuredDataset
in addition to the filename of the chunk you want to process for a particular maptask
r

Rupsha Chaudhuri

12/14/2022, 10:08 PM
got it.. thanks.. let me try this out
n

Niels Bantilan

12/14/2022, 10:14 PM
actually, now that I think of it, you might need to output a list of `StructuredDataset`s… that might be easier to work with, so that each map task doesn’t have to download the entire dataset in each pod, only to process a single chunk of it… @Yee there might be a more clever way to do this… if I get a single partitioned
StructuredDataset
as input in a map task, is there a way for me to only download one of the chunks onto the map task pod?
I guess you can use a
current_context()
to use the
file_access
API to download a specific chunk
r

Rupsha Chaudhuri

12/14/2022, 10:22 PM
I’m actually building the feeder task to spit out a list of chunked dataframes… and then planning to give each map task 1 such chunked dataframe
n

Niels Bantilan

12/14/2022, 10:23 PM
yeah that sounds much simpler 🙃
y

Yee

12/14/2022, 10:25 PM
basically assuming the parquet files are partitioned along the map index, can we make it automatically only decode the portion needed?
i think this is fine.
this sounds like a neat idea. how does flytekit currently know it’s in map task?
5 Views