Creating a Component
This walkthrough helps in creating a CSW component in Scala/Java. CSW components depend on the csw-framework
package, which can be found here. This section discusses constructing a HCD, but the principles apply to an Assembly as well. We will be constructing the Assembly in the next section Working with Multiple Components.
Tutorial: Developing an HCD
This tutorial shows code written in Scala and Java, based on code generated by the giter8 templates with the default values.
Anatomy of Component
A component consists of a supervisor actor, a Top Level Actor, a component handler and one or more worker actors. From all these, csw-framework
provides supervisor actor, a Top Level Actor and abstract class of handlers. Component developers are expected to implement this handler which also acts as a gateway from framework to component code.
Supervisor
A Supervisor actor is the actor first started for any component. The main responsibilities that supervisor performs is as follows:
- Implement and manage the component lifecycle for the TLA and for the rest of the system (see Lifecycle below).
- Register itself with location service.
- Provide an administrative interface to the component to the rest of the system. For instance, the Container can perform certain administrative communication with the Supervisor such as restart or shutdown.
- Allow components outside of the Supervisor and TLA to monitor the lifecycle state of the TLA. This is particularly useful for testing. The test needs to know that the component is ready before it starts its test actions.
Because the Supervisor registers itself with location service, it serves as the gateway for all incoming communications from external components/entities.
The source code of supervisor actor can be found here
Top level actor
While the Supervisor works as the external interface for the component and the manager of Lifecycle, the functional implementation of a component is implemented in a Top Level Actor (TLA), spawned by supervisor actor for any component. However, the developer is not expected to implement TLA code directly. Instead, the functionality of the TLA is added by implementing the ComponentHandlers
abstract class, consisting of a list of a methods, or hooks
, called by the TLA during specific lifecycle and command events (see Handlers). The ComponentHandlers
implementation is specified during constructing using that factory (see Constructing The Component)
The source code of the Top Level Actor can be found here.
Handlers
The following hooks should be overridden in your ComponentHandlers implementation class:
initialize
: called when component is starting up, prior to be put into the Running state.validateCommand
: called when component receives a command. (see Validation)onSubmit
: called on Submit command if validateCommand returnsAccepted
.onOneway
: called on Oneway command if validateCommand returnsAccepted
.onGoOffline
: called when component receives external message from an administrative client to go offline.onGoOnline
: called when component receives external message from an administrative client to go online.onLocationTrackingEvent
: called when a tracked dependency changes location state. (see Tracking Dependencies)onShutdown
: called when component is shutting down.
The source code of ComponentHandlers
can be found here.
More details about handler significance and invocation can be found here
If the component developer wishes to write the handler implementation in java, then he/she needs to implement the java version of ComponentHandlers
which is JComponentHandlers
. The source code of JComponentHandlers
can be found here. Any further reference to ComponentHandlers
should implicitly also apply to JComponentHandlers
.
Tutorial: Developing an HCD
As seen in the Getting Started page, if you are using the giter8 template, handler classes for both the HCD and Assembly are written for you, with handler implementations stubbed out. We will walkthough filling them in below.
Constructing the Component
After writing the handlers, component developer needs to wire it up with framework. In order to do this, developer needs to implement a ComponentBehaviorFactory
. This factory should to be configured in configuration file for the component (see example below). The csw-framework
then picks up the full path of ComponentBehaviorFactory
from configuration file and spawns the component handlers using this factory as a process of booting a component. The factory is instantiated using java reflection.
Additional sample code to implement the ComponentBehaviorFactory
can be found here
Tutorial: Developing an HCD
As seen in the Getting Started page, if using the template, this factory class will be implemented for you.
Component Configuration (ComponentInfo)
Component configuration contains details needed to spawn a component. This configuration resides in a configuration file for a particular component. The template creates one for our sample HCD as follows:
- Scala
-
name = "SampleHcd" componentType = hcd behaviorFactoryClassName = "nfiraos.samplehcd.SampleHcdBehaviorFactory" prefix = "nfiraos.samplehcd" locationServiceUsage = RegisterOnly
- Java
-
name = "JSampleHcd" componentType = hcd behaviorFactoryClassName = "nfiraos.samplehcd.JSampleHcdBehaviorFactory" prefix = "nfiraos.samplehcd" locationServiceUsage = RegisterOnly
behaviorFactoryClassName
refers to class name of the concrete implementation of ComponentBehaviorFactory
, which is SampleHcdBehaviorFactory
for Scala in above example, JSampleHcdBehaviorFactory
for Java.
The name
and componentType
is used to create the ComponentId
representing a unique component in location service.
The locationServiceUsage
is used by the Supervisor actor to decide whether to only register a component with location service or register and track other components.
The configuration file is parsed to a ComponentInfo
object and injected in the Supervisor actor. It is then injected in ComponentHandlers
while spawning a component.
The configuration can also contain a list of components and services it wishes to track as dependencies. See Tracking Dependencies.
More details about ComponentInfo
can be found here.
An additional sample configuration file can be found here.
Lifecycle
The Supervisor of a component manages its lifecycle state, which can be one of the following:
- Idle
- Running
- RunningOffline
- Restart
- Shutdown
- Lock
The state the component is in dictates the actions it can take when it receives a message or command, and how those actions are carried out.
Idle
The component initializes in the idle state. Top level actor calls the initialize
hook of ComponentHandlers
as first thing on boot-up. Component developers write their initialization logic in this hook. The logic could also do things like accessing the configuration service to fetch the hardware configurations to set the hardware to default positions.
After the initialization, if the component would have configured RegisterAndTrack
for locationServiceUsage
then the Top Level Actor will start tracking the connections
configured for that component. This use case is mostly applicable for Sequencers and Assemblies. HCDs mostly will have RegisterOnly
configured for locationServiceUsage
.
The Supervisor actor will now register itself with location service. Registering with location service will notify other components tracking this component with a LocationUpdated
event containing a Location
with a reference to the Supervisor actor.
After successful registration, the component will transition to Running
state.
Running
When the supervisor actor receives Initialized
message from the Top Level Actor after successful initialization, it registers itself with location service and transitions the component to Running
state. Running state signifies that the component is accessible via location service, which allows other entities to communicate with it by sending commands via messages. Any commands received by supervisor actor will be forwarded to the Top Level Actor for processing.
RunningOffline
When the Supervisor actor receives GoOffline
message, it transitions the component to RunningOffline
state and forwards it to the Top Level Actor. The Top Level Actor then calls onGoOffline
hook of ComponentHandlers
.
If GoOnline
message is received by the Supervisor actor then it transits the component back to Running
state and forwards it to the Top Level Actor. The Top Level Actor then calls onGoOnline
hook of ComponentHandlkers
.
In RunningOffline
state, if any command is received, it is forwarded to underlying component hook through the Top Level Actor. It is then the responsibility of the component developer to check the isOnline
flag provided by csw-framework
and process the command accordingly.
Restart
When the Supervisor actor receives a Restart
message, it will transit the component to the Restart
state. Then, it will unregister itself from location service so that other components tracking this component will be notified and no commands are received while restart is in progress.
Then, the Top Level Actor is stopped and postStop hook of the Top Level Actor will call the onShutdown
hook of ComponentHandlers
. Component developers are expected to write any cleanup of resources or logic that should be executed for graceful shutdown of component in this hook.
After successful shutdown of component, the Supervisor actor will create the Top Level Actor again from scratch. This will cause the initialize
hook of ComponentHandlers
to be called again. After successful initialization of component, the Supervisor actor will register itself with location service.
Shutdown
When the Supervisor actor receives a Shutdown
message, it transitions the component to the Shutdown
state. Any commands received while shutdown is in progress will be ignored. Then, it will stop the Top Level Actor. The postStop hook of the Top Level Actor will call the onShutdown
hook of ComponentHandlers
. Component developers are expected to write any cleanup of resources or logic that should be executed for graceful shutdown of component in this hook.
Lock
When the Supervisor actor receives a Lock
message, it transitions the component to the Lock
state. Upon locking, the Supervisor will only accept the commands received from the component that locked the component and ignore all others.
In the Lock
state, messages like Shutdown
and Restart
will also be ignored. A component must first be unlocked to accept these commands.
Lock
messages are constructed with a duration value specified. When this duration expires, the component will automatically be unlocked. A component can be manually unlocked by sending an Unlock
message.
Logging
csw-framework
will provide a LoggerFactory
as dependency injection in constructor of ComponentHandlers
. The LoggerFactory
will have the component’s name predefined in it. The component developer is expected to use this factory to log statements.
Logging works much like other popular loggers such as log4j. However, with the development of log management tools such as logstash, the emphasis on log message formatting has been to write messages in JSON format, so that they can easily be ingested. Plain text writing to stdout is still supported. More details on how to use logging can be found here.
Tutorial: Developing an HCD
Let’s use logging to flesh out some of our command handlers. Add some simple log messages in the initialize
and onShutdown
hooks, and to the onLocationTrackingEvent
hook as well, although we won’t be using it for this HCD:
- Scala
-
var maybePublishingGenerator: Option[Cancellable] = None override def initialize(): Future[Unit] = { log.info("In HCD initialize") maybePublishingGenerator = Some(publishCounter()) Future.unit } override def onLocationTrackingEvent(trackingEvent: TrackingEvent): Unit = { log.debug(s"TrackingEvent received: ${trackingEvent.connection.name}") } override def onShutdown(): Future[Unit] = { log.info("HCD is shutting down") Future.unit }
- Java
-
private Optional<Cancellable> maybePublishingGenerator = Optional.empty(); @Override public CompletableFuture<Void> jInitialize() { return CompletableFuture.runAsync(() -> { log.info("In HCD initialize"); maybePublishingGenerator = Optional.of(publishCounter()); }); } @Override public void onLocationTrackingEvent(TrackingEvent trackingEvent) { log.debug(() -> "TrackingEvent received: " + trackingEvent.connection().name()); } @Override public CompletableFuture<Void> jOnShutdown() { return CompletableFuture.runAsync(() -> log.info("HCD is shutting down")); }
In the example code, you’ll notice we have added some functionality to start publishing events. We will cover the Event Service later. You can leave that code out for now.
Next we’ll add some command handling.
Receiving Commands
A command is something that carries some metadata and a set of parameters. A component sends message to other components using commands
. Various kinds of commands are as follows:
- Setup : Contains goal, command, or demand information to be used to configure the target OMOA component.
- Observe: Contains goal or demand information to be used by a detector. system. Properties and their value types will be standardized by the ESW for the ESW PDR.
- Wait: Sequencer only. Instructs a sequencer to pause until told to continue.
More details about creating commands can be found here.
Whenever a command is sent to a component it is wrapped inside a command wrapper. There are two kinds of command wrapper:
- Submit: A command is wrapped in submit when the completion result is expected from receiver component
- Oneway: A command is wrapped in oneway when the completion of command is not expected from receiver component but is determined by sender component by subscribing to receiver component’s state
Validation
When a command is received by a component, the Top Level Actor will call the validateCommand
hook of ComponentHandlers
. Component developers are expected to perform appropriate validation of command, whether it is valid to execute, and return a CommandResponse
. The CommandResponse
returned from this hook will be sent back to sender directly by csw-framework
.
The logic in validateCommand
hook can be used to handle commands of various durations. If the command can be executed immediately, then the component developer can return a final response directly in the validation step using a CompletedWithResult
command response.
This should be only used for commands that require a very small amount of time to execute. If the command will take longer, then component developer should return an intermediate response Accepted
or Invalid
specifying whether the command is valid to be executed or not, and process the command in the onSubmit
or onOneway
handlers (see Command Response).
Different types of command responses and their significance can be found here.
Tutorial: Developing an HCD
Let’s add some command validation to our HCD. For our sample HCD, we will only handle one command, sleep
, in which we will cause the HCD to sleep for the time specified in a parameter of the command. This will simulate a long running command.
Add some code to ensure the command we receive is the sleep
command, and return an Invalid
response if not. You could imagine much more checking could be added, such as checking the types and values of the parameters of our sleep
command, but we will keep it simple for our demonstration.
- Scala
-
override def validateCommand(controlCommand: ControlCommand): ValidateCommandResponse = { log.info(s"Validating command: ${controlCommand.commandName.name}") controlCommand.commandName.name match { case "sleep" => Accepted(controlCommand.runId) case x => Invalid(controlCommand.runId, CommandIssue.UnsupportedCommandIssue(s"Command $x. not supported.")) } }
- Java
-
@Override public CommandResponse.ValidateCommandResponse validateCommand(ControlCommand controlCommand) { String commandName = controlCommand.commandName().name(); log.info(() -> "Validating command: " + commandName); if (commandName.equals("sleep")) { return new CommandResponse.Accepted(controlCommand.runId()); } return new CommandResponse.Invalid(controlCommand.runId(), new CommandIssue.UnsupportedCommandIssue("Command " + commandName + ". not supported.")); }
Command Response
The response returned from validateCommand
hook of ComponentHandlers
will be received by the Top Level Actor, who then sends the response back to sender. If the response returned was Accepted
, then it either calls the onSubmit
hook or the onOneway
hook of ComponentHandlers
depending on the wrapper(submit or oneway) in which the command was received.
If the command was received as a Submit
, then the Top Level Actor adds the response returned from the validateCommand
hook in the CommandResponseManager
. If the response was Accepted
, the TLA then calls the onSubmit
hook of ComponentHandlers
.
In case the command received by a component a Oneway
, the response is not added to the CommandResponseManager
, and the onOneway
hook of ComponentHandlers
is called.
The CommandResponseManager
is responsible for managing and bookkeeping the command status of long running submit commands. The sender of the command (and any component, really) can query the command statuses or subscribe to changes in command statuses using CommandService
.
The CommandService
class provides helper methods for communicating with other components, and should be a component’s primary means of sending commands to other components. This will be described in the next tutorial section, Sending Commands.
When the onSubmit
hook is called, it is the responsibility of component developers to update the status of the received command in the CommandResponseManager
as it changes. The instance of commandResponseManager is provided in ComponentHandlers
which should be injected in any worker actor or other actor/class created for the component.
More details on methods available in CommandResponseManager
can be found here.
Tutorial: Developing an HCD
We will implement command handling in the onSubmit
hook. Note that this hook actually receives a ControlCommand
as an argument, which can be either a Setup
or an Observe
. We will use pattern matching to handle the command if it is a Setup
and forward to an onSetup
handling method. Observe
commands will be ignored.
- Scala
-
override def onSubmit(controlCommand: ControlCommand): SubmitResponse = { log.info(s"Handling command: ${controlCommand.commandName}") controlCommand match { case setupCommand: Setup => onSetup(setupCommand) case observeCommand: Observe => // implement (or not) Error(controlCommand.runId, "Observe not supported") } } def onSetup(setup: Setup): SubmitResponse = { val sleepTimeKey: Key[Long] = KeyType.LongKey.make("SleepTime") // get param from the Parameter Set in the Setup val sleepTimeParam: Parameter[Long] = setup(sleepTimeKey) // values of parameters are arrays. Get the first one (the only one in our case) using `head` method available as a convenience method on `Parameter`. val sleepTimeInMillis: Long = sleepTimeParam.head log.info(s"command payload: ${sleepTimeParam.keyName} = $sleepTimeInMillis") workerActor ! Sleep(setup.runId, sleepTimeInMillis) Started(setup.runId) }
- Java
-
@Override public CommandResponse.SubmitResponse onSubmit(ControlCommand controlCommand) { log.info(() -> "Handling command: " + controlCommand.commandName()); if (controlCommand instanceof Setup) { onSetup((Setup) controlCommand); return new CommandResponse.Started(controlCommand.runId()); } else if (controlCommand instanceof Observe) { // implement (or not) } return new CommandResponse.Error(controlCommand.runId(), "Observe command not supported"); } private void onSetup(Setup setup) { Key<Long> sleepTimeKey = JKeyType.LongKey().make("SleepTime"); // get param from the Parameter Set in the Setup Optional<Parameter<Long>> sleepTimeParamOption = setup.jGet(sleepTimeKey); // values of parameters are arrays. Get the first one (the only one in our case) using `head` method available as a convenience method on `Parameter`. if (sleepTimeParamOption.isPresent()) { Parameter<Long> sleepTimeParam = sleepTimeParamOption.get(); long sleepTimeInMillis = sleepTimeParam.head(); log.info(() -> "command payload: " + sleepTimeParam.keyName() + " = " + sleepTimeInMillis); workerActor.tell(new Sleep(setup.runId(), sleepTimeInMillis)); } }
In our example, the sleep
command has one parameter called SleepTime
. We retrieve this parameter from the Setup
by creating a Key
to this parameter using the name and type, and then calling an apply
method on the Setup
(the setup(longkey)
shorthand) which finds the matching Parameter
in the Setup
’s ParameterSet
(use the Setup.jget()
method in Java). By doing this, the Parameter
is returned with the proper typing, and so the values retrieved from the Parameter
are typed as well. Note, all values are stored as an array, so we get our single value for SleepTime
by using the head
method available as a convenience method on ParameterSet
.
At this point, to prevent our HCD from blocking while handling the command, we pass it off to a worker actor, which we will specify somewhere in this class. This could be defined in a separate class, but writing it as an internal class allows us to use the logging facility and CommandResponseManager
without having to inject them into our new Actor class.
- Scala
-
sealed trait WorkerCommand case class Sleep(runId: Id, timeInMillis: Long) extends WorkerCommand private val workerActor = ctx.spawn( Behaviors.receiveMessage[WorkerCommand](msg => { msg match { case sleep: Sleep => log.trace(s"WorkerActor received sleep command with time of ${sleep.timeInMillis} ms") // simulate long running command Thread.sleep(sleep.timeInMillis) commandResponseManager.addOrUpdateCommand(sleep.runId, CommandResponse.Completed(sleep.runId)) case _ => log.error("Unsupported message type") } Behaviors.same }), "WorkerActor" )
- Java
-
private interface WorkerCommand { } private static final class Sleep implements WorkerCommand { private final Id runId; private final long timeInMillis; private Sleep(Id runId, long timeInMillis) { this.runId = runId; this.timeInMillis = timeInMillis; } } private ActorRef<WorkerCommand> createWorkerActor() { return actorContext.spawn( Behaviors.receiveMessage(msg -> { if (msg instanceof Sleep) { Sleep sleep = (Sleep) msg; log.trace(() -> "WorkerActor received sleep command with time of " + sleep.timeInMillis + " ms"); // simulate long running command Thread.sleep(sleep.timeInMillis); commandResponseManager.addOrUpdateCommand(sleep.runId, new CommandResponse.Completed(sleep.runId)); } else { log.error("Unsupported message type"); } return Behaviors.same(); }), "WorkerActor" ); }
This worker actor simply takes the time passed in the message, sleeps that amount, and then updates the CommandResponseManager
that the command is complete.
If the component developer creates new set of messages for worker actor then it is mandatory that those messages extend TMTSerializable
which will enable messages to serialize on wire.
Events
CSW Events
have a similar structure to commands, in that along with a name and a prefix (used to represent the source of the event), they include data represented in the Event
in a set of parameters.
More details about events can be found here.
Access to the Event Service is passed in to the handlers class in the constructor. The Event Service provides a factory method to create a “default” publisher and subscriber, which can be accessed in various parts of your code to reuse a single connection to the service. In most cases, reusing this connection will provide the performance needed.
But if you prefer to create new connections, custom publishers and subscribers can be constructed. See the manual on the Event Service for more information.
Publishers have an API that allows the publishing of a single event, a stream of events, or periodic events created by an EventGenerator
, which is simply a function that returns an event.
Tutorial: Developing an HCD
Let’s add a publisher to our component. We will use the default publisher that will periodically publish events generated by an EventGenerator
.
- Scala
-
private def publishCounter(): Cancellable = { var counter = 0 def incrementCounterEvent() = { counter += 1 val param: Parameter[Int] = KeyType.IntKey.make("counter").set(counter) SystemEvent(componentInfo.prefix, EventName("HcdCounter")).add(param) } log.info("Starting publish stream.") eventService.defaultPublisher.publish(incrementCounterEvent(), 5.second, err => log.error(err.getMessage, ex = err)) } private def stopPublishingGenerator(): Unit = { log.info("Stopping publish stream") maybePublishingGenerator.foreach(_.cancel) }
- Java
-
private int counter = 0; private Event incrementCounterEvent() { counter += 1; Parameter<Integer> param = JKeyType.IntKey().make("counter").set(counter); return new SystemEvent(componentInfo.prefix(), new EventName("HcdCounter")).add(param); } private Cancellable publishCounter() { log.info("Starting publish stream."); return eventService.defaultPublisher().publish(this::incrementCounterEvent, Duration.ofSeconds(5)); } private void stopPublishingGenerator() { log.info("Stopping publish stream"); maybePublishingGenerator.ifPresent(Cancellable::cancel); }
In Java, you must save a reference to the IEventService
passed into the constructor as a member variable. We use the name eventService
(not shown here).
We encapsulate the starting of the publishing in our method publishCounter
. Our EventGenerator
is the incrementCounterEvent
method which increments our integer variable counter
and stores it in the ParameterSet
of a new SystemEvent
and returns it. Once our defaultPublisher
is resolved, we pass in a reference to incrementCounterEvent
and specify a period of 5 seconds. We log a message when publishing the event so that it can be observed when running the component.
The publish
method returns a Cancellable
type in a future. When the publishing is set up, the Cancellable
can be used to stop the event generator. We demonstrate its usage in the stopPublishingGenerator
method, although this method is not called in our tutorial.
We will start this publishing when our component initializes, so we return to our initialize
method and add a call to our publishCounter
method. We save a reference to the Cancellable
object for future use in our stopPublishingGenerator
method.
- Scala
-
var maybePublishingGenerator: Option[Cancellable] = None override def initialize(): Future[Unit] = { log.info("In HCD initialize") maybePublishingGenerator = Some(publishCounter()) Future.unit } override def onLocationTrackingEvent(trackingEvent: TrackingEvent): Unit = { log.debug(s"TrackingEvent received: ${trackingEvent.connection.name}") } override def onShutdown(): Future[Unit] = { log.info("HCD is shutting down") Future.unit }
- Java
-
private Optional<Cancellable> maybePublishingGenerator = Optional.empty(); @Override public CompletableFuture<Void> jInitialize() { return CompletableFuture.runAsync(() -> { log.info("In HCD initialize"); maybePublishingGenerator = Optional.of(publishCounter()); }); } @Override public void onLocationTrackingEvent(TrackingEvent trackingEvent) { log.debug(() -> "TrackingEvent received: " + trackingEvent.connection().name()); } @Override public CompletableFuture<Void> jOnShutdown() { return CompletableFuture.runAsync(() -> log.info("HCD is shutting down")); }
Building and Running component in standalone mode
Once the component is ready, it is started using the ContainerCmd
object in standalone mode. The details about starting the ContainerCmd
in standalone mode can be found here.
There are various ways to build and run the project. A simple way during development is to to use sbt to run it. The sbt command runMain
can be used to specify an application with a main method and run it with arguments specified at the command line. When this command is executed, sbt will take care of any downloading of dependencies, compiling, or building necessary to run your application.
Our template includes a wrapper application around ContainerCmd that we can use in the deployment module. To run our HCD in standalone mode, go to the project root directory and type sbt "<deploy-module>/runMain <mainClass> --local --standalone <path-to-config-file>"
, where
<deploy-module>
is the name of the deployment module created by the template (sample-deploy
if using defaults)<mainClass>
is the full class name of our ContainerCmd application, which the template names<prefix>.<name>deploy.<Name>ContainerCmdApp
. If you accept the defaults for the template, it will benfiraos.sampledeploy.SampleContainerCmdApp
. If you are having problems determining the class name, usesbt run
and it will prompt you the possibilities.<path-to-config-file>
is the filename, which can be an absolute path or relative to the directory of the deployment module. If using defaults, this would besrc/main/resources/SampleHcdStandalone.conf
for Scala, andsrc/main/resources/JSampleHcdStandalone.conf
for Java.
So if using the template defaults, the full command would be
- Scala
-
sbt "sample-deploy/runMain nfiraos.sampledeploy.SampleContainerCmdApp --local --standalone src/main/resources/SampleHcdStandalone.conf"
- Java
-
sbt "sample-deploy/runMain nfiraos.sampledeploy.SampleContainerCmdApp --local --standalone src/main/resources/JSampleHcdStandalone.conf"
To run the component using the deployment package, perform the following steps:
- Run
sbt <project>/universal:packageBin
, where<project>
is your deployment module (e.g.sample-deploy
). This will create self contained zip in<project>/target/universal
directory - Unzip generated zip file and enter into bin directory
- Run the
./<project>-cmd-app --local --standalone <path-to-local-config-file-to-start-the-component>
Alternatively, you can run sbt stage
, which installs the application under target/universal/stage/bin.
CSW HTTP Location server must be running, and appropriate environment variables set to run apps. See CSW Location Server.