slipstream

Top level objects.

Submodules

Classes

Conf

The application configuration singleton.

Functions

handle(...)

Bind sources and sinks to the handler function.

stream(→ collections.abc.Coroutine[None, None, None])

Start processing iterables bound by handle function.

Package Contents

class slipstream.Conf(conf: dict[str, Any] | None = None)[source]

The application configuration singleton.

Register iterables (sources) and handlers (sinks): >>> from slipstream import handle

>>> async def messages():
...     for emoji in '🏆📞🐟👌':
...         yield emoji
>>> @handle(messages(), sink=[print])
... def handle_message(msg):
...     yield f'Hello {msg}!'

Set application kafka configuration (optional):

>>> Conf({'bootstrap_servers': 'localhost:29091'})
{'bootstrap_servers': 'localhost:29091'}

Provide exit hooks:

>>> async def exit_hook():
...     print('Shutting down application.')
>>> c = Conf()
>>> c.register_exit_hook(exit_hook)
pubsub
iterables: ClassVar[dict[str, PausableStream]]
pipes: ClassVar[dict[slipstream.utils.AsyncCallable, tuple[str, tuple[slipstream.utils.Pipe, Ellipsis]]]]
exit_hooks: ClassVar[set[slipstream.utils.AsyncCallable]]
conf: dict[str, Any]
register_iterable(key: str, it: collections.abc.AsyncIterable[Any]) None[source]

Add iterable to global Conf.

register_handler(key: str, handler: slipstream.utils.AsyncCallable, *pipe: slipstream.utils.Pipe) None[source]

Add handler to global Conf.

register_exit_hook(exit_hook: slipstream.utils.AsyncCallable) None[source]

Add exit hook that’s called on shutdown.

async start(**kwargs: Any) None[source]

Start processing registered iterables.

slipstream.handle(*iterable: collections.abc.AsyncIterable[Any], pipe: collections.abc.Iterable[slipstream.utils.Pipe] = [], sink: collections.abc.Iterable[collections.abc.Callable | slipstream.utils.AsyncCallable] = []) collections.abc.Callable[[slipstream.utils.AsyncCallable], collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]]][source]

Bind sources and sinks to the handler function.

Ex:
>>> topic = Topic('demo')  
>>> cache = Cache('state/demo')  
>>> @handle(topic, sink=[print, cache])  
... def handler(msg, **kwargs):
...     return msg.key, msg.value
slipstream.stream(**kwargs: Any) collections.abc.Coroutine[None, None, None][source]

Start processing iterables bound by handle function.

Ex:
>>> from asyncio import run
>>> kwargs = {
...     'env': 'DEV',
... }
>>> run(stream(**kwargs))