slipstream package¶
Top level objects.
- class slipstream.Cache(*args: Any, **kwargs: Any)[source]¶
Bases:
ICacheCreate a RocksDB database in the specified folder.
>>> cache = Cache('db/mycache')
The cache instance acts as a callable to store data:
>>> cache('key', {'msg': 'Hello World!'}) >>> cache['key'] {'msg': 'Hello World!'}
- columns(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None, prefix: str | int | float | bytes | bool | None = None) Iterator[list[tuple[Any, Any]]][source]¶
Get values as widecolumns.
- compact_range(begin: str | int | float | bytes | bool | None, end: str | int | float | bytes | bool | None, compact_opt: CompactOptions | None = None) None[source]¶
Run manual compaction on range for the current column family.
- create_column_family(name: str, options: Options | None = None) Rdict[source]¶
Craete column family.
- delete(key: str | int | float | bytes | bool, write_opt: WriteOptions | None = None) None[source]¶
Delete item from database.
- delete_range(begin: str | int | float | bytes | bool, end: str | int | float | bytes | bool, write_opt: WriteOptions | None = None) None[source]¶
Delete database items, excluding end.
- entities(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None, prefix: str | int | float | bytes | bool | None = None) Iterator[tuple[str | int | float | bytes | bool, list[tuple[Any, Any]]]][source]¶
Get keys and entities.
- get(key: str | int | float | bytes | bool | list[str | int | float | bytes | bool], default: T | None = None, read_opt: ReadOptions | None = None) Any | T[source]¶
Get item from database by key.
- get_entity(key: str | int | float | bytes | bool | list[str | int | float | bytes | bool], default: Any | None = None, read_opt: ReadOptions | None = None) list[tuple[Any, Any]] | None[source]¶
Get wide-column from database by key.
>>> cache.get_entity('key') [('a', 1), ('b', 2)]
- ingest_external_file(paths: list[str], opts: IngestExternalFileOptions | None = None) None[source]¶
Load list of SST files into current column family.
- items(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None, prefix: str | int | float | bytes | bool | None = None) Iterator[tuple[str | int | float | bytes | bool, Any]][source]¶
Get tuples of key-value pairs.
- key_may_exist(key: str | int | float | bytes | bool, fetch: bool = False, read_opt: ReadOptions | None = None) bool | tuple[bool, Any][source]¶
Check if a key exist without performing IO operations.
- keys(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None, prefix: str | int | float | bytes | bool | None = None) Iterator[str | int | float | bytes | bool][source]¶
Get keys.
- live_files() list[dict[str, Any]][source]¶
Get list of all table files with their level, start/end key.
- property_int_value(name: str) int | None[source]¶
Get property as int by name from current column family.
- put(key: str | int | float | bytes | bool, value: Any, write_opt: WriteOptions | None = None) None[source]¶
Put item in database using key.
- put_entity(key: str | int | float | bytes | bool, names: list[Any], values: list[Any], write_opt: WriteOptions | None = None) None[source]¶
Put wide-column in database using key.
>>> cache.put_entity('key', ['a', 'b'], [1, 2])
- transaction(key: str | int | float | bytes | bool) AsyncGenerator[Cache, None][source]¶
Lock the db entry while using the context manager.
>>> async with cache.transaction('fish'): ... cache['fish'] = '🐟'
This works for asynchronous code (not multi-threading/processing)
While locked, other transactions on the same key will block
Actions outside of transaction blocks ignore ongoing transactions
Reads aren’t limited by ongoing transactions
- class slipstream.Conf(*args: Any, **kwargs: Any)[source]¶
Bases:
objectThe application configuration singleton.
Register iterables (sources) and handlers (sinks): >>> from slipstream import handle
>>> async def messages(): ... for emoji in '🏆📞🐟👌': ... yield emoji
>>> @handle(messages(), sink=[print]) ... def handle_message(msg): ... yield f'Hello {msg}!'
Set application kafka configuration (optional):
>>> Conf({'bootstrap_servers': 'localhost:29091'}) {'bootstrap_servers': 'localhost:29091'}
Provide exit hooks:
>>> async def exit_hook(): ... print('Shutting down application.')
>>> c = Conf() >>> c.register_exit_hook(exit_hook)
- exit_hooks: ClassVar[set[Callable[[P], T | Awaitable[T]]]] = {}¶
- iterables: ClassVar[dict[str, PausableStream]] = {}¶
- pipes: ClassVar[dict[Callable[[P], T | Awaitable[T]], tuple[str, tuple[Callable[[AsyncIterable[Any]], AsyncIterable[Any]], ...]]]] = {}¶
- pubsub = <slipstream.utils.PubSub object>¶
- register_exit_hook(exit_hook: Callable[[P], T | Awaitable[T]]) None[source]¶
Add exit hook that’s called on shutdown.
- class slipstream.Topic(name: str, conf: dict[str, Any] | None = None, offset: int | dict[int, int] | None = None, codec: ICodec | None = None, dry: bool = False)[source]¶
Bases:
objectAct as a consumer and producer.
>>> topic = Topic( ... 'emoji', ... { ... 'bootstrap_servers': 'localhost:29091', ... 'auto_offset_reset': 'earliest', ... 'group_id': 'demo', ... }, ... )
Loop over topic (iterable) to consume from it:
>>> async for msg in topic: ... print(msg.value)
Call topic (callable) with data to produce to it:
>>> await topic({'msg': 'Hello World!'})
- property admin: AIOKafkaClient¶
Get started instance of Kafka admin client.
- slipstream.handle(*iterable: AsyncIterable[Any], pipe: Iterable[Callable[[AsyncIterable[Any]], AsyncIterable[Any]]] = [], sink: Iterable[Callable | Callable[[P], T | Awaitable[T]]] = []) Callable[[Callable[[P], T | Awaitable[T]]], Callable[[...], Awaitable[Any]]][source]¶
Bind sources and sinks to the handler function.
- Ex:
>>> topic = Topic('demo') >>> cache = Cache('state/demo')
>>> @handle(topic, sink=[print, cache]) ... def handler(msg, **kwargs): ... return msg.key, msg.value
- slipstream.stream(**kwargs: Any) Coroutine[None, None, None][source]¶
Start processing iterables bound by handle function.
- Ex:
>>> from asyncio import run >>> kwargs = { ... 'env': 'DEV', ... } >>> run(stream(**kwargs))
Submodules¶
slipstream.caching module¶
Slipstream caching.
- class slipstream.caching.Cache(*args: Any, **kwargs: Any)[source]¶
Bases:
ICacheCreate a RocksDB database in the specified folder.
>>> cache = Cache('db/mycache')
The cache instance acts as a callable to store data:
>>> cache('key', {'msg': 'Hello World!'}) >>> cache['key'] {'msg': 'Hello World!'}
- columns(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None, prefix: str | int | float | bytes | bool | None = None) Iterator[list[tuple[Any, Any]]][source]¶
Get values as widecolumns.
- compact_range(begin: str | int | float | bytes | bool | None, end: str | int | float | bytes | bool | None, compact_opt: CompactOptions | None = None) None[source]¶
Run manual compaction on range for the current column family.
- create_column_family(name: str, options: Options | None = None) Rdict[source]¶
Craete column family.
- delete(key: str | int | float | bytes | bool, write_opt: WriteOptions | None = None) None[source]¶
Delete item from database.
- delete_range(begin: str | int | float | bytes | bool, end: str | int | float | bytes | bool, write_opt: WriteOptions | None = None) None[source]¶
Delete database items, excluding end.
- entities(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None, prefix: str | int | float | bytes | bool | None = None) Iterator[tuple[str | int | float | bytes | bool, list[tuple[Any, Any]]]][source]¶
Get keys and entities.
- get(key: str | int | float | bytes | bool | list[str | int | float | bytes | bool], default: T | None = None, read_opt: ReadOptions | None = None) Any | T[source]¶
Get item from database by key.
- get_entity(key: str | int | float | bytes | bool | list[str | int | float | bytes | bool], default: Any | None = None, read_opt: ReadOptions | None = None) list[tuple[Any, Any]] | None[source]¶
Get wide-column from database by key.
>>> cache.get_entity('key') [('a', 1), ('b', 2)]
- ingest_external_file(paths: list[str], opts: IngestExternalFileOptions | None = None) None[source]¶
Load list of SST files into current column family.
- items(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None, prefix: str | int | float | bytes | bool | None = None) Iterator[tuple[str | int | float | bytes | bool, Any]][source]¶
Get tuples of key-value pairs.
- key_may_exist(key: str | int | float | bytes | bool, fetch: bool = False, read_opt: ReadOptions | None = None) bool | tuple[bool, Any][source]¶
Check if a key exist without performing IO operations.
- keys(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None, prefix: str | int | float | bytes | bool | None = None) Iterator[str | int | float | bytes | bool][source]¶
Get keys.
- live_files() list[dict[str, Any]][source]¶
Get list of all table files with their level, start/end key.
- property_int_value(name: str) int | None[source]¶
Get property as int by name from current column family.
- put(key: str | int | float | bytes | bool, value: Any, write_opt: WriteOptions | None = None) None[source]¶
Put item in database using key.
- put_entity(key: str | int | float | bytes | bool, names: list[Any], values: list[Any], write_opt: WriteOptions | None = None) None[source]¶
Put wide-column in database using key.
>>> cache.put_entity('key', ['a', 'b'], [1, 2])
- transaction(key: str | int | float | bytes | bool) AsyncGenerator[Cache, None][source]¶
Lock the db entry while using the context manager.
>>> async with cache.transaction('fish'): ... cache['fish'] = '🐟'
This works for asynchronous code (not multi-threading/processing)
While locked, other transactions on the same key will block
Actions outside of transaction blocks ignore ongoing transactions
Reads aren’t limited by ongoing transactions
slipstream.checkpointing module¶
Slipstream checkpointing.
- class slipstream.checkpointing.Checkpoint(name: str, dependent: AsyncIterable[Any], dependencies: list[Dependency], downtime_callback: Callable[[Checkpoint, Dependency], Any] | None = None, recovery_callback: Callable[[Checkpoint, Dependency], Any] | None = None, cache: ICache | None = None, cache_key_prefix: str = '_', pause_dependent: bool = True)[source]¶
Bases:
objectPulse the heartbeat of dependency streams to handle downtimes.
A checkpoint consists of a dependent stream and dependency streams.
>>> async def emoji(): ... for emoji in '🏆📞🐟👌': ... yield emoji
>>> dependent, dependency = emoji(), emoji()
The checkpoint and dependency names should not be changed once created, they are used to persist the checkpoint in the cache.
>>> c = Checkpoint( ... 'dependent', ... dependent=dependent, ... dependencies=[Dependency('dependency', dependency)], ... )
Checkpoints automatically handle pausing of dependent streams when they are bound to user handler functions (using handle):
>>> from slipstream import handle
>>> @handle(dependent) ... async def dependent_handler(msg): ... await c.check_pulse(marker=msg.value['event_timestamp']) ... yield msg.key, msg.value
>>> @handle(dependency) ... async def dependency_handler(msg): ... await c.heartbeat(msg.value['event_timestamp']) ... yield msg.key, msg.value
On the first pulse check, no message might have been received from dependency yet. Therefore the dependency checkpoint is updated with the initial state and marker of the dependent stream:
>>> from asyncio import run
>>> run(c.check_pulse(marker=datetime(2025, 1, 1, 10), offset=8)) >>> c['dependency'].checkpoint_marker datetime.datetime(2025, 1, 1, 10, 0)
When a message is received in dependency, send a heartbeat with its event time, which can be compared with the dependent event times to check for downtime:
>>> run(c.heartbeat(datetime(2025, 1, 1, 10, 30))) {'is_late': False, ...}
When the pulse is checked after a while, it’s apparent that no dependency messages have been received for 30 minutes:
>>> run(c.check_pulse(marker=datetime(2025, 1, 1, 11), offset=9)) datetime.timedelta(seconds=1800)
Because the downtime surpasses the default downtime_threshold, the dependent stream will be paused (and resumed when the recovery check succeeds). Callbacks can be provided for additional custom behavior.
As the dependency stream recovers, it has to “catch up” with the the dependent stream first. Until then, the dependent stream stays paused, and the dependency stream is marked as down.
>>> run(c.heartbeat(datetime(2025, 1, 1, 10, 45))) {'is_late': True, ...}
>>> run(c.heartbeat(datetime(2025, 1, 1, 11, 1))) {'is_late': False, ...}
If no cache is provided, the checkpoint lifespan will be limited to that of the application runtime.
- async check_pulse(marker: datetime | Any, **kwargs: Any) Any | None[source]¶
Update state that can be used as checkpoint.
- Args:
- marker (datetime | Any): Typically the event timestamp that is
compared to the event timestamp of a dependency stream.
- kwargs (Any): Any information that can be used for reprocessing any
incorrect data that was sent out during downtime of a dependency stream, stored in state.
- Returns:
- Any: Typically the timedelta between the last state_marker and
the checkpoint_marker since the stream went down.
- async heartbeat(marker: datetime | Any, dependency_name: str | None = None) dict[source]¶
Update checkpoint to latest state.
- Args:
- marker (datetime | Any): Typically the event timestamp that is
compared to the event timestamp of a dependent stream.
- dependency_name (str, optional): Required when there are multiple
dependencies to specify which one the heartbeat is for.
- class slipstream.checkpointing.Dependency(name: str, dependency: AsyncIterable[Any], downtime_threshold: Any = datetime.timedelta(seconds=600), downtime_check: Callable[[Checkpoint, Dependency], Any | Awaitable[Any]] | None = None, recovery_check: Callable[[Checkpoint, Dependency], bool | Awaitable[bool]] | None = None)[source]¶
Bases:
objectTrack the dependent stream state to recover from downtime.
The dependency name should not be changed once created, it is used to persist the dependency in the cache.
>>> async def emoji(): ... for emoji in '🏆📞🐟👌': ... yield emoji >>> Dependency('emoji', emoji()) {'checkpoint_state': None, 'checkpoint_marker': None}
- property downtime_check: Callable[[Checkpoint, Dependency], Any | Awaitable[Any]][source]¶
Is called when downtime is detected.
- property recovery_check: Callable[[Checkpoint, Dependency], bool | Awaitable[bool]][source]¶
Is called when downtime is resolved.
slipstream.codecs module¶
Slipstream codecs.
slipstream.core module¶
Core module.
- class slipstream.core.PausableStream(it: AsyncIterable[Any])[source]¶
Bases:
objectCan signal source stream to pause.
If it is of type AsyncGenerator, it will receive the signal through the yield send syntax in order to handle the state change appropriately. Alternatively, the signal property can be used directly.
For example, the Topic class uses the signal to pause the Consumer.
Any value can be sent as a Signal, but only Signal.PAUSE will trigger a pause in consumption of the iterable in PausableStream. Any other value will resume the PausableStream.
- class slipstream.core.Topic(name: str, conf: dict[str, Any] | None = None, offset: int | dict[int, int] | None = None, codec: ICodec | None = None, dry: bool = False)[source]¶
Bases:
objectAct as a consumer and producer.
>>> topic = Topic( ... 'emoji', ... { ... 'bootstrap_servers': 'localhost:29091', ... 'auto_offset_reset': 'earliest', ... 'group_id': 'demo', ... }, ... )
Loop over topic (iterable) to consume from it:
>>> async for msg in topic: ... print(msg.value)
Call topic (callable) with data to produce to it:
>>> await topic({'msg': 'Hello World!'})
slipstream.interfaces module¶
Slipstream interfaces.
- class slipstream.interfaces.ICache(*args: Any, **kwargs: Any)[source]¶
Bases:
objectBase class for cache implementations.
>>> class MyCache(ICache): ... def __init__(self): ... self.db = {} ... ... def __contains__(self, key: Key) -> bool: ... return key in self.db ... ... def __delitem__(self, key: Key) -> None: ... del self.db[key] ... ... def __getitem__(self, key: Key | list[Key]) -> Any: ... return self.db.get(key, None) ... ... def __setitem__(self, key: Key, val: Any) -> None: ... self.db[key] = val
>>> cache = MyCache() >>> cache['prize'] = '🏆' >>> cache['prize'] '🏆' >>> del cache['prize'] >>> 'prize' in cache False
slipstream.utils module¶
Slipstream utilities.
- class slipstream.utils.AsyncSynchronizedGenerator(gen: AsyncIterable[Any])[source]¶
Bases:
objectAsync generator that synchronizes values across copies.
- class slipstream.utils.PubSub(*args: Any, **kwargs: Any)[source]¶
Bases:
objectSingleton publish subscribe pattern class.
- async apublish(topic: str, *args: Any, **kwargs: Any) None[source]¶
Publish message to subscribers of topic.
- async iter_topic(topic: str) AsyncIterator[Any][source]¶
Asynchronously iterate over messages published to a topic.
- publish(topic: str, *args: Any, **kwargs: Any) None[source]¶
Publish message to subscribers of topic.