Source code for slipstream.codecs
"""Slipstream codecs."""
from json import dumps, loads
from typing import Any
from slipstream.interfaces import ICodec
[docs]
class JsonCodec(ICodec):
"""Serialize/deserialize json messages."""
[docs]
def encode(self, obj: Any) -> bytes:
"""Serialize message.
>>> c = JsonCodec()
>>> c.encode({'key': 1})
b'{"key": 1}'
"""
return dumps(obj, default=str).encode()
[docs]
def decode(self, s: bytes) -> object:
"""Deserialize message.
>>> c = JsonCodec()
>>> c.decode(b'{"key": 1}')
{'key': 1}
"""
return loads(s.decode())