Module csw.EventPublisher

Classes

class EventPublisher (redis: RedisConnector)
Expand source code
class EventPublisher:

    def __init__(self, redis: RedisConnector):
        self._redis = redis

    @classmethod
    def make(cls) -> Self:
        return cls(RedisConnector.make())

    async def publish(self, event: Event):
        """
        Publish an event to the Event Service

        Args:
            event (Event): Event to be published
        """
        event_key = str(event.source) + "." + event.eventName.name
        obj = cbor2.dumps(event._asDict())
        await self._redis.publish(event_key, obj)

    async def close(self):
        await self._redis.close()

Static methods

def make() ‑> Self

Methods

async def close(self)
Expand source code
async def close(self):
    await self._redis.close()
async def publish(self,
event: Event)
Expand source code
async def publish(self, event: Event):
    """
    Publish an event to the Event Service

    Args:
        event (Event): Event to be published
    """
    event_key = str(event.source) + "." + event.eventName.name
    obj = cbor2.dumps(event._asDict())
    await self._redis.publish(event_key, obj)

Publish an event to the Event Service

Args

event : Event
Event to be published