LSCS Segments Assembly

The project includes a simplified Segments Assembly. As discussed in the overview, the Segments Assembly job is to receive Assembly Setups, convert them to HCD Setups, and forward them to the Segments HCD. There are no events produced by the Assembly and no Setup commands that are handled within the Assembly itself.

Top Level Actor

The Top Level Actor for Segments Assembly can be found in the lscsComps project in the file named SegmentsAssemblyHandlers.scala, which contains the entire Assembly. As with all CSW Assemblies, the TLA for Segments Assembly implements handlers as needed. In this Assembly, only the validateCommand and onSubmit handlers are implemented.

Assembly Command Validation

The validateCommand handler is shown below.

Scala
source/**
 * This is the validate handler. This should perform all validation needed so that
 * the command can execute, or it should return a validation error.
 * Here we return an error for an Observe or pass to the Setup validation.
 * @param runId command runId
 * @param controlCommand either a Setup or Observe
 * @return a [ValidateCommandResponse]
 */
override def validateCommand(runId: Id, controlCommand: ControlCommand): ValidateCommandResponse = {
  controlCommand match {
    case setup: Setup => handleValidation(runId, setup)
    case observe      => Invalid(runId, UnsupportedCommandIssue(s"$observe command not supported."))
  }
}

// All Setup validation is performed here
private def handleValidation(runId: Id, setup: Setup): ValidateCommandResponse = {
  if (Common.ALL_COMMANDS.contains(setup.commandName) || setup.commandName.equals(HcdShutdown.shutdownCommand)) {
    Accepted(runId)
  }
  else {
    Invalid(
      runId,
      CommandIssue.UnsupportedCommandIssue(s"Segment Assembly does not support the `${setup.commandName}` command.")
    )
  }
}

The validateCommand checks that the command is a Setup and returns an error if an Observe is received; otherwise it passes on validation of the Setup to handleValidation. Remember that each Setup from the outside has a Setup with the command name equal to the name of the segment command. Therefore, the validation code looks up the command name in the list of implemented commands in Common. This code is shown below:

Scala
source/**
 * This map is used by the Assembly to access the correct toCommand for an incoming command Setup
 * It maps command name to a function that returns the formatted command
 */
val CommandMap: Map[CommandName, Setup => String] = Map(
  ACTUATOR.COMMAND_NAME          -> ACTUATOR.toCommand,
  TARG_GEN_ACT.COMMAND_NAME      -> TARG_GEN_ACT.toCommand,
  CFG_CUR_LOOP.COMMAND_NAME      -> CFG_CUR_LOOP.toCommand,
  CFG_ACT_VC.COMMAND_NAME        -> CFG_ACT_VC.toCommand,
  CFG_ACT_OFFLD.COMMAND_NAME     -> CFG_ACT_OFFLD.toCommand,
  CFG_ACT_SNUB.COMMAND_NAME      -> CFG_ACT_SNUB.toCommand,
  SET_LIMIT_ACT.COMMAND_NAME     -> SET_LIMIT_ACT.toCommand,
  SET_PARAM_ACT.COMMAND_NAME     -> SET_PARAM_ACT.toCommand,
  CAL_WH_DEADBANDWH.COMMAND_NAME -> CAL_WH_DEADBANDWH.toCommand,
  MOVE_WH.COMMAND_NAME           -> MOVE_WH.toCommand
)

// This is used by validation of Assembly and HCD to verify that the received command is currently supported. Could
// be removed when all commands are supported
val ALL_COMMANDS: List[CommandName] = CommandMap.keys.toList

If the command name is not present, validation fails with an UnsupportedCommandIssue. Otherwise, the command is Accepted. Validation could be improved, but we assume that if the command name is within our list, then the Setup contains all the correct parameters. An error is returned during execution if the handling fails.

Assembly Command Execution

Once validated, the onSubmit handler is called. The submit-related code for the Segments Assembly is shown below:

Scala
source/**
 * The Assembly receives a Setup command with the name of the low-level command.
 * It transforms it into an HCD command, which is just the String command to all or one segment.
 * Ranges aren't yet supported.
 */
override def onSubmit(runId: Id, controlCommand: ControlCommand): SubmitResponse = {
  controlCommand match {
    case setup: Setup => handleSetup(runId, setup)
    case observe      => Invalid(runId, UnsupportedCommandIssue(s"$observe command not supported."))
  }
}

