slipstream package

Top level objects.

class slipstream.Cache(*args: Any, **kwargs: Any)[source]

Bases: ICache

Create a RocksDB database in the specified folder.

>>> cache = Cache('db/mycache')  

The cache instance acts as a callable to store data:

>>> cache('key', {'msg': 'Hello World!'})  
>>> cache['key']  
{'msg': 'Hello World!'}
cancel_all_background(wait: bool = True) None[source]

Request stopping background work.

close() None[source]

Flush memory to disk, and drop the current column family.

columns(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None, prefix: str | int | float | bytes | bool | None = None) Iterator[list[tuple[Any, Any]]][source]

Get values as widecolumns.

compact_range(begin: str | int | float | bytes | bool | None, end: str | int | float | bytes | bool | None, compact_opt: CompactOptions | None = None) None[source]

Run manual compaction on range for the current column family.

create_column_family(name: str, options: Options | None = None) Rdict[source]

Craete column family.

delete(key: str | int | float | bytes | bool, write_opt: WriteOptions | None = None) None[source]

Delete item from database.

delete_range(begin: str | int | float | bytes | bool, end: str | int | float | bytes | bool, write_opt: WriteOptions | None = None) None[source]

Delete database items, excluding end.

destroy(options: Options | None = None) None[source]

Delete the database.

drop_column_family(name: str) None[source]

Drop column family by name.

entities(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None, prefix: str | int | float | bytes | bool | None = None) Iterator[tuple[str | int | float | bytes | bool, list[tuple[Any, Any]]]][source]

Get keys and entities.

flush(wait: bool = True) None[source]

Manually flush the current column family.

flush_wal(sync: bool = True) None[source]

Manually flush the WAL buffer.

get(key: str | int | float | bytes | bool | list[str | int | float | bytes | bool], default: T | None = None, read_opt: ReadOptions | None = None) Any | T[source]

Get item from database by key.

get_column_family(name: str) Rdict[source]

Get column family by name.

get_column_family_handle(name: str) ColumnFamily[source]

Get column family handle by name.

get_entity(key: str | int | float | bytes | bool | list[str | int | float | bytes | bool], default: Any | None = None, read_opt: ReadOptions | None = None) list[tuple[Any, Any]] | None[source]

Get wide-column from database by key.

>>> cache.get_entity('key')  
[('a', 1), ('b', 2)]
ingest_external_file(paths: list[str], opts: IngestExternalFileOptions | None = None) None[source]

Load list of SST files into current column family.

items(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None, prefix: str | int | float | bytes | bool | None = None) Iterator[tuple[str | int | float | bytes | bool, Any]][source]

Get tuples of key-value pairs.

iter(read_opt: ReadOptions | None = None) RdictIter[source]

Get iterable.

key_may_exist(key: str | int | float | bytes | bool, fetch: bool = False, read_opt: ReadOptions | None = None) bool | tuple[bool, Any][source]

Check if a key exist without performing IO operations.

keys(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None, prefix: str | int | float | bytes | bool | None = None) Iterator[str | int | float | bytes | bool][source]

Get keys.

latest_sequence_number() int[source]

Get sequence number of the most recent transaction.

list_cf(path: str, options: Options | None = None) list[str][source]

List column families.

live_files() list[dict[str, Any]][source]

Get list of all table files with their level, start/end key.

path() str[source]

Get current database path.

property_int_value(name: str) int | None[source]

Get property as int by name from current column family.

property_value(name: str) str | None[source]

Get property by name from current column family.

put(key: str | int | float | bytes | bool, value: Any, write_opt: WriteOptions | None = None) None[source]

Put item in database using key.

put_entity(key: str | int | float | bytes | bool, names: list[Any], values: list[Any], write_opt: WriteOptions | None = None) None[source]

Put wide-column in database using key.

>>> cache.put_entity('key', ['a', 'b'], [1, 2])  
repair(path: str, options: Options | None = None) None[source]

Repair the database.

set_dumps(dumps: Callable[[Any], bytes]) None[source]

Set custom dumps function.

set_loads(dumps: Callable[[bytes], Any]) None[source]

