LSCS Segments Assembly
The project includes a simplified Segments Assembly. As discussed in the overview, the Segments Assembly job is to receive Assembly Setups, convert them to HCD Setups, and forward them to the Segments HCD. There are no events produced by the Assembly and no Setup commands that are handled within the Assembly itself.
Top Level Actor
The Top Level Actor for Segments Assembly can be found in the lscsComps
project in the file named SegmentsAssemblyHandlers.scala
, which contains the entire Assembly. As with all CSW Assemblies, the TLA for Segments Assembly implements handlers as needed. In this Assembly, only the validateCommand
and onSubmit
handlers are implemented.
Assembly Command Validation
The validateCommand
handler is shown below.
- Scala
-
source
/** * This is the validate handler. This should perform all validation needed so that * the command can execute, or it should return a validation error. * Here we return an error for an Observe or pass to the Setup validation. * @param runId command runId * @param controlCommand either a Setup or Observe * @return a [ValidateCommandResponse] */ override def validateCommand(runId: Id, controlCommand: ControlCommand): ValidateCommandResponse = { controlCommand match { case setup: Setup => handleValidation(runId, setup) case observe => Invalid(runId, UnsupportedCommandIssue(s"$observe command not supported.")) } } // All Setup validation is performed here private def handleValidation(runId: Id, setup: Setup): ValidateCommandResponse = { if (Common.ALL_COMMANDS.contains(setup.commandName) || setup.commandName.equals(HcdShutdown.shutdownCommand)) { Accepted(runId) } else { Invalid( runId, CommandIssue.UnsupportedCommandIssue(s"Segment Assembly does not support the `${setup.commandName}` command.") ) } }
The validateCommand
checks that the command is a Setup
and returns an error if an Observe
is received; otherwise it passes on validation of the Setup to handleValidation
. Remember that each Setup from the outside has a Setup with the command name equal to the name of the segment command. Therefore, the validation code looks up the command name in the list of implemented commands in Common. This code is shown below:
- Scala
-
source
/** * This map is used by the Assembly to access the correct toCommand for an incoming command Setup * It maps command name to a function that returns the formatted command */ val CommandMap: Map[CommandName, Setup => String] = Map( ACTUATOR.COMMAND_NAME -> ACTUATOR.toCommand, TARG_GEN_ACT.COMMAND_NAME -> TARG_GEN_ACT.toCommand, CFG_CUR_LOOP.COMMAND_NAME -> CFG_CUR_LOOP.toCommand, CFG_ACT_VC.COMMAND_NAME -> CFG_ACT_VC.toCommand, CFG_ACT_OFFLD.COMMAND_NAME -> CFG_ACT_OFFLD.toCommand, CFG_ACT_SNUB.COMMAND_NAME -> CFG_ACT_SNUB.toCommand, SET_LIMIT_ACT.COMMAND_NAME -> SET_LIMIT_ACT.toCommand, SET_PARAM_ACT.COMMAND_NAME -> SET_PARAM_ACT.toCommand, CAL_WH_DEADBANDWH.COMMAND_NAME -> CAL_WH_DEADBANDWH.toCommand, MOVE_WH.COMMAND_NAME -> MOVE_WH.toCommand ) // This is used by validation of Assembly and HCD to verify that the received command is currently supported. Could // be removed when all commands are supported val ALL_COMMANDS: List[CommandName] = CommandMap.keys.toList
If the command name is not present, validation fails with an UnsupportedCommandIssue. Otherwise, the command is Accepted. Validation could be improved, but we assume that if the command name is within our list, then the Setup contains all the correct parameters. An error is returned during execution if the handling fails.
Assembly Command Execution
Once validated, the onSubmit
handler is called. The submit-related code for the Segments Assembly is shown below:
- Scala
-
source
/** * The Assembly receives a Setup command with the name of the low-level command. * It transforms it into an HCD command, which is just the String command to all or one segment. * Ranges aren't yet supported. */ override def onSubmit(runId: Id, controlCommand: ControlCommand): SubmitResponse = { controlCommand match { case setup: Setup => handleSetup(runId, setup) case observe => Invalid(runId, UnsupportedCommandIssue(s"$observe command not supported.")) } } /** * Processes commands as Setups * @param runId command runId * @param assemblySetup the [[Setup]] to execute * @return [[SubmitResponse]] response from the command. All commands are started currently. */ private def handleSetup(runId: Id, assemblySetup: Setup): SubmitResponse = { assemblySetup.commandName match { case HcdShutdown.shutdownCommand => log.debug(s"Segments Assembly received shutdown request: $runId and $assemblySetup") val hcdSetup = Setup(assemblyPrefix, HcdShutdown.shutdownCommand) submitAndWaitHCD(runId, hcdSetup) map { sr => cswCtx.commandResponseManager.updateCommand(sr.withRunId(runId)) } Started(runId) case cmd => log.info(s"Segments Assembly received a command: '$cmd', runId: $runId, setup: $assemblySetup") // This simulates what the Assembly does to send to HCD - has received above Setup try { val hcdSetup: Setup = HcdDirectCommand.toHcdDirectCommand(assemblyPrefix, assemblySetup) // Assembly sends the Setup and updates submitAndWaitHCD(runId, hcdSetup) map { sr => log.info(s"Assembly command completed from HCD: $sr") cswCtx.commandResponseManager.updateCommand(sr.withRunId(runId)) } Started(runId) } catch { case _: Exception => CommandResponse.Error(runId, s"An exception was thrown while processing setup: ${assemblySetup.commandName}") } } } /** * This is a convenience routine to check the availability of HCD prior to sending * @param runId command runId * @param setup the Setup to send * @return command response as a SubmitResponse */ private def submitAndWaitHCD(runId: Id, setup: Setup): Future[SubmitResponse] = hcdCS match { case Some(cs) => // Added a delay here because segment commands take an unknown amount of time. // Can be made an implicit for all calls in file for a more complex situation with different timeouts. cs.submitAndWait(setup)(timeout = 15.seconds) case None => Future(CommandResponse.Error(runId, s"The Segment HCD is not currently available: ${hcdConnection.componentId}")) }
As before, the Observes
return an Error (even through we can not get to this code in this example.) The handleSetup
command must handle any named commands first. In this case, there is a command called shutdownCommand
that when sent to the HCD, it causes all the segment connections to be closed and the HCD shutdown. This is useful for testing.
The second case matches on any other command. In this case, the function HcdDirectCommand.toHcdDirectCommand
is called. If it does not fail (it is wrapped in a try clause), a Setup formatted for the HCD is returned and passed to the submitAndWaitHCD
function. All this is doing is calling Command Service submitAndWait, but it is checking that the CommandService instance created for the HCD is valid before sending. (This will be covered later.) It also provides a custom timeout, which has been arbitrarily set to 15 seconds for all the segments to complete any command. If this time is exceeded, the submitAndWait will time out and return an error. Note that if the Command Service is not available, because the HCD is not available, an Error is also returned.
The function HcdDirectCommand.toHcdDirectCommand is shown below:
- Scala
-
source
/** * This object contains the parameter keys for the command sent to the HCD to process a single * segment command. This is used by the segment Assembly and test code to produce an HCD Setup from * an Assembly Setup. */ case object HcdDirectCommand { val lscsDirectCommand: CommandName = CommandName("lscsDirectCommand") // This key is used to store the command to be executed val lscsCommandKey: Key[String] = KeyType.StringKey.make(name = "lscsCommand") val lscsCommandNameKey: Key[String] = KeyType.StringKey.make(name = "lscsCommandName") /** * This helper function returns a direct command Setup for the * @param assemblyPrefix prefix of the Assembly as source * @param assemblySetup the Setup received by the Assembly -contains segmentIdKey and command name * @param obsId optional ObsId, defaults to None * @return Setup ready for sending to HCD */ def toHcdDirectCommand(assemblyPrefix: Prefix, assemblySetup: Setup, obsId: Option[ObsId] = None): Setup = { val segmentIdExists = assemblySetup.exists(segmentIdKey) val segmentRangeExists = assemblySetup.exists(segmentRangeKey) // Can't go on without one of these set require(segmentIdExists || segmentRangeExists, s"Bad segment info in the Assembly Setup: ${assemblySetup.commandName}") // Convert setup to a String command - note that we know this will work because we validated val commandAsString = CommandMap(assemblySetup.commandName)(assemblySetup) // Grab the command name from the first part of require(commandAsString.nonEmpty, "The command to the HCD must not be empty, fool!") Setup(assemblyPrefix, lscsDirectCommand, obsId).madd( lscsCommandKey.set(commandAsString), lscsCommandNameKey.set(assemblySetup.commandName.name), assemblySetup(segmentIdKey) ) } }
First this function ensures that some required parameters are present in the Setup received by the Assembly. A require
will throw an IllegalArgument exception if the condition is false. It then uses the CommandMap structure from Common (shown as part of validation) to extract the toCommand
function for the command. This returns the String segment command as discussed in input. It checks that the command String is not empty.
Then the HCD setup is constructed using parameters from the Assembly Setup and new ones from HcdDirectCommand. The command string is passed with the lscsCommandKey parameter, and the command name is within lscsCommandNameKey. The last entry pulls the segmentIdKey from the Assembly Setup and inserts it into the HCD setup.
The HCD Setup for the following Setup received at the Assembly:
import csw.prefix.models.Prefix
import csw.params.commands.Setup
import m1cs.segments.segcommands.SegmentId
import m1cs.segments.segcommands.ACTUATOR.*
import m1cs.segments.segcommands.ACTUATOR.ActuatorModes.*
import m1cs.segments.shared.HcdDirectCommand
val clientPrefix = Prefix("ESW.testClient")
// clientPrefix: Prefix = Prefix(subsystem = ESW, componentName = "testClient")
val assemblySetup = toActuator(clientPrefix, Set(1,3)).withMode(TRACK).withTarget(22.34).toSegment(SegmentId("A23")).asSetup
// assemblySetup: Setup = Setup(
// source = Prefix(subsystem = ESW, componentName = "testClient"),
// commandName = CommandName(name = "ACTUATOR"),
// maybeObsId = None,
// paramSet = Set(
// Parameter(
// keyName = "ACT_ID",
// keyType = IntKey,
// items = ArraySeq(1, 3),
// units = none
// ),
// Parameter(
// keyName = "MODE",
// keyType = ChoiceKey,
// items = ArraySeq(Choice(name = "TRACK")),
// units = none
// ),
// Parameter(
// keyName = "TARGET",
// keyType = FloatKey,
// items = ArraySeq(22.34F),
// units = none
// ),
// Parameter(
// keyName = "SegmentId",
// keyType = StringKey,
// items = ArraySeq("A23"),
// units = none
// )
// )
// )
is the following Setup for the segments HCD:
val assemblyPrefix = Prefix("M1CS.segmentsAssembly")
// assemblyPrefix: Prefix = Prefix(
// subsystem = M1CS,
// componentName = "segmentsAssembly"
// )
val hcdSetup: Setup = HcdDirectCommand.toHcdDirectCommand(assemblyPrefix, assemblySetup)
// hcdSetup: Setup = Setup(
// source = Prefix(subsystem = M1CS, componentName = "segmentsAssembly"),
// commandName = CommandName(name = "lscsDirectCommand"),
// maybeObsId = None,
// paramSet = Set(
// Parameter(
// keyName = "lscsCommand",
// keyType = StringKey,
// items = ArraySeq("ACTUATOR ACT_ID=(1,3), MODE=TRACK, TARGET=22.34"),
// units = none
// ),
// Parameter(
// keyName = "lscsCommandName",
// keyType = StringKey,
// items = ArraySeq("ACTUATOR"),
// units = none
// ),
// Parameter(
// keyName = "SegmentId",
// keyType = StringKey,
// items = ArraySeq("A23"),
// units = none
// )
// )
// )
Handling SubmitResponse from the Segments HCD
One last thing is that the Assembly must handle the SubmitResponse from the HCD. When the Assembly sends the HCD Setup to the HCD, a new runId
is created for the command. When the HCD command completes, the Assembly needs to pass an appropriate response back to the caller. This is handled by the following piece of code that is repeated from above:
submitAndWaitHCD(runId, hcdSetup) map { sr =>
cswCtx.commandResponseManager.updateCommand(sr.withRunId(runId))
}
Started(runId)
The onSubmit handler sends the command using submitAndWaitHCD and then returns Started
to the caller. This indicates a CSW long-running command (as opposed to an immediate-completion command). The HCD command command runs asynchronously and returns a value in the future. When that occurs, the result is mapped to the closure shown, which calls the Assembly’s Command Response Manager with the SubmitResponse from the HCD, but it replaces the HCD runId with the Assembly Setup’s runId using withRunId
. That’s all that is needed to handle the response from the HCD to the Assembly caller.
That’s the extent of the Setup processing in the Assembly.
Tracking the HCD and Creation of Command Service
The last bit of interesting code in the SegmentsAssemblyHandlers is how the SegmentsAssembly gets connection information about the Segments HCD, which it must have to send it commands.
As a reminder, connections (i.e. hosts and ports) are not hard-coded in CSW. When a component starts up, its Supervisor registers itself with the Location Service on behalf of the TLA and that location information includes enough information so that one component can create an appropriate connection to the other. CSW supports Akka-based connections and HTTP-based connections.
When the Segments Assembly starts up, its Component Configuration File contains an entry that indicates to the Supervisor that it wants to track
the HCD. The “SegmentsAssemblyStandalone.conf” conf file is shown here.
prefix = "m1cs.segmentsAssembly"
componentType = assembly
behaviorFactoryClassName = "m1cs.segments.assembly.SegmentsAssemblyBehaviorFactory"
locationServiceUsage = RegisterAndTrackServices
connections = [
{
prefix: "m1cs.segmentsHCD"
componentType: hcd
connectionType: akka
}
]
This file is discussed in the CSW documentation. The key in this discussion is that the connections
array has an entry for the an HCD with prefix m1cs.segmentsHCD
and connectionType: Akka. This indicates to CSW and the Supervisor of the Assembly that it should track the Segments HCD and deliver events to the Assembly when the Segments HCD is available and also when/if it shuts down or crashes. To receive tracking events, the assembly overrides the onLocationTrackingEvent
handler as shown here.
- Scala
-
source
/** * This is overriding tracking events to gain events for Segments HCD. The Assembly should be started * with a Component Configuration file that includes tracking and the info for the Segments HCD. * This is done in the test files for reference. * When the LocationUpdated event is received, a CommandService is created. When the * connection goes down, the CommandService is set to None. When None an error is issued in onSubmit. * @param trackingEvent CSW TrackingEvent. */ override def onLocationTrackingEvent(trackingEvent: TrackingEvent): Unit = { log.debug(s"onLocationTrackingEvent called: $trackingEvent") trackingEvent match { case LocationUpdated(location) => log.debug(s"Assembly received HCD location: $location") // Should be safe here since we are tracking only Akka location val hcdLocation = location.asInstanceOf[AkkaLocation] hcdCS = Some(CommandServiceFactory.make(hcdLocation)(ctx.system)) case LocationRemoved(connection) => if (connection == hcdConnection) { hcdCS = None } } }
This code shows that the Assembly is handling two events delivered by the Assembly’s Supervisor: locationUpdated
and locationRemoved
. The locationUpdated
is delivered when the HCD is registered and running. When this happens, the Assembly creates a CommandService instance for the HCD.
In the constructor of the Assembly is the following variable declaration:
import csw.command.api.scaladsl.CommandService
private var hcdCS: Option[CommandService] = None // Initially, there is no CommandService for HCD
Initially the HCD CommandService is set to None, meaning there is no Command Service (i.e. the HCD is not available). When the HCD is available, a CommandService instance is created and this variable is set to its value as shown.
Then, when a command is sent and processed by the submitAndWaitHCD
call way up in the Assembly Command Execution section, it checks the value of this Option. If present, the command is sent to the HCD. If None, an Error is returned to the caller.
This is an excellent way to track the availability of an Assembly or HCD using the builtin CSW functionality.