Module csw.RedisConnector
Classes
class RedisConnector (loc: Location)
-
Expand source code
class RedisConnector: def __init__(self, loc: Location): """ Events are posted to Redis. This is internal class used to access Redis. Args: loc (Location): EventServer location """ # XXX TODO Check why only localhost works! # sentinel = Sentinel([(uri.hostname, uri.port)], socket_timeout=0.1) # print(f"XXX Sentinel({uri.port})") uri = urlparse(loc.uri) sentinel = Sentinel([("localhost", uri.port)]) self._redis = sentinel.master_for('eventServer') self._pubsub = self._redis.pubsub() @classmethod def make(cls) -> Self: prefix = Prefix(Subsystem.CSW, "EventServer") conn = ConnectionInfo.make(prefix, ComponentType.Service, ConnectionType.TcpType) loc = LocationServiceSync().find(conn) return RedisConnector(loc) async def close(self): await self._redis.aclose() await self._pubsub.aclose() async def subscribe(self, keyList: List[str], callback: Callable[[any], Awaitable]) -> Task: """ Set up a Redis subscription on specified keys with specified callback on value changes. Args: keyList (List[str]): list of keys to subscribe to callback (function): callback called when item changes. Should take a Redis message type. Returns: PubSubWorkerThread subscription thread. Use .stop() method to stop subscription """ d = dict.fromkeys(keyList, callback) await self._pubsub.subscribe(**d) return asyncio.create_task(self._pubsub.run()) async def unsubscribe(self, keyList: List[str]): """ Unsubscribe to the list of event keys Args: keyList (List[str]): list of keys to unsubscribe from """ await self._pubsub.unsubscribe(keyList) # XXX Commented out due to Event Service performance concerns when using psubscribe # def pSubscribe(self, keyList: List[str], callback): # """ # Set up a Redis subscription on specified keys with specified callback on value changes. # In this case the keys are treated as glob-style patterns. # # Args: # keyList (List[str]): list of key patterns to subscribe to # callback (function): callback called when item changes. Should take a Redis message type. # # Returns: PubSubWorkerThread # subscription thread. Use .stop() method to stop subscription # """ # d = dict.fromkeys(keyList, callback) # self._pubsub.psubscribe(**d) # return self._pubsub.run_in_thread(sleep_time=0.001) # def pUnsubscribe(self, keyList: List[str]): # """ # Unsubscribe to the list of event key patterns # # Args: # keyList (List[str]): list of key patterns to unsubscribe from # """ # self._pubsub.punsubscribe(keyList) async def publish(self, key: str, encodedValue: bytes): """ Publish CBOR encoded event string to Redis Args: key: String specifying Redis key for event. Should be source prefix + "." + event name. encodedValue: CBOR encoded value for the event (in the form [className, dict]) """ await self._redis.set(key, encodedValue) await self._redis.publish(key, encodedValue) async def get(self, key: str) -> str: """ Get value from Redis using specified key Args: key (str): String specifying Redis key for event. Should be source prefix + "." + event name. Returns: str Raw Redis string for event, typically in some encoding """ return await self._redis.get(key)
Events are posted to Redis. This is internal class used to access Redis.
Args
loc
:Location
- EventServer location
Static methods
def make() ‑> Self
Methods
async def close(self)
-
Expand source code
async def close(self): await self._redis.aclose() await self._pubsub.aclose()
async def get(self, key: str) ‑> str
-
Expand source code
async def get(self, key: str) -> str: """ Get value from Redis using specified key Args: key (str): String specifying Redis key for event. Should be source prefix + "." + event name. Returns: str Raw Redis string for event, typically in some encoding """ return await self._redis.get(key)
Get value from Redis using specified key
Args
key
:str
- String specifying Redis key for event. Should be source prefix + "." + event name.
Returns: str Raw Redis string for event, typically in some encoding
async def publish(self, key: str, encodedValue: bytes)
-
Expand source code
async def publish(self, key: str, encodedValue: bytes): """ Publish CBOR encoded event string to Redis Args: key: String specifying Redis key for event. Should be source prefix + "." + event name. encodedValue: CBOR encoded value for the event (in the form [className, dict]) """ await self._redis.set(key, encodedValue) await self._redis.publish(key, encodedValue)
Publish CBOR encoded event string to Redis
Args
key
- String specifying Redis key for event. Should be source prefix + "." + event name.
encodedValue
- CBOR encoded value for the event (in the form [className, dict])
async def subscribe(self,
keyList: List[str],
callback: Callable[[], Awaitable]) ‑> _asyncio.Task -
Expand source code
async def subscribe(self, keyList: List[str], callback: Callable[[any], Awaitable]) -> Task: """ Set up a Redis subscription on specified keys with specified callback on value changes. Args: keyList (List[str]): list of keys to subscribe to callback (function): callback called when item changes. Should take a Redis message type. Returns: PubSubWorkerThread subscription thread. Use .stop() method to stop subscription """ d = dict.fromkeys(keyList, callback) await self._pubsub.subscribe(**d) return asyncio.create_task(self._pubsub.run())
Set up a Redis subscription on specified keys with specified callback on value changes.
Args
keyList
:List[str]
- list of keys to subscribe to
callback
:function
- callback called when item changes. Should take a Redis message type.
Returns: PubSubWorkerThread subscription thread. Use .stop() method to stop subscription
async def unsubscribe(self, keyList: List[str])
-
Expand source code
async def unsubscribe(self, keyList: List[str]): """ Unsubscribe to the list of event keys Args: keyList (List[str]): list of keys to unsubscribe from """ await self._pubsub.unsubscribe(keyList)
Unsubscribe to the list of event keys
Args
keyList
:List[str]
- list of keys to unsubscribe from