Cookbook

Slipstream favors user freedom over rigid abstractions, letting you craft framework features in just a few lines. Share your artistry!

Timer

Async generators can be used to trigger handler functions.

from asyncio import run, sleep
from time import strftime

from slipstream import handle, stream

async def timer(interval=1.0):
    while True:
        await sleep(interval)
        yield

@handle(timer())
def handler():
    print(strftime('%H:%M:%S', localtime()))

run(stream())
23:25:10
23:25:11
23:25:12
...

Iterable

Regular non-async Iterables don’t support parallelism.

Adding asyncio.sleep allows other coroutines to run during the delay.

from asyncio import sleep

async def async_iterable(it):
    for msg in it:
        await sleep(0.01)
        yield msg

Source

Any data source that can be turned into an AsyncIterable, can be bound to a handler function.

Depends on: aiohttp.

For example: the Wikipedia recent changes streaming API:

from asyncio import run

from aiohttp import ClientSession

from slipstream import handle, stream

URL = 'https://stream.wikimedia.org/v2/stream/recentchange'

async def read_streaming_api(url):
    async with ClientSession(raise_for_status=True) as session:
        async with session.get(url) as r:
            async for line in r.content:
                yield line

@handle(read_streaming_api(URL), sink=[print])
def handler(msg):
    yield f'Wiki change {msg}!'

run(stream())

Sharing sources is efficient. But processing delays in one handler may delay consumption for both:

wiki_changes = read_streaming_api(url)

@handle(wiki_changes)
def first_handler(msg):
    pass

@handle(wiki_changes)
def second_handler(msg):
    pass

run(stream())

Pipe

Use the pipe parameter in slipstream.handle to transform the stream itself.

Depends on: asyncstdlib.

from asyncio import run, sleep

from asyncstdlib import pairwise, accumulate

from slipstream import handle, stream

async def numbers(n=100):
    for i in range(n):
        await sleep(0.1)
        yield i

@handle(numbers(), pipe=[pairwise], sink=[print])
def handler(pair):
    yield pair

run(stream())
# pipe=[pairwise]    # pipe=[accumulate]    # pipe=[accumulate, pairwise]
(0, 1)               0                      (0, 1)
(1, 2)               1                      (1, 3)
(2, 3)               3                      (3, 6)
(3, 4)               6                      (6, 10)
...                  ...                    ...

Sink

Any data sink (such as Redis) that can be turned into a Callable can be used in slipstream.handle.

Depends on: redis.

from asyncio import run

from redis import Redis

from slipstream import handle, stream

async def messages():
    for _ in range(2):
        for emoji in '🏆📞🐟👌':
            yield emoji

r = Redis(host='localhost', port=6379, charset='utf-8', decode_responses=True)

def cache(pair: tuple):
    r.set(*pair)

@handle(messages(), sink=[cache])
def handler(msg):
    count = int(r.get(msg)) + 1 if msg in r else 1
    yield msg, count

run(stream())

print({k: int(r[k]) for k in r.keys('*')})
{'👌': 2, '🏆': 2, '📞': 2, '🐟': 2}

Alternatively, slipstream.interfaces.ICache can be used.

AvroCodec

Custom codecs can be created using slipstream.interfaces.ICodec:

Depends on: avro.

from io import BytesIO

from avro.io import BinaryDecoder, BinaryEncoder, DatumReader, DatumWriter
from avro.schema import Schema, parse

from slipstream.codecs import ICodec

class AvroCodec(ICodec):
    """Serializes/deserializes avro messages using schema."""

    def __init__(self, path: str):
        with open(path) as a:
            self.schema = parse(a.read())

    def encode(self, obj: Any) -> bytes:
        writer = DatumWriter(self.schema)
        bytes_writer = BytesIO()
        encoder = BinaryEncoder(bytes_writer)
        writer.write(obj, encoder)
        return cast(bytes, bytes_writer.getvalue())

    def decode(self, s: bytes) -> object:
        bytes_reader = BytesIO(s)
        decoder = BinaryDecoder(bytes_reader)
        reader = DatumReader(self.schema)
        return cast(object, reader.read(decoder))

