Event Service

The Event Service implements the publish/subscribe messaging paradigm where one component publishes an event and all clients that have subscribed receive the event. In CSW, the events published are described under the messages documentation here. One advantage of this type of message system for the Event Service is that publishers and subscribers are decoupled. This decoupling of publishers and subscribers can allow for greater scalability and a more dynamic network topology. Publishers can publish regardless of whether there are subscribers, and subscribers can subscribe even if there are no publishers. The relationship between publishers and subscribers can be one-to-one, one-to-many, many to one, or even many-to-many.

The Event Service is optimized for the high performance requirements of events as demands with varying rates, (e.g. 100 Hz, 50 Hz etc.), but can also be used with events that are published infrequently or when values change. In the TMT control system, events may be created as the output of a calculation by one component for the input to a calculation in one or more other components. Demand events often consist of events that are published at a specific rate.

The Event Service also stores the most recent published event for every unique event by prefix and name. This allows components needing to check the state of another component can do so without the overhead of subscribing.

Dependencies

If you already have a dependency on csw-framework in your build.sbt, then you can skip this as csw-framework depends on csw-event-client Otherwise add below dependency in your build.sbt

sbt
libraryDependencies += "com.github.tmtsoftware.csw" %% "csw-event-client" % "5.0.1"

Accessing Event Service

When you create component handlers using csw-framework as explained here, you get a handle to EventService which is created by csw-framework, which provides the following features:

Access to defaultPublisher: Using the defaultPublisher in EventService, you can publish a single event or a stream of demand events to Event Service. In most cases, you should use the defaultPublisher, because you can then pass the instance of EventService in worker actors or different places in your code and call eventService.defaultPublisher to access the publisher. Each EventPublisher has its own TCP connection to the Event Service. When you reuse the defaultPublisher instance of EventPublisher, all events published go through same TCP connection.

Access to defaultSubscriber: Using the defaultSubscriber, you can subscribe to specific event keys. You can share the defaultSubscriber in the same way as defaultPublisher by passing an instance of EventService to different parts of your code. Unlike defaultPublisher, each subscription with defaultSubscriber.subscribe creates a new TCP connection for just that subscription. This behavior is the same whether you use the defaultSubscriber or the makeNewSubscriber call on EventService.

Each EventSubscriber also has one TCP connection that is used to provide the latest event from the event server when subscribing and for the explicit get calls. That means, with defaultSubscriber, you are sharing same connection for getting latest events and creating a new connection for each subscribe call. The underlying event server can handle many connections, but it is good to understand how connections are used and reused.

Creating a new Publisher or Subscriber: The makeNewPublisher API of Event Service can be used to create a new publisher which would internally create a new TCP connection to the Event Store. One of the use cases of this API could be to publish high frequency event streams in order to dedicate a separate connection to demanding streams without affecting the performance of all other low frequency (for ex. 1Hz, 20Hz etc.) event streams.

However, makeNewSubscriber API does not really have any specific use cases. Both defaultSubscriber and makeNewSubscriber APIs behave almost similar since the subscribe API of EventService itself creates a new connection for every subscription. Prefer using defaultSubscriber over makeNewSubscriber.

Usage of EventPublisher

Below examples demonstrate the usage of multiple variations of the publish API.

For Single Event

This is the simplest API to publish a single event. It returns a Future which will complete successfully if the event is published or fail immediately with a PublishFailure exception if the component cannot publish the event.

Scala
source{
  val publisher = eventService.defaultPublisher
  val event     = SystemEvent(componentInfo.prefix, EventName("filter_wheel"))
  publisher.publish(event)
}
Java
sourceEvent event = new SystemEvent(componentInfo.prefix(), new EventName("filter_wheel"));
eventService.defaultPublisher().publish(event);

With Generator

A generator is useful when component code needs to publish events with a specific frequency. The following example demonstrates the usage of publish API with event generator which will publish one event at each interval. eventGenerator is a function responsible for generating events. It can hold domain specific logic of generating new events based on certain conditions.

