slipstream¶
Top level objects.
Submodules¶
Classes¶
The application configuration singleton. |
Functions¶
Package Contents¶
- class slipstream.Conf(conf: dict[str, Any] | None = None)[source]¶
The application configuration singleton.
Register iterables (sources) and handlers (sinks): >>> from slipstream import handle
>>> async def messages(): ... for emoji in '🏆📞🐟👌': ... yield emoji
>>> @handle(messages(), sink=[print]) ... def handle_message(msg): ... yield f'Hello {msg}!'
Set application kafka configuration (optional):
>>> Conf({'bootstrap_servers': 'localhost:29091'}) {'bootstrap_servers': 'localhost:29091'}
Provide exit hooks:
>>> async def exit_hook(): ... print('Shutting down application.')
>>> c = Conf() >>> c.register_exit_hook(exit_hook)
- pubsub¶
- iterables: ClassVar[dict[str, PausableStream]]¶
- pipes: ClassVar[dict[slipstream.utils.AsyncCallable, tuple[str, tuple[slipstream.utils.Pipe, Ellipsis]]]]¶
- exit_hooks: ClassVar[set[slipstream.utils.AsyncCallable]]¶
- conf: dict[str, Any]¶
- register_iterable(key: str, it: collections.abc.AsyncIterable[Any]) None[source]¶
Add iterable to global Conf.
- register_handler(key: str, handler: slipstream.utils.AsyncCallable, *pipe: slipstream.utils.Pipe) None[source]¶
Add handler to global Conf.
- slipstream.handle(*iterable: collections.abc.AsyncIterable[Any], pipe: collections.abc.Iterable[slipstream.utils.Pipe] = [], sink: collections.abc.Iterable[collections.abc.Callable | slipstream.utils.AsyncCallable] = []) collections.abc.Callable[[slipstream.utils.AsyncCallable], collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]]][source]¶
Bind sources and sinks to the handler function.
- Ex:
>>> topic = Topic('demo') >>> cache = Cache('state/demo')
>>> @handle(topic, sink=[print, cache]) ... def handler(msg, **kwargs): ... return msg.key, msg.value