Aggregations

Streaming aggregations typically don’t rely on the whole data’s history but are either:

  • Fold or reduce operations: incremental updates to a state like count or sum over all data, like the code snippet in Getting Started

  • Window operations: applying these on data within a window of time (event- or wall-time based)

Here are some of the well-known window types:

  • Tumbling: fixed-size, non-overlapping, on fixed time interval

  • Hopping: fixed-size, overlapping, on fixed time interval

  • Sliding: fixed-size, overlapping, on content of window change

  • Session: dynamic-size, overlapping, on some condition being met

Let’s look at the Sliding window using these emoji’s, having timestamps in seconds:

from asyncio import run, sleep

from slipstream import Cache, handle, stream

cache = Cache('state/emoji')

async def messages():
    events = [
        ('🏆', 0.0), ('📞', 0.5), ('🐟', 1.0), ('👌', 2.0),
        ('🏆', 3.5), ('📞', 4.0), ('🐟', 5.0), ('👌', 5.5)
    ]
    for emoji, ts in events:
        await sleep(0.1)
        yield emoji, ts

Fixed-size window sliding with each event:

from collections import Counter

window_size_seconds = 3.0

@handle(messages(), sink=[print])
async def sliding_handler(event):
    _, event_time = event

    events = cache.get('sliding_events', [])
    events.append(event)

    # Keep events within window_size_seconds of current event_time
    events = [
        (e, t) for e, t in events
        if event_time - t <= window_size_seconds
    ]
    cache.put('sliding_events', events)

    counts = Counter(emoji for emoji, _ in events)
    return f'Sliding window ending at {event_time}: {dict(counts)}'

run(stream())
Sliding window ending at 0.0: {'🏆': 1}
Sliding window ending at 0.5: {'🏆': 1, '📞': 1}
Sliding window ending at 1.0: {'🏆': 1, '📞': 1, '🐟': 1}
Sliding window ending at 2.0: {'🏆': 1, '📞': 1, '🐟': 1, '👌': 1}
Sliding window ending at 3.5: {'📞': 1, '🐟': 1, '👌': 1, '🏆': 1}
Sliding window ending at 4.0: {'🐟': 1, '👌': 1, '🏆': 1, '📞': 1}
Sliding window ending at 5.0: {'👌': 1, '🏆': 1, '📞': 1, '🐟': 1}
Sliding window ending at 5.5: {'🏆': 1, '📞': 1, '🐟': 1, '👌': 1}

For production-readiness, you’d add:

  • Watermarks: to determine when a window is “complete” despite late events

  • Late event handling: drop, reassign, or buffer late events

To handle late data or stream downtimes, see Synchronization.

Joins

Cross-stream stateful operations such as joins can be achieved using Cache.

Using the messages below, we’ll use a temporal-join to find the weather at the time of each activity:

from datetime import datetime as dt

weather_messages = iter([
    {'timestamp': dt(2023, 1, 1, 10), 'value': '🌞'},
    {'timestamp': dt(2023, 1, 1, 11), 'value': '⛅'},
    {'timestamp': dt(2023, 1, 1, 12), 'value': '🌦️'},
    {'timestamp': dt(2023, 1, 1, 13), 'value': '🌧'},
])
activity_messages = iter([
    {'timestamp': dt(2023, 1, 1, 10, 30), 'value': 'swimming'},  # 🌞
    {'timestamp': dt(2023, 1, 1, 11, 30), 'value': 'walking home'},  # ⛅
    {'timestamp': dt(2023, 1, 1, 12, 30), 'value': 'shopping'},  # 🌦️
    {'timestamp': dt(2023, 1, 1, 13, 10), 'value': 'lunch'},  # 🌧
])

By caching the weather updates using their (POSIX) event-time as a key, we can find the nearest timestamp value. This type of join is often called a temporal-join, nearby-join, or merge-as-of:

