slipstream.core¶
Core module.
Attributes¶
Classes¶
Can signal source stream to pause. |
|
Act as a consumer and producer. |
Module Contents¶
- class slipstream.core.PausableStream(it: collections.abc.AsyncIterable[Any])[source]¶
Can signal source stream to pause.
If it is of type AsyncGenerator, it will receive the signal through the yield send syntax in order to handle the state change appropriately. Alternatively, the signal property can be used directly.
For example, the Topic class uses the signal to pause the Consumer.
Any value can be sent as a Signal, but only Signal.PAUSE will trigger a pause in consumption of the iterable in PausableStream. Any other value will resume the PausableStream.
- signal: slipstream.utils.Signal | Any = None[source]¶
- send_signal(signal: slipstream.utils.Signal | Any) None[source]¶
Send signal to stream.
- class slipstream.core.Topic(name: str, conf: dict[str, Any] | None = None, offset: int | dict[int, int] | None = None, codec: slipstream.interfaces.ICodec | None = None, dry: bool = False)[source]¶
Act as a consumer and producer.
>>> topic = Topic( ... 'emoji', ... { ... 'bootstrap_servers': 'localhost:29091', ... 'auto_offset_reset': 'earliest', ... 'group_id': 'demo', ... }, ... )
Loop over topic (iterable) to consume from it:
>>> async for msg in topic: ... print(msg.value)
Call topic (callable) with data to produce to it:
>>> await topic({'msg': 'Hello World!'})
- async seek(offset: int | dict[int, int], consumer: aiokafka.AIOKafkaConsumer | None = None, timeout: float = 30.0) None[source]¶
Seek to offset.