/**
 * Processes commands as Setups
 * @param runId command runId
 * @param assemblySetup the [[Setup]] to execute
 * @return [[SubmitResponse]] response from the command. All commands are started currently.
 */
private def handleSetup(runId: Id, assemblySetup: Setup): SubmitResponse = {
  assemblySetup.commandName match {
    case HcdShutdown.shutdownCommand =>
      log.debug(s"Segments Assembly received shutdown request: $runId and $assemblySetup")
      val hcdSetup = Setup(assemblyPrefix, HcdShutdown.shutdownCommand)

      submitAndWaitHCD(runId, hcdSetup) map { sr =>
        cswCtx.commandResponseManager.updateCommand(sr.withRunId(runId))
      }
      Started(runId)
    case cmd =>
      log.info(s"Segments Assembly received a command: '$cmd',  runId: $runId, setup: $assemblySetup")

      // This simulates what the Assembly does to send to HCD - has received above Setup
      try {
        val hcdSetup: Setup = HcdDirectCommand.toHcdDirectCommand(assemblyPrefix, assemblySetup)
        // Assembly sends the Setup and updates
        submitAndWaitHCD(runId, hcdSetup) map { sr =>
          log.info(s"Assembly command completed from HCD: $sr")
          cswCtx.commandResponseManager.updateCommand(sr.withRunId(runId))
        }
        Started(runId)
      }
      catch {
        case _: Exception =>
          CommandResponse.Error(runId, s"An exception was thrown while processing setup: ${assemblySetup.commandName}")
      }
  }
}

/**
 * This is a convenience routine to check the availability of HCD prior to sending
 * @param runId command runId
 * @param setup the Setup to send
 * @return command response as a SubmitResponse
 */
private def submitAndWaitHCD(runId: Id, setup: Setup): Future[SubmitResponse] =
  hcdCS match {
    case Some(cs) =>
      // Added a delay here because segment commands take an unknown amount of time.
      // Can be made an implicit for all calls in file for a more complex situation with different timeouts.
      cs.submitAndWait(setup)(timeout = 15.seconds)
    case None =>
      Future(CommandResponse.Error(runId, s"The Segment HCD is not currently available: ${hcdConnection.componentId}"))
  }

As before, the Observes return an Error (even through we can not get to this code in this example.) The handleSetup command must handle any named commands first. In this case, there is a command called shutdownCommand that when sent to the HCD, it causes all the segment connections to be closed and the HCD shutdown. This is useful for testing.

The second case matches on any other command. In this case, the function HcdDirectCommand.toHcdDirectCommand is called. If it does not fail (it is wrapped in a try clause), a Setup formatted for the HCD is returned and passed to the submitAndWaitHCD function. All this is doing is calling Command Service submitAndWait, but it is checking that the CommandService instance created for the HCD is valid before sending. (This will be covered later.) It also provides a custom timeout, which has been arbitrarily set to 15 seconds for all the segments to complete any command. If this time is exceeded, the submitAndWait will time out and return an error. Note that if the Command Service is not available, because the HCD is not available, an Error is also returned.

The function HcdDirectCommand.toHcdDirectCommand is shown below:

Scala
source/**
 * This object contains the parameter keys for the command sent to the HCD to process a single
 * segment command. This is used by the segment Assembly and test code to produce an HCD Setup from
 * an Assembly Setup.
 */
case object HcdDirectCommand {

  val lscsDirectCommand: CommandName = CommandName("lscsDirectCommand")
  // This key is used to store the command to be executed
  val lscsCommandKey: Key[String]     = KeyType.StringKey.make(name = "lscsCommand")
  val lscsCommandNameKey: Key[String] = KeyType.StringKey.make(name = "lscsCommandName")

  /**
   * This helper function returns a direct command Setup for the
   * @param assemblyPrefix prefix of the Assembly as source
   * @param assemblySetup the Setup received by the Assembly -contains segmentIdKey and command name
   * @param obsId optional ObsId, defaults to None
   * @return Setup ready for sending to HCD
   */
  def toHcdDirectCommand(assemblyPrefix: Prefix, assemblySetup: Setup, obsId: Option[ObsId] = None): Setup = {
    val segmentIdExists    = assemblySetup.exists(segmentIdKey)
    val segmentRangeExists = assemblySetup.exists(segmentRangeKey)
    // Can't go on without one of these set
    require(segmentIdExists || segmentRangeExists, s"Bad segment info in the Assembly Setup: ${assemblySetup.commandName}")

    // Convert setup to a String command - note that we know this will work because we validated
    val commandAsString = CommandMap(assemblySetup.commandName)(assemblySetup)

    // Grab the command name from the first part of
    require(commandAsString.nonEmpty, "The command to the HCD must not be empty, fool!")

    Setup(assemblyPrefix, lscsDirectCommand, obsId).madd(
      lscsCommandKey.set(commandAsString),
      lscsCommandNameKey.set(assemblySetup.commandName.name),
      assemblySetup(segmentIdKey)
    )
  }
}