Scala
source{
  val publisher        = eventService.defaultPublisher
  val baseEvent: Event = SystemEvent(componentInfo.prefix, EventName("filter_wheel"))
  val interval         = 100.millis

  // this holds the logic for event generation, could be based on some computation or current state of HCD
  def eventGenerator(): Option[Event] =
    baseEvent match {
      case e: SystemEvent  => Some(SystemEvent(e.source, e.eventName, e.paramSet))
      case e: ObserveEvent => Some(IRDetectorEvent.observeStart(e.source, ObsId("2020A-001-123")))
    }

  publisher.publish(eventGenerator(), interval)
}
Java
sourceprivate Cancellable startPublishingEvents(ComponentInfo componentInfo) {
    Event baseEvent = new SystemEvent(componentInfo.prefix(), new EventName("filter_wheel"));
    return eventService.defaultPublisher().publish(() -> eventGenerator(baseEvent), Duration.ofMillis(100));
}

// this holds the logic for event generation, could be based on some computation or current state of HCD
private Optional<Event> eventGenerator(Event baseEvent) {
    // add logic here to create a new event and return the same
    return Optional.ofNullable(baseEvent);
}
Note

Callbacks like eventGenerator are not thread-safe on the JVM. If you are doing side effects/mutations inside the callback, you should ensure that it is done in a thread-safe way inside an actor. Here is an example of how it can be done.

With Event Stream

In order to publish a continuous stream of events, this stream-based API can also be used. If an infinite stream is provided, shutdown of the stream needs to be taken care by the users. (Note that streams discussed here are an Akka feature that is supported in event publisher and subscriber APIs. See Akka stream documentation.)

Scala
sourcedef onError(publishFailure: PublishFailure): Unit =
  log.error(s"Publish failed for event: [${publishFailure.event}]", ex = publishFailure.cause)

val publisher = eventService.defaultPublisher
val eventStream: Source[Event, Future[Done]] = Source(1 to n)
  .map(id => makeEvent(id, componentInfo.prefix, EventName("filter_wheel")))
  .watchTermination()(Keep.right)

publisher.publish(eventStream, failure => onError(failure))
Java
sourceSource<Event, CompletionStage<Done>> eventStream = Source
        .range(1, n)
        .map(id -> makeEvent(id, componentInfo.prefix(), new EventName("filter_wheel")))
        .watchTermination(Keep.right());

return eventService.defaultPublisher().<CompletionStage<Done>>publish(eventStream, failure -> { /*do something*/ });

This API also demonstrates the usage of an onError callback which can be used to handle events that failed while being published.

You can find complete list of APIs supported by EventPublisher and IEventPublisher with detailed description of each API here:

Usage of EventSubscriber

The EventSubscriber API has several options available that are useful in different situations. Examples below demonstrate the usage of multiple variations available in the subscribe API.

With Callback

The example shown below takes a set of event keys to subscribe to and a callback function which will be called on each event received by the event stream. This is the simplest and most commonly used API. The example below uses an inline function, but that is not necessary.

Scala
source{
  val subscriber = eventService.defaultSubscriber

  subscriber.subscribeCallback(
    Set(EventKey(hcd.prefix, EventName("filter_wheel"))),
    event => { /*do something*/ }
  )
}
Java
source
IEventSubscriber subscriber = eventService.defaultSubscriber(); EventKey filterWheelEventKey = new EventKey(hcdLocation.prefix(), new EventName("filter_wheel")); return subscriber.subscribeCallback(Set.of(filterWheelEventKey), event -> { /*do something*/ });
Note

Callbacks are not thread-safe on the JVM. If you need to do side effects/mutations, prefer using subscribeActorRef API.

With Asynchronous Callback

This API is useful when you want to subscribe to events with a callback that has an asynchronous behavior. The callback is of type Event => Future and it ensures that the event callbacks are called sequentially in such a way that the subsequent execution will start only after the prior one finishes. This API gives the guarantee of ordered execution of the asynchronous callbacks.

