LSCS Segments HCD

The project includes a Segments HCD that processes segments commands. As discussed in the overview, the Segments HCD receives HCD Setups from the Assembly (or other sources), validates them and then sends them off to one or more segments. It monitors each of the segments for a response, which is hopefully success, but may be an error. Once all the segments complete successfully or if any segment completes with an Error, the HCD command is completed and a response is passed back to the caller. There are no events of any kind produced by the HCD.

Top Level Actor

The Top Level Actor for Segments HCD can be found in the lscsComps project in the file named SegmentsHcdHandlers.scala, which contains the majority of the HCD code. As with all CSW components, the TLA for Segments HCD implements handlers as needed. In this HCD, only initialize, validateCommand, and onSubmit handlers are implemented. Unlike the Assembly, the HCD does not use the onLocationTrackingevent handler.

Initialization

Initialization in the Segments HCD does the important job of creating the Segments, which is discussed later. Here only the job of initializing the segments is done.

Scala
source// Set this during initialization to be a Segments instance
private var createdSegments: Segments = _

/**
 * The TLA initialize reads the number of segments from the reference.conf file.  This is convenient for
 * testing. It creates that number of segments in each sector.  At some point this could be removed and
 * replaced with MAX_SEGMENT_NUMBER.
 */
override def initialize(): Unit = {
  val segPath = "m1cs.segments"
  val maxSegments =
    if (ctx.system.settings.config.hasPath(segPath))
      ctx.system.settings.config.getInt(segPath)
    else SegmentId.MAX_SEGMENT_NUMBER
  val segmentRange = 1 to maxSegments //SegmentId.MAX_SEGMENT_NUMBER
  log.info(
    s"Initializing Segments HCD with ${segmentRange.max} segments in each sector for a total of ${segmentRange.max * SegmentId.ALL_SECTORS.size} segments."
  )
  createdSegments = SegmentManager.createSegments(creator, segmentRange, log)
}

Any state that is initialized in the initialize handler must be global to the TLA. In this case, there is a variable called createdSegments of type Segments that is set during initialize. The segments value is managed by the SegmentManager object, but it is essentially a Map of SegmentId values to SegmentActor references. The variable createdSegments is used by the onSubmit handler.

As shown, the code gets the maxSegments value from the reference.conf file and hands it to the SegmentManager to create segments. As mentioned before, the Segment Manager will create that number of segments in each sector. Therefore, the maximum value for maxSegments is MAX_SEGMENT_NUMBER and once one is happy with the simulator, the check can either be eliminated or set to 82 in the reference.conf file.

HCD Command Validation

As in the Assembly, an Observe is rejected and the Setup is passed to another function for validation. The validateCommand handler is shown below.

Scala
source/*
 * All Setup validation is performed here. Three checks are done for a lscsDirectCommand:
 * 1. is there a valid LSCS command
 * 2. is there a segmentId key (one or all)
 * 3. If one segment, is the segment available?
 * The HCD shutdown command is also accepted.  All others are rejected.
 */
private def handleValidation(runId: Id, setup: Setup): ValidateCommandResponse = {
  setup.commandName match {
    case HcdDirectCommand.lscsDirectCommand =>
      if (!setup.exists(lscsCommandKey))
        Invalid(runId, CommandIssue.MissingKeyIssue(s"Setup must include the ${lscsCommandKey.keyName} parameter."))

      if (!setup.exists(segmentIdKey))
        Invalid(runId, CommandIssue.MissingKeyIssue(s"Setup must include the ${segmentIdKey.keyName} parameter."))

      // Verify that if one segment, that the segment is online in the list
      val segmentIdValue = setup(segmentIdKey).head
      if (segmentIdValue != ALL_SEGMENTS && !createdSegments.segmentExists(SegmentId(segmentIdValue))) {
        Invalid(
          runId,
          CommandIssue.ParameterValueOutOfRangeIssue(s"The segmentId: $segmentIdValue is not currently available.")
        )
      }
      else {
        Accepted(runId)
      }
    case HcdShutdown.shutdownCommand =>
      Accepted(runId)
    case other =>
      Invalid(runId, CommandIssue.UnsupportedCommandIssue(s"HCD does not accept the command: $other"))
  }
}

The HCD can process two commands: lscsDirectCommand and shutdownCommand. For lscsDirectCommand handleValidation does three things:

  1. Is the lscsCommandKey present? If not, return Invalid.
  2. Is a segmentIdKey included? If not, return Invalid.
  3. Finally, if the destination is a single segment, check to see if the destination segment is “online”. Since we can configure the number of segments to create, it’s possible to send a command to a segment that does not exist. If this check fails, return Invalid.