Set custom loads function.

set_options(options: dict[str, str]) None[source]

Set options for current column family.

set_read_options(read_opt: ReadOptions) None[source]

Set custom read options.

set_write_options(write_opt: WriteOptions) None[source]

Set custom write options.

snapshot() Snapshot[source]

Create snapshot of current column family.

transaction(key: str | int | float | bytes | bool) AsyncGenerator[Cache, None][source]

Lock the db entry while using the context manager.

>>> async with cache.transaction('fish'):  
...     cache['fish'] = '🐟'
  • This works for asynchronous code (not multi-threading/processing)

  • While locked, other transactions on the same key will block

  • Actions outside of transaction blocks ignore ongoing transactions

  • Reads aren’t limited by ongoing transactions

try_catch_up_with_primary() None[source]

Try to catch up with the primary by reading log files.

values(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None, prefix: str | int | float | bytes | bool | None = None) Iterator[Any][source]

Get values.

write(write_batch: WriteBatch, write_opt: WriteOptions | None = None) None[source]

Write a batch.

class slipstream.Conf(*args: Any, **kwargs: Any)[source]

Bases: object

The application configuration singleton.

Register iterables (sources) and handlers (sinks): >>> from slipstream import handle

>>> async def messages():
...     for emoji in '🏆📞🐟👌':
...         yield emoji
>>> @handle(messages(), sink=[print])
... def handle_message(msg):
...     yield f'Hello {msg}!'

Set application kafka configuration (optional):

>>> Conf({'bootstrap_servers': 'localhost:29091'})
{'bootstrap_servers': 'localhost:29091'}

Provide exit hooks:

>>> async def exit_hook():
...     print('Shutting down application.')
>>> c = Conf()
>>> c.register_exit_hook(exit_hook)
exit_hooks: ClassVar[set[Callable[[P], T | Awaitable[T]]]] = {}
iterables: ClassVar[dict[str, PausableStream]] = {}
pipes: ClassVar[dict[Callable[[P], T | Awaitable[T]], tuple[str, tuple[Callable[[AsyncIterable[Any]], AsyncIterable[Any]], ...]]]] = {}
pubsub = <slipstream.utils.PubSub object>
register_exit_hook(exit_hook: Callable[[P], T | Awaitable[T]]) None[source]

Add exit hook that’s called on shutdown.

register_handler(key: str, handler: Callable[[P], T | Awaitable[T]], *pipe: Callable[[AsyncIterable[Any]], AsyncIterable[Any]]) None[source]

Add handler to global Conf.

register_iterable(key: str, it: AsyncIterable[Any]) None[source]

Add iterable to global Conf.

async start(**kwargs: Any) None[source]

Start processing registered iterables.

class slipstream.Topic(name: str, conf: dict[str, Any] | None = None, offset: int | dict[int, int] | None = None, codec: ICodec | None = None, dry: bool = False)[source]

Bases: object

Act as a consumer and producer.

>>> topic = Topic(
...     'emoji',
...     {
...         'bootstrap_servers': 'localhost:29091',
...         'auto_offset_reset': 'earliest',
...         'group_id': 'demo',
...     },
... )

Loop over topic (iterable) to consume from it:

>>> async for msg in topic:  
...     print(msg.value)

Call topic (callable) with data to produce to it:

>>> await topic({'msg': 'Hello World!'})  
property admin: AIOKafkaClient

Get started instance of Kafka admin client.

async asend(value: Any) ConsumerRecord[Any, Any][source]

Send data to generator.

async exit_hook() None[source]

Cleanup and finalization.

async get_consumer() AIOKafkaConsumer[source]

Get started instance of Kafka consumer.

async get_producer() AIOKafkaProducer[source]

Get started instance of Kafka producer.

async init_generator() AsyncGenerator[Literal[Signal.SENTINEL] | ConsumerRecord[Any, Any], bool | None][source]

Initialize generator.

async seek(offset: int | dict[int, int], consumer: AIOKafkaConsumer | None = None, timeout: float = 30.0) None[source]

Seek to offset.