from asyncio import run, sleep

from slipstream import Cache, handle, stream

weather_cache = Cache('state/weather')

async def async_iterable(it):
    for msg in it:
        await sleep(0.01)
        yield msg

@handle(async_iterable(weather_messages), sink=[weather_cache])
def weather_handler(w):
    unix_ts = w['timestamp'].timestamp()
    yield unix_ts, w

@handle(async_iterable(activity_messages), sink=[print])
def activity_handler(a):
    unix_ts = a['timestamp'].timestamp()

    for w in weather_cache.values(backwards=True, from_key=unix_ts):
        yield f'The weather during {a["value"]} was {w["value"]}'
        return

    yield a['value'], '?'

run(stream())
The weather during swimming was 🌞
The weather during walking home was ⛅
The weather during shopping was 🌦️
The weather during lunch was 🌧

This approach works when the weather updates are guaranteed to be received in time. If the weather stream goes down, the activity stream will be enriched with stale data.

To manage late data, see synchronization 👇

Synchronization

Using Checkpoint we can detect and act on stream downtimes, pausing the dependent stream, and optionally send out corrections.

from datetime import datetime as dt

weather_messages = iter([
    {'timestamp': dt(2023, 1, 1, 10), 'value': '🌞'},
    {'timestamp': dt(2023, 1, 1, 11), 'value': '⛅'},
    {'timestamp': dt(2023, 1, 1, 12), 'value': '🌦️'},
    {'timestamp': dt(2023, 1, 1, 13), 'value': '🌧'},
])
activity_messages = iter([
    {'timestamp': dt(2023, 1, 1, 10, 30), 'value': 'swimming'},  # 🌞
    {'timestamp': dt(2023, 1, 1, 11, 30), 'value': 'walking home'},  # ⛅
    {'timestamp': dt(2023, 1, 1, 12, 30), 'value': 'shopping'},  # 🌦️
    {'timestamp': dt(2023, 1, 1, 13, 10), 'value': 'lunch'},  # 🌧
])

Some changes in our setup are required:

  • Adding a Cache for storing the Checkpoint

  • Storing the AsyncIterables in variables for later reference in the Checkpoint

from asyncio import run, sleep
from datetime import timedelta
from typing import cast

from slipstream import Cache, Topic, handle, stream
from slipstream.checkpointing import Checkpoint, Dependency
from slipstream.codecs import JsonCodec
from slipstream.core import READ_FROM_END

async def async_iterable(it):
    for msg in it:
        await sleep(1)
        yield msg

weather_stream = async_iterable(weather_messages)
activity_stream = async_iterable(activity_messages)

activity = Topic('activity', {
    'bootstrap_servers': 'localhost:29091',
    'auto_offset_reset': 'earliest',
    'group_instance_id': 'activity',
    'group_id': 'activity',
}, codec=JsonCodec(), offset=READ_FROM_END)
checkpoints_cache = Cache('state/checkpoints', target_table_size=10000)
weather_cache = Cache('state/weather')

The Checkpoint defines the relationship between streams:

  • The activity Topic depends on the weather_stream AsyncIterable

  • The dependency must be down for 1 hour

  • The downtime_callback function is called when a downtime is detected

  • The recovery_callback function is called when the dependency has caught up again

async def downtime_callback(c: Checkpoint, d: Dependency) -> None:
    print('\tThe stream is automatically paused.')

async def recovery_callback(c: Checkpoint, d: Dependency) -> None:
    offsets = cast(dict[str, int], d.checkpoint_state)
    print(
        '\tDowntime resolved, '
        f'going back to offset {offsets} for reprocessing.'
    )
    await activity.seek({
        int(p): o for p, o in offsets.items()
    })

checkpoint = Checkpoint(
    'activity',
    dependent=activity,
    dependencies=[Dependency(
        'weather_stream',
        weather_stream,
        downtime_threshold=timedelta(hours=1)
    )],
    downtime_callback=downtime_callback,
    recovery_callback=recovery_callback,
    cache=checkpoints_cache
)