If these three things pass, the Setup is Accepted. There is probably more that could be done but that’s it for now. We are assuming that the input/output library is used to constructing the Setups so no checks for required parameters are done.

The HCD also accepts the HcdShutdown.shutdownCommand. This command is always accepted.

If any other command is received by the HCD, it is rejected and Invalid is returned.

Segments HCD Command Execution

Once validated, the onSubmit handler is called. Like the Assembly, the HCD rejects Observes and hands off execution to handleSetup. Checking for Observe again isn’t needed since it is done in validation. HandleSetup is reproduced below. This is the crux of the HCD.

Scala
source/**
 * Processes commands as Setups for the HCD.
 * @param runId command runId
 * @param setup the [[Setup]] to execute
 * @return [[SubmitResponse]] response from the command. All commands are started currently.
 */
private def handleSetup(runId: Id, setup: Setup): SubmitResponse = {
  setup.commandName match {
    case HcdDirectCommand.lscsDirectCommand =>
      // We know all these params are present at this point
      val command         = setup(lscsCommandKey).head
      val commandName     = setup(lscsCommandNameKey).head
      val segmentKeyValue = setup(segmentIdKey).head
      // The sendList, at this point, is either one segment or all segments.  The same execution approach
      // is used regardless of one or all
      val sendList = if (segmentKeyValue == ALL_SEGMENTS) {
        createdSegments.getAllSegments
      }
      else {
        createdSegments.getSegment(segmentId = SegmentId(segmentKeyValue))
      }

      // Here a SegComMonitor is created to send and watch for completion of the command. The sendList is the
      // list of SegmentActors that will receive the command.  The function passed as the 5th argument will
      // be executed when all the segment commands complete either successfully or with an error.
      val mon1 = ctx.spawnAnonymous(
        hcd.SegComMonitor(
          commandName,
          command,
          sendList.segments,
          runId,
          (sr: SubmitResponse) => cswCtx.commandResponseManager.updateCommand(sr),
          log
        )
      )
      // Start the command and return Started to the Assembly
      mon1 ! SegComMonitor.Start
      Started(runId)
    case HcdShutdown.shutdownCommand =>
      //This just sends shutdown to all the online segments
      createdSegments.shutdownAll()
      Completed(runId)
    case other =>
      Error(runId, s"This HCD does not handle this command: $other")
  }
}

There are two commands, lscsDirectCommand, and shutdownCommand.

lscsDirectCommand Implementation

The first thing that occurs is the command name, the segment destination, and the command String are extracted from the Setup. Then the segmentKeyValue, which is either ALL_SEGMENTS or a specific segment ID is tested for and used to access a list of segments for the command. It calls SegmentManager to return on or all.

Then a SegComMonitor actor is created, which takes the command String, the list of segments, the runId of the request, and a function to execute when all the segments have completed. The SegComMonitor.Start message is sent to the monitor, which causes the sending of the command to all the segments.

Just as a note in case it is not clear. These lscsDirectCommands executed by the HCD are totally asynchronous. As you can see from the code, once a monitor is started, the submit-handler returns Started indicating to the caller that a long- running command is started. The HCD is then available to accept and process another HCD/segment command, which starts its own SegComMonitor. The tests demonstrate that this works.

Segment Monitor

Segment Monitor is the longest piece of code in the project. Segment Monitor is an actor implementing a two state finite state machine with the job of executing a segment command and waiting for responses. The code is long but is included here.

Scala
source/**
 * These are the commands for the Command Sequence Monitor. The WrappedSegmentResponse is needed to receive
 * the SegmentActor.Response and transform it into a SegComMonitor.Command.
 * CommandTimeout is is issued and received if the Command Monitor times out before receiving all segment responses.
 */
sealed trait Command
final private case class WrappedSegmentResponse(response: SegmentActor.Response) extends Command
case object Start                                                                extends Command
final private case object CommandTimeout                                         extends Command

/**
 * This private class implements the Segment Command Monitor.
 * The class is an actor with a two state FSM. The monitor is created with a list of segment actors and a
 * segment command that is to be sent to each of the segment actors in the list. It waits for the Start message
 * in the `starting` state. Once Start is received, it sends to command sequentially to each segment actor and
 * waits for the responses in the `waiting` state.
 *
 * It determines completion by counting responses and also checking that the responses are Completed rather than Error.
 * Once the correct number of responses have been received, it executes the replyTo function, which is usually a
 * function to update the CSW CRM, which notifies the caller that the command is completed.
 *
 * There is one more feature.  A timeout can be provided that will allow the command to timeout and reply to the
 * caller with an Error.
 *
 * A new Segment Command Monitor is created for every command executed, after all the responses have been received or
 * the monitor is otherwise completed, the monitor returns Behaviors.stopped, which causes the actor to be ended.
 */
