Event Service

The Event Service implements the publish/subscribe messaging paradigm where one component publishes an event and all components that have subscribed receive the event. In CSW, the events published are described under the messages documentation here. One advantage of this type of message system and Event Service also is that publishers and subscribers are decoupled. This decoupling of publishers and subscribers can allow for greater scalability and a more dynamic network topology. Publishers can publish regardless of whether there are subscribers, and subscribers can subscribe even if there are no publishers. The relationship between publishers and subscribers can be one-to-one, one-to-many, many to one, or even many-to-many.

Event Service is optimized for the high performance requirements of events as demands with varying rates, for ex. 100 Hz, 50 Hz etc., but can also be used with events that are published infrequently or when values change. In the TMT control system, events may be created as the output of a calculation by one component for the input to a calculation in one or more other components. Demand events often consist of events that are published at a specific rate.

The Event Service provides an API that allows events to be published and also allows clients to subscribe and unsubscribe to specific events and call developer code when events are received.

Event Service also stores the most recent published event for every unique event by prefix and name. This is useful for publishing components when an event contains state information that changes. Other components needing to check the state can do so without the overhead of subscribing.

Dependencies

If you already have a dependency on csw-framework in your build.sbt, then you can skip this as csw-framework depends on csw-event-client Otherwise add below dependency in your build.sbt

sbt
libraryDependencies += "com.github.tmtsoftware.csw" %% "csw-event-client" % "0.6.0-RC1"

Accessing Event Service

When you create component handlers using csw-framework as explained here, you get a handle to EventService which is created by csw-framework

Using EventService you can start publishing or subscribing to events. EventService is injected in component handlers via csw-framwework and provides the following features:

Access to defaultPublisher: Using defaultPublisher in EventService, you can publish a single event or a stream of demand events to Event Service. In most cases, you should use defaultPublisher, because you can then pass the instance of EventService in worker actors or different places in your code and call eventService.defaultPublisher to access publisher. Each EventPublisher has its own TCP connection to the Event Service. When you reuse the defaultPublisher instance of EventPublisher, all events published go through same TCP connection.

Access to defaultSubscriber: Using defaultSubscriber, you can subscribe to specific event keys. You can share defaultSubscriber in the same way as defaultPublisher by passing an instance of EventService to different parts of your code. Unlike defaultPublisher, each subscription with defaultSubscriber.subscribe creates a new TCP connection for just that subscription. This behavior is the same whether you use defaultSubscriber or makeNewSubscriber call on EventService.

Each EventSubscriber also has one TCP connection that is used to provide the latest event from the event server when subscribing and also for the explicit get calls. That means, with defaultSubscriber, you are sharing same connection for getting latest events and creating a new connection for each subscribe call. The underlying event server can handle many connections, but it is good to understand how connections are used and reused.

Creating a new Publisher or Subscriber: The makeNewPublisher API of Event Service can be used to create a new publisher which would internally create a new TCP connection to the Event Store. One of the use cases of this API could be to publish high frequency event streams in order to dedicate a separate connection to demanding streams without affecting the performance of all other low frequency (for ex. 1Hz, 20Hz etc.) event streams.

However, makeNewSubscriber API does not really have any specific use cases. Both defaultSubscriber and makeNewSubscriber APIs behave almost similar since the subscribe API of EventService itself creates a new connection for every subscription. Prefer using defaultSubscriber over makeNewSubscriber.

Usage of EventPublisher

Below examples demonstrate the usage of multiple variations of publish API.

For Single Event

This is the simplest API to publish a single event. It returns a Future which will complete successfully if the event is published or fail immediately with a PublishFailure exception if the component cannot publish the event.

Scala
{
  val publisher = eventService.defaultPublisher
  val event     = SystemEvent(componentInfo.prefix, EventName("filter_wheel"))
  publisher.publish(event)
}
Java
Event event = new SystemEvent(componentInfo.prefix(), new EventName("filter_wheel"));
eventService.defaultPublisher().publish(event);