Scala
sourcedef subscribe(): EventSubscription = {
  val subscriber = eventService.defaultSubscriber
  subscriber.subscribeAsync(Set(EventKey(hcd.prefix, EventName("filter_wheel"))), callback)
}

private def callback(event: Event): Future[String] = {
  /* do something */
  Future.successful("some value")
}
Java
sourcepublic IEventSubscription subscribe() {
    IEventSubscriber subscriber = eventService.defaultSubscriber();

    EventKey filterWheelEventKey = new EventKey(hcdLocation.prefix(), new EventName("filter_wheel"));
    return subscriber.subscribeAsync(Set.of(filterWheelEventKey), this::callback);
}

private CompletableFuture<String> callback(Event event) {
    /* do something */
    return CompletableFuture.completedFuture("some value");
}

With ActorRef

If there is a need to mutate state on receiving each event, then it is recommended to use this API and send a message to an actor. To use this API, you have to create an actor which takes event and then you can safely keep mutable state inside this actor. In the example shown below, eventHandler is the actorRef which accepts events.

Scala
sourcedef subscribe(ctx: ActorContext[TopLevelActorMessage]): EventSubscription = {
  val subscriber                    = eventService.defaultSubscriber
  val eventHandler: ActorRef[Event] = ctx.spawnAnonymous(EventHandler.behavior)

  subscriber.subscribeActorRef(Set(EventKey(hcd.prefix, EventName("filter_wheel"))), eventHandler)
}

object EventHandler {
  val behavior: Behavior[Event] = Behaviors.setup { ctx =>
    // setup required for the actor

    Behaviors.receiveMessage { case _ => // handle messages and return new behavior with changed state
      Behaviors.same
    }
  }
}
Java
sourcepublic IEventSubscription subscribe(ActorContext<TopLevelActorMessage> ctx) {

    IEventSubscriber subscriber = eventService.defaultSubscriber();
    ActorRef<Event> eventHandler = ctx.spawnAnonymous(JEventHandler.behavior());

    EventKey filterWheelEventKey = new EventKey(hcdLocation.prefix(), new EventName("filter_wheel"));
    return subscriber.subscribeActorRef(Set.of(filterWheelEventKey), eventHandler);
}

public static class JEventHandler {

    public static Behavior<Event> behavior() {
        // handle messages
        return null;
    }
}

Receive Event Stream

This API takes a set of Event keys to subscribe to and returns a Source of events (see Akka stream documentation). This API gives more control to the user to customize the behavior of an event stream.

Scala
source{
  val subscriber = eventService.defaultSubscriber

  subscriber
    .subscribe(Set(EventKey(hcd.prefix, EventName("filter_wheel"))))
    .to(Sink.foreach { event => /*do something*/ })
    .run()
}
Java
source
IEventSubscriber subscriber = eventService.defaultSubscriber(); EventKey filterWheelEventKey = new EventKey(hcdLocation.prefix(), new EventName("filter_wheel")); return subscriber.subscribe(Set.of(filterWheelEventKey)).to(Sink.foreach(event -> { /*do something*/ })).run(system);

Controlling Subscription Rate

In all the examples shown above, events are received by the subscriber as soon as they are published. There will be scenarios where you would like to control the rate of events received by your code. For instance, slow subscribers can receive events at their own specified speed rather than being overloaded with events to catch up with the publisher’s speed.

All the APIs in EventSubscriber can be provided with an interval and a SubscriptionMode to control the subscription rate. Following example demonstrates this with the subscribeCallback API.

Scala
source{
  val subscriber = eventService.defaultSubscriber

  subscriber.subscribeCallback(
    Set(EventKey(hcd.prefix, EventName("filter_wheel"))),
    event => { /*do something*/ },
    1.seconds,
    SubscriptionModes.RateAdapterMode
  )
}
Java
source
IEventSubscriber subscriber = eventService.defaultSubscriber(); EventKey filterWheelEventKey = new EventKey(hcdLocation.prefix(), new EventName("filter_wheel")); return subscriber.subscribeCallback(Set.of(filterWheelEventKey), event -> { /* do something*/ }, Duration.ofMillis(1000), SubscriptionModes.jRateAdapterMode());