class SegMonitor private[SegComMonitor] (
    commandName: String,
    fullCommand: String,
    segments: List[ActorRef[SegmentActor.Command]],
    runId: Id,
    replyTo: SubmitResponse => Unit, // This is here so that handler can provide the CRM but tests can do something else
    log: Logger,
    segmentResponseMapper: ActorRef[SegmentActor.Response],
    timeout: FiniteDuration
) {
  def starting(): Behavior[Command] =
    Behaviors.receiveMessage {
      case Start =>
        log.debug(s"Sending $commandName to ${segments.size} segments.")
        segments.foreach(_ ! SegmentActor.Send(commandName, fullCommand, segmentResponseMapper))
        waiting(segments.size, responsesReceived = 0)
      case _ =>
        Behaviors.unhandled
    }

  /**
   * The waiting state is entered once the command is sent to the segments. Every time a response from a segment
   * is received, the Behavior calls itself with updated state. This is not a recursive call in Akka.
   * @param totalSegments the number of expected responses
   * @param responsesReceived number of responses received so far
   * @return a typed Behavior
   */
  def waiting(totalSegments: Int, responsesReceived: Int): Behavior[Command] =
    Behaviors.withTimers { timers =>
      Behaviors
        .receiveMessage[Command] {
          case wrapped: WrappedSegmentResponse =>
            wrapped.response match {
              case SegmentActor.Started(commandName, commandId, segmentId) =>
                timers.startSingleTimer(TIMEOUT_KEY, CommandTimeout, timeout)
                log.debug(s"Started: $segmentId:$commandName:$commandId with $timeout timeout.")
                Behaviors.same
              case SegmentActor.Completed(commandName, commandId, segmentId) =>
                val updatedResponsesReceived = responsesReceived + 1
                if (totalSegments == updatedResponsesReceived) {
                  log.info(
                    s"$commandName completed successfully for **$updatedResponsesReceived** segments. Sending Completed($runId)"
                  )
                  log.debug(s"Cancelling Timeout Timer")
                  timers.cancel(TIMEOUT_KEY)
                  replyTo(Completed(runId))
                  Behaviors.stopped
                }
                else {
                  if (Math.floorMod(responsesReceived, 20) == 0)
                    log.debug(s"Completed: $segmentId:$commandName:$commandId  Total completed: $responsesReceived")
                  waiting(totalSegments, updatedResponsesReceived)
                }
              case SegmentActor.Processing(commandName, commandId, segmentId) =>
                log.debug(s"Processing: $segmentId:$commandName:$commandId")
                Behaviors.same
              case SegmentActor.Error(commandName, commandId, segmentId, message) =>
                val updatedResponsesReceived = responsesReceived + 1
                log.error(
                  s"Error: $segmentId:$commandName:$commandId--STOPPING, $updatedResponsesReceived responses received."
                )
                // Cancel the timeout timer
                timers.cancel(TIMEOUT_KEY)
                replyTo(Error(runId, message))
                Behaviors.stopped
            }
          case Start =>
            Behaviors.unhandled
          case CommandTimeout =>
            replyTo(
              Error(
                runId,
                s"A segment command timed out after receiving: $responsesReceived responses of expected: $totalSegments."
              )
            )
            Behaviors.stopped
          case other =>
            log.error(s"SegComMonitor received some other message.  Just FYI: $other")
            Behaviors.same
        }
        .receiveSignal { case (_, PostStop) =>
          log.debug(s">>>SegComMonitor for $runId STOPPED<<<")
          Behaviors.same
        }
    }
}

SegComMonitor is created in the starting state, awaiting the start message from the HCD TLA. When received, it sends the full segment command to each of the SegmentActors on the list passed into the monitor from the TLA (it will be 1 segment or a list of all segments). Once the commands are sent, it moves to the waiting state.

The state is called waiting because it is waiting for the responses from the SegmentActors. First note that waiting starts a timer so that if something goes wrong in a SegmentActor, after a timeout period, the CommandTimeout message will be sent to itself, which will cause an Error SubmitResponse to be returned to the runId through the replyTo function.