In handle_weather handler we will “kill” the stream for 5 seconds:

@handle(weather_stream, sink=[weather_cache, print])
async def handle_weather(w):
    """Process weather message."""
    ts = w['timestamp']
    unix_ts = ts.timestamp()
    await checkpoint.heartbeat(ts)
    yield unix_ts, w

    if w['value'] == '⛅':
        print('\tKilling weather stream on purpose')
        await sleep(5)
        print('\tRecovering the weather stream')

@handle(activity_stream, sink=[activity])
def producer(val):
    """Send data to activity topic."""
    yield None, val

@handle(activity, sink=[print])
async def handle_activity(msg):
    """Process activity message."""
    a = msg.value
    ts = dt.strptime(a['timestamp'], '%Y-%m-%d %H:%M:%S')
    unix_ts = ts.timestamp()

    if downtime := await checkpoint.check_pulse(ts, **{
        str(msg.partition): msg.offset
    }):
        print(
            f'\tDowntime detected: {downtime}, '
            '(could cause faulty enrichment)'
        )

    for w in weather_cache.values(backwards=True, from_key=unix_ts):
        yield f'The weather during {a["value"]} was {w["value"]}'
        return

    yield a["value"], '?'

run(stream())

During the 5 seconds, the activity messages still flow in. This triggers the downtime detection, because the activity event times supercede the last seen weather event time. Breakdown:

  • checkpoint.heartbeat registers the weather event time in the checkpoint

  • checkpoint.check_pulse registers the activity event time, checking the pulse of its dependencies

  • It also passes some state to the checkpoint, in this case; the Kafka offsets

The weather during swimming was 🌞
    Killing weather stream on purpose
The weather during walking home was ⛅
    The stream is automatically paused.
    Downtime detected: 1:30:00, (could cause faulty enrichment)
The weather during shopping was ⛅
    Recovering the weather stream
    Downtime resolved, going back to offset {'0': 2} for reprocessing.
The weather during shopping was 🌦️
The weather during lunch was 🌧
  • One faulty enrichment took place: The weather during shopping was before the activity stream was paused (waiting for the weather_stream to recover).

  • When the weather_stream recovered, the user defined recovery_callback was called.

  • The callback seeks the activity topic back to the offset before the weather_stream went down, causing the activity events that were sent out with stale data to be reprocessed

  • The faulty enrichment was corrected: The weather during shopping was 🌦️

Notice that when sending out corrections is required (using slipstream.Topic.seek for example), data flows through the handler function again. This must be handled appropriately when dealing with stateful aggregations (prevent counting/summing an event twice). All consumers of the data must also be capable of dealing with corrections, by compacting/deduplicating the data by some key.

Endpoint

We can add API endpoints using fastapi.

Depends on: fastapi.

This streaming endpoint emits cache updates:

from asyncio import gather, run, sleep
from time import strftime

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from uvicorn import Config, Server

from slipstream import Cache, handle, stream

app, cache = FastAPI(), Cache('db')

async def timer(interval=1.0):
    while True:
        yield
        await sleep(interval)

@handle(timer(), sink=[cache, print])
def tick_tock():
    yield 'time', strftime('%H:%M:%S')

async def cache_value_updates():
    async for _, v in cache:
        yield v + '\n'

@app.get('/updates')
async def updates():
    return StreamingResponse(
        cache_value_updates(),
        media_type='text/event-stream'
    )

async def main():
    config = Config(app=app, host='0.0.0.0', port=8000)
    server = Server(config)
    await gather(stream(), server.serve())

if __name__ == '__main__':
    run(main())
  • An update is emitted only when the cache is called as a function (cache(key, val))

  • The cache can be used as an AsyncIterator (async for k, v in cache)

  • The updates endpoint returns the emitted updates through a StreamingResponse

curl -N http://127.0.0.1:8000/updates
00:16:57
00:16:58
00:16:59
00:17:00
...

When we call the endpoint, we’ll receive each update to the cache.