Communication using Commands

csw-command library provides support for command based communication between components.

This section describes how to communicate with any other component using commands. To check how to manage commands received, please visit Managing Command State

Dependencies

sbt
libraryDependencies += "org.tmt" %% "csw-command" % "0.5.0"

Command-based Communication Between Components

A component can send Commands to other components. The commands can be sent as following two types of messages:

  • submit - A command is sent as Submit when the result of completion is desired.
  • oneway - A command is sent as Oneway when the result of completion is not desired.

A oneway is meant to be used between an Assembly and an HCD. It is also used when tracking completion using a Matcher and current state values (see below).

Feedback Needed on Future of Oneway

In the FDR prototype version of CSW, the oneway message was used between an Assembly and HCD to “fire and forget”. A oneway command was sent to the destination and no validation or command completion is provided in order to be as efficient as possible. In this case completion information is provided through Command Service pubsub and current state values.

In this release version of CSW, oneway does provide validation to the caller, and this version of CSW Command Service also reinstates immediate completion, which was dropped in the CSW prototype release. The only feature left from the FDR version is that oneway tells the sender that there is no completion information provided.

Given these two changes, the use of oneway is not sufficiently different from submit. With this in mind we anticipate one of the two possibly futures for oneway:

  1. Oneway should provide a clear difference with submit and go back to its original features with no validation or command completion.
  2. Oneway could be removed simplifying the command API to just submit.

The reason for oneway case 1 is to provide the best possible performance. A scenario is an actor that is subscribed to a 20 Hz demand event and sends out motion commands to one or more HCDs based on a calculation using data in this event. At 20 Hz, it’s probably not useful to get validation information. Nothing can be done anyway. The receiver could log a message.

However, maybe even in this case validation is useful and can be used by the sender to understand problems in the receiver, so providing only submit is adequate.

We are looking for feedback. Do you see a continued role for oneway? Maybe there is another scenario for oneway? Please write us if with your recommendation for oneway – and be warned that it may change behavior or disappear in the next release based on feedback.

The following responses can be received as a CommandResponse after sending a command with Submit or Oneway:

  • Accepted : The command is validated and will be executed, this is returned for a long-running action.
  • Completed : The command has been executed successfully.
  • CompletedWithResult : The command is executed successfully and generated some result as a parameter set.
  • Invalid : The command is not valid and will not be executed. A reason is provided.
  • NoLongerValid : The command can no longer be executed (will be deprecated)
  • Error : The command has failed in execution. A reason is provided.
  • Cancelled : The command was cancelled.
  • CommandNotAvailable : A queried command is not available.
  • NotAllowed : The command cannot be executed currently because of the current state of the destination component. Eg. another command is in execution in the presence of which it cannot accept any other command to execute or some other reason.

A command sent as Submit or Oneway is validated by the receiving component before actual execution. If the validation is successful, the actual execution can happen in two ways :

  • Immediate Completion - The component receiving the command can determine if the command can be executed immediately and thus provide the final execution response directly without sending a response for validation. This should be reserved for actions that do not take long to complete.
Scala
val eventualResponse: Future[CommandResponse] = async {
  await(assemblyComponent.submit(Setup(prefix, immediateCmd, obsId))) match {
    case response: Completed ⇒
      //do something with completed result
      response
    case otherResponse ⇒
      // do something with other response which is not expected
      otherResponse
  }
}
Java
CompletableFuture<CommandResponse> eventualCommandResponse =
        hcdCmdService
                .submit(imdInvalidCommand, timeout)
                .thenApply(
                        response -> {
                            if (response instanceof Completed) {
                                //do something with completed result
                            }
                            return response;
                        }
                );
  • Long Running Actions - The component receiving the command may determine that the command cannot be executed immediately. In this case, the component provides a Accepted response as an acknowledgement and maintains the state of the command. The sender can query the state of a particular command at a later time or use the subscribe method to get the final response when the execution is completed.

The sender component can use the following with the command id (RunId) of an executing command to get the current status, completion response and/or result of the command

  • Query - Query the current state of an executing command
Scala
val setupForQuery = Setup(prefix, longRunning, Some(obsId))
assemblyCommandService.submit(setupForQuery)

//do some work before querying for the result of above command as needed

val eventualResponse: Future[CommandResponse] = assemblyCommandService.query(setupForQuery.runId)
Java
hcdCmdService.submit(controlCommand, timeout);

// do some work before querying for the result of above command as needed

CompletableFuture<CommandResponse> queryResponse = hcdCmdService.query(controlCommand.runId(), timeout);
  • Subscribe - It is also possible to subscribe to asynchronously get command response updates for an executing command. At least one response is always delivered.