With Generator

A generator is useful when component code needs to publish events with a specific frequency. The following example demonstrates the usage of publish API with event generator which will publish one event at each interval. eventGenerator is a function responsible for generating events. It can hold domain specific logic of generating new events based on certain conditions.

Scala
{
  val publisher        = eventService.defaultPublisher
  val baseEvent: Event = SystemEvent(componentInfo.prefix, EventName("filter_wheel"))
  val interval         = 100.millis

  // this holds the logic for event generation, could be based on some computation or current state of HCD
  def eventGenerator(): Event = baseEvent match {
    case e: SystemEvent  ⇒ e.copy(eventId = Id(), eventTime = EventTime())
    case e: ObserveEvent ⇒ e.copy(eventId = Id(), eventTime = EventTime())
  }

  publisher.publish(eventGenerator(), interval)
}
Java
private Cancellable startPublishingEvents(ComponentInfo componentInfo) {
    Event baseEvent = new SystemEvent(componentInfo.prefix(), new EventName("filter_wheel"));
    return eventService.defaultPublisher().publish(() -> eventGenerator(baseEvent), Duration.ofMillis(100));
}

// this holds the logic for event generation, could be based on some computation or current state of HCD
private Event eventGenerator(Event baseEvent) {
    // add logic here to create a new event and return the same
    return baseEvent;
}

With Event Stream

In order to publish a continuous stream of events, this stream-based API can also be used. If an infinite stream is provided, shutdown of the stream needs to be taken care by the users. (Note that streams discussed here are an Akka feature that is supported in event publisher and subscriber APIs. See Akka stream documentation.)

Scala
def onError(publishFailure: PublishFailure): Unit =
  log.error(s"Publish failed for event: [${publishFailure.event}]", ex = publishFailure.cause)

val publisher = eventService.defaultPublisher
val eventStream: Source[Event, Future[Done]] = Source(1 to n)
  .map(id ⇒ makeEvent(id, componentInfo.prefix, EventName("filter_wheel")))
  .watchTermination()(Keep.right)

publisher.publish(eventStream, failure ⇒ onError(failure))
Java
Source<Event, CompletionStage<Done>> eventStream = Source
        .range(1, n)
        .map(id -> makeEvent(id, componentInfo.prefix(), new EventName("filter_wheel")))
        .watchTermination(Keep.right());

return eventService.defaultPublisher().<CompletionStage<Done>>publish(eventStream, failure -> { /*do something*/ });

This API also demonstrates the usage of onError callback which can be used to be alerted to and handle events that failed while being published. The eventGenerator API showed just above also demonstrates the use of the onError callback.

You can find complete list of APIs supported by EventPublisher and IEventPublisher with detailed description of each API here:

Usage of EventSubscriber

The EventSubscriber API has several options available that are useful in different situations. Examples below demonstrate the usage of multiple variations available in the subscribe API.

With Callback

The example shown below takes a set of event keys to subscribe to and a callback function which will be called on each event received by the event stream. This is the simplest and most commonly used API. The example below uses an inline function, but that is not necessary.

Scala
{
  val subscriber = eventService.defaultSubscriber

  subscriber.subscribeCallback(
    Set(EventKey(hcd.prefix, EventName("filter_wheel"))),
    event ⇒ { /*do something*/ }
  )
}
Java

IEventSubscriber subscriber = eventService.defaultSubscriber(); EventKey filterWheelEventKey = new EventKey(hcdLocation.prefix(), new EventName("filter_wheel")); return subscriber.subscribeCallback(Collections.singleton(filterWheelEventKey), event -> { /*do something*/ });

With Asynchronous Callback

The above example will run into concurrency issues if the callback has an shared state or asynchronous behavior. To avoid that use the following API which will give the guarantee of ordered execution of these asynchronous callbacks. In this case, no further processing occurs until the Future completes.

Scala
def subscribe(): EventSubscription = {
  val subscriber = eventService.defaultSubscriber
  subscriber.subscribeAsync(Set(EventKey(hcd.prefix, EventName("filter_wheel"))), callback)
}

