[docs]classICache(metaclass=CacheMeta):"""Base class for cache implementations. >>> class MyCache(ICache): ... def __init__(self): ... self.db = {} ... ... def __contains__(self, key: Key) -> bool: ... return key in self.db ... ... def __delitem__(self, key: Key) -> None: ... del self.db[key] ... ... def __getitem__(self, key: Key | list[Key]) -> Any: ... return self.db.get(key, None) ... ... def __setitem__(self, key: Key, val: Any) -> None: ... self.db[key] = val >>> cache = MyCache() >>> cache['prize'] = '🏆' >>> cache['prize'] '🏆' >>> del cache['prize'] >>> 'prize' in cache False """__slots__=('_iterable_key','_pubsub')@abstractmethoddef__contains__(self,key:Key)->bool:"""Key exists in db."""raiseNotImplementedError@abstractmethoddef__delitem__(self,key:Key)->None:"""Delete item from db."""raiseNotImplementedError@abstractmethoddef__getitem__(self,key:Key|list[Key])->Any:"""Get item from db or None. Important: - This method should **not** raise a `KeyError` if key does not exist. - Instead, return None. """raiseNotImplementedError@abstractmethoddef__setitem__(self,key:Key,val:Any)->None:"""Set item in db."""raiseNotImplementedErrorasyncdef__call__(self,key:Key,val:Any)->None:"""Set item in db while also publishing to pubsub."""self.__setitem__(key,val)awaitself._pubsub.apublish(self._iterable_key,(key,val),)asyncdef__aiter__(self)->AsyncIterator[Any]:"""Consume published updates to cache."""asyncformsginself._pubsub.iter_topic(self._iterable_key,):yieldmsg