Scala
val eventualCommandResponse = assemblyCommandService.submit(setup).flatMap {
  case _: Accepted ⇒ assemblyCommandService.subscribe(setup.runId)
  case _           ⇒ Future(CommandResponse.Error(setup.runId, ""))
}
Java
CompletableFuture<CommandResponse> testCommandResponse =
        hcdCmdService
                .submit(controlCommand, timeout)
                .thenCompose(commandResponse -> {
                    if (commandResponse instanceof CommandResponse.Accepted)
                        return hcdCmdService.subscribe(commandResponse.runId(), timeout);
                    else
                        return CompletableFuture.completedFuture(new CommandResponse.Error(commandResponse.runId(), "test error"));
                });

CommandService

A helper/wrapper is provided called CommandService that provides a convenient way to use the Command Service with a component discovered using Location Service. A CommandService instance is created using the value from the Location Service. This CommandService instance will has methods for communicating with the component.

The API can be exercised as follows for different scenarios of command-based communication:

submit

Submit a command and get a CommandResponse as a Future. The CommandResponse can be a response from validation (Accepted, Invalid) or a final Response in case of immediate completion.

Scala/immediate-response
val eventualResponse: Future[CommandResponse] = async {
  await(assemblyComponent.submit(Setup(prefix, immediateCmd, obsId))) match {
    case response: Completed ⇒
      //do something with completed result
      response
    case otherResponse ⇒
      // do something with other response which is not expected
      otherResponse
  }
}
Java/immediate-response
CompletableFuture<CommandResponse> eventualCommandResponse =
        hcdCmdService
                .submit(imdInvalidCommand, timeout)
                .thenApply(
                        response -> {
                            if (response instanceof Completed) {
                                //do something with completed result
                            }
                            return response;
                        }
                );
Scala/validation-response
// `setupWithTimeoutMatcher` is a sample setup payload intended to be used when command response is not determined
// using matcher
val submitCommandResponseF: Future[Unit] = async {
  val initialResponse: CommandResponse = await(assemblyComponent.submit(setupWithTimeoutMatcher))
  initialResponse match {
    case accepted: Accepted ⇒
    // do Something
    case invalid: Invalid ⇒
    // do Something
    case x ⇒
    // do Something
  }
}
Java/validation-response
CompletableFuture submitCommandResponseF = hcdCmdService
        .oneway(setup, timeout)
        .thenAccept(initialCommandResponse -> {
            if (initialCommandResponse instanceof CommandResponse.Accepted) {
                //do something
            } else if (initialCommandResponse instanceof CommandResponse.Invalid) {
                //do something
            } else {
                //do something
            }
        });

oneway

Send a command as a Oneway and get a CommandResponse as a Future. The CommandResponse can be a response of validation (Accepted, Invalid) or a final Response.

Scala
// `setupWithTimeoutMatcher` is a sample setup payload intended to be used when command response is not determined
// using matcher
val onewayCommandResponseF: Future[Unit] = async {
  val initialResponse: CommandResponse = await(assemblyComponent.oneway(setupWithTimeoutMatcher))
  initialResponse match {
    case accepted: Accepted ⇒
    // do Something
    case invalid: Invalid ⇒
    // do Something
    case x ⇒
    // do Something
  }
}
Java
CompletableFuture onewayCommandResponseF = hcdCmdService
        .oneway(setup, timeout)
        .thenAccept(initialCommandResponse -> {
            if (initialCommandResponse instanceof CommandResponse.Accepted) {
                //do something
            } else if (initialCommandResponse instanceof CommandResponse.Invalid) {
                //do something
            } else {
                //do something
            }
        });

subscribe

Subscribe for the result of a long-running command which was sent as Submit to get a CommandResponse as a Future.

Scala
val eventualCommandResponse = assemblyCommandService.submit(setup).flatMap {
  case _: Accepted ⇒ assemblyCommandService.subscribe(setup.runId)
  case _           ⇒ Future(CommandResponse.Error(setup.runId, ""))
}
Java
CompletableFuture<CommandResponse> testCommandResponse =
        hcdCmdService
                .submit(controlCommand, timeout)
                .thenCompose(commandResponse -> {
                    if (commandResponse instanceof CommandResponse.Accepted)
                        return hcdCmdService.subscribe(commandResponse.runId(), timeout);
                    else
                        return CompletableFuture.completedFuture(new CommandResponse.Error(commandResponse.runId(), "test error"));
                });

query

Query for the result of a long-running command which was sent as Submit to get a CommandResponse as a Future.

