slipstream.utils ================ .. py:module:: slipstream.utils .. autoapi-nested-parse:: Slipstream utilities. Attributes ---------- .. autoapisummary:: slipstream.utils.AsyncCallable Classes ------- .. autoapisummary:: slipstream.utils.Singleton slipstream.utils.PubSub Functions --------- .. autoapisummary:: slipstream.utils.iscoroutinecallable slipstream.utils.get_param_names Module Contents --------------- .. py:type:: AsyncCallable :canonical: Callable[..., Awaitable[Any]] | Callable[..., Any] .. py:function:: iscoroutinecallable(o: Any) -> bool Check whether function is coroutine. .. py:function:: get_param_names(o: Any) Return function parameter names. .. py:class:: Singleton Bases: :py:obj:`type` Maintain a single instance of a class. .. py:class:: PubSub Singleton publish subscribe pattern class. .. py:method:: subscribe(topic: str, listener: AsyncCallable) -> None Subscribe callable to topic. .. py:method:: unsubscribe(topic: str, listener: AsyncCallable) -> None Unsubscribe callable from topic. .. py:method:: publish(topic: str, *args: Any, **kwargs: Any) -> None Publish message to subscribers of topic. .. py:method:: apublish(topic: str, *args: Any, **kwargs: Any) -> None :async: Publish message to subscribers of topic. .. py:method:: iter_topic(topic: str) -> AsyncIterator[Any] :async: Asynchronously iterate over messages published to a topic.