LSCS Segments HCD
The project includes a Segments HCD that processes segments commands. As discussed in the overview, the Segments HCD receives HCD Setups from the Assembly (or other sources), validates them and then sends them off to one or more segments. It monitors each of the segments for a response, which is hopefully success, but may be an error. Once all the segments complete successfully or if any segment completes with an Error, the HCD command is completed and a response is passed back to the caller. There are no events of any kind produced by the HCD.
Top Level Actor
The Top Level Actor for Segments HCD can be found in the lscsComps
project in the file named SegmentsHcdHandlers.scala
, which contains the majority of the HCD code. As with all CSW components, the TLA for Segments HCD implements handlers as needed. In this HCD, only initialize
, validateCommand
, and onSubmit
handlers are implemented. Unlike the Assembly, the HCD does not use the onLocationTrackingevent
handler.
Initialization
Initialization in the Segments HCD does the important job of creating the Segments, which is discussed later. Here only the job of initializing the segments is done.
- Scala
-
source
// Set this during initialization to be a Segments instance private var createdSegments: Segments = _ /** * The TLA initialize reads the number of segments from the reference.conf file. This is convenient for * testing. It creates that number of segments in each sector. At some point this could be removed and * replaced with MAX_SEGMENT_NUMBER. */ override def initialize(): Unit = { val segPath = "m1cs.segments" val maxSegments = if (ctx.system.settings.config.hasPath(segPath)) ctx.system.settings.config.getInt(segPath) else SegmentId.MAX_SEGMENT_NUMBER val segmentRange = 1 to maxSegments //SegmentId.MAX_SEGMENT_NUMBER log.info( s"Initializing Segments HCD with ${segmentRange.max} segments in each sector for a total of ${segmentRange.max * SegmentId.ALL_SECTORS.size} segments." ) createdSegments = SegmentManager.createSegments(creator, segmentRange, log) }
Any state that is initialized in the initialize
handler must be global to the TLA. In this case, there is a variable called createdSegments
of type Segments
that is set during initialize. The segments value is managed by the SegmentManager object, but it is essentially a Map of SegmentId values to SegmentActor references. The variable createdSegments
is used by the onSubmit handler.
As shown, the code gets the maxSegments
value from the reference.conf file and hands it to the SegmentManager to create segments. As mentioned before, the Segment Manager will create that number of segments in each sector. Therefore, the maximum value for maxSegments is MAX_SEGMENT_NUMBER and once one is happy with the simulator, the check can either be eliminated or set to 82 in the reference.conf file.
HCD Command Validation
As in the Assembly, an Observe
is rejected and the Setup is passed to another function for validation. The validateCommand
handler is shown below.
- Scala
-
source
/* * All Setup validation is performed here. Three checks are done for a lscsDirectCommand: * 1. is there a valid LSCS command * 2. is there a segmentId key (one or all) * 3. If one segment, is the segment available? * The HCD shutdown command is also accepted. All others are rejected. */ private def handleValidation(runId: Id, setup: Setup): ValidateCommandResponse = { setup.commandName match { case HcdDirectCommand.lscsDirectCommand => if (!setup.exists(lscsCommandKey)) Invalid(runId, CommandIssue.MissingKeyIssue(s"Setup must include the ${lscsCommandKey.keyName} parameter.")) if (!setup.exists(segmentIdKey)) Invalid(runId, CommandIssue.MissingKeyIssue(s"Setup must include the ${segmentIdKey.keyName} parameter.")) // Verify that if one segment, that the segment is online in the list val segmentIdValue = setup(segmentIdKey).head if (segmentIdValue != ALL_SEGMENTS && !createdSegments.segmentExists(SegmentId(segmentIdValue))) { Invalid( runId, CommandIssue.ParameterValueOutOfRangeIssue(s"The segmentId: $segmentIdValue is not currently available.") ) } else { Accepted(runId) } case HcdShutdown.shutdownCommand => Accepted(runId) case other => Invalid(runId, CommandIssue.UnsupportedCommandIssue(s"HCD does not accept the command: $other")) } }
The HCD can process two commands: lscsDirectCommand and shutdownCommand. For lscsDirectCommand handleValidation
does three things:
- Is the
lscsCommandKey
present? If not, return Invalid. - Is a
segmentIdKey
included? If not, return Invalid. - Finally, if the destination is a single segment, check to see if the destination segment is “online”. Since we can configure the number of segments to create, it’s possible to send a command to a segment that does not exist. If this check fails, return Invalid.
If these three things pass, the Setup is Accepted. There is probably more that could be done but that’s it for now. We are assuming that the input/output library is used to constructing the Setups so no checks for required parameters are done.
The HCD also accepts the HcdShutdown.shutdownCommand. This command is always accepted.
If any other command is received by the HCD, it is rejected and Invalid is returned.
Segments HCD Command Execution
Once validated, the onSubmit
handler is called. Like the Assembly, the HCD rejects Observes and hands off execution to handleSetup
. Checking for Observe again isn’t needed since it is done in validation. HandleSetup is reproduced below. This is the crux of the HCD.
- Scala
-
source
/** * Processes commands as Setups for the HCD. * @param runId command runId * @param setup the [[Setup]] to execute * @return [[SubmitResponse]] response from the command. All commands are started currently. */ private def handleSetup(runId: Id, setup: Setup): SubmitResponse = { setup.commandName match { case HcdDirectCommand.lscsDirectCommand => // We know all these params are present at this point val command = setup(lscsCommandKey).head val commandName = setup(lscsCommandNameKey).head val segmentKeyValue = setup(segmentIdKey).head // The sendList, at this point, is either one segment or all segments. The same execution approach // is used regardless of one or all val sendList = if (segmentKeyValue == ALL_SEGMENTS) { createdSegments.getAllSegments } else { createdSegments.getSegment(segmentId = SegmentId(segmentKeyValue)) } // Here a SegComMonitor is created to send and watch for completion of the command. The sendList is the // list of SegmentActors that will receive the command. The function passed as the 5th argument will // be executed when all the segment commands complete either successfully or with an error. val mon1 = ctx.spawnAnonymous( hcd.SegComMonitor( commandName, command, sendList.segments, runId, (sr: SubmitResponse) => cswCtx.commandResponseManager.updateCommand(sr), log ) ) // Start the command and return Started to the Assembly mon1 ! SegComMonitor.Start Started(runId) case HcdShutdown.shutdownCommand => //This just sends shutdown to all the online segments createdSegments.shutdownAll() Completed(runId) case other => Error(runId, s"This HCD does not handle this command: $other") } }
There are two commands, lscsDirectCommand
, and shutdownCommand
.
lscsDirectCommand Implementation
The first thing that occurs is the command name, the segment destination, and the command String are extracted from the Setup. Then the segmentKeyValue, which is either ALL_SEGMENTS or a specific segment ID is tested for and used to access a list of segments for the command. It calls SegmentManager to return on or all.
Then a SegComMonitor
actor is created, which takes the command String, the list of segments, the runId of the request, and a function to execute when all the segments have completed. The SegComMonitor.Start
message is sent to the monitor, which causes the sending of the command to all the segments.
Just as a note in case it is not clear. These lscsDirectCommands executed by the HCD are totally asynchronous. As you can see from the code, once a monitor is started, the submit-handler returns Started
indicating to the caller that a long- running command is started. The HCD is then available to accept and process another HCD/segment command, which starts its own SegComMonitor. The tests demonstrate that this works.
Segment Monitor
Segment Monitor is the longest piece of code in the project. Segment Monitor is an actor implementing a two state finite state machine with the job of executing a segment command and waiting for responses. The code is long but is included here.
- Scala
-
source
/** * These are the commands for the Command Sequence Monitor. The WrappedSegmentResponse is needed to receive * the SegmentActor.Response and transform it into a SegComMonitor.Command. * CommandTimeout is is issued and received if the Command Monitor times out before receiving all segment responses. */ sealed trait Command final private case class WrappedSegmentResponse(response: SegmentActor.Response) extends Command case object Start extends Command final private case object CommandTimeout extends Command /** * This private class implements the Segment Command Monitor. * The class is an actor with a two state FSM. The monitor is created with a list of segment actors and a * segment command that is to be sent to each of the segment actors in the list. It waits for the Start message * in the `starting` state. Once Start is received, it sends to command sequentially to each segment actor and * waits for the responses in the `waiting` state. * * It determines completion by counting responses and also checking that the responses are Completed rather than Error. * Once the correct number of responses have been received, it executes the replyTo function, which is usually a * function to update the CSW CRM, which notifies the caller that the command is completed. * * There is one more feature. A timeout can be provided that will allow the command to timeout and reply to the * caller with an Error. * * A new Segment Command Monitor is created for every command executed, after all the responses have been received or * the monitor is otherwise completed, the monitor returns Behaviors.stopped, which causes the actor to be ended. */ class SegMonitor private[SegComMonitor] ( commandName: String, fullCommand: String, segments: List[ActorRef[SegmentActor.Command]], runId: Id, replyTo: SubmitResponse => Unit, // This is here so that handler can provide the CRM but tests can do something else log: Logger, segmentResponseMapper: ActorRef[SegmentActor.Response], timeout: FiniteDuration ) { def starting(): Behavior[Command] = Behaviors.receiveMessage { case Start => log.debug(s"Sending $commandName to ${segments.size} segments.") segments.foreach(_ ! SegmentActor.Send(commandName, fullCommand, segmentResponseMapper)) waiting(segments.size, responsesReceived = 0) case _ => Behaviors.unhandled } /** * The waiting state is entered once the command is sent to the segments. Every time a response from a segment * is received, the Behavior calls itself with updated state. This is not a recursive call in Akka. * @param totalSegments the number of expected responses * @param responsesReceived number of responses received so far * @return a typed Behavior */ def waiting(totalSegments: Int, responsesReceived: Int): Behavior[Command] = Behaviors.withTimers { timers => Behaviors .receiveMessage[Command] { case wrapped: WrappedSegmentResponse => wrapped.response match { case SegmentActor.Started(commandName, commandId, segmentId) => timers.startSingleTimer(TIMEOUT_KEY, CommandTimeout, timeout) log.debug(s"Started: $segmentId:$commandName:$commandId with $timeout timeout.") Behaviors.same case SegmentActor.Completed(commandName, commandId, segmentId) => val updatedResponsesReceived = responsesReceived + 1 if (totalSegments == updatedResponsesReceived) { log.info( s"$commandName completed successfully for **$updatedResponsesReceived** segments. Sending Completed($runId)" ) log.debug(s"Cancelling Timeout Timer") timers.cancel(TIMEOUT_KEY) replyTo(Completed(runId)) Behaviors.stopped } else { if (Math.floorMod(responsesReceived, 20) == 0) log.debug(s"Completed: $segmentId:$commandName:$commandId Total completed: $responsesReceived") waiting(totalSegments, updatedResponsesReceived) } case SegmentActor.Processing(commandName, commandId, segmentId) => log.debug(s"Processing: $segmentId:$commandName:$commandId") Behaviors.same case SegmentActor.Error(commandName, commandId, segmentId, message) => val updatedResponsesReceived = responsesReceived + 1 log.error( s"Error: $segmentId:$commandName:$commandId--STOPPING, $updatedResponsesReceived responses received." ) // Cancel the timeout timer timers.cancel(TIMEOUT_KEY) replyTo(Error(runId, message)) Behaviors.stopped } case Start => Behaviors.unhandled case CommandTimeout => replyTo( Error( runId, s"A segment command timed out after receiving: $responsesReceived responses of expected: $totalSegments." ) ) Behaviors.stopped case other => log.error(s"SegComMonitor received some other message. Just FYI: $other") Behaviors.same } .receiveSignal { case (_, PostStop) => log.debug(s">>>SegComMonitor for $runId STOPPED<<<") Behaviors.same } } }
SegComMonitor
is created in the starting
state, awaiting the start
message from the HCD TLA. When received, it sends the full segment command to each of the SegmentActors on the list passed into the monitor from the TLA (it will be 1 segment or a list of all segments). Once the commands are sent, it moves to the waiting
state.
The state is called waiting
because it is waiting for the responses from the SegmentActors. First note that waiting starts a timer so that if something goes wrong in a SegmentActor, after a timeout period, the CommandTimeout message will be sent to itself, which will cause an Error SubmitResponse to be returned to the runId through the replyTo
function.
The monitor is written to assume the SegmentActor will return Completed or Error responses as described in the M1CS docs (Started and Processing are not handled). The happy case is the SegmentActor.Completed message. All the monitor does is count happy responses until it receives the correct number, it then sends Completed to the runId through the replyTo function.
If a single Error is returned, the monitor stops immediately–if a single segment produces an error out of all segments, it means the command fails immediately, and an Error is returned to the client through the runId and replyTo function.
The important part for completion is the code around the SegmentActor.Completed message. Whenever a segment returns a positive response, the count is incremented and waiting is called again (which appears recursive, but actually isn’t). Once all the responses are received, the Completed SubmitResponse is sent to the caller through the replyTo function. The Behavior of the monitor becomes Behaviors.stopped
, which causes the monitor actor to stop.
The SegComMonitor is written in the functional typed actor style. See akka.io.
Neither simulator implements the protocol that includes: Processing and Started messages. We suggest they not be implemented and keep the low-level protocol simple.
SegComMonitor
tests are a good way to see how testing works and how we validate the M1CS requirement to complete all segment commands and complete a CSW command.
The following test shows sending a command to 492 segments and waiting for completion. First the SegmentManager
is used to create a full mirror configuration. This is what the HCD does during initialization The tester
is a function that notifies the com1Response TestProbe when the command completes.
Then a SegComMonitor
is created with a test command and the list of 492 segments. It is then started, which causes the command to be sent to all the segments. Then the test code waits for the Completion message. Once received as a precaution it waits to see if any other messages arrive, then shutsdown all the segment connections and quits.
- Scala
-
source
test("492 segments - all sectors - send 1") { // Note: This test fails on Mac, server must be running on Linux due to open file issue? val range = 1 to SegmentId.MAX_SEGMENT_NUMBER // Create segments val segments = SegmentManager.createSegments(testCreator, range, log) val segmentActors = segments.getAllSegments.segments segmentActors.size shouldBe 492 val com1Response = TestProbe[SubmitResponse]() val tester = makeTester(com1Response) val runId1 = Id() val mon = testKit.spawn(hcd.SegComMonitor(cn1, cn1full, segmentActors, runId1, tester, log)) mon ! SegComMonitor.Start // This verifies that both commands finished successfully com1Response.expectMessage(10.seconds, Completed(runId1)) // Wait for final logs com1Response.expectNoMessage(100.milli) segments.shutdownAll() testKit.stop(mon, 5.seconds) }
The following code is similar but in this case, the test starts and executes two overlapping segment commands and waits for them both to asynchronously complete either successfully or with an error.
- Scala
-
source
test("492 segments - all sectors - overlap") { // Note: This test fails on Mac, server must be running on Linux due to open file issue? val range = 1 to SegmentId.MAX_SEGMENT_NUMBER // Create segments val segments = SegmentManager.createSegments(testCreator, range, log) val segmentActors = segments.getAllSegments.segments segmentActors.size shouldBe 492 val com1Response = TestProbe[SubmitResponse]() val tester = makeTester(com1Response) val runId1 = Id() val mon1 = testKit.spawn(hcd.SegComMonitor(cn1, cn1full, segmentActors, runId1, tester, log)) mon1 ! SegComMonitor.Start val runId2 = Id() val mon2 = testKit.spawn(hcd.SegComMonitor(cn2, cn2full, segmentActors, runId2, tester, log)) mon2 ! SegComMonitor.Start // This verifies that both commands finished successfully val messages = com1Response.receiveMessages(2, 15.seconds) messages.size shouldBe 2 val resultRunIds = Set(messages.head.runId, messages(1).runId) resultRunIds.contains(runId1) shouldBe true resultRunIds.contains(runId2) shouldBe true // Wait for final logs or shutdown com1Response.expectNoMessage(100.milli) segments.shutdownAll() testKit.stop(mon1, 5.seconds) testKit.stop(mon2, 5.seconds) }
At least with the JVM simulator, it is not known which command will complete first because of the randomized delays, so the test waits for 2 messages and then checks that they are the correct messages.
Segment Actor
The Segments HCD creates a SegmentActor instance for each configured segment. The SegmentActor is a wrapper for the class called SocketClientStream
, which is the low-level communication with the segment and which implements the JPL-library protocol. The code below is the interesting part of SegmentActor.
- Scala
-
source
object SegmentActor { // The following is temp hack for testing errors val ERROR_SEG_ID: Int = 6 val ERROR_COMMAND_NAME: String = "ERROR" val FAKE_ERROR_MESSAGE: String = "Error received from simulator." private implicit val timout: Timeout = Timeout(5.seconds) def apply(segmentId: SegmentId, log: Logger): Behavior[Command] = { Behaviors.setup[Command] { ctx => // Here we check to see if there is a property called simulatorHost val simulatorHost = Properties.propOrElse("simulatorHost", "localhost") log.debug(s">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Connecting to: $simulatorHost") val io: SocketClientStream = SocketClientStream(ctx, segmentId.toString, host = simulatorHost) handle(io, segmentId, log) } } private def handle(io: SocketClientStream, segmentId: SegmentId, log: Logger): Behavior[Command] = Behaviors.receive[Command] { (ctx, m) => // This is an error sequence number to return when send fails val errorSeqNum = -1 import ctx.executionContext m match { case Send(commandName, command, replyTo) => log.debug(s"Sending segment $segmentId => $command") // This branch is here to test error only. TMT simulator will delay a random time before replying with ERROR // For testing, it is possible to send ERROR and TMT simulator will reply with ERROR rather than COMPLETED // This will only happen if the error command is sent to error segment val simCommand = if (commandName == ERROR_COMMAND_NAME && segmentId.number == ERROR_SEG_ID) { s"ERROR $command" } else { command } io.send(simCommand).onComplete { case Success(m) => // This is the sequence number used by the low level socket code to send the command // It is created by the SocketServerStream code val seqNum = m.hdr.seqNo if (m.cmd.toLowerCase.contains("completed")) { replyTo ! Completed(commandName, seqNum, segmentId) } else { replyTo ! Error(commandName, seqNum, segmentId, "Error received from simulator.") } log.debug(s"Segment $segmentId: ${m.cmd}") case Failure(exception) => // This branch occurs when for instance, the socket send fails log.error(s"Socket send failed: $exception", ex = exception) replyTo ! Error(commandName, errorSeqNum, segmentId, "Error failure received from simulator.") } handle(io, segmentId, log) case SendWithTime(commandName, _, delay, replyTo) => // If DELAY command is sent, TMT simulator waits for specified time, this is for testing overlaps val simCommand = s"DELAY ${delay.toMillis.toString}" log.debug(s"Sending segment: $segmentId => $simCommand") io.send(simCommand).onComplete { case Success(m) => // The value returned from the simulator for DELAY is not used at this point. That could change. // Message from simulator not used at this point - cannot handle ERROR val seqNum = m.hdr.seqNo log.debug(s"Received: ${m.cmd} $seqNum") replyTo ! Completed(commandName, seqNum, segmentId) case Failure(exception) => log.error(s"Socket send failed: $exception", ex = exception) replyTo ! Error(commandName, errorSeqNum, segmentId, "Error received from simulator.") } handle(io, segmentId, log)
First, when the SegmentActor is created in the apply
def function. This is where the lower level SocketClientStream
class is added and where the socket connection to the LSCS occurs.
SegmentActor has two messages for sending a message to the LSCS simulator. The first is Send, which is the message meant for the operating use case. It sends a command String to the segment LSCS. The second is SendWithTime, which allows sending specified delay to the simulator allowing predictable testing of things like multiple commands and overlapping commands.
At this time both segment commands are implemented as a variable delay on the segment side of the socket. With the Send
command, the command String is sent to the simulator, which in the JVM simulator, returns the Completed
response after a random delay. With the sendWithTime
message, we send the command: DELAY MILLIS such as DELAY 1234, to the socket client. The desired delay is an argument of the message. The JVM simulator waits for the specified time before returning Completed
.
Tthe send
method of the socket client sends the command to the segment socket. This is an asynchronous call using a Future. When the response is received, the SegmentActor sends the response to the caller, which is a SegComMon
instance, to be counted.
The ShutdownSegment command is also handled. When this occurs, the client is terminated, which closes the socket connection to the segment. These shutdown commands are present so that tests always work correctly. In the operations case, the sockets would probably stay open and would probably not need to be closed. It might be worth-while to investigate created the sockets during a command and closing them after if commands are relatively infrequent.