slipstream.utils¶ Slipstream utilities. Attributes¶ AsyncCallable Classes¶ Singleton Maintain a single instance of a class. PubSub Singleton publish subscribe pattern class. Functions¶ iscoroutinecallable(→ bool) Check whether function is coroutine. get_param_names(o) Return function parameter names. Module Contents¶ type slipstream.utils.AsyncCallable = Callable[..., Awaitable[Any]] | Callable[..., Any][source]¶ slipstream.utils.iscoroutinecallable(o: Any) → bool[source]¶ Check whether function is coroutine. slipstream.utils.get_param_names(o: Any)[source]¶ Return function parameter names. class slipstream.utils.Singleton[source]¶ Bases: type Maintain a single instance of a class. class slipstream.utils.PubSub[source]¶ Singleton publish subscribe pattern class. subscribe(topic: str, listener: AsyncCallable) → None[source]¶ Subscribe callable to topic. unsubscribe(topic: str, listener: AsyncCallable) → None[source]¶ Unsubscribe callable from topic. publish(topic: str, *args: Any, **kwargs: Any) → None[source]¶ Publish message to subscribers of topic. async apublish(topic: str, *args: Any, **kwargs: Any) → None[source]¶ Publish message to subscribers of topic. async iter_topic(topic: str) → AsyncIterator[Any][source]¶ Asynchronously iterate over messages published to a topic.