slipstream¶
Top level objects.
Submodules¶
Attributes¶
Classes¶
The application configuration singleton. |
Functions¶
Package Contents¶
- slipstream.VERSION¶
- class slipstream.Conf(conf: dict[str, Any] = {})[source]¶
The application configuration singleton.
Register iterables (sources) and handlers (sinks): >>> from slipstream import handle
>>> async def messages(): ... for emoji in '🏆📞🐟👌': ... yield emoji
>>> @handle(messages(), sink=[print]) ... def handle_message(msg): ... yield f'Hello {msg}!'
Set application kafka configuration (optional):
>>> Conf({'bootstrap_servers': 'localhost:29091'}) {'bootstrap_servers': 'localhost:29091'}
Provide exit hooks:
>>> async def exit_hook(): ... print('Shutting down application.')
>>> c = Conf() >>> c.register_exit_hook(exit_hook)
- pubsub¶
- iterables: dict[str, PausableStream]¶
- exit_hooks: set[slipstream.utils.AsyncCallable]¶
- register_iterable(key: str, it: collections.abc.AsyncIterable[Any])[source]¶
Add iterable to global Conf.
- register_handler(key: str, handler: slipstream.utils.AsyncCallable)[source]¶
Add handler to global Conf.
- register_exit_hook(exit_hook: slipstream.utils.AsyncCallable)[source]¶
Add exit hook that’s called on shutdown.
- conf: dict[str, Any]¶
- slipstream.handle(*iterable: collections.abc.AsyncIterable[Any], sink: Iterable[Callable | slipstream.utils.AsyncCallable] = [])[source]¶
Snaps function to stream.
- Ex:
>>> topic = Topic('demo') >>> cache = Cache('state/demo')
>>> @handle(topic, sink=[print, cache]) ... def handler(msg, **kwargs): ... return msg.key, msg.value