nutritious-mechanic-88579
11/06/2024, 4:46 PMtask
that preprocesses data and returns a list
of dict
using json.dumps
as FlyteFile
* Then we want to use map_task
to process each dict in list that's in the FlyteFile
@task
def ingest_data() -> pd.DataFrame:
return pd.DataFrame(...some Data)
@task
def preprocess_data(data: pd.DataFrame) -> FlyteFile:
rows = []
for _ , row in data.iterrows():
... some processing
rows.append(row)
out_path = Path(flytekit.current_context().working_directory) / "preprocessed.json"
with out_path.open(mode="w") as output_file:
pickle.dumps(input_rows, output_file)
return FlyteFile(path=str(out_path))
@task
def generate_ai_response(row: ?) -> ?:
ai_response = do_some_ai_stuff()
return ai_response
@workflow
def ai_workflow():
data = ingest_data()
preprocessed_data = preprocess_data(data)
predicted_responses: ? = map_task(generate_ai_response)(
row=preprocessed_data
)
We don't understand how to map_task
over a FlyteFile
or if that's even possible. Can we deserialise the preprocessed_data
inside the workflow
prior to map_task
? Or how else would we do this?
Thank you!wide-vegetable-51116
11/06/2024, 6:09 PMFlyteFile
s, but you could output a list[dict]
objects (basically rows in data.iterrows()
) and you can map over that. You can use dataclasses if you want more structure.
task
def ingest_data() -> pd.DataFrame:
return pd.DataFrame(...some Data)
@task
def preprocess_data(data: pd.DataFrame) -> list[dict]:
rows = []
for _ , row in data.iterrows():
... some processing
row = dict(...)
rows.append(row)
return rows
@task
def generate_ai_response(row: dict) -> dict:
ai_response = do_some_ai_stuff()
return ai_response
@workflow
def ai_workflow():
data = ingest_data()
preprocessed_data = preprocess_data(data)
predicted_responses = map_task(generate_ai_response)(
row=preprocessed_data
)
nutritious-mechanic-88579
11/11/2024, 5:08 PMWorkflow[ai-project:development:main.ntrk_mutation_analysis_workflow] failed. RuntimeExecutionError: max number of system retry attempts [11/10] exhausted. Last known status message: failed at Node[n2]. RuntimeExecutionError: failed during plugin execution, caused by: output file @[<s3://flyte/metadata/propeller/ai-project-development-f93f414e3c13c4a199e2/n2/data/0/outputs.pb>] is too large [50062009] bytes, max allowed [2097152] bytes
How can we solve that? Is there some flyte-propeller
settings in Helm?freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
the max allowed
. Union has a limit till 10GB. What do you think about using union?nutritious-mechanic-88579
11/11/2024, 6:57 PMfreezing-airport-6809
freezing-airport-6809
high-accountant-32689
11/12/2024, 12:08 AMstorage.limits.maxDownloadMBs
in your values file: https://github.com/flyteorg/flyte/blob/25cfe16940f10f9bbef02e288c823db16eb37609/charts/flyte-core/values.yaml#L610-L611