Features

Can’t find what you seek? Create a new issue.

Topic

Topic can be used to interact with kafka.

from asyncio import run

from slipstream import Topic

topic = Topic('emoji', {
    'bootstrap_servers': 'localhost:29091',
    'auto_offset_reset': 'earliest',
    'group_instance_id': 'demo',
    'group_id': 'demo',
})

async def main():
    await topic(key='trophy', value='πŸ†')
    await topic(key='fish', value='🐟')

    async for msg in topic:
        print(msg.key, msg.value)

run(main())
trophy πŸ†
fish 🐟

Topic uses aiokafka under the hood.

Cache

Cache can be used to persist data.

Install rocksdict separately or along with slipstream (unpinned):

pip install slipstream-async[cache]
from slipstream import Cache

cache = Cache('db')

cache['prize'] = 'πŸ†'
cache['phone'] = 'πŸ“ž'

for x, y in cache.items():
    print(x, y)
phone πŸ“ž
prize πŸ†

Cache is a basic wrapper around rocksdict.

To prevent race conditions, the transaction context manager can be used:

with cache.transaction('fish'):
    cache['fish'] = '🐟'
  • This only works for asynchronous code (not for multithreading or multiprocessing code)

  • Until a transaction is finished, other transactions for the same key will block

  • All actions outside of transaction blocks will ignore ongoing transactions (risk for race conditions)

  • Reads won’t be limited by ongoing transactions

Conf

Conf can be used to set default kafka configurations.

from slipstream import Conf, Topic

Conf({
    'bootstrap_servers': 'localhost:29091',
    'group_id': 'default-demo',
})

topic1 = Topic('emoji', {'bootstrap_servers': 'localhost:29092'})

Conf({
    'security_protocol': 'SASL_SSL',
    'sasl_mechanism': 'PLAIN',
    'sasl_plain_username': 'myuser',
    'sasl_plain_password': 'mypass',
})

topic2 = Topic('conf', {'group_id': 'demo'})

print(topic1.conf)
print(topic2.conf)
{'bootstrap_servers': 'localhost:29092', 'group_id': 'default-demo'}
{'bootstrap_servers': 'localhost:29091', 'group_id': 'demo', 'security_protocol': 'SASL_SSL', 'sasl_mechanism': 'PLAIN', 'sasl_plain_username': 'myuser', 'sasl_plain_password': 'mypass'}

Yield

When your handler function returns zero or more values, use yield instead of return.

from asyncio import run

from slipstream import handle, stream

async def numbers():
    for x in range(5):
        yield x

@handle(numbers(), sink=[print])
def handler(n):
    if n == 0:
        yield f'zero: {n}'
    if n % 2 == 0:
        yield f'even: {n}'

run(stream())
zero: 0
even: 0
even: 2
even: 4

Timer

Async generators can be used to trigger handler functions.

from asyncio import run, sleep
from time import strftime

from slipstream import handle, stream

async def timer(interval=1.0):
    while True:
        yield
        await sleep(interval)

@handle(timer())
def handler():
    print(strftime('%H:%M:%S', localtime()))

run(stream())
23:25:10
23:25:11
23:25:12
...

Codec

Codecs are used for serializing and deserializing data.

from asyncio import run

from slipstream import Topic
from slipstream.codecs import JsonCodec

topic = Topic('emoji', {
    'bootstrap_servers': 'localhost:29091',
    'auto_offset_reset': 'earliest',
    'group_instance_id': 'demo',
    'group_id': 'demo',
}, codec=JsonCodec())

async def main():
    await topic(key='fish', value={'msg': '🐟'})

    async for msg in topic:
        print(msg.value)

run(main())
{'msg': '🐟'}

Custom codecs can be created using ICodec:

from io import BytesIO

from avro.io import BinaryDecoder, BinaryEncoder, DatumReader, DatumWriter
from avro.schema import Schema, parse

from slipstream.codecs import ICodec

class AvroCodec(ICodec):
    """Serializes/deserializes avro messages using schema."""

    def __init__(self, path: str):
        with open(path) as a:
            self.schema = parse(a.read())

    def encode(self, obj: Any) -> bytes:
        writer = DatumWriter(self.schema)
        bytes_writer = BytesIO()
        encoder = BinaryEncoder(bytes_writer)
        writer.write(obj, encoder)
        return cast(bytes, bytes_writer.getvalue())

    def decode(self, s: bytes) -> object:
        bytes_reader = BytesIO(s)
        decoder = BinaryDecoder(bytes_reader)
        reader = DatumReader(self.schema)
        return cast(object, reader.read(decoder))

Endpoint

We can install fastapi to add API endpoints.

from asyncio import gather, run, sleep
from time import strftime

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from uvicorn import Config, Server

from slipstream import Cache, handle, stream

app, cache = FastAPI(), Cache('db')

async def timer(interval=1.0):
    while True:
        yield
        await sleep(interval)

@handle(timer(), sink=[cache, print])
def tick_tock():
    yield 'time', strftime('%H:%M:%S')

async def cache_value_updates():
    async for _, v in cache:
        yield v + '\n'

@app.get('/updates')
async def updates():
    return StreamingResponse(
        cache_value_updates(),
        media_type='text/event-stream'
    )

async def main():
    config = Config(app=app, host='0.0.0.0', port=8000)
    server = Server(config)
    await gather(stream(), server.serve())

if __name__ == '__main__':
    run(main())

In this example we’re creating a streaming endpoint that emits cache changes:

  • An update is emitted only when the cache is called as a function (cache(key, val))

  • The cache can be used as an AsyncIterator (async for k, v in cache)

  • The cache_value_updates function formats values that have been updated

  • The updates endpoint returns the emitted updates through a StreamingResponse

When we run the application and call the endpoint, we’ll receive the cache value updates:

curl -N http://127.0.0.1:8000/updates
00:16:57
00:16:58
00:16:59
00:17:00
...

Checkpoint

A Checkpoint can be used to pulse the heartbeat of dependency streams to handle downtimes.

The easiest way to grasp the concept is by looking at the output of these examples:

  1. Downtime recovery

  2. Downtime reprocessing

A checkpoint consists of a dependent stream and dependency streams:

async def emoji():
    for emoji in 'πŸ†πŸ“žπŸŸπŸ‘Œ':
        yield emoji

dependent, dependency = emoji(), emoji()

c = Checkpoint(
    'dependent', dependent=dependent,
    dependencies=[Dependency('dependency', dependency)]
)

Checkpoints automatically handle pausing of dependent streams when they are bound to user handler functions (using handle):

@handle(dependency)
async def dependency_handler(msg):
    key, val = msg.key, msg.value
    await c.heartbeat(val['event_timestamp'])
    yield key, val

@handle(dependent)
async def dependent_handler(msg):
    key, val, offset = msg.key, msg.value, msg.offset
    c.check_pulse(marker=msg['event_timestamp'], offset=offset)
    yield key, msg

On the first pulse check, no message might have been received from dependency yet. Therefore the dependency checkpoint is updated with the initial state and marker of the dependent stream:

from asyncio import run

run(c.check_pulse(marker=datetime(2025, 1, 1, 10), offset=8))
c['dependency'].checkpoint_marker
datetime.datetime(2025, 1, 1, 10, 0)

When a message is received in dependency, send a heartbeat with its event time, which can be compared with the dependent event times to check for downtime:

run(c.heartbeat(datetime(2025, 1, 1, 10, 30)))

When the pulse is checked after a while, it’s apparent that no dependency messages have been received for 30 minutes:

run(c.check_pulse(marker=datetime(2025, 1, 1, 11), offset=9))
datetime.timedelta(seconds=1800)