The monitor is written to assume the SegmentActor will return Completed or Error responses as described in the M1CS docs (Started and Processing are not handled). The happy case is the SegmentActor.Completed message. All the monitor does is count happy responses until it receives the correct number, it then sends Completed to the runId through the replyTo function.

If a single Error is returned, the monitor stops immediately–if a single segment produces an error out of all segments, it means the command fails immediately, and an Error is returned to the client through the runId and replyTo function.

The important part for completion is the code around the SegmentActor.Completed message. Whenever a segment returns a positive response, the count is incremented and waiting is called again (which appears recursive, but actually isn’t). Once all the responses are received, the Completed SubmitResponse is sent to the caller through the replyTo function. The Behavior of the monitor becomes Behaviors.stopped, which causes the monitor actor to stop.

Note

The SegComMonitor is written in the functional typed actor style. See akka.io.

Warning

Neither simulator implements the protocol that includes: Processing and Started messages. We suggest they not be implemented and keep the low-level protocol simple.

SegComMonitor tests are a good way to see how testing works and how we validate the M1CS requirement to complete all segment commands and complete a CSW command.

The following test shows sending a command to 492 segments and waiting for completion. First the SegmentManager is used to create a full mirror configuration. This is what the HCD does during initialization The tester is a function that notifies the com1Response TestProbe when the command completes.

Then a SegComMonitor is created with a test command and the list of 492 segments. It is then started, which causes the command to be sent to all the segments. Then the test code waits for the Completion message. Once received as a precaution it waits to see if any other messages arrive, then shutsdown all the segment connections and quits.

Scala
sourcetest("492 segments - all sectors - send 1") {
  // Note: This test fails on Mac, server must be running on Linux due to open file issue?

  val range = 1 to SegmentId.MAX_SEGMENT_NUMBER
  // Create segments
  val segments      = SegmentManager.createSegments(testCreator, range, log)
  val segmentActors = segments.getAllSegments.segments
  segmentActors.size shouldBe 492

  val com1Response = TestProbe[SubmitResponse]()
  val tester       = makeTester(com1Response)

  val runId1 = Id()
  val mon    = testKit.spawn(hcd.SegComMonitor(cn1, cn1full, segmentActors, runId1, tester, log))
  mon ! SegComMonitor.Start

  // This verifies that both commands finished successfully
  com1Response.expectMessage(10.seconds, Completed(runId1))

  // Wait for final logs
  com1Response.expectNoMessage(100.milli)
  segments.shutdownAll()

  testKit.stop(mon, 5.seconds)
}

The following code is similar but in this case, the test starts and executes two overlapping segment commands and waits for them both to asynchronously complete either successfully or with an error.

Scala
sourcetest("492 segments - all sectors - overlap") {
  // Note: This test fails on Mac, server must be running on Linux due to open file issue?

  val range = 1 to SegmentId.MAX_SEGMENT_NUMBER
  // Create segments
  val segments      = SegmentManager.createSegments(testCreator, range, log)
  val segmentActors = segments.getAllSegments.segments
  segmentActors.size shouldBe 492

  val com1Response = TestProbe[SubmitResponse]()
  val tester       = makeTester(com1Response)

  val runId1 = Id()
  val mon1   = testKit.spawn(hcd.SegComMonitor(cn1, cn1full, segmentActors, runId1, tester, log))
  mon1 ! SegComMonitor.Start

  val runId2 = Id()
  val mon2   = testKit.spawn(hcd.SegComMonitor(cn2, cn2full, segmentActors, runId2, tester, log))
  mon2 ! SegComMonitor.Start

  // This verifies that both commands finished successfully
  val messages = com1Response.receiveMessages(2, 15.seconds)
  messages.size shouldBe 2
  val resultRunIds = Set(messages.head.runId, messages(1).runId)
  resultRunIds.contains(runId1) shouldBe true
  resultRunIds.contains(runId2) shouldBe true

  // Wait for final logs or shutdown
  com1Response.expectNoMessage(100.milli)
  segments.shutdownAll()

  testKit.stop(mon1, 5.seconds)
  testKit.stop(mon2, 5.seconds)
}

At least with the JVM simulator, it is not known which command will complete first because of the randomized delays, so the test waits for 2 messages and then checks that they are the correct messages.

Segment Actor

The Segments HCD creates a SegmentActor instance for each configured segment. The SegmentActor is a wrapper for the class called SocketClientStream, which is the low-level communication with the segment and which implements the JPL-library protocol. The code below is the interesting part of SegmentActor.

