Module csw.EventSubscriber

Classes

class EventSubscriber (redis: RedisConnector)
Expand source code
class EventSubscriber:

    def __init__(self, redis: RedisConnector):
        self._redis = redis

    @classmethod
    def make(cls) -> Self:
        return cls(RedisConnector.make())

    async def close(self):
        await self._redis.close()

    @staticmethod
    async def _handleCallback(message: dict, callback: Callable[[Event], Awaitable]):
        data = message['data']
        event = Event._fromDict(cbor2.loads(data))
        await callback(event)

    async def subscribe(self, eventKeyList: list[EventKey],
                        callback: Callable[[Event], Awaitable]) -> EventSubscription:
        """
        Start a subscription to system events in event service, specifying a callback
        to be called when an event in the list has its value updated.

        Args:
            eventKeyList (list[EventKey]): list of event EventKey to subscribe to
            callback (Callable[[Event], None]): function to be called when event updates. Should take Event and return void

        Returns:
            an object that can be used to unsubscribe
        """
        keyList = list(map(lambda k: str(k), eventKeyList))

        async def f(message):
            await self._handleCallback(message, callback)

        t = await self._redis.subscribe(keyList, f)
        async def unsub():
            await self._redis.unsubscribe(keyList)
        return EventSubscription(t, unsub)

    async def unsubscribe(self, eventKeyList: list[EventKey]):
        """
        Unsubscribes to the given list of event keys (or all keys, if eventKeyList is empty)

        Args:
            eventKeyList (list[EventKey]): list of EventKeys to unsubscribe from
        """
        keyList = list(map(lambda k: str(k), eventKeyList))
        return await self._redis.unsubscribe(keyList)

    # XXX Commented out due to Event Service performance concerns when using psubscribe
    # def pSubscribe(self, eventKeyList: list, callback):
    #     """
    #     Start a subscription to system events in event service, specifying a callback
    #     to be called when an event in the list has its value updated.
    #     In this case the keys are treated as glob-style patterns:
    #     h?llo subscribes to hello, hallo and hxllo,
    #     h*llo subscribes to hllo and heeeello,
    #     h[ae]llo subscribes to hello and hallo, but not hillo.
    #
    #     Args:
    #         eventKeyList (list): list of event key (string patterns) to subscribe to
    #         callback (function): function to be called when event updates. Should take Event and return void
    #
    #     Returns: PubSubWorkerThread
    #         subscription thread. Use .stop() method to stop subscription.
    #     """
    #     return self._redis.pSubscribe(eventKeyList, lambda message: self._handleCallback(message, callback))

    # def pUnsubscribe(self, eventKeyList: list):
    #     """
    #     Unsubscribes to the given list of event key patterns (or all keys, if eventKeyList is empty)
    #
    #     Args:
    #         eventKeyList (list): list of event key patterns (Strings) to unsubscribe from
    #     """
    #     return self._redis.pUnsubscribe(eventKeyList)

    async def gets(self, eventKeys: Set[EventKey]) -> Set[Event]:
        """
        Get latest events for multiple Event Keys. The latest events available for the given Event Keys will be received first.
        If event is not published for one or more event keys, `invalid event` will be received for those Event Keys.

        In case the underlying server is not available, the future fails with [[csw.event.api.exceptions.EventServerNotAvailable]] exception.
        In all other cases of exception, the future fails with the respective exception

        Args:
            eventKeys: a set of [[csw.params.events.EventKey]] to subscribe to
        Returns:
            a set of latest Event for the provided Event Keys
        """
        fList = list(map(lambda k: self.get(k), eventKeys))
        events: List[Event] = await asyncio.gather(*fList)
        return set(events)

    async def get(self, eventKey: EventKey) -> Event:
        """
        Get an event from the Event Service

        Args:
            eventKey (EventKey): String specifying Redis key for event.  Should be source prefix + "." + event name.

        Returns: Event obtained from Event Service, decoded into a Event
        """
        data = await self._redis.get(str(eventKey))
        if data:
            event = Event._fromDict(cbor2.loads(data))
            return event
        return SystemEvent.invalidEvent(eventKey)

