slipstream ========== .. py:module:: slipstream .. autoapi-nested-parse:: Top level objects. Submodules ---------- .. toctree:: :maxdepth: 1 /autoapi/slipstream/caching/index /autoapi/slipstream/checkpointing/index /autoapi/slipstream/codecs/index /autoapi/slipstream/core/index /autoapi/slipstream/interfaces/index /autoapi/slipstream/utils/index Classes ------- .. autoapisummary:: slipstream.Conf Functions --------- .. autoapisummary:: slipstream.handle slipstream.stream Package Contents ---------------- .. py:class:: Conf(conf: dict[str, Any] | None = None) The application configuration singleton. Register iterables (sources) and handlers (sinks): >>> from slipstream import handle >>> async def messages(): ... for emoji in '🏆📞🐟👌': ... yield emoji >>> @handle(messages(), sink=[print]) ... def handle_message(msg): ... yield f'Hello {msg}!' Set application kafka configuration (optional): >>> Conf({'bootstrap_servers': 'localhost:29091'}) {'bootstrap_servers': 'localhost:29091'} Provide exit hooks: >>> async def exit_hook(): ... print('Shutting down application.') >>> c = Conf() >>> c.register_exit_hook(exit_hook) .. py:attribute:: pubsub .. py:attribute:: iterables :type: ClassVar[dict[str, PausableStream]] .. py:attribute:: pipes :type: ClassVar[dict[slipstream.utils.AsyncCallable, tuple[str, tuple[slipstream.utils.Pipe, Ellipsis]]]] .. py:attribute:: exit_hooks :type: ClassVar[set[slipstream.utils.AsyncCallable]] .. py:attribute:: conf :type: dict[str, Any] .. py:method:: register_iterable(key: str, it: collections.abc.AsyncIterable[Any]) -> None Add iterable to global Conf. .. py:method:: register_handler(key: str, handler: slipstream.utils.AsyncCallable, *pipe: slipstream.utils.Pipe) -> None Add handler to global Conf. .. py:method:: register_exit_hook(exit_hook: slipstream.utils.AsyncCallable) -> None Add exit hook that's called on shutdown. .. py:method:: start(**kwargs: Any) -> None :async: Start processing registered iterables. .. py:function:: handle(*iterable: collections.abc.AsyncIterable[Any], pipe: collections.abc.Iterable[slipstream.utils.Pipe] = [], sink: collections.abc.Iterable[collections.abc.Callable | slipstream.utils.AsyncCallable] = []) -> collections.abc.Callable[[slipstream.utils.AsyncCallable], collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]]] Bind sources and sinks to the handler function. Ex: >>> topic = Topic('demo') # doctest: +SKIP >>> cache = Cache('state/demo') # doctest: +SKIP >>> @handle(topic, sink=[print, cache]) # doctest: +SKIP ... def handler(msg, **kwargs): ... return msg.key, msg.value .. py:function:: stream(**kwargs: Any) -> collections.abc.Coroutine[None, None, None] Start processing iterables bound by `handle` function. Ex: >>> from asyncio import run >>> kwargs = { ... 'env': 'DEV', ... } >>> run(stream(**kwargs)) # doctest: +SKIP