private def callback(event: Event): Future[String] = {
  /* do something */
  Future.successful("some value")
}
Java
public IEventSubscription subscribe() {
    IEventSubscriber subscriber = eventService.defaultSubscriber();

    EventKey filterWheelEventKey = new EventKey(hcdLocation.prefix(), new EventName("filter_wheel"));
    return subscriber.subscribeAsync(Collections.singleton(filterWheelEventKey), this::callback);
}

private CompletableFuture<String> callback(Event event) {
    /* do something */
    return CompletableFuture.completedFuture("some value");
}

With ActorRef

If there is a need to mutate state on receiving each event, then it is recommended to use this API and send a message to an actor. To use this API, you have to create an actor which takes event and then you can safely keep mutable state inside this actor. In the example shown below, eventHandler is the actorRef which accepts events.

Scala
def subscribe(ctx: ActorContext[TopLevelActorMessage]): EventSubscription = {
  val subscriber                    = eventService.defaultSubscriber
  val eventHandler: ActorRef[Event] = ctx.spawnAnonymous(EventHandler.make())

  subscriber.subscribeActorRef(Set(EventKey(hcd.prefix, EventName("filter_wheel"))), eventHandler)
}

object EventHandler {
  def make(): Behavior[Event] = Behaviors.setup(ctx ⇒ new EventHandler(ctx))
}

class EventHandler(ctx: ActorContext[Event]) extends MutableBehavior[Event] {
  override def onMessage(msg: Event): Behavior[Event] = {
    // handle messages
    Behaviors.same
  }
}
Java
public IEventSubscription subscribe(ActorContext<TopLevelActorMessage> ctx) {

    IEventSubscriber subscriber = eventService.defaultSubscriber();
    ActorRef<Event> eventHandler = ctx.spawnAnonymous(new JEventHandlerFactory().make());

    EventKey filterWheelEventKey = new EventKey(hcdLocation.prefix(), new EventName("filter_wheel"));
    return subscriber.subscribeActorRef(Collections.singleton(filterWheelEventKey), eventHandler);
}

public class JEventHandlerFactory {
    public Behavior<Event> make() {
        return Behaviors.setup(JEventHandler::new);
    }
}

public class JEventHandler extends MutableBehavior<Event> {
    private ActorContext<Event> ctx;

    JEventHandler(ActorContext<Event> context) {
        ctx = context;
    }

    @Override
    public Behaviors.Receive<Event> createReceive() {
        // handle messages
        return null;
    }
}

Receive Event Stream

This API takes a set of Event keys to subscribe to and returns a Source of events (see Akka stream documentation). This API gives more control to the user to customize behavior of an event stream.

Scala
{
  val subscriber = eventService.defaultSubscriber

  subscriber
    .subscribe(Set(EventKey(hcd.prefix, EventName("filter_wheel"))))
    .to(Sink.foreach { event => /*do something*/
    })
    .run()
}
Java

IEventSubscriber subscriber = eventService.defaultSubscriber(); EventKey filterWheelEventKey = new EventKey(hcdLocation.prefix(), new EventName("filter_wheel")); return subscriber.subscribe(Collections.singleton(filterWheelEventKey)).to(Sink.foreach(event -> { /*do something*/ })).run(mat);

Controlling Subscription Rate

In all the examples shown above, events are received by the subscriber as soon as they are published. There will be scenarios where you would like to control the rate of events received by your code. For instance, slow subscribers can receive events at their own specified speed rather than being overloaded with events to catch up with the publisher’s speed.

All the APIs in EventSubscriber can be provided with interval and SubscriptionMode to control the subscription rate. Following example demonstrates this with the subscribeCallback API.

Scala
{
  val subscriber = eventService.defaultSubscriber

  subscriber.subscribeCallback(
    Set(EventKey(hcd.prefix, EventName("filter_wheel"))),
    event ⇒ { /*do something*/ },
    1.seconds,
    SubscriptionModes.RateAdapterMode
  )
}
Java

