What do people currently do for more "stream based...
# ask-the-community
e
What do people currently do for more "stream based" workflow topologies (e.g. hyperparameter tuning, iterative optimization etc)? I'm thinking it might be possible to make a wrapper that creates a redis queue uses flyte serialization and deserialization to serialize things onto/off the queue, and then wraps python generators, but I'm wondering if anyone else has tried something like this. EDIT: in particular, it would be helpful to know if there is a callable version of the flyte serializer that could be used in something like rq
k
❤️ @Eli Bixby you are right. Infact the programming model for Flyte does not dictate batch or stream. Infact we might do something in the future. I do have a spec for StreamLiterals, or stream of literals. Also these PRs are landing soon - probably 1.5 https://github.com/flyteorg/flytekit/pull/1512 Complete data rewrite to support native streams https://github.com/flyteorg/flytekit/pull/1518
And yes there is a callable version of Flyte serializer, but let’s do one thing - make an issue and let’s open up the api at the top level so that it gets added to the contract
e
Is there a doc somewhere about the data subsystem redesign? As for stream literals, on the python side, I would expect it to work with generators and async generators, so something like:
Copy code
@task
async def foo(input_stream: AsyncGenerator[Literal]) -> AsyncGenerator[Literal]:
    async for item in input_stream:
        yield do_thing(item)
Would be nice to support async generators as that will make interacting with multiple streams within a task very straight forward.
k
@Eli Bixby that’s a great UX
Ya let’s collaborate on this once I open it up
104 Views