Scala
val setupForQuery = Setup(prefix, longRunning, Some(obsId))
assemblyCommandService.submit(setupForQuery)

//do some work before querying for the result of above command as needed

val eventualResponse: Future[CommandResponse] = assemblyCommandService.query(setupForQuery.runId)
Java
hcdCmdService.submit(controlCommand, timeout);

// do some work before querying for the result of above command as needed

CompletableFuture<CommandResponse> queryResponse = hcdCmdService.query(controlCommand.runId(), timeout);

submitAndSubscribe

Submit a command and Subscribe for the result if it was successfully validated as Accepted to get a final CommandResponse as a Future.

Scala
val setupForSubscribe = Setup(prefix, longRunning, Some(obsId))
val response          = assemblyCommandService.submitAndSubscribe(setupForSubscribe)
Java
CompletableFuture<CommandResponse> finalResponseCompletableFuture = hcdCmdService.submitAndSubscribe(failureResCommand1, timeout);
CommandResponse actualValidationResponse = finalResponseCompletableFuture.get();

onewayAndMatch

Send a command and match the published state from the component using a StateMatcher. If the match is successful a Completed response is provided as a future. In case of a failure or unmatched state, Error CommandResponse is provided as a Future.

Scala
val eventualResponse1: Future[CommandResponse] = assemblyComponent.onewayAndMatch(setupWithMatcher, demandMatcher)
Java

// create a DemandMatcher which specifies the desired state to be matched. StateMatcher stateMatcher = new DemandMatcher(new DemandState(prefix().prefix(), new StateName("testStateName")).add(param), false, timeout); // create matcher instance Matcher matcher1 = new Matcher(hcdLocation.componentRef().narrow(), demandMatcher, ec, mat); // start the matcher so that it is ready to receive state published by the source CompletableFuture<MatcherResponse> matcherResponse = matcher1.jStart(); CompletableFuture<CommandResponse> matchedCommandResponse = hcdCmdService.onewayAndMatch(setup, stateMatcher, timeout);

submitAllAndGetResponse

Submit multiple commands and get one CommandResponse as a Future of CommandResponse for all commands. If all the commands were successful, a CommandResponse as Completed will be returned. If any one of the command fails, an Error will be returned.

Scala

val responseOfMultipleCommands = hcdComponent.submitAllAndGetResponse(Set(setupHcd1, setupHcd2))
Java
Setup setupHcd1 = new Setup(prefix(), shortRunning(), Optional.empty()).add(encoderParam);
Setup setupHcd2 = new Setup(prefix(), mediumRunning(), Optional.empty()).add(encoderParam);

HashMap<JCommandService, Set<ControlCommand>> componentsToCommands = new HashMap<JCommandService, Set<ControlCommand>>() {
    {
        put(hcdCmdService, new HashSet<ControlCommand>(Arrays.asList(setupHcd1, setupHcd2)));
    }
};

CompletableFuture<CommandResponse> commandResponse = hcdCmdService
        .submitAllAndGetResponse(
                new HashSet<ControlCommand>(Arrays.asList(setupHcd1, setupHcd2)),
                timeout
        );

submitAllAndGetFinalResponse

Submit multiple commands and get final CommandResponse for all as one CommandResponse. If all the commands were successful, a CommandResponse as Completed will be returned. If any one of the command fails, an Error will be returned. For long running commands, it will subscribe for the result of those which were successfully validated as Accepted and get the final CommandResponse.

Scala

val finalResponseOfMultipleCommands = hcdComponent.submitAllAndGetFinalResponse(Set(setupHcd1, setupHcd2))
Java
Setup setupHcd1 = new Setup(prefix(), shortRunning(), Optional.empty()).add(encoderParam);
Setup setupHcd2 = new Setup(prefix(), mediumRunning(), Optional.empty()).add(encoderParam);

HashMap<JCommandService, Set<ControlCommand>> componentsToCommands = new HashMap<JCommandService, Set<ControlCommand>>() {
    {
        put(hcdCmdService, new HashSet<ControlCommand>(Arrays.asList(setupHcd1, setupHcd2)));
    }
};

CompletableFuture<CommandResponse> finalCommandResponse = hcdCmdService
        .submitAllAndGetFinalResponse(
                new HashSet<ControlCommand>(Arrays.asList(setupHcd1, setupHcd2)),
                timeout
        );

subscribeCurrentState

This method can be used to subscribe to the CurrentState of the component by providing a callback. Subscribing results into a handle of CurrentStateSubscription which can be used to unsubscribe the subscription.

