Featuresยถ
Slipstream missing a feature? Create a new issue.
Topicยถ
Topic can be used to interact with Kafka.
Depends on: aiokafka.
from asyncio import run
from slipstream import Topic
topic = Topic('emoji', {
'bootstrap_servers': 'localhost:29091',
'auto_offset_reset': 'earliest',
'group_instance_id': 'demo',
'group_id': 'demo',
})
async def main():
await topic(key='trophy', value='๐')
await topic(key='fish', value='๐')
async for msg in topic:
print(msg.key, msg.value)
run(main())
trophy ๐
fish ๐
Cacheยถ
Cache can be used to persist data.
Depends on: rocksdict.
from slipstream import Cache
cache = Cache('db')
cache['prize'] = '๐'
cache['phone'] = '๐'
for x, y in cache.items():
print(x, y)
phone ๐
prize ๐
Data is persisted to disk and automatically loaded upon restart, adding to application resilience.
By default, it will retain a window size of 25 MB using Fifo compaction, this can be configured by passing options in slipstream.caching.Cache.
Transactionยถ
To prevent race conditions, Cacheโs transaction context manager can be used:
async with cache.transaction('fish'):
cache['fish'] = '๐'
This only works for asynchronous code (not for multithreading or multiprocessing code)
Until a transaction is finished, other transactions for the same key will block
All actions outside of transaction blocks will ignore ongoing transactions (risk for race conditions)
Reads wonโt be limited by ongoing transactions
Proxyยถ
Proxy enables passing messages between handlers.
from asyncio import run
from slipstream import handle, stream
from slipstream.caching import Proxy
proxy = Proxy()
async def messages():
for emoji in '๐๐๐๐':
yield emoji
@handle(messages(), sink=[proxy])
def handler(emoji):
yield f'Proxied {emoji}!'
@handle(proxy, sink=[print])
def handler(msg):
yield msg
run(stream())
Proxied ๐!
Proxied ๐!
Proxied ๐!
Proxied ๐!
Confยถ
Conf can be used to set default kafka configurations.
from slipstream import Conf, Topic
Conf({
'bootstrap_servers': 'localhost:29091',
'group_id': 'default-demo',
})
topic1 = Topic('emoji', {'bootstrap_servers': 'localhost:29092'})
Conf({
'security_protocol': 'SASL_SSL',
'sasl_mechanism': 'PLAIN',
'sasl_plain_username': 'myuser',
'sasl_plain_password': 'mypass',
})
topic2 = Topic('conf', {'group_id': 'demo'})
print(topic1.conf)
print(topic2.conf)
{'bootstrap_servers': 'localhost:29092', 'group_id': 'default-demo'}
{'bootstrap_servers': 'localhost:29091', 'group_id': 'demo', 'security_protocol': 'SASL_SSL', 'sasl_mechanism': 'PLAIN', 'sasl_plain_username': 'myuser', 'sasl_plain_password': 'mypass'}
Yieldยถ
When your handler function returns zero or more values, use yield instead of return.
from asyncio import run
from slipstream import handle, stream
async def numbers():
for x in range(5):
yield x
@handle(numbers(), sink=[print])
def handler(n):
if n == 0:
yield f'zero: {n}'
if n % 2 == 0:
yield f'even: {n}'
run(stream())
zero: 0
even: 0
even: 2
even: 4
Codecยถ
Codecs are used for serializing and deserializing data.
from asyncio import run
from slipstream import Topic
from slipstream.codecs import JsonCodec
topic = Topic('emoji', {
'bootstrap_servers': 'localhost:29091',
'auto_offset_reset': 'earliest',
'group_instance_id': 'demo',
'group_id': 'demo',
}, codec=JsonCodec())
async def main():
await topic(key='fish', value={'msg': '๐'})
async for msg in topic:
print(msg.value)
run(main())
{'msg': '๐'}
You can define your own codecs using slipstream.interfaces.ICodec, see AvroCodec as an example.
Checkpointยถ
Checkpoints can be used to detect late data:
Example - Downtime recovery
Example - Downtime reprocessing
A checkpoint consists of one dependent, and many dependency streams:
async def emoji():
for emoji in '๐๐๐๐':
yield emoji
dependent, dependency = emoji(), emoji()
# Cache for persisting one or more Checkpoints
checkpoints_cache = Cache('state/checkpoints', target_table_size=10000)
c = Checkpoint(
'dependent', dependent=dependent,
dependencies=[Dependency('dependency', dependency)],
cache=checkpoints_cache
)
By default datetimes are compared to detect late data (preferably event times).
While using handle, the dependent stream will automatically be paused when any dependency streams are down or fall behind 10 minutes.
This can be configured in slipstream.checkpointing.Dependency. The pausing behavior can be disabled in slipstream.checkpointing.Checkpoint.
@handle(dependency)
async def dependency_handler(msg):
await c.heartbeat(msg.value['event_timestamp'])
yield msg.key, msg.value
@handle(dependent)
async def dependent_handler(msg):
await c.check_pulse(marker=msg.value['event_timestamp'])
yield msg.key, msg.value
When the dependency stream recovers, it might have to process a backlog of messages. So the dependent stream will remain paused until the dependency stream has caught up.
Heartbeat returns latency info which can be used to handle late data differently (e.g. buffer or drop):
latency = await c.heartbeat(msg.value['event_timestamp'])
latency
.. {
.. 'is_late': True,
.. 'dependent_marker': datetime(2025, 1, 1, 10),
.. 'dependency_marker': datetime(2025, 1, 1, 9),
.. }
Check pulse returns whatever the downtime_check of the dependency returns. By default itโs a timedelta object.
It also takes arbitrary additional state (kwargs), which can be utilized to reprocess data. In the example above โDowntime reprocessingโ, the offset was stored per partition, allowing to seek back to the time at which the dependency went down.
if downtime := await checkpoint.check_pulse(ts, **{
str(partition): offset
}):
print(f'Downtime detected: {downtime}')