Module csw.EventPublisher
Classes
class EventPublisher (redis: RedisConnector)
-
Expand source code
class EventPublisher: def __init__(self, redis: RedisConnector): self._redis = redis @classmethod def make(cls) -> Self: return cls(RedisConnector.make()) async def publish(self, event: Event): """ Publish an event to the Event Service Args: event (Event): Event to be published """ event_key = str(event.source) + "." + event.eventName.name obj = cbor2.dumps(event._asDict()) await self._redis.publish(event_key, obj) async def close(self): await self._redis.close()
Static methods
def make() ‑> Self
Methods
async def close(self)
-
Expand source code
async def close(self): await self._redis.close()
async def publish(self,
event: Event)-
Expand source code
async def publish(self, event: Event): """ Publish an event to the Event Service Args: event (Event): Event to be published """ event_key = str(event.source) + "." + event.eventName.name obj = cbor2.dumps(event._asDict()) await self._redis.publish(event_key, obj)
Publish an event to the Event Service
Args
event
:Event
- Event to be published