flat-waiter-82487
04/01/2025, 2:26 PM@workflow
def dummy_workflow(n: int) -> int:
numbers = generate_numbers(n=n)
squared_numbers = map_task(square)(x=numbers)
total = sum_squares(squares=squared_numbers)
return total
class Data(BaseModel):
n: int = Field(ge=1, le=100)
@app.post("/workflow")
async def root(data: Data):
result = dummy_workflow(n=data.n)
return {
"message": "Workflow executed",
"result": result,
}
The reason I'm asking is that I want to have a single pipeline abstraction across our code basedamp-lion-88352
04/01/2025, 3:46 PMfreezing-airport-6809
freezing-airport-6809
flat-waiter-82487
04/02/2025, 5:49 AMdamp-lion-88352
04/02/2025, 5:50 AMdamp-lion-88352
04/02/2025, 5:50 AMflat-waiter-82487
04/02/2025, 5:50 AMflat-waiter-82487
04/02/2025, 11:30 AMdummy_workflow(params...)
serialize data passed between tasks?damp-lion-88352
04/02/2025, 11:31 AM