Static methods

def make() ‑> Self

Methods

async def close(self)
Expand source code
async def close(self):
    await self._redis.close()
async def get(self,
eventKey: EventKey) ‑> Event
Expand source code
async def get(self, eventKey: EventKey) -> Event:
    """
    Get an event from the Event Service

    Args:
        eventKey (EventKey): String specifying Redis key for event.  Should be source prefix + "." + event name.

    Returns: Event obtained from Event Service, decoded into a Event
    """
    data = await self._redis.get(str(eventKey))
    if data:
        event = Event._fromDict(cbor2.loads(data))
        return event
    return SystemEvent.invalidEvent(eventKey)

Get an event from the Event Service

Args

eventKey : EventKey
String specifying Redis key for event. Should be source prefix + "." + event name.

Returns: Event obtained from Event Service, decoded into a Event

async def gets(self,
eventKeys: Set[EventKey]) ‑> Set[Event]
Expand source code
async def gets(self, eventKeys: Set[EventKey]) -> Set[Event]:
    """
    Get latest events for multiple Event Keys. The latest events available for the given Event Keys will be received first.
    If event is not published for one or more event keys, `invalid event` will be received for those Event Keys.

    In case the underlying server is not available, the future fails with [[csw.event.api.exceptions.EventServerNotAvailable]] exception.
    In all other cases of exception, the future fails with the respective exception

    Args:
        eventKeys: a set of [[csw.params.events.EventKey]] to subscribe to
    Returns:
        a set of latest Event for the provided Event Keys
    """
    fList = list(map(lambda k: self.get(k), eventKeys))
    events: List[Event] = await asyncio.gather(*fList)
    return set(events)

Get latest events for multiple Event Keys. The latest events available for the given Event Keys will be received first. If event is not published for one or more event keys, invalid event will be received for those Event Keys.

In case the underlying server is not available, the future fails with [[csw.event.api.exceptions.EventServerNotAvailable]] exception. In all other cases of exception, the future fails with the respective exception

Args

eventKeys
a set of [[csw.params.events.EventKey]] to subscribe to

Returns

a set of latest Event for the provided Event Keys

async def subscribe(self,
eventKeyList: list[EventKey],
callback: Callable[[Event], Awaitable]) ‑> EventSubscription
Expand source code
async def subscribe(self, eventKeyList: list[EventKey],
                    callback: Callable[[Event], Awaitable]) -> EventSubscription:
    """
    Start a subscription to system events in event service, specifying a callback
    to be called when an event in the list has its value updated.

    Args:
        eventKeyList (list[EventKey]): list of event EventKey to subscribe to
        callback (Callable[[Event], None]): function to be called when event updates. Should take Event and return void

    Returns:
        an object that can be used to unsubscribe
    """
    keyList = list(map(lambda k: str(k), eventKeyList))

    async def f(message):
        await self._handleCallback(message, callback)

    t = await self._redis.subscribe(keyList, f)
    async def unsub():
        await self._redis.unsubscribe(keyList)
    return EventSubscription(t, unsub)

Start a subscription to system events in event service, specifying a callback to be called when an event in the list has its value updated.

Args

eventKeyList : list[EventKey]
list of event EventKey to subscribe to
callback : Callable[[Event], None]
function to be called when event updates. Should take Event and return void

Returns

an object that can be used to unsubscribe

async def unsubscribe(self,
eventKeyList: list[EventKey])
Expand source code
async def unsubscribe(self, eventKeyList: list[EventKey]):
    """
    Unsubscribes to the given list of event keys (or all keys, if eventKeyList is empty)

    Args:
        eventKeyList (list[EventKey]): list of EventKeys to unsubscribe from
    """
    keyList = list(map(lambda k: str(k), eventKeyList))
    return await self._redis.unsubscribe(keyList)

Unsubscribes to the given list of event keys (or all keys, if eventKeyList is empty)

Args

eventKeyList : list[EventKey]
list of EventKeys to unsubscribe from