[docs]defiscoroutinecallable(o:Any)->bool:"""Check whether function is coroutine."""returniscoroutinefunction(o)or(hasattr(o,'__call__')andiscoroutinefunction(o.__call__))
[docs]defget_param_names(o:Any):"""Return function parameter names."""params=signature(o).parametersreturntuple(params.keys())
[docs]classSingleton(type):"""Maintain a single instance of a class."""_instances:dict['Singleton',Any]={}def__call__(cls,*args:Any,**kwargs:Any):"""Apply metaclass singleton action."""ifclsnotincls._instances:cls._instances[cls]=super(Singleton,cls).__call__(*args,**kwargs)instance=cls._instances[cls]ifhasattr(instance,'__update__'):instance.__update__(*args,**kwargs)returninstance
[docs]defsubscribe(self,topic:str,listener:AsyncCallable)->None:"""Subscribe callable to topic."""iftopicnotinself._topics:self._topics[topic]=[]self._topics[topic].append(listener)
[docs]defunsubscribe(self,topic:str,listener:AsyncCallable)->None:"""Unsubscribe callable from topic."""iftopicinself._topics:self._topics[topic].remove(listener)ifnotself._topics[topic]:delself._topics[topic]
[docs]defpublish(self,topic:str,*args:Any,**kwargs:Any)->None:"""Publish message to subscribers of topic."""iftopicnotinself._topics:returnforlistenerinself._topics[topic]:listener(*args,**kwargs)
[docs]asyncdefapublish(self,topic:str,*args:Any,**kwargs:Any)->None:"""Publish message to subscribers of topic."""iftopicnotinself._topics:returnforlistenerinself._topics[topic]:ifiscoroutinecallable(listener):awaitlistener(*args,**kwargs)else:listener(*args,**kwargs)
[docs]asyncdefiter_topic(self,topic:str)->AsyncIterator[Any]:"""Asynchronously iterate over messages published to a topic."""queue:Queue[Any]=Queue()self.subscribe(topic,queue.put_nowait)try:whileTrue:yieldawaitqueue.get()finally:self.unsubscribe(topic,queue.put_nowait)