There are two types of Subscription modes:

  • RateAdapterMode which ensures that an event is received exactly at each tick of the specified interval.
  • RateLimiterMode which ensures that events are received as they are published along with the guarantee that no more than one event is delivered within a given interval.

Read more about Subscription Mode here

Pattern Subscription

The following example demonstrates the usage of the pattern subscribe API with a callback. Events with keys that match the specified pattern and belong to the specified subsystem are received by the subscriber. The callback function provided is called on each event received.

Scala
source{
  val subscriber = eventService.defaultSubscriber
  subscriber.pSubscribeCallback(subsystem, "*", event => { /*do something*/ })
}
Java
source
IEventSubscriber subscriber = eventService.defaultSubscriber(); subscriber.pSubscribeCallback(subsystem, "*", event -> { /* do something*/ });
Warning

DO NOT include subsystem in the provided pattern. Final pattern generated will be provided pattern prepended with subsystem. For Ex. pSubscribe(Subsytem.WFOS, *) will subscribe to event keys matching pattern : wfos.*

Warning

The pattern-based subscribe API is provided because it is useful in testing, but should not be used in production code. The use of certain patterns and many pattern-based subscriptions can impact the overall-performance of the Event Service.

Event Subscription

On subscription to event keys, you receive an EventSubscription which provides following APIs:

  • unsubscribe: Used to unsubscribe by destroying the event stream and releasing the the connection to the Event Server.

  • ready: check if event subscription is successful or not.

You can find complete list of APIs supported by EventSubscriber and IEventSubscriber with detailed description of each API here:

Create Event Service

If you are not using csw-framework, you can create the EventService using an EventServiceFactory.

Scala
source// create event service using location service
val eventService1: EventService = new EventServiceFactory().make(locationService)

// create event service using host and port of event server.
val eventService2: EventService = new EventServiceFactory().make("localhost", 26379)
Java
source// create event service using host and port of event server.
IEventService eventService1 = new EventServiceFactory().jMake(locationService, actorSystem);

// create event service using host and port of event server.
IEventService eventService2 = new EventServiceFactory().jMake("localhost", 26379, actorSystem);

The provided implementation of Event Service is backed up by Redis. The above example demonstrates creation of Event Service with default Redis client options. You can optionally supply a RedisClient to the EventStore from outside which allows you to customize the behavior of the RedisClient used by Event Service, which will usually only be required in testing.

RedisClient is an expensive resource. Reuse this instance as much as possible.

Note that it is the responsibility of the consumer of this API to shutdown the Redis Client when it is no longer in use.

Scala
source
val clientOptions = ClientOptions.builder().disconnectedBehavior(DisconnectedBehavior.REJECT_COMMANDS).build val redisClient = RedisClient.create() redisClient.setOptions(clientOptions) // create event service using location service val eventService1: EventService = new EventServiceFactory(RedisStore(redisClient)).make(locationService) // create event service using host and port of event server. val eventService2: EventService = new EventServiceFactory(RedisStore(redisClient)).make("localhost", 26379)
Java
sourceClientOptions clientOptions = ClientOptions.builder().disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS).build();
RedisClient redisClient   = RedisClient.create();
redisClient.setOptions(clientOptions);

EventStores.RedisStore redisStore = new EventStores.RedisStore(redisClient);
// create event service using location service
IEventService eventService1 = new EventServiceFactory(redisStore).jMake(locationService, actorSystem);

// create event service using host and port of event server.
IEventService eventService2 = new EventServiceFactory(redisStore).jMake("localhost", 26379, actorSystem);

Technical Description

See Event Service Technical Description.

Source code for examples of creation

Source code for examples of publishing

Source code for examples of subscribing