IEventSubscriber subscriber = eventService.defaultSubscriber(); EventKey filterWheelEventKey = new EventKey(hcdLocation.prefix(), new EventName("filter_wheel")); return subscriber.subscribeCallback(Collections.singleton(filterWheelEventKey), event -> { /* do something*/ }, Duration.ofMillis(1000), SubscriptionModes.jRateAdapterMode());

There are two types of Subscription modes:

  • RateAdapterMode which ensures that an event is received exactly at each tick of the specified interval.
  • RateLimiterMode which ensures that events are received as they are published along with the guarantee that no more than one event is delivered within a given interval.

Read more about Subscription Mode here

Pattern Subscription

The following example demonstrates the usage of pattern subscribe API with callback. Events with keys that match the specified pattern and belong to the given subsystem are received by the subscriber. The callback function provided is called on each event received.

Scala
{
  val subscriber = eventService.defaultSubscriber
  subscriber.pSubscribeCallback(subsystem, "*", event ⇒ { /*do something*/ })
}
Java

IEventSubscriber subscriber = eventService.defaultSubscriber(); subscriber.pSubscribeCallback(subsystem, "*", event -> { /* do something*/ });
Warning

DO NOT include subsystem in the provided pattern. Final pattern generated will be provided pattern prepended with subsystem. For Ex. pSubscribe(Subsytem.WFOS, *) will subscribe to event keys matching pattern : wfos.*

Warning

The pattern-based subscribe API is provided because it is useful in testing, but should not be used in production code. The use of certain patterns and many pattern-based subscriptions can impact the overall-performance of the Event Service.

Event Subscription

On subscription to event keys, you receive an EventSubscription which provides following APIs:

  • unsubscribe: On un-subscribing, the event stream is destroyed and the connection created to event server while subscription is released.

  • ready: check if event subscription is successful or not.

You can find complete list of API’s supported by EventSubscriber and IEventSubscriber with detailed description of each API here:

Create Event Service

If you are not using csw-framework, you can create EventService using EventServiceFactory.

Scala
// create event service using location service
val eventService1: EventService = new EventServiceFactory().make(locationService)

// create event service using host and port of event server.
val eventService2: EventService = new EventServiceFactory().make("localhost", 26379)
Java
// create event service using host and port of event server.
IEventService eventService1 = new EventServiceFactory().jMake(locationService, actorSystem);

// create event service using host and port of event server.
IEventService eventService2 = new EventServiceFactory().jMake("localhost", 26379, actorSystem);

The provided implementation of Event Service is backed up by Redis. The above example demonstrates creation of Event Service with default Redis client options. You can optionally supply a RedisClient to the EventStore from outside which allows you to customize the behaviour of RedisClient used by Event Service, which in most often be required in test scope only.

RedisClient is an expensive resource. Reuse this instance as much as possible.

Note that it is the responsibility of consumer of this API to shutdown Redis Client when it is no longer in use.

Scala

val clientOptions = ClientOptions.builder().disconnectedBehavior(DisconnectedBehavior.REJECT_COMMANDS).build val redisClient = RedisClient.create() redisClient.setOptions(clientOptions) // create event service using location service val eventService1: EventService = new EventServiceFactory(RedisStore(redisClient)).make(locationService) // create event service using host and port of event server. val eventService2: EventService = new EventServiceFactory(RedisStore(redisClient)).make("localhost", 26379)
Java
ClientOptions clientOptions = ClientOptions.builder().disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS).build();
RedisClient redisClient   = RedisClient.create();
redisClient.setOptions(clientOptions);

EventStores.RedisStore redisStore = new EventStores.RedisStore(redisClient);
// create event service using location service
IEventService eventService1 = new EventServiceFactory(redisStore).jMake(locationService, actorSystem);

// create event service using host and port of event server.
IEventService eventService2 = new EventServiceFactory(redisStore).jMake("localhost", 26379, actorSystem);