slipstream ========== .. py:module:: slipstream .. autoapi-nested-parse:: Top level objects. Submodules ---------- .. toctree:: :maxdepth: 1 /autoapi/slipstream/caching/index /autoapi/slipstream/checkpointing/index /autoapi/slipstream/codecs/index /autoapi/slipstream/core/index /autoapi/slipstream/interfaces/index /autoapi/slipstream/utils/index Attributes ---------- .. autoapisummary:: slipstream.VERSION Classes ------- .. autoapisummary:: slipstream.Conf Functions --------- .. autoapisummary:: slipstream.handle slipstream.stream Package Contents ---------------- .. py:data:: VERSION .. py:class:: Conf(conf: dict[str, Any] = {}) The application configuration singleton. Register iterables (sources) and handlers (sinks): >>> from slipstream import handle >>> async def messages(): ... for emoji in '🏆📞🐟👌': ... yield emoji >>> @handle(messages(), sink=[print]) ... def handle_message(msg): ... yield f'Hello {msg}!' Set application kafka configuration (optional): >>> Conf({'bootstrap_servers': 'localhost:29091'}) {'bootstrap_servers': 'localhost:29091'} Provide exit hooks: >>> async def exit_hook(): ... print('Shutting down application.') >>> c = Conf() >>> c.register_exit_hook(exit_hook) .. py:attribute:: pubsub .. py:attribute:: iterables :type: dict[str, PausableStream] .. py:attribute:: exit_hooks :type: set[slipstream.utils.AsyncCallable] .. py:method:: register_iterable(key: str, it: collections.abc.AsyncIterable[Any]) Add iterable to global Conf. .. py:method:: register_handler(key: str, handler: slipstream.utils.AsyncCallable) Add handler to global Conf. .. py:method:: register_exit_hook(exit_hook: slipstream.utils.AsyncCallable) Add exit hook that's called on shutdown. .. py:method:: start(**kwargs: Any) :async: Start processing registered iterables. .. py:attribute:: conf :type: dict[str, Any] .. py:function:: handle(*iterable: collections.abc.AsyncIterable[Any], sink: Iterable[Callable | slipstream.utils.AsyncCallable] = []) Snaps function to stream. Ex: >>> topic = Topic('demo') # doctest: +SKIP >>> cache = Cache('state/demo') # doctest: +SKIP >>> @handle(topic, sink=[print, cache]) # doctest: +SKIP ... def handler(msg, **kwargs): ... return msg.key, msg.value .. py:function:: stream(**kwargs: Any) Start the streams. Ex: >>> from asyncio import run >>> args = { ... 'env': 'DEV', ... } >>> run(stream(**args)) # doctest: +SKIP