Scala
// subscribe to the current state of an assembly component and use a callback which forwards each received
// element to a test probe actor
assemblyCommandService.subscribeCurrentState(probe.ref ! _)
Java
// subscribe to the current state of an assembly component and use a callback which forwards each received
// element to a test probe actor
CurrentStateSubscription subscription = hcdCmdService.subscribeCurrentState(currentState -> probe.ref().tell(currentState));

Matching state for command completion

A Matcher is provided for matching state against a desired state. The matcher is created with a source of state identified by its ActorRef and an instance of StateMatcher which defines the state and criteria for matching. Several instances of StateMatcher are available for common use. These are DemandMatcherAll for matching the entire DemandState against the current state, DemandMatcher for matching state with or without units against the current state and PresenceMatcher which checks if a matching state is found with a provided prefix.

Scala

// create a DemandMatcher which specifies the desired state to be matched. val demandMatcher = DemandMatcher(DemandState(prefix, StateName("testStateName"), Set(param)), withUnits = false, timeout) // create matcher instance val matcher = new Matcher(assemblyLocation.componentRef, demandMatcher) // start the matcher so that it is ready to receive state published by the source val matcherResponseF: Future[MatcherResponse] = matcher.start // submit command and if the command is successfully validated, check for matching of demand state against current state val eventualCommandResponse: Future[CommandResponse] = async { val initialResponse = await(assemblyComponent.oneway(setupWithMatcher)) initialResponse match { case _: Accepted ⇒ val matcherResponse = await(matcherResponseF) // create appropriate response if demand state was matched from among the published state or otherwise matcherResponse match { case MatchCompleted ⇒ Completed(setupWithMatcher.runId) case MatchFailed(ex) ⇒ Error(setupWithMatcher.runId, ex.getMessage) } case invalid: Invalid ⇒ matcher.stop() invalid case x ⇒ x } } val commandResponse = Await.result(eventualCommandResponse, timeout.duration)
Java

// create a DemandMatcher which specifies the desired state to be matched. DemandMatcher demandMatcher = new DemandMatcher(new DemandState(prefix().prefix(), new StateName("testStateName")).add(param), false, timeout); // create matcher instance Matcher matcher = new Matcher(hcdLocation.componentRef().narrow(), demandMatcher, ec, mat); // start the matcher so that it is ready to receive state published by the source CompletableFuture<MatcherResponse> matcherResponseFuture = matcher.jStart(); // submit command and if the command is successfully validated, check for matching of demand state against current state CompletableFuture<CommandResponse> commandResponseToBeMatched = hcdCmdService .oneway(setup, timeout) .thenCompose(initialCommandResponse -> { if (initialCommandResponse instanceof CommandResponse.Accepted) { return matcherResponseFuture.thenApply(matcherResponse -> { // create appropriate response if demand state was matched from among the published state or otherwise if (matcherResponse.getClass().isAssignableFrom(MatcherResponses.jMatchCompleted().getClass())) return new Completed(initialCommandResponse.runId()); else return new CommandResponse.Error(initialCommandResponse.runId(), "Match not completed"); }); } else { matcher.stop(); return CompletableFuture.completedFuture(initialCommandResponse); } }); CommandResponse actualResponse = commandResponseToBeMatched.get();

Distributing commands

CommandDistributor is a utility for distributing commands to multiple components and get an aggregated response.

aggregated validation response

A component can send one or more commands to one or more components using a Map[ComponentRef, Set[ControlCommand], and get an aggregated response of validation as Accepted if all the commands were successfully validated. An Error response is returned otherwise

Scala
val aggregatedValidationResponse = CommandDistributor(
  Map(assemblyCommandService → Set(setupAssembly1, setupAssembly2), hcdComponent → Set(setupHcd1, setupHcd2))
).aggregatedValidationResponse()
Java
CompletableFuture<CommandResponse> cmdValidationResponseF =
        new JCommandDistributor(componentsToCommands).
                aggregatedValidationResponse(timeout, ec, mat);

aggregated completion response

A component can send one or more commands to one or more components using a Map[ComponentRef, Set[ControlCommand]. The utility handles subscribing for final completion result for all the commands post successful validation and get an aggregated response of completion as Completed if all the commands were successfully completed. An Error response is returned otherwise.

Scala
val aggregatedResponse = CommandDistributor(
  Map(assemblyCommandService → Set(setupAssembly1, setupAssembly2), hcdComponent → Set(setupHcd1, setupHcd2))
).aggregatedCompletionResponse()
Java
CompletableFuture<CommandResponse> cmdCompletionResponseF =
        new JCommandDistributor(componentsToCommands).
                aggregatedCompletionResponse(timeout, ec, mat);