What do people currently do for more "stream based" workflow topologies (e.g. hyperparameter tuning, iterative optimization etc)?
I'm thinking it might be possible to make a wrapper that creates a redis queue uses flyte serialization and deserialization to serialize things onto/off the queue, and then wraps python generators, but I'm wondering if anyone else has tried something like this.
EDIT: in particular, it would be helpful to know if there is a callable version of the flyte serializer that could be used in something like rq
k
Ketan (kumare3)
02/28/2023, 3:04 PM
❤️ @Eli Bixby you are right. Infact the programming model for Flyte does not dictate batch or stream. Infact we might do something in the future. I do have a spec for StreamLiterals, or stream of literals.
Also these PRs are landing soon - probably 1.5
https://github.com/flyteorg/flytekit/pull/1512
Complete data rewrite to support native streams
https://github.com/flyteorg/flytekit/pull/1518
And yes there is a callable version of Flyte serializer, but let’s do one thing - make an issue and let’s open up the api at the top level so that it gets added to the contract
e
Eli Bixby
03/01/2023, 9:02 AM
Is there a doc somewhere about the data subsystem redesign?
As for stream literals, on the python side, I would expect it to work with generators and async generators, so something like:
Copy code
@task
async def foo(input_stream: AsyncGenerator[Literal]) -> AsyncGenerator[Literal]:
async for item in input_stream:
yield do_thing(item)
Would be nice to support async generators as that will make interacting with multiple streams within a task very straight forward.