slipstream.checkpointing

Slipstream checkpointing.

Attributes

Classes

Dependency

Track the dependent stream state to recover from downtime.

Checkpoint

Pulse the heartbeat of dependency streams to handle downtimes.

Module Contents

slipstream.checkpointing.STATE_NAME = 'state'[source]
slipstream.checkpointing.STATE_MARKER_NAME = 'state_marker'[source]
slipstream.checkpointing.CHECKPOINT_STATE_NAME = 'checkpoint_state'[source]
slipstream.checkpointing.CHECKPOINT_MARKER_NAME = 'checkpoint_marker'[source]
slipstream.checkpointing.CHECKPOINTS_NAME = 'checkpoints'[source]
class slipstream.checkpointing.Dependency(name: str, dependency: collections.abc.AsyncIterable[Any], downtime_threshold: Any = timedelta(minutes=10), downtime_check: slipstream.utils.AsyncCallable[[Checkpoint, Dependency], Any] | None = None, recovery_check: slipstream.utils.AsyncCallable[[Checkpoint, Dependency], bool] | None = None)[source]

Track the dependent stream state to recover from downtime.

The dependency name should not be changed once created, it is used to persist the dependency in the cache.

>>> async def emoji():
...     for emoji in '🏆📞🐟👌':
...         yield emoji
>>> Dependency('emoji', emoji())
{'checkpoint_state': None, 'checkpoint_marker': None}
property downtime_check: slipstream.utils.AsyncCallable[[Checkpoint, Dependency], Any][source]

Is called when downtime is detected.

property recovery_check: slipstream.utils.AsyncCallable[[Checkpoint, Dependency], bool][source]

Is called when downtime is resolved.

name[source]
dependency[source]
checkpoint_state = None[source]
checkpoint_marker = None[source]
downtime_threshold[source]
is_down = False[source]
save(cache: slipstream.interfaces.ICache, cache_key_prefix: str, checkpoint_state: Any, checkpoint_marker: datetime.datetime) None[source]

Save checkpoint state to cache.

load(cache: slipstream.interfaces.ICache, cache_key_prefix: str) None[source]

Load checkpoint state from cache.

class slipstream.checkpointing.Checkpoint(name: str, dependent: collections.abc.AsyncIterable[Any], dependencies: list[Dependency], downtime_callback: collections.abc.Callable[[Checkpoint, Dependency], Any] | None = None, recovery_callback: collections.abc.Callable[[Checkpoint, Dependency], Any] | None = None, cache: slipstream.interfaces.ICache | None = None, cache_key_prefix: str = '_', pause_dependent: bool = True)[source]

Pulse the heartbeat of dependency streams to handle downtimes.

A checkpoint consists of a dependent stream and dependency streams.

>>> async def emoji():
...     for emoji in '🏆📞🐟👌':
...         yield emoji
>>> dependent, dependency = emoji(), emoji()

The checkpoint and dependency names should not be changed once created, they are used to persist the checkpoint in the cache.

>>> c = Checkpoint(
...     'dependent',
...     dependent=dependent,
...     dependencies=[Dependency('dependency', dependency)],
... )

Checkpoints automatically handle pausing of dependent streams when they are bound to user handler functions (using handle):

>>> from slipstream import handle
>>> @handle(dependent)
... async def dependent_handler(msg):
...     await c.check_pulse(marker=msg.value['event_timestamp'])
...     yield msg.key, msg.value
>>> @handle(dependency)
... async def dependency_handler(msg):
...     await c.heartbeat(msg.value['event_timestamp'])
...     yield msg.key, msg.value

On the first pulse check, no message might have been received from dependency yet. Therefore the dependency checkpoint is updated with the initial state and marker of the dependent stream:

>>> from asyncio import run
>>> run(c.check_pulse(marker=datetime(2025, 1, 1, 10), offset=8))
>>> c['dependency'].checkpoint_marker
datetime.datetime(2025, 1, 1, 10, 0)

When a message is received in dependency, send a heartbeat with its event time, which can be compared with the dependent event times to check for downtime:

>>> run(c.heartbeat(datetime(2025, 1, 1, 10, 30)))
{'is_late': False, ...}

When the pulse is checked after a while, it’s apparent that no dependency messages have been received for 30 minutes:

>>> run(c.check_pulse(marker=datetime(2025, 1, 1, 11), offset=9))
datetime.timedelta(seconds=1800)

Because the downtime surpasses the default downtime_threshold, the dependent stream will be paused (and resumed when the recovery check succeeds). Callbacks can be provided for additional custom behavior.

As the dependency stream recovers, it has to “catch up” with the the dependent stream first. Until then, the dependent stream stays paused, and the dependency stream is marked as down.

>>> run(c.heartbeat(datetime(2025, 1, 1, 10, 45)))
{'is_late': True, ...}
>>> run(c.heartbeat(datetime(2025, 1, 1, 11, 1)))
{'is_late': False, ...}

If no cache is provided, the checkpoint lifespan will be limited to that of the application runtime.

name[source]
dependent[source]
dependencies: dict[str, Dependency][source]
pause_dependent = True[source]
state[source]
state_marker = None[source]
async heartbeat(marker: datetime.datetime | Any, dependency_name: str | None = None) dict[source]

Update checkpoint to latest state.

Args:
marker (datetime | Any): Typically the event timestamp that is

compared to the event timestamp of a dependent stream.

dependency_name (str, optional): Required when there are multiple

dependencies to specify which one the heartbeat is for.

async check_pulse(marker: datetime.datetime | Any, **kwargs: Any) Any | None[source]

Update state that can be used as checkpoint.

Args:
marker (datetime | Any): Typically the event timestamp that is

compared to the event timestamp of a dependency stream.

kwargs (Any): Any information that can be used for reprocessing any

incorrect data that was sent out during downtime of a dependency stream, stored in state.

Returns:
Any: Typically the timedelta between the last state_marker and

the checkpoint_marker since the stream went down.