Component Handlers
A component developer creates a Top Level Actor (TLA) by inheriting from an abstract class ComponentHandlers or JComponentHandlers for Scala or Java, respectively. Each of these abstract classes provides several handler methods that can be overridden by the developer to provide component-specific code as described below.
Component Lifecycle
For each component, the CSW framework creates a Supervisor
that creates the TLA, and along with the abstract behavior class provided by the framework, it starts up and initializes the component in a standardized way. At the conclusion of the startup of the component, it is ready to receive commands from the outside world. The following figure is used to describe the startup lifecycle interactions between the framework and the TLA.
initialize
As described in Creating a Component, a Supervisor
is created based on the contents of the ComponentInfo file. The figure shows that the Supervisor in the framework creates the specified TLA. Once the TLA is created, the framework calls the initialize
handler. This is the opportunity for the component to perform any initialization needed before it is ready to receive commands.
The implementation of the initialize
handler is up to the developer. A common task will be for the component to fetch a configuration from the Configuration Service. It may also determine the location of components or services it needs from the Location Service.
The TLA indicates a successful initialize
by returning normally. If it cannot initialize, the handler should throw an exception, which will be caught and logged. The Supervisor will retry the creation and initialization of the TLA three times. If it fails after three times, the Supervisor will log a message and stop.
When initialize
succeeds, the Supervisor in the framework and the component itself enter the Running state. When in the Running state, commands received from outside the component are passed to the TLA (see below).
- Assembly/Scala
-
override def initialize(): Future[Unit] = async { // Initialization could include following steps : // 1. fetch config (preferably from configuration service) val calculationConfig = await(getAssemblyConfig) // 2. create a worker actor which is used by this assembly val worker: ActorRef[WorkerActorMsg] = ctx.spawnAnonymous(WorkerActor.behavior(calculationConfig)) // 3. find a Hcd connection from the connections provided in componentInfo val maybeConnection = componentInfo.connections.find(connection => connection.componentId.componentType == ComponentType.HCD) // 4. If an Hcd is found as a connection, resolve its location from location service and create other // required worker actors required by this assembly maybeConnection match { case Some(_) => resolveHcd().map { case Some(hcd) => runningHcds = runningHcds.updated(maybeConnection.get, Some(CommandServiceFactory.make(hcd)(ctx.system))) diagnosticsPublisher = ctx.spawnAnonymous(DiagnosticsPublisher.behavior(runningHcds(maybeConnection.get).get, worker)) commandHandler = ctx.spawnAnonymous(CommandHandler.behavior(calculationConfig, runningHcds(maybeConnection.get))) case None => // do something } case None => Future.successful(()) } }
- Assembly/Java
-
@Override public CompletableFuture<Void> jInitialize() { // fetch config (preferably from configuration service) CompletableFuture<ConfigData> configDataCompletableFuture = getAssemblyConfig(); // create a worker actor which is used by this assembly CompletableFuture<ActorRef<WorkerActorMsg>> worker = configDataCompletableFuture.thenApply(config -> ctx.spawnAnonymous(WorkerActor.behavior(config))); // find a Hcd connection from the connections provided in componentInfo Optional<Connection> mayBeConnection = componentInfo.getConnections().stream() .filter(connection -> connection.componentId().componentType() == JComponentType.HCD) .findFirst(); // If an Hcd is found as a connection, resolve its location from location service and create other // required worker actors required by this assembly, also subscribe to HCD's filter wheel event stream return mayBeConnection.map(connection -> worker.thenAcceptBoth(resolveHcd(), (workerActor, hcdLocation) -> { if (!hcdLocation.isPresent()) throw new HcdNotFoundException(); else { runningHcds.put(connection, Optional.of(CommandServiceFactory.jMake(hcdLocation.orElseThrow(), ctx.getSystem()))); } diagnosticPublisher = ctx.spawnAnonymous(JDiagnosticsPublisher.behavior(CommandServiceFactory.jMake(hcdLocation.orElseThrow(), ctx.getSystem()), workerActor)); })).orElseThrow(); }
- Hcd/Scala
-
override def initialize(): Future[Unit] = async { // fetch config (preferably from configuration service) val hcdConfig = await(getHcdConfig) // create a worker actor which is used by this hcd val worker: ActorRef[WorkerActorMsg] = ctx.spawnAnonymous(WorkerActor.behavior(hcdConfig)) // initialise some state by using the worker actor created above current = await(worker ? InitialState) stats = await(worker ? GetStatistics) }
- Hcd/Java
-
@Override public CompletableFuture<Void> jInitialize() { // fetch config (preferably from configuration service) getConfig().thenAccept(config -> hcdConfig = config); // create a worker actor which is used by this hcd worker = ctx.spawnAnonymous(WorkerActor.behavior(hcdConfig)); // initialise some state by using the worker actor created above CompletionStage<Integer> askCurrent = AskPattern.ask(worker, WorkerActorMsgs.JInitialState::new, Duration.ofSeconds(5), ctx.getSystem().scheduler()); CompletableFuture<Void> currentFuture = askCurrent.thenAccept(c -> current = c).toCompletableFuture(); CompletionStage<Integer> askStats = AskPattern.ask(worker, WorkerActorMsgs.JInitialState::new, Duration.ofSeconds(5), ctx.getSystem().scheduler()); CompletableFuture<Void> statsFuture = askStats.thenAccept(s -> stats = s).toCompletableFuture(); return CompletableFuture.allOf(currentFuture, statsFuture); }
Creation Timeout
The Supervisor
waits for the initialize
to complete. If it times out, it will retry the creation of the TLA 3 times in the same way as with initialize failures. The timeout value is configurable by the TLA by setting the initializeTimeout
value in ComponentInfo.
Location Service Interactions
Once the Supervisor and TLA are in the Running state, the Supervisor registers the component with the Location Service. This allows the component to be located so it can be contacted. Registration with Location Service happens only if locationServiceUsage in ComponentInfo is not set to DoNotRegister
.
If the component has connections and locationServiceUsage in ComponentInfo is set to RegisterAndTrackServices
, the framework will resolve the components and deliver TrackingEvent
s to the TLA through the onTrackingEvent
onTrackingEvent
handler.
Shutting Down
A component may be shutdown by an external administrative program whether it is deployed in a container or standalone. Shutting down may occur when the component is in the Running
state, either online
or offline
(see below).
onShutdown
The TLA provides a handler called onShutdown
that is called by the Supervisor when shutting down to give the TLA an opportunity to perform any clean up it may require, such as freeing resources.
As with initialize
, there is a timeout that the framework will wait for the component to return from onShutdown
. This is currently set to 10 seconds and cannot be overridden. If it does not return, it is assumed that the TLA is damaged and the TLA is destroyed immediately. After a successful return from onShutdown
, the Supervisor deletes the component.
- Assembly/Scala
-
override def onShutdown(): Future[Unit] = async { // clean up resources }
- Assembly/Java
-
@Override public CompletableFuture<Void> jOnShutdown() { // clean up resources return new CompletableFuture<>(); }
- Hcd/Scala
-
override def onShutdown(): Future[Unit] = async { // clean up resources }
- Hcd/Java
-
@Override public CompletableFuture<Void> jOnShutdown() { return CompletableFuture.runAsync(() -> { // clean up resources }); }
Restarting
A component may be restarted by an external administrative program whether it is deployed in a container or standalone. A restart may occur when the component is in the Running
state, either online
or offline
(see below).
A restart
causes the component to be destroyed and re-created with a new TLA. The onShutdown
handler is called to allow the component to tidy up before it is destroyed. Then the Supervisor creates a new TLA and the startup proceeds as with initialize
above.
Component Online and Offline
Online
describes a component that is currently part of the observing system that is in use. When a component enters the Running state it is also “online”.
A component is offline
when it is operating and available for active observing but is not currently in use.
If a component is to transition from the online state to the offline state, the onGoOffLine
handler is called. The component should make any changes in its operation for offline use.
If a component is to transition from the offline state to the online state, the onGoOnline
handler is called. The component should make any changes in its operation needed for online use.
Unless implemented by the developer, there is no fundamental difference in the inherent behavior of a component when in either state. These two states provide a standard way for code to be implemented via these handlers for the transition from one state to another, allowing the component to prepare itself to be online (ready for operations) or offline (stowed or dormant). Any call to transition to a online/offline state when the component is already in that state is a no op.
isOnline
A component has access to the isOnline
boolean flag, which can be used to determine if the component is in the online or offline state.
onGoOffline
A component can be notified to run in offline mode in case it is not in use. The component can change its behavior if needed as a part of this handler.
- Assembly/Scala
-
override def onGoOffline(): Unit = { // do something when going offline }
- Assembly/Java
-
@Override public void onGoOffline() { // do something when going offline }
- Hcd/Scala
-
override def onGoOffline(): Unit = { // do something when going offline }
- Hcd/Java
-
@Override public void onGoOffline() { // do something when going offline }
onGoOnline
A component can be notified to run in online mode again in case it was put to run in offline mode. The component can change its behavior if needed as a part of this handler.
- Assembly/Scala
-
override def onGoOnline(): Unit = { // do something when going online }
- Assembly/Java
-
@Override public void onGoOnline() { // do something when going online }
- Hcd/Scala
-
override def onGoOnline(): Unit = { // do something when going online }
- Hcd/Java
-
@Override public void onGoOnline() { // do something when going online }
Handling commands
The remaining handlers are associated with handling incoming commands. There is a handler for submit commands called onSubmit
and a handler for oneway called onOneway
.
This section gives an introduction to the command handlers. For more information on how to send and monitor commands, see the Communication using Commands page.
validateCommand
The validateCommand
handler allows the component to inspect a command and its parameters to determine if the actions related to the command can be executed or started. If it is okay, an Accepted
response is returned. If not, Invalid
is returned. Validation may also take into consideration the state of the component. For instance, if an Assembly or HCD can only handle one command at a time, validateCommand
should return an return Invalid
if a second command is received.
The handler is called whenever a command is sent as a Submit
or Oneway
message to the component. If the handler returns Accepted
, the corresponding onSubmit
or onOneway
handler is called. This handler can also be called when the Command Service method validateCommand
is used, to preview the acceptance of a command before it is sent using submit
or oneway
. In this case, the onSubmit
or onOneway
handler is not called.
- Assembly/Scala
-
override def validateCommand(controlCommand: ControlCommand): ValidateCommandResponse = controlCommand match { case _: Setup => Accepted(controlCommand.runId) // validation for setup goes here case _: Observe => Accepted(controlCommand.runId) // validation for observe goes here }
- Assembly/Java
-
@Override public CommandResponse.ValidateCommandResponse validateCommand(ControlCommand controlCommand) { if (controlCommand instanceof Setup) { // validation for setup goes here return new CommandResponse.Accepted(controlCommand.runId()); } else if (controlCommand instanceof Observe) { // validation for observe goes here return new CommandResponse.Accepted(controlCommand.runId()); } else { return new CommandResponse.Invalid(controlCommand.runId(), new CommandIssue.AssemblyBusyIssue("Command not supported")); } }
- Hcd/Scala
-
override def validateCommand(controlCommand: ControlCommand): ValidateCommandResponse = controlCommand match { case _: Setup => Accepted(controlCommand.runId) // validation for setup goes here case _: Observe => Accepted(controlCommand.runId) // validation for observe goes here }
- Hcd/Java
-
@Override public CommandResponse.ValidateCommandResponse validateCommand(ControlCommand controlCommand) { if (controlCommand instanceof Setup) { // validation for setup goes here return new CommandResponse.Accepted(controlCommand.runId()); } else if (controlCommand instanceof Observe) { // validation for observe goes here return new CommandResponse.Accepted(controlCommand.runId()); } else { return new CommandResponse.Invalid(controlCommand.runId(), new CommandIssue.UnsupportedCommandIssue(controlCommand.commandName().name())); } }
onSubmit
On receiving a command sent using the submit
message, the onSubmit
handler is invoked for a component only if the validateCommand
handler returns Accepted
. The onSubmit
handler returns a SubmitResponse
indicating if the command is completed immediately, or if it is long-running by returning a Started
response. Completion of a long running command is then tracked using the CommandResponseManager
, described in more detail in the Managing Command State page.
The example shows one way to process Setup
and Observe
commands separately.
- Assembly/Scala
-
override def onSubmit(controlCommand: ControlCommand): SubmitResponse = controlCommand match { case setup: Setup => submitSetup(setup) // includes logic to handle Submit with Setup config command case observe: Observe => submitObserve(observe) // includes logic to handle Submit with Observe config command }
- Assembly/Java
-
@Override public CommandResponse.SubmitResponse onSubmit(ControlCommand controlCommand) { if (controlCommand instanceof Setup) return submitSetup((Setup) controlCommand); // includes logic to handle Submit with Setup config command else if (controlCommand instanceof Observe) return submitObserve((Observe) controlCommand); // includes logic to handle Submit with Observe config command else return new CommandResponse.Error(controlCommand.runId(), "Submitted command not supported: " + controlCommand.commandName().name()); }
onOneway
On receiving a command as oneway
, the onOneway
handler is invoked for a component only if the validateCommand
handler returns Accepted
. The onOneway
handler does not return a value and a command submitted with the oneway
does not track completion of actions.
- Assembly/Scala
-
override def onOneway(controlCommand: ControlCommand): Unit = controlCommand match { case setup: Setup => onewaySetup(setup) // includes logic to handle Oneway with Setup config command case observe: Observe => onewayObserve(observe) // includes logic to handle Oneway with Observe config command }
- Assembly/Java
-
@Override public void onOneway(ControlCommand controlCommand) { if (controlCommand instanceof Setup) onewaySetup((Setup) controlCommand); // includes logic to handle Oneway with Setup config command else if (controlCommand instanceof Observe) onewayObserve((Observe) controlCommand); // includes logic to handle Oneway with Observe config command }