slipstream.handle(*iterable: AsyncIterable[Any], pipe: Iterable[Callable[[AsyncIterable[Any]], AsyncIterable[Any]]] = [], sink: Iterable[Callable | Callable[[P], T | Awaitable[T]]] = []) Callable[[Callable[[P], T | Awaitable[T]]], Callable[[...], Awaitable[Any]]][source]

Bind sources and sinks to the handler function.

Ex:
>>> topic = Topic('demo')  
>>> cache = Cache('state/demo')  
>>> @handle(topic, sink=[print, cache])  
... def handler(msg, **kwargs):
...     return msg.key, msg.value
slipstream.stream(**kwargs: Any) Coroutine[None, None, None][source]

Start processing iterables bound by handle function.

Ex:
>>> from asyncio import run
>>> kwargs = {
...     'env': 'DEV',
... }
>>> run(stream(**kwargs))  

Submodules

slipstream.caching module

Slipstream caching.

class slipstream.caching.Cache(*args: Any, **kwargs: Any)[source]

Bases: ICache

Create a RocksDB database in the specified folder.

>>> cache = Cache('db/mycache')  

The cache instance acts as a callable to store data:

>>> cache('key', {'msg': 'Hello World!'})  
>>> cache['key']  
{'msg': 'Hello World!'}
cancel_all_background(wait: bool = True) None[source]

Request stopping background work.

close() None[source]

Flush memory to disk, and drop the current column family.

columns(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None, prefix: str | int | float | bytes | bool | None = None) Iterator[list[tuple[Any, Any]]][source]

Get values as widecolumns.

compact_range(begin: str | int | float | bytes | bool | None, end: str | int | float | bytes | bool | None, compact_opt: CompactOptions | None = None) None[source]

Run manual compaction on range for the current column family.

create_column_family(name: str, options: Options | None = None) Rdict[source]

Craete column family.

delete(key: str | int | float | bytes | bool, write_opt: WriteOptions | None = None) None[source]

Delete item from database.

delete_range(begin: str | int | float | bytes | bool, end: str | int | float | bytes | bool, write_opt: WriteOptions | None = None) None[source]

Delete database items, excluding end.

destroy(options: Options | None = None) None[source]

Delete the database.

drop_column_family(name: str) None[source]

Drop column family by name.

entities(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None, prefix: str | int | float | bytes | bool | None = None) Iterator[tuple[str | int | float | bytes | bool, list[tuple[Any, Any]]]][source]

Get keys and entities.

flush(wait: bool = True) None[source]

Manually flush the current column family.

flush_wal(sync: bool = True) None[source]

Manually flush the WAL buffer.

get(key: str | int | float | bytes | bool | list[str | int | float | bytes | bool], default: T | None = None, read_opt: ReadOptions | None = None) Any | T[source]

Get item from database by key.

get_column_family(name: str) Rdict[source]

Get column family by name.

get_column_family_handle(name: str) ColumnFamily[source]

Get column family handle by name.

get_entity(key: str | int | float | bytes | bool | list[str | int | float | bytes | bool], default: Any | None = None, read_opt: ReadOptions | None = None) list[tuple[Any, Any]] | None[source]

Get wide-column from database by key.

>>> cache.get_entity('key')  
[('a', 1), ('b', 2)]
ingest_external_file(paths: list[str], opts: IngestExternalFileOptions | None = None) None[source]

Load list of SST files into current column family.

items(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None, prefix: str | int | float | bytes | bool | None = None) Iterator[tuple[str | int | float | bytes | bool, Any]][source]

Get tuples of key-value pairs.

iter(read_opt: ReadOptions | None = None) RdictIter[source]

Get iterable.

key_may_exist(key: str | int | float | bytes | bool, fetch: bool = False, read_opt: ReadOptions | None = None) bool | tuple[bool, Any][source]

Check if a key exist without performing IO operations.

keys(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None, prefix: str | int | float | bytes | bool | None = None) Iterator[str | int | float | bytes | bool][source]

Get keys.

latest_sequence_number() int[source]

Get sequence number of the most recent transaction.

list_cf(path: str, options: Options | None = None) list[str][source]

List column families.

live_files() list[dict[str, Any]][source]

