slipstream.core

Core module.

Attributes

Classes

PausableStream

Can signal source stream to pause.

Topic

Act as a consumer and producer.

Module Contents

slipstream.core.READ_FROM_START = -2[source]
slipstream.core.READ_FROM_END = -1[source]
class slipstream.core.PausableStream(it: collections.abc.AsyncIterable[Any])[source]

Can signal source stream to pause.

If it is of type AsyncGenerator, it will receive the signal through the yield send syntax in order to handle the state change appropriately. Alternatively, the signal property can be used directly.

For example, the Topic class uses the signal to pause the Consumer.

Any value can be sent as a Signal, but only Signal.PAUSE will trigger a pause in consumption of the iterable in PausableStream. Any other value will resume the PausableStream.

property iterable: collections.abc.AsyncIterable[Any][source]

Get iterable.

signal: slipstream.utils.Signal | Any = None[source]
running: asyncio.Event[source]
send_signal(signal: slipstream.utils.Signal | Any) None[source]

Send signal to stream.

class slipstream.core.Topic(name: str, conf: dict[str, Any] | None = None, offset: int | dict[int, int] | None = None, codec: slipstream.interfaces.ICodec | None = None, dry: bool = False)[source]

Act as a consumer and producer.

>>> topic = Topic(
...     'emoji',
...     {
...         'bootstrap_servers': 'localhost:29091',
...         'auto_offset_reset': 'earliest',
...         'group_id': 'demo',
...     },
... )

Loop over topic (iterable) to consume from it:

>>> async for msg in topic:  
...     print(msg.value)

Call topic (callable) with data to produce to it:

>>> await topic({'msg': 'Hello World!'})  
name[source]
conf[source]
starting_offset = None[source]
codec = None[source]
dry = False[source]
consumer: aiokafka.AIOKafkaConsumer | None = None[source]
producer: aiokafka.AIOKafkaProducer | None = None[source]
property admin: aiokafka.AIOKafkaClient[source]

Get started instance of Kafka admin client.

async seek(offset: int | dict[int, int], consumer: aiokafka.AIOKafkaConsumer | None = None, timeout: float = 30.0) None[source]

Seek to offset.

async get_consumer() aiokafka.AIOKafkaConsumer[source]

Get started instance of Kafka consumer.

async get_producer() aiokafka.AIOKafkaProducer[source]

Get started instance of Kafka producer.

async init_generator() collections.abc.AsyncGenerator[Literal[slipstream.utils.Signal.SENTINEL] | aiokafka.ConsumerRecord[Any, Any], bool | None][source]

Initialize generator.

async asend(value: Any) aiokafka.ConsumerRecord[Any, Any][source]

Send data to generator.

async exit_hook() None[source]

Cleanup and finalization.