Scala
sourceobject SegmentActor {

  // The following is temp hack for testing errors
  val ERROR_SEG_ID: Int                = 6
  val ERROR_COMMAND_NAME: String       = "ERROR"
  val FAKE_ERROR_MESSAGE: String       = "Error received from simulator."
  private implicit val timout: Timeout = Timeout(5.seconds)

  def apply(segmentId: SegmentId, log: Logger): Behavior[Command] = {
    Behaviors.setup[Command] { ctx =>
      // Here we check to see if there is a property called simulatorHost
      val simulatorHost          = Properties.propOrElse("simulatorHost", "localhost")
      log.debug(s">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Connecting to: $simulatorHost")
      val io: SocketClientStream = SocketClientStream(ctx, segmentId.toString, host = simulatorHost)
      handle(io, segmentId, log)
    }
  }

  private def handle(io: SocketClientStream, segmentId: SegmentId, log: Logger): Behavior[Command] =
    Behaviors.receive[Command] { (ctx, m) =>
      // This is an error sequence number to return when send fails
      val errorSeqNum = -1
      import ctx.executionContext
      m match {
        case Send(commandName, command, replyTo) =>
          log.debug(s"Sending segment $segmentId => $command")
          // This branch is here to test error only. TMT simulator will delay a random time before replying with ERROR
          // For testing, it is possible to send ERROR and TMT simulator will reply with ERROR rather than COMPLETED
          // This will only happen if the error command is sent to error segment
          val simCommand = if (commandName == ERROR_COMMAND_NAME && segmentId.number == ERROR_SEG_ID) {
            s"ERROR $command"
          } else {
            command
          }
          io.send(simCommand).onComplete {
            case Success(m) =>
              // This is the sequence number used by the low level socket code to send the command
              // It is created by the SocketServerStream code
              val seqNum = m.hdr.seqNo
              if (m.cmd.toLowerCase.contains("completed")) {
                replyTo ! Completed(commandName, seqNum, segmentId)
              } else {
                replyTo ! Error(commandName, seqNum, segmentId, "Error received from simulator.")
              }
              log.debug(s"Segment $segmentId: ${m.cmd}")
            case Failure(exception) =>
              // This branch occurs when for instance, the socket send fails
              log.error(s"Socket send failed: $exception", ex = exception)
              replyTo ! Error(commandName, errorSeqNum, segmentId, "Error failure received from simulator.")
          }
          handle(io, segmentId, log)

        case SendWithTime(commandName, _, delay, replyTo) =>
          // If DELAY command is sent, TMT simulator waits for specified time, this is for testing overlaps
          val simCommand = s"DELAY ${delay.toMillis.toString}"
          log.debug(s"Sending segment: $segmentId => $simCommand")

          io.send(simCommand).onComplete {
            case Success(m) =>
              // The value returned from the simulator for DELAY is not used at this point.  That could change.
              // Message from simulator not used at this point - cannot handle ERROR
              val seqNum = m.hdr.seqNo
              log.debug(s"Received: ${m.cmd} $seqNum")
              replyTo ! Completed(commandName, seqNum, segmentId)
            case Failure(exception) =>
              log.error(s"Socket send failed: $exception", ex = exception)
              replyTo ! Error(commandName, errorSeqNum, segmentId, "Error received from simulator.")
          }
          handle(io, segmentId, log)

First, when the SegmentActor is created in the apply def function. This is where the lower level SocketClientStream class is added and where the socket connection to the LSCS occurs.

SegmentActor has two messages for sending a message to the LSCS simulator. The first is Send, which is the message meant for the operating use case. It sends a command String to the segment LSCS. The second is SendWithTime, which allows sending specified delay to the simulator allowing predictable testing of things like multiple commands and overlapping commands.

At this time both segment commands are implemented as a variable delay on the segment side of the socket. With the Send command, the command String is sent to the simulator, which in the JVM simulator, returns the Completed response after a random delay. With the sendWithTime message, we send the command: DELAY MILLIS such as DELAY 1234, to the socket client. The desired delay is an argument of the message. The JVM simulator waits for the specified time before returning Completed.

Tthe send method of the socket client sends the command to the segment socket. This is an asynchronous call using a Future. When the response is received, the SegmentActor sends the response to the caller, which is a SegComMon instance, to be counted.

The ShutdownSegment command is also handled. When this occurs, the client is terminated, which closes the socket connection to the segment. These shutdown commands are present so that tests always work correctly. In the operations case, the sockets would probably stay open and would probably not need to be closed. It might be worth-while to investigate created the sockets during a command and closing them after if commands are relatively infrequent.

The source code for this page can be found here.