Hello everyone :wave: I have a question about the...
# ask-the-community
p
Hello everyone 👋 I have a question about the map_task. I want to use the map_task with a task that gives me a list of dataframes as output. That means that the result of the map_task will be a list of lists, and I want to flatten it into a single list. To avoid memory problems, I don't want to do the flattening in a single task due to the large amount of data. Any suggestions? Is it possible in another way? Thanks you in advance 🙏 Example code:
Copy code
@task
def generate_queries(configs: dict) -> List[str]:
    return generate_queries_from_configs(configs=configs)

@task
def get_data_for_query(query: str, configs: dict) -> pd.DataFrame:
    client = bigquery.Client(project=configs.get("gcpProject"))
    df = client.query(query).to_dataframe()
    return df

@task
def to_output(df: pd.DataFrame) -> List[AnomalyData]:
    data_list = []
    for name in df['name'].unique():
        data = df[df['name'] == name].copy()
        data_list.append(AnomalyData(name=name, data=data))

    return data_list

@workflow
def start_workflow()-> str:
     ...
    queries_list = generate_queries(configs=configs)

    data_list = map_task(
        partial(get_data_for_query, configs=configs),
        metadata=TaskMetadata(retries=0),
        concurrency=20,
    )(query=queries)

     # TODO how to flatten?
     list_of_list_data = map_task(
        to_output,
       metadata=TaskMetadata(retries=0),
       concurrency=20,
    )(df=data_list)

   ....
j
if its hard to flatten in a single task, is it possible to safely return them in the map task at all? i think it might be better to write a pickled file to some where on the cloud and then write a merge-sort style dynamic task that merges then 🤔
y
if these are all dataframes, then they should be offloaded to s3…
if you’re just flattening them, and not reading their contents, it should not matter.
(if it does then that’s a bug in flytekit that should be fixed)
p
oh indeed they are just references to a bucket! I wrote my own task that flattens 🙂 Thanks 🙏