Get list of all table files with their level, start/end key.

path() str[source]

Get current database path.

property_int_value(name: str) int | None[source]

Get property as int by name from current column family.

property_value(name: str) str | None[source]

Get property by name from current column family.

put(key: str | int | float | bytes | bool, value: Any, write_opt: WriteOptions | None = None) None[source]

Put item in database using key.

put_entity(key: str | int | float | bytes | bool, names: list[Any], values: list[Any], write_opt: WriteOptions | None = None) None[source]

Put wide-column in database using key.

>>> cache.put_entity('key', ['a', 'b'], [1, 2])  
repair(path: str, options: Options | None = None) None[source]

Repair the database.

set_dumps(dumps: Callable[[Any], bytes]) None[source]

Set custom dumps function.

set_loads(dumps: Callable[[bytes], Any]) None[source]

Set custom loads function.

set_options(options: dict[str, str]) None[source]

Set options for current column family.

set_read_options(read_opt: ReadOptions) None[source]

Set custom read options.

set_write_options(write_opt: WriteOptions) None[source]

Set custom write options.

snapshot() Snapshot[source]

Create snapshot of current column family.

transaction(key: str | int | float | bytes | bool) AsyncGenerator[Cache, None][source]

Lock the db entry while using the context manager.

>>> async with cache.transaction('fish'):  
...     cache['fish'] = '🐟'
  • This works for asynchronous code (not multi-threading/processing)

  • While locked, other transactions on the same key will block

  • Actions outside of transaction blocks ignore ongoing transactions

  • Reads aren’t limited by ongoing transactions

try_catch_up_with_primary() None[source]

Try to catch up with the primary by reading log files.

values(backwards: bool = False, from_key: str | int | float | bytes | bool | None = None, read_opt: ReadOptions | None = None, prefix: str | int | float | bytes | bool | None = None) Iterator[Any][source]

Get values.

write(write_batch: WriteBatch, write_opt: WriteOptions | None = None) None[source]

Write a batch.

slipstream.checkpointing module

Slipstream checkpointing.

class slipstream.checkpointing.Checkpoint(name: str, dependent: AsyncIterable[Any], dependencies: list[Dependency], downtime_callback: Callable[[Checkpoint, Dependency], Any] | None = None, recovery_callback: Callable[[Checkpoint, Dependency], Any] | None = None, cache: ICache | None = None, cache_key_prefix: str = '_', pause_dependent: bool = True)[source]

Bases: object

Pulse the heartbeat of dependency streams to handle downtimes.

A checkpoint consists of a dependent stream and dependency streams.

>>> async def emoji():
...     for emoji in '🏆📞🐟👌':
...         yield emoji
>>> dependent, dependency = emoji(), emoji()

The checkpoint and dependency names should not be changed once created, they are used to persist the checkpoint in the cache.

>>> c = Checkpoint(
...     'dependent',
...     dependent=dependent,
...     dependencies=[Dependency('dependency', dependency)],
... )

Checkpoints automatically handle pausing of dependent streams when they are bound to user handler functions (using handle):

>>> from slipstream import handle
>>> @handle(dependent)
... async def dependent_handler(msg):
...     await c.check_pulse(marker=msg.value['event_timestamp'])
...     yield msg.key, msg.value
>>> @handle(dependency)
... async def dependency_handler(msg):
...     await c.heartbeat(msg.value['event_timestamp'])
...     yield msg.key, msg.value

On the first pulse check, no message might have been received from dependency yet. Therefore the dependency checkpoint is updated with the initial state and marker of the dependent stream:

>>> from asyncio import run
>>> run(c.check_pulse(marker=datetime(2025, 1, 1, 10), offset=8))
>>> c['dependency'].checkpoint_marker
datetime.datetime(2025, 1, 1, 10, 0)

When a message is received in dependency, send a heartbeat with its event time, which can be compared with the dependent event times to check for downtime:

>>> run(c.heartbeat(datetime(2025, 1, 1, 10, 30)))
{'is_late': False, ...}

When the pulse is checked after a while, it’s apparent that no dependency messages have been received for 30 minutes:

>>> run(c.check_pulse(marker=datetime(2025, 1, 1, 11), offset=9))
datetime.timedelta(seconds=1800)

Because the downtime surpasses the default downtime_threshold, the dependent stream will be paused (and resumed when the recovery check succeeds). Callbacks can be provided for additional custom behavior.

As the dependency stream recovers, it has to “catch up” with the the dependent stream first. Until then, the dependent stream stays paused, and the dependency stream is marked as down.

>>> run(c.heartbeat(datetime(2025, 1, 1, 10, 45)))
{'is_late': True, ...}
>>> run(c.heartbeat(datetime(2025, 1, 1, 11, 1)))
{'is_late': False, ...}

If no cache is provided, the checkpoint lifespan will be limited to that of the application runtime.

async check_pulse(marker: datetime | Any, **kwargs: Any) Any | None[source]

Update state that can be used as checkpoint.

Args:
marker (datetime | Any): Typically the event timestamp that is

compared to the event timestamp of a dependency stream.

kwargs (Any): Any information that can be used for reprocessing any

incorrect data that was sent out during downtime of a dependency stream, stored in state.

Returns:
Any: Typically the timedelta between the last state_marker and

the checkpoint_marker since the stream went down.

async heartbeat(marker: datetime | Any, dependency_name: str | None = None) dict[source]

Update checkpoint to latest state.

Args:
marker (datetime | Any): Typically the event timestamp that is

compared to the event timestamp of a dependent stream.

dependency_name (str, optional): Required when there are multiple

dependencies to specify which one the heartbeat is for.

class slipstream.checkpointing.Dependency(name: str, dependency: AsyncIterable[Any], downtime_threshold: Any = datetime.timedelta(seconds=600), downtime_check: Callable[[Checkpoint, Dependency], Any | Awaitable[Any]] | None = None, recovery_check: Callable[[Checkpoint, Dependency], bool | Awaitable[bool]] | None = None)[source]

Bases: object

Track the dependent stream state to recover from downtime.

The dependency name should not be changed once created, it is used to persist the dependency in the cache.

>>> async def emoji():
...     for emoji in '🏆📞🐟👌':
...         yield emoji
>>> Dependency('emoji', emoji())
{'checkpoint_state': None, 'checkpoint_marker': None}
property downtime_check: Callable[[Checkpoint, Dependency], Any | Awaitable[Any]][source]

Is called when downtime is detected.

load(cache: ICache, cache_key_prefix: str) None[source]

Load checkpoint state from cache.

property recovery_check: Callable[[Checkpoint, Dependency], bool | Awaitable[bool]][source]

Is called when downtime is resolved.

save(cache: ICache, cache_key_prefix: str, checkpoint_state: Any, checkpoint_marker: datetime) None[source]

Save checkpoint state to cache.

slipstream.codecs module

Slipstream codecs.

class slipstream.codecs.JsonCodec[source]

Bases: ICodec

Serialize/deserialize json messages.

decode(s: bytes) object[source]

Deserialize message.

>>> c = JsonCodec()
>>> c.decode(b'{"key": 1}')
{'key': 1}
encode(obj: Any) bytes[source]

Serialize message.

>>> c = JsonCodec()
>>> c.encode({'key': 1})
b'{"key": 1}'

slipstream.core module

Core module.

class slipstream.core.PausableStream(it: AsyncIterable[Any])[source]

Bases: object

Can signal source stream to pause.

If it is of type AsyncGenerator, it will receive the signal through the yield send syntax in order to handle the state change appropriately. Alternatively, the signal property can be used directly.

For example, the Topic class uses the signal to pause the Consumer.

Any value can be sent as a Signal, but only Signal.PAUSE will trigger a pause in consumption of the iterable in PausableStream. Any other value will resume the PausableStream.

property iterable: AsyncIterable[Any][source]

Get iterable.

send_signal(signal: Signal | Any) None[source]

Send signal to stream.

class slipstream.core.Topic(name: str, conf: dict[str, Any] | None = None, offset: int | dict[int, int] | None = None, codec: ICodec | None = None, dry: bool = False)[source]

Bases: object

Act as a consumer and producer.

