slipstream.utils

Slipstream utilities.

Attributes

Classes

Signal

Signals can be exchanged with streams.

Singleton

Maintain a single instance of a class.

PubSub

Singleton publish subscribe pattern class.

AsyncSynchronizedGenerator

Async generator that synchronizes values across copies.

Functions

awaitable(→ Any)

Convert into awaitable.

iscoroutinecallable(→ bool)

Check whether object is coroutine.

get_param_names(→ tuple[str, Ellipsis])

Return function parameter names.

Module Contents

slipstream.utils.T[source]
slipstream.utils.P[source]
type slipstream.utils.AsyncCallable = Callable[P, T | Awaitable[T]][source]
type slipstream.utils.Pipe = Callable[[AsyncIterable[Any]], AsyncIterable[Any]][source]
class slipstream.utils.Signal[source]

Bases: enum.Enum

Signals can be exchanged with streams.

SENTINEL represents an absent yield value PAUSE represents the signal to pause stream RESUME represents the signal to resume stream

SENTINEL = 0[source]
PAUSE = 1[source]
RESUME = 2[source]
STOP = 3[source]
async slipstream.utils.awaitable(x: Any) Any[source]

Convert into awaitable.

slipstream.utils.iscoroutinecallable(o: Any) bool[source]

Check whether object is coroutine.

slipstream.utils.get_param_names(o: Any) tuple[str, Ellipsis][source]

Return function parameter names.

class slipstream.utils.Singleton[source]

Bases: type

Maintain a single instance of a class.

class slipstream.utils.PubSub[source]

Singleton publish subscribe pattern class.

subscribe(topic: str, listener: AsyncCallable) None[source]

Subscribe callable to topic.

unsubscribe(topic: str, listener: AsyncCallable) None[source]

Unsubscribe callable from topic.

publish(topic: str, *args: Any, **kwargs: Any) None[source]

Publish message to subscribers of topic.

async apublish(topic: str, *args: Any, **kwargs: Any) None[source]

Publish message to subscribers of topic.

async iter_topic(topic: str) collections.abc.AsyncIterator[Any][source]

Asynchronously iterate over messages published to a topic.

class slipstream.utils.AsyncSynchronizedGenerator(gen: collections.abc.AsyncIterable[Any])[source]

Async generator that synchronizes values across copies.

property value: Any[source]

Get current value the generator is holding.

copy() _GeneratorCopy[source]

Create a synchronized copy of this generator.