First this function ensures that some required parameters are present in the Setup received by the Assembly. A require will throw an IllegalArgument exception if the condition is false. It then uses the CommandMap structure from Common (shown as part of validation) to extract the toCommand function for the command. This returns the String segment command as discussed in input. It checks that the command String is not empty.

Then the HCD setup is constructed using parameters from the Assembly Setup and new ones from HcdDirectCommand. The command string is passed with the lscsCommandKey parameter, and the command name is within lscsCommandNameKey. The last entry pulls the segmentIdKey from the Assembly Setup and inserts it into the HCD setup.

The HCD Setup for the following Setup received at the Assembly:

import csw.prefix.models.Prefix
import csw.params.commands.Setup

import m1cs.segments.segcommands.SegmentId
import m1cs.segments.segcommands.ACTUATOR.*
import m1cs.segments.segcommands.ACTUATOR.ActuatorModes.*
import m1cs.segments.shared.HcdDirectCommand

val clientPrefix = Prefix("ESW.testClient")
// clientPrefix: Prefix = Prefix(subsystem = ESW, componentName = "testClient")
val assemblySetup = toActuator(clientPrefix, Set(1,3)).withMode(TRACK).withTarget(22.34).toSegment(SegmentId("A23")).asSetup
// assemblySetup: Setup = Setup(
//   source = Prefix(subsystem = ESW, componentName = "testClient"),
//   commandName = CommandName(name = "ACTUATOR"),
//   maybeObsId = None,
//   paramSet = Set(
//     Parameter(
//       keyName = "ACT_ID",
//       keyType = IntKey,
//       items = ArraySeq(1, 3),
//       units = none
//     ),
//     Parameter(
//       keyName = "MODE",
//       keyType = ChoiceKey,
//       items = ArraySeq(Choice(name = "TRACK")),
//       units = none
//     ),
//     Parameter(
//       keyName = "TARGET",
//       keyType = FloatKey,
//       items = ArraySeq(22.34F),
//       units = none
//     ),
//     Parameter(
//       keyName = "SegmentId",
//       keyType = StringKey,
//       items = ArraySeq("A23"),
//       units = none
//     )
//   )
// )

is the following Setup for the segments HCD:

val assemblyPrefix = Prefix("M1CS.segmentsAssembly")
// assemblyPrefix: Prefix = Prefix(
//   subsystem = M1CS,
//   componentName = "segmentsAssembly"
// )
val hcdSetup: Setup = HcdDirectCommand.toHcdDirectCommand(assemblyPrefix, assemblySetup)
// hcdSetup: Setup = Setup(
//   source = Prefix(subsystem = M1CS, componentName = "segmentsAssembly"),
//   commandName = CommandName(name = "lscsDirectCommand"),
//   maybeObsId = None,
//   paramSet = Set(
//     Parameter(
//       keyName = "lscsCommand",
//       keyType = StringKey,
//       items = ArraySeq("ACTUATOR ACT_ID=(1,3), MODE=TRACK, TARGET=22.34"),
//       units = none
//     ),
//     Parameter(
//       keyName = "lscsCommandName",
//       keyType = StringKey,
//       items = ArraySeq("ACTUATOR"),
//       units = none
//     ),
//     Parameter(
//       keyName = "SegmentId",
//       keyType = StringKey,
//       items = ArraySeq("A23"),
//       units = none
//     )
//   )
// )

Handling SubmitResponse from the Segments HCD

