Featuresο
Canβt find what you seek? Create a new issue.
Topicο
Topic can be used to interact with kafka.
from asyncio import run
from slipstream import Topic
topic = Topic('emoji', {
'bootstrap_servers': 'localhost:29091',
'auto_offset_reset': 'earliest',
'group_instance_id': 'demo',
'group_id': 'demo',
})
async def main():
await topic(key='trophy', value='π')
await topic(key='fish', value='π')
async for msg in topic:
print(msg.key, msg.value)
run(main())
trophy π
fish π
Topic uses aiokafka under the hood.
Cacheο
Cache can be used to persist data.
Install rocksdict separately or along with slipstream (unpinned):
pip install slipstream-async[cache]
from slipstream import Cache
cache = Cache('db')
cache['prize'] = 'π'
cache['phone'] = 'π'
for x, y in cache.items():
print(x, y)
phone π
prize π
Cache is a basic wrapper around rocksdict.
To prevent race conditions, the transaction context manager can be used:
with cache.transaction('fish'):
cache['fish'] = 'π'
This only works for asynchronous code (not for multithreading or multiprocessing code)
Until a transaction is finished, other transactions for the same key will block
All actions outside of transaction blocks will ignore ongoing transactions (risk for race conditions)
Reads wonβt be limited by ongoing transactions
Confο
Conf can be used to set default kafka configurations.
from slipstream import Conf, Topic
Conf({
'bootstrap_servers': 'localhost:29091',
'group_id': 'default-demo',
})
topic1 = Topic('emoji', {'bootstrap_servers': 'localhost:29092'})
Conf({
'security_protocol': 'SASL_SSL',
'sasl_mechanism': 'PLAIN',
'sasl_plain_username': 'myuser',
'sasl_plain_password': 'mypass',
})
topic2 = Topic('conf', {'group_id': 'demo'})
print(topic1.conf)
print(topic2.conf)
{'bootstrap_servers': 'localhost:29092', 'group_id': 'default-demo'}
{'bootstrap_servers': 'localhost:29091', 'group_id': 'demo', 'security_protocol': 'SASL_SSL', 'sasl_mechanism': 'PLAIN', 'sasl_plain_username': 'myuser', 'sasl_plain_password': 'mypass'}
Yieldο
When your handler function returns zero or more values, use yield instead of return.
from asyncio import run
from slipstream import handle, stream
async def numbers():
for x in range(5):
yield x
@handle(numbers(), sink=[print])
def handler(n):
if n == 0:
yield f'zero: {n}'
if n % 2 == 0:
yield f'even: {n}'
run(stream())
zero: 0
even: 0
even: 2
even: 4
Timerο
Async generators can be used to trigger handler functions.
from asyncio import run, sleep
from time import strftime
from slipstream import handle, stream
async def timer(interval=1.0):
while True:
yield
await sleep(interval)
@handle(timer())
def handler():
print(strftime('%H:%M:%S', localtime()))
run(stream())
23:25:10
23:25:11
23:25:12
...
Codecο
Codecs are used for serializing and deserializing data.
from asyncio import run
from slipstream import Topic
from slipstream.codecs import JsonCodec
topic = Topic('emoji', {
'bootstrap_servers': 'localhost:29091',
'auto_offset_reset': 'earliest',
'group_instance_id': 'demo',
'group_id': 'demo',
}, codec=JsonCodec())
async def main():
await topic(key='fish', value={'msg': 'π'})
async for msg in topic:
print(msg.value)
run(main())
{'msg': 'π'}
Custom codecs can be created using ICodec:
from io import BytesIO
from avro.io import BinaryDecoder, BinaryEncoder, DatumReader, DatumWriter
from avro.schema import Schema, parse
from slipstream.codecs import ICodec
class AvroCodec(ICodec):
"""Serializes/deserializes avro messages using schema."""
def __init__(self, path: str):
with open(path) as a:
self.schema = parse(a.read())
def encode(self, obj: Any) -> bytes:
writer = DatumWriter(self.schema)
bytes_writer = BytesIO()
encoder = BinaryEncoder(bytes_writer)
writer.write(obj, encoder)
return cast(bytes, bytes_writer.getvalue())
def decode(self, s: bytes) -> object:
bytes_reader = BytesIO(s)
decoder = BinaryDecoder(bytes_reader)
reader = DatumReader(self.schema)
return cast(object, reader.read(decoder))
Endpointο
We can install fastapi to add API endpoints.
from asyncio import gather, run, sleep
from time import strftime
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from uvicorn import Config, Server
from slipstream import Cache, handle, stream
app, cache = FastAPI(), Cache('db')
async def timer(interval=1.0):
while True:
yield
await sleep(interval)
@handle(timer(), sink=[cache, print])
def tick_tock():
yield 'time', strftime('%H:%M:%S')
async def cache_value_updates():
async for _, v in cache:
yield v + '\n'
@app.get('/updates')
async def updates():
return StreamingResponse(
cache_value_updates(),
media_type='text/event-stream'
)
async def main():
config = Config(app=app, host='0.0.0.0', port=8000)
server = Server(config)
await gather(stream(), server.serve())
if __name__ == '__main__':
run(main())
In this example weβre creating a streaming endpoint that emits cache changes:
An update is emitted only when the cache is called as a function (
cache(key, val))The cache can be used as an
AsyncIterator(async for k, v in cache)The
cache_value_updatesfunction formats values that have been updatedThe
updatesendpoint returns the emitted updates through aStreamingResponse
When we run the application and call the endpoint, weβll receive the cache value updates:
curl -N http://127.0.0.1:8000/updates
00:16:57
00:16:58
00:16:59
00:17:00
...
Checkpointο
A Checkpoint can be used to pulse the heartbeat of dependency streams to handle downtimes.
The easiest way to grasp the concept is by looking at the output of these examples:
A checkpoint consists of a dependent stream and dependency streams:
async def emoji():
for emoji in 'ππππ':
yield emoji
dependent, dependency = emoji(), emoji()
c = Checkpoint(
'dependent', dependent=dependent,
dependencies=[Dependency('dependency', dependency)]
)
Checkpoints automatically handle pausing of dependent streams when they are bound to user handler functions (using handle):
@handle(dependency)
async def dependency_handler(msg):
key, val = msg.key, msg.value
await c.heartbeat(val['event_timestamp'])
yield key, val
@handle(dependent)
async def dependent_handler(msg):
key, val, offset = msg.key, msg.value, msg.offset
c.check_pulse(marker=msg['event_timestamp'], offset=offset)
yield key, msg
On the first pulse check, no message might have been received from dependency yet. Therefore the dependency checkpoint is updated with the initial state and marker of the dependent stream:
from asyncio import run
run(c.check_pulse(marker=datetime(2025, 1, 1, 10), offset=8))
c['dependency'].checkpoint_marker
datetime.datetime(2025, 1, 1, 10, 0)
When a message is received in dependency, send a heartbeat with its event time, which can be compared with the dependent event times to check for downtime:
run(c.heartbeat(datetime(2025, 1, 1, 10, 30)))
When the pulse is checked after a while, itβs apparent that no dependency messages have been received for 30 minutes:
run(c.check_pulse(marker=datetime(2025, 1, 1, 11), offset=9))
datetime.timedelta(seconds=1800)