Hi, We have the following workflow: * We have a `...
# flyte-support
n
Hi, We have the following workflow: * We have a
task
that preprocesses data and returns a
list
of
dict
using
json.dumps
as
FlyteFile
* Then we want to use
map_task
to process each dict in list that's in the FlyteFile
Copy code
@task
def ingest_data() -> pd.DataFrame: 
	return pd.DataFrame(...some Data)

@task
def preprocess_data(data: pd.DataFrame) -> FlyteFile:
	rows = []
	for _ , row in data.iterrows(): 
		... some processing
		rows.append(row)
	out_path = Path(flytekit.current_context().working_directory) / "preprocessed.json"  
	with out_path.open(mode="w") as output_file:  
	    pickle.dumps(input_rows, output_file)
	return FlyteFile(path=str(out_path))


@task
def generate_ai_response(row: ?) -> ?:
	ai_response = do_some_ai_stuff()
	return ai_response


@workflow
def ai_workflow(): 
	data = ingest_data()
	preprocessed_data = preprocess_data(data)
	predicted_responses: ? = map_task(generate_ai_response)(  
	    row=preprocessed_data  
	)
We don't understand how to
map_task
over a
FlyteFile
or if that's even possible. Can we deserialise the
preprocessed_data
inside the
workflow
prior to
map_task
? Or how else would we do this? Thank you!
w
Hi Maarten, you can’t map over
FlyteFile
s, but you could output a
list[dict]
objects (basically rows in
data.iterrows()
) and you can map over that. You can use dataclasses if you want more structure.
Copy code
task
def ingest_data() -> pd.DataFrame: 
	return pd.DataFrame(...some Data)

@task
def preprocess_data(data: pd.DataFrame) -> list[dict]:
	rows = []
	for _ , row in data.iterrows(): 
		... some processing
        row = dict(...)
		rows.append(row)
	return rows


@task
def generate_ai_response(row: dict) -> dict:
	ai_response = do_some_ai_stuff()
	return ai_response


@workflow
def ai_workflow(): 
	data = ingest_data()
	preprocessed_data = preprocess_data(data)
	predicted_responses = map_task(generate_ai_response)(  
	    row=preprocessed_data  
	)
n
Hi @wide-vegetable-51116, we keep running into this issue where:
Copy code
Workflow[ai-project:development:main.ntrk_mutation_analysis_workflow] failed. RuntimeExecutionError: max number of system retry attempts [11/10] exhausted. Last known status message: failed at Node[n2]. RuntimeExecutionError: failed during plugin execution, caused by: output file @[<s3://flyte/metadata/propeller/ai-project-development-f93f414e3c13c4a199e2/n2/data/0/outputs.pb>] is too large [50062009] bytes, max allowed [2097152] bytes
How can we solve that? Is there some
flyte-propeller
settings in Helm?
f
@nutritious-mechanic-88579 have you thought of using JSONL and JSONIterator?
ohh wait, i dont think we can run map on that 😞
we have released a big thing where you can return extremely large lists without affecting the data limit.
the max allowed
. Union has a limit till 10GB. What do you think about using union?
n
@freezing-airport-6809 Thanks, but we have PII data and SLAs require us to run workflows on our hosted k8s cluster.
f
Pii data is ok, union runs in your cloud
It’s BYOC, your cloud
h
@nutritious-mechanic-88579, you can set
storage.limits.maxDownloadMBs
in your values file: https://github.com/flyteorg/flyte/blob/25cfe16940f10f9bbef02e288c823db16eb37609/charts/flyte-core/values.yaml#L610-L611