>>> topic = Topic(
...     'emoji',
...     {
...         'bootstrap_servers': 'localhost:29091',
...         'auto_offset_reset': 'earliest',
...         'group_id': 'demo',
...     },
... )

Loop over topic (iterable) to consume from it:

>>> async for msg in topic:  
...     print(msg.value)

Call topic (callable) with data to produce to it:

>>> await topic({'msg': 'Hello World!'})  
property admin: AIOKafkaClient[source]

Get started instance of Kafka admin client.

async asend(value: Any) ConsumerRecord[Any, Any][source]

Send data to generator.

async exit_hook() None[source]

Cleanup and finalization.

async get_consumer() AIOKafkaConsumer[source]

Get started instance of Kafka consumer.

async get_producer() AIOKafkaProducer[source]

Get started instance of Kafka producer.

async init_generator() AsyncGenerator[Literal[Signal.SENTINEL] | ConsumerRecord[Any, Any], bool | None][source]

Initialize generator.

async seek(offset: int | dict[int, int], consumer: AIOKafkaConsumer | None = None, timeout: float = 30.0) None[source]

Seek to offset.

slipstream.interfaces module

Slipstream interfaces.

class slipstream.interfaces.ICache(*args: Any, **kwargs: Any)[source]

Bases: object

Base class for cache implementations.

>>> class MyCache(ICache):
...     def __init__(self):
...         self.db = {}
...
...     def __contains__(self, key: Key) -> bool:
...         return key in self.db
...
...     def __delitem__(self, key: Key) -> None:
...         del self.db[key]
...
...     def __getitem__(self, key: Key | list[Key]) -> Any:
...         return self.db.get(key, None)
...
...     def __setitem__(self, key: Key, val: Any) -> None:
...         self.db[key] = val
>>> cache = MyCache()
>>> cache['prize'] = '🏆'
>>> cache['prize']
'🏆'
>>> del cache['prize']
>>> 'prize' in cache
False
class slipstream.interfaces.ICodec[source]

Bases: object

Base class for codecs.

abstract decode(s: bytes) object[source]

Deserialize object.

abstract encode(obj: Any) bytes[source]

Serialize object.

class slipstream.interfaces.SourceSinkMeta(name, bases, namespace, **kwargs)[source]

Bases: ABCMeta

Metaclass adds default source/sink functionalities.

slipstream.utils module

Slipstream utilities.

class slipstream.utils.AsyncSynchronizedGenerator(gen: AsyncIterable[Any])[source]

Bases: object

Async generator that synchronizes values across copies.

copy() _GeneratorCopy[source]

Create a synchronized copy of this generator.

property value: Any[source]

Get current value the generator is holding.

class slipstream.utils.PubSub(*args: Any, **kwargs: Any)[source]

Bases: object

Singleton publish subscribe pattern class.

async apublish(topic: str, *args: Any, **kwargs: Any) None[source]

Publish message to subscribers of topic.

async iter_topic(topic: str) AsyncIterator[Any][source]

Asynchronously iterate over messages published to a topic.

publish(topic: str, *args: Any, **kwargs: Any) None[source]

Publish message to subscribers of topic.

subscribe(topic: str, listener: Callable[[P], T | Awaitable[T]]) None[source]

Subscribe callable to topic.

unsubscribe(topic: str, listener: Callable[[P], T | Awaitable[T]]) None[source]

Unsubscribe callable from topic.

class slipstream.utils.Signal(value)[source]

Bases: Enum

Signals can be exchanged with streams.

SENTINEL represents an absent yield value PAUSE represents the signal to pause stream RESUME represents the signal to resume stream

PAUSE = 1[source]
RESUME = 2[source]
SENTINEL = 0[source]
STOP = 3[source]
class slipstream.utils.Singleton[source]

Bases: type

Maintain a single instance of a class.

async slipstream.utils.awaitable(x: Any) Any[source]

Convert into awaitable.

slipstream.utils.get_param_names(o: Any) tuple[str, ...][source]

Return function parameter names.

slipstream.utils.iscoroutinecallable(o: Any) bool[source]

Check whether object is coroutine.