slipstream

Top level objects.

Submodules

Attributes

Classes

Conf

The application configuration singleton.

Functions

handle(*iterable[, sink])

Snaps function to stream.

stream(**kwargs)

Start the streams.

Package Contents

slipstream.VERSION
class slipstream.Conf(conf: dict[str, Any] = {})[source]

The application configuration singleton.

Register iterables (sources) and handlers (sinks): >>> from slipstream import handle

>>> async def messages():
...     for emoji in '🏆📞🐟👌':
...         yield emoji
>>> @handle(messages(), sink=[print])
... def handle_message(msg):
...     yield f'Hello {msg}!'

Set application kafka configuration (optional):

>>> Conf({'bootstrap_servers': 'localhost:29091'})
{'bootstrap_servers': 'localhost:29091'}

Provide exit hooks:

>>> async def exit_hook():
...     print('Shutting down application.')
>>> c = Conf()
>>> c.register_exit_hook(exit_hook)
pubsub
iterables: dict[str, PausableStream]
exit_hooks: set[slipstream.utils.AsyncCallable]
register_iterable(key: str, it: collections.abc.AsyncIterable[Any])[source]

Add iterable to global Conf.

register_handler(key: str, handler: slipstream.utils.AsyncCallable)[source]

Add handler to global Conf.

register_exit_hook(exit_hook: slipstream.utils.AsyncCallable)[source]

Add exit hook that’s called on shutdown.

async start(**kwargs: Any)[source]

Start processing registered iterables.

conf: dict[str, Any]
slipstream.handle(*iterable: collections.abc.AsyncIterable[Any], sink: Iterable[Callable | slipstream.utils.AsyncCallable] = [])[source]

Snaps function to stream.

Ex:
>>> topic = Topic('demo')                 
>>> cache = Cache('state/demo')           
>>> @handle(topic, sink=[print, cache])   
... def handler(msg, **kwargs):
...     return msg.key, msg.value
slipstream.stream(**kwargs: Any)[source]

Start the streams.

Ex:
>>> from asyncio import run
>>> args = {
...     'env': 'DEV',
... }
>>> run(stream(**args))