slipstream package
Top level objects.
- class slipstream.Cache(path: str, options: Options | None = None, column_families: ~typing.Dict[str, Options] | None = None, access_type: AccessType = <builtins.AccessType object>, target_table_size: int = 26214400, number_of_locks: int = 16)[source]
Bases:
ICacheCreate a RocksDB database in the specified folder.
>>> cache = Cache('db/mycache')
The cache instance acts as a callable to store data:
>>> cache('key', {'msg': 'Hello World!'}) >>> cache['key'] {'msg': 'Hello World!'}
- compact_range(begin: str | int | float | bytes | bool | None, end: str | int | float | bytes | bool | None, compact_opt: CompactOptions = <builtins.CompactOptions object>) None[source]
Run manual compaction on range for the current column family.
- create_column_family(name: str, options: Options = <builtins.Options object>) Rdict[source]
Craete column family.
- delete(key: str | int | float | bytes | bool, write_opt: WriteOptions | None = None) None[source]
Delete item from database.
- delete_range(begin: str | int | float | bytes | bool, end: str | int | float | bytes | bool, write_opt: WriteOptions | None = None) None[source]
Delete database items, excluding end.
- get(key: str | int | float | bytes | bool | list[str | int | float | bytes | bool], default: Any | None = None, read_opt: ReadOptions | None = None) Any | None[source]
Get item from database by key.
- ingest_external_file(paths: ~typing.List[str], opts: IngestExternalFileOptions = <builtins.IngestExternalFileOptions object>) None[source]
Load list of SST files into current column family.
- items(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None) RdictItems[source]
Get tuples of key-value pairs.
- key_may_exist(key: str | int | float | bytes | bool, fetch: bool = False, read_opt: ReadOptions | None = None) bool | Tuple[bool, Any][source]
Check if a key exist without performing IO operations.
- keys(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None) RdictKeys[source]
Get keys.
- live_files() List[Dict[str, Any]][source]
Get list of all table files with their level, start/end key.
- property_int_value(name: str) int | None[source]
Get property as int by name from current column family.
- put(key: str | int | float | bytes | bool, value: Any, write_opt: WriteOptions | None = None) None[source]
Put item in database using key.
- class slipstream.Conf(*args: Any, **kwargs: Any)[source]
Bases:
objectThe application configuration singleton.
Register iterables (sources) and handlers (sinks): >>> from slipstream import handle
>>> async def messages(): ... for emoji in '🏆📞🐟👌': ... yield emoji
>>> @handle(messages(), sink=[print]) ... def handle_message(msg): ... yield f'Hello {msg}!'
Set application kafka configuration (optional): >>> Conf({‘bootstrap_servers’: ‘localhost:29091’}) {‘bootstrap_servers’: ‘localhost:29091’}
Provide exit hooks: >>> async def exit_hook(): … print(‘Shutting down application.’)
>>> c = Conf() >>> c.register_exit_hook(exit_hook)
- exit_hooks: set[Callable[[...], Awaitable[Any]] | Callable[[...], Any]] = {}
- iterables: dict[str, PausableStream] = {}
- pubsub = <slipstream.utils.PubSub object>
- register_exit_hook(exit_hook: Callable[[...], Awaitable[Any]] | Callable[[...], Any])[source]
Add exit hook that’s called on shutdown.
- class slipstream.Topic(name: str, conf: dict[str, Any] = {}, offset: int | None = None, codec: ICodec | None = None, dry: bool = False)[source]
Bases:
objectAct as a consumer and producer.
>>> topic = Topic('emoji', { ... 'bootstrap_servers': 'localhost:29091', ... 'auto_offset_reset': 'earliest', ... 'group_id': 'demo', ... })
Loop over topic (iterable) to consume from it:
>>> async for msg in topic: ... print(msg.value)
Call topic (callable) with data to produce to it:
>>> await topic({'msg': 'Hello World!'})
- property admin: AIOKafkaClient
Get started instance of Kafka admin client.
- slipstream.handle(*iterable: AsyncIterable[Any], sink: Iterable[Callable[[...], Awaitable[Any]] | Callable[[...], Any]] = [])[source]
Snaps function to stream.
- Ex:
>>> topic = Topic('demo') >>> cache = Cache('state/demo')
>>> @handle(topic, sink=[print, cache]) ... def handler(msg, **kwargs): ... return msg.key, msg.value
- slipstream.stream(**kwargs: Any)[source]
Start the streams.
- Ex:
>>> from asyncio import run >>> args = { ... 'env': 'DEV', ... } >>> run(stream(**args))
Submodules
slipstream.caching module
Slipstream caching.
- class slipstream.caching.Cache(path: str, options: Options | None = None, column_families: ~typing.Dict[str, Options] | None = None, access_type: AccessType = <builtins.AccessType object>, target_table_size: int = 26214400, number_of_locks: int = 16)[source]
Bases:
ICacheCreate a RocksDB database in the specified folder.
>>> cache = Cache('db/mycache')
The cache instance acts as a callable to store data:
>>> cache('key', {'msg': 'Hello World!'}) >>> cache['key'] {'msg': 'Hello World!'}
- compact_range(begin: str | int | float | bytes | bool | None, end: str | int | float | bytes | bool | None, compact_opt: CompactOptions = <builtins.CompactOptions object>) None[source]
Run manual compaction on range for the current column family.
- create_column_family(name: str, options: Options = <builtins.Options object>) Rdict[source]
Craete column family.
- delete(key: str | int | float | bytes | bool, write_opt: WriteOptions | None = None) None[source]
Delete item from database.
- delete_range(begin: str | int | float | bytes | bool, end: str | int | float | bytes | bool, write_opt: WriteOptions | None = None) None[source]
Delete database items, excluding end.
- get(key: str | int | float | bytes | bool | list[str | int | float | bytes | bool], default: Any | None = None, read_opt: ReadOptions | None = None) Any | None[source]
Get item from database by key.
- ingest_external_file(paths: ~typing.List[str], opts: IngestExternalFileOptions = <builtins.IngestExternalFileOptions object>) None[source]
Load list of SST files into current column family.
- items(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None) RdictItems[source]
Get tuples of key-value pairs.
- key_may_exist(key: str | int | float | bytes | bool, fetch: bool = False, read_opt: ReadOptions | None = None) bool | Tuple[bool, Any][source]
Check if a key exist without performing IO operations.
- keys(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None) RdictKeys[source]
Get keys.
- live_files() List[Dict[str, Any]][source]
Get list of all table files with their level, start/end key.
- property_int_value(name: str) int | None[source]
Get property as int by name from current column family.
- put(key: str | int | float | bytes | bool, value: Any, write_opt: WriteOptions | None = None) None[source]
Put item in database using key.
slipstream.checkpointing module
Slipstream checkpointing.
- class slipstream.checkpointing.Checkpoint(name: str, dependent: AsyncIterable[Any], dependencies: list[Dependency], downtime_callback: Callable[[Checkpoint, Dependency], Any] | None = None, recovery_callback: Callable[[Checkpoint, Dependency], Any] | None = None, cache: ICache | None = None, cache_key_prefix: str = '_')[source]
Bases:
objectPulse the heartbeat of dependency streams to handle downtimes.
A checkpoint consists of a dependent stream and dependency streams.
>>> async def emoji(): ... for emoji in '🏆📞🐟👌': ... yield emoji
>>> dependent, dependency = emoji(), emoji()
>>> c = Checkpoint( ... 'dependent', dependent=dependent, ... dependencies=[Dependency('dependency', dependency)] ... )
Checkpoints automatically handle pausing of dependent streams when they are bound to user handler functions (using handle):
>>> from slipstream import handle
>>> @handle(dependent) ... async def dependent_handler(msg): ... key, val, offset = msg.key, msg.value, msg.offset ... c.check_pulse(marker=msg['event_timestamp'], offset=offset) ... yield key, msg
>>> @handle(dependency) ... async def dependency_handler(msg): ... key, val = msg.key, msg.value ... await c.heartbeat(val['event_timestamp']) ... yield key, val
On the first pulse check, no message might have been received from dependency yet. Therefore the dependency checkpoint is updated with the initial state and marker of the dependent stream:
>>> from asyncio import run
>>> run(c.check_pulse(marker=datetime(2025, 1, 1, 10), offset=8)) >>> c['dependency'].checkpoint_marker datetime.datetime(2025, 1, 1, 10, 0)
When a message is received in dependency, send a heartbeat with its event time, which can be compared with the dependent event times to check for downtime:
>>> run(c.heartbeat(datetime(2025, 1, 1, 10, 30)))
When the pulse is checked after a while, it’s apparent that no dependency messages have been received for 30 minutes:
>>> run(c.check_pulse(marker=datetime(2025, 1, 1, 11), offset=9)) datetime.timedelta(seconds=1800)
Because the downtime surpasses the default downtime_threshold, the dependent stream will be paused (and resumed when the recovery check succeeds). Callbacks can be provided for additional custom behavior.
If no cache is provided, the checkpoint lifespan will be limited to that of the application runtime.
- async check_pulse(marker: datetime | Any, **kwargs: Any) Any | None[source]
Update state that can be used as checkpoint.
- Args:
- marker (datetime | Any): Typically the event timestamp that is
compared to the event timestamp of a dependency stream.
- state (Any): Any information that can be used for reprocessing any
incorrect data that was sent out during downtime of a dependency stream (offsets for example).
- Returns:
- Any: Typically the timedelta between the last state_marker and
the checkpoint_marker since the stream went down.
- async heartbeat(marker: datetime | Any, dependency_name: str | None = None) None[source]
Update checkpoint to latest state.
- Args:
- marker (datetime | Any): Typically the event timestamp that is
compared to the event timestamp of a dependent stream.
- dependency_name (str, optional): Required when there are multiple
dependencies to specify which one the heartbeat is for.
- class slipstream.checkpointing.Dependency(name: str, dependency: AsyncIterable[Any], downtime_threshold: Any = datetime.timedelta(seconds=600), downtime_check: Callable[[Checkpoint, Dependency], Any] | None = None, recovery_check: Callable[[Checkpoint, Dependency], bool] | None = None)[source]
Bases:
objectTrack the dependent stream state to recover from downtime.
>>> async def emoji(): ... for emoji in '🏆📞🐟👌': ... yield emoji >>> Dependency('emoji', emoji()) {'checkpoint_state': None, 'checkpoint_marker': None}
slipstream.codecs module
Slipstream codecs.
slipstream.core module
Core module.
- class slipstream.core.Conf(*args: Any, **kwargs: Any)[source]
Bases:
objectThe application configuration singleton.
Register iterables (sources) and handlers (sinks): >>> from slipstream import handle
>>> async def messages(): ... for emoji in '🏆📞🐟👌': ... yield emoji
>>> @handle(messages(), sink=[print]) ... def handle_message(msg): ... yield f'Hello {msg}!'
Set application kafka configuration (optional): >>> Conf({‘bootstrap_servers’: ‘localhost:29091’}) {‘bootstrap_servers’: ‘localhost:29091’}
Provide exit hooks: >>> async def exit_hook(): … print(‘Shutting down application.’)
>>> c = Conf() >>> c.register_exit_hook(exit_hook)
- conf: dict[str, Any]
- exit_hooks: set[Callable[[...], Awaitable[Any]] | Callable[[...], Any]] = {}
- iterables: dict[str, PausableStream] = {}
- pubsub = <slipstream.utils.PubSub object>
- register_exit_hook(exit_hook: Callable[[...], Awaitable[Any]] | Callable[[...], Any])[source]
Add exit hook that’s called on shutdown.
- class slipstream.core.PausableStream(it: AsyncIterable[Any])[source]
Bases:
objectCan signal source stream to pause.
If it is of type AsyncGenerator, it will receive the signal through the yield send syntax in order to handle the state change appropriately. Alternatively, the signal property can be used directly.
For example, the Topic class uses the signal to pause the Consumer.
Any value can be sent as a Signal, but only Signal.PAUSE will trigger a pause in consumption of the iterable in PausableStream. Any other value will resume the PausableStream.
- class slipstream.core.Signal(value)[source]
Bases:
EnumSignals can be exchanged with streams.
SENTINEL represents an absent yield value PAUSE represents the signal to pause stream RESUME represents the signal to resume stream
- PAUSE = 1
- RESUME = 2
- SENTINEL = 0
- class slipstream.core.Topic(name: str, conf: dict[str, Any] = {}, offset: int | None = None, codec: ICodec | None = None, dry: bool = False)[source]
Bases:
objectAct as a consumer and producer.
>>> topic = Topic('emoji', { ... 'bootstrap_servers': 'localhost:29091', ... 'auto_offset_reset': 'earliest', ... 'group_id': 'demo', ... })
Loop over topic (iterable) to consume from it:
>>> async for msg in topic: ... print(msg.value)
Call topic (callable) with data to produce to it:
>>> await topic({'msg': 'Hello World!'})
- property admin: AIOKafkaClient
Get started instance of Kafka admin client.
- slipstream.core.handle(*iterable: AsyncIterable[Any], sink: Iterable[Callable[[...], Awaitable[Any]] | Callable[[...], Any]] = [])[source]
Snaps function to stream.
- Ex:
>>> topic = Topic('demo') >>> cache = Cache('state/demo')
>>> @handle(topic, sink=[print, cache]) ... def handler(msg, **kwargs): ... return msg.key, msg.value
slipstream.interfaces module
Slipstream interfaces.
slipstream.utils module
Slipstream utilities.
- class slipstream.utils.PubSub(*args: Any, **kwargs: Any)[source]
Bases:
objectSingleton publish subscribe pattern class.
- async apublish(topic: str, *args: Any, **kwargs: Any) None[source]
Publish message to subscribers of topic.
- async iter_topic(topic: str) AsyncIterator[Any][source]
Asynchronously iterate over messages published to a topic.
- publish(topic: str, *args: Any, **kwargs: Any) None[source]
Publish message to subscribers of topic.