One last thing is that the Assembly must handle the SubmitResponse from the HCD. When the Assembly sends the HCD Setup to the HCD, a new runId is created for the command. When the HCD command completes, the Assembly needs to pass an appropriate response back to the caller. This is handled by the following piece of code that is repeated from above:

submitAndWaitHCD(runId, hcdSetup) map { sr =>
  cswCtx.commandResponseManager.updateCommand(sr.withRunId(runId))
}
Started(runId)

The onSubmit handler sends the command using submitAndWaitHCD and then returns Started to the caller. This indicates a CSW long-running command (as opposed to an immediate-completion command). The HCD command command runs asynchronously and returns a value in the future. When that occurs, the result is mapped to the closure shown, which calls the Assembly’s Command Response Manager with the SubmitResponse from the HCD, but it replaces the HCD runId with the Assembly Setup’s runId using withRunId. That’s all that is needed to handle the response from the HCD to the Assembly caller.

That’s the extent of the Setup processing in the Assembly.

Tracking the HCD and Creation of Command Service

The last bit of interesting code in the SegmentsAssemblyHandlers is how the SegmentsAssembly gets connection information about the Segments HCD, which it must have to send it commands.

As a reminder, connections (i.e. hosts and ports) are not hard-coded in CSW. When a component starts up, its Supervisor registers itself with the Location Service on behalf of the TLA and that location information includes enough information so that one component can create an appropriate connection to the other. CSW supports Akka-based connections and HTTP-based connections.

When the Segments Assembly starts up, its Component Configuration File contains an entry that indicates to the Supervisor that it wants to track the HCD. The “SegmentsAssemblyStandalone.conf” conf file is shown here.

prefix = "m1cs.segmentsAssembly"
componentType = assembly
behaviorFactoryClassName = "m1cs.segments.assembly.SegmentsAssemblyBehaviorFactory"
locationServiceUsage = RegisterAndTrackServices
connections = [
  {
    prefix: "m1cs.segmentsHCD"
    componentType: hcd
    connectionType: akka
  }
]

This file is discussed in the CSW documentation. The key in this discussion is that the connections array has an entry for the an HCD with prefix m1cs.segmentsHCD and connectionType: Akka. This indicates to CSW and the Supervisor of the Assembly that it should track the Segments HCD and deliver events to the Assembly when the Segments HCD is available and also when/if it shuts down or crashes. To receive tracking events, the assembly overrides the onLocationTrackingEvent handler as shown here.

Scala
source/**
 * This is overriding tracking events to gain events for Segments HCD. The Assembly should be started
 * with a Component Configuration file that includes tracking and the info for the Segments HCD.
 * This is done in the test files for reference.
 * When the LocationUpdated event is received, a CommandService is created. When the
 * connection goes down, the CommandService is set to None. When None an error is issued in onSubmit.
 * @param trackingEvent CSW TrackingEvent.
 */
override def onLocationTrackingEvent(trackingEvent: TrackingEvent): Unit = {
  log.debug(s"onLocationTrackingEvent called: $trackingEvent")
  trackingEvent match {
    case LocationUpdated(location) =>
      log.debug(s"Assembly received HCD location: $location")
      // Should be safe here since we are tracking only Akka location
      val hcdLocation = location.asInstanceOf[AkkaLocation]
      hcdCS = Some(CommandServiceFactory.make(hcdLocation)(ctx.system))
    case LocationRemoved(connection) =>
      if (connection == hcdConnection) {
        hcdCS = None
      }
  }
}

This code shows that the Assembly is handling two events delivered by the Assembly’s Supervisor: locationUpdated and locationRemoved. The locationUpdated is delivered when the HCD is registered and running. When this happens, the Assembly creates a CommandService instance for the HCD.

In the constructor of the Assembly is the following variable declaration:

import csw.command.api.scaladsl.CommandService

private var hcdCS: Option[CommandService] = None // Initially, there is no CommandService for HCD

Initially the HCD CommandService is set to None, meaning there is no Command Service (i.e. the HCD is not available). When the HCD is available, a CommandService instance is created and this variable is set to its value as shown.

Then, when a command is sent and processed by the submitAndWaitHCD call way up in the Assembly Command Execution section, it checks the value of this Option. If present, the command is sent to the HCD. If None, an Error is returned to the caller.

This is an excellent way to track the availability of an Assembly or HCD using the builtin CSW functionality.

The source code for this page can be found here.