slipstream.core =============== .. py:module:: slipstream.core .. autoapi-nested-parse:: Core module. Attributes ---------- .. autoapisummary:: slipstream.core.READ_FROM_START slipstream.core.READ_FROM_END Classes ------- .. autoapisummary:: slipstream.core.PausableStream slipstream.core.Topic Module Contents --------------- .. py:data:: READ_FROM_START :value: -2 .. py:data:: READ_FROM_END :value: -1 .. py:class:: PausableStream(it: collections.abc.AsyncIterable[Any]) Can signal source stream to pause. If `it` is of type `AsyncGenerator`, it will receive the signal through the yield send syntax in order to handle the state change appropriately. Alternatively, the `signal` property can be used directly. For example, the Topic class uses the signal to pause the Consumer. Any value can be sent as a Signal, but only Signal.PAUSE will trigger a pause in consumption of the iterable in PausableStream. Any other value will resume the PausableStream. .. py:property:: iterable :type: collections.abc.AsyncIterable[Any] Get iterable. .. py:attribute:: signal :type: slipstream.utils.Signal | Any :value: None .. py:attribute:: running :type: asyncio.Event .. py:method:: send_signal(signal: slipstream.utils.Signal | Any) -> None Send signal to stream. .. py:class:: Topic(name: str, conf: dict[str, Any] | None = None, offset: int | dict[int, int] | None = None, codec: slipstream.interfaces.ICodec | None = None, dry: bool = False) Act as a consumer and producer. >>> topic = Topic( ... 'emoji', ... { ... 'bootstrap_servers': 'localhost:29091', ... 'auto_offset_reset': 'earliest', ... 'group_id': 'demo', ... }, ... ) Loop over topic (iterable) to consume from it: >>> async for msg in topic: # doctest: +SKIP ... print(msg.value) Call topic (callable) with data to produce to it: >>> await topic({'msg': 'Hello World!'}) # doctest: +SKIP .. py:attribute:: name .. py:attribute:: conf .. py:attribute:: starting_offset :value: None .. py:attribute:: codec :value: None .. py:attribute:: dry :value: False .. py:attribute:: consumer :type: aiokafka.AIOKafkaConsumer | None :value: None .. py:attribute:: producer :type: aiokafka.AIOKafkaProducer | None :value: None .. py:property:: admin :type: aiokafka.AIOKafkaClient Get started instance of Kafka admin client. .. py:method:: seek(offset: int | dict[int, int], consumer: aiokafka.AIOKafkaConsumer | None = None, timeout: float = 30.0) -> None :async: Seek to offset. .. py:method:: get_consumer() -> aiokafka.AIOKafkaConsumer :async: Get started instance of Kafka consumer. .. py:method:: get_producer() -> aiokafka.AIOKafkaProducer :async: Get started instance of Kafka producer. .. py:method:: init_generator() -> collections.abc.AsyncGenerator[Literal[slipstream.utils.Signal.SENTINEL] | aiokafka.ConsumerRecord[Any, Any], bool | None] :async: Initialize generator. .. py:method:: asend(value: Any) -> aiokafka.ConsumerRecord[Any, Any] :async: Send data to generator. .. py:method:: exit_hook() -> None :async: Cleanup and finalization.