Communication using Commands

csw-command library provides support for command based communication between components.

This section describes how to communicate with any other component using commands. To check how to manage commands received, please visit Managing Command State

Dependencies

sbt
libraryDependencies += "com.github.tmtsoftware.csw" %% "csw-command" % "0.6.0-RC1"

Command-based Communication Between Components

A component can send Commands to other components. The commands can be sent as following two types of messages:

  • submit - A command is sent as Submit when the result of completion is desired.
  • oneway - A command is sent as Oneway when the result of completion is not desired.

A oneway is meant to be used between an Assembly and an HCD. It is also used when tracking completion using a Matcher and current state values (see below).

Feedback Needed on Future of Oneway

In the FDR prototype version of CSW, the oneway message was used between an Assembly and HCD to “fire and forget”. A oneway command was sent to the destination and no validation or command completion is provided in order to be as efficient as possible. In this case completion information is provided through Command Service pubsub and current state values.

In this release version of CSW, oneway does provide validation to the caller, and this version of CSW Command Service also reinstates immediate completion, which was dropped in the CSW prototype release. The only feature left from the FDR version is that oneway tells the sender that there is no completion information provided.

Given these two changes, the use of oneway is not sufficiently different from submit. With this in mind we anticipate one of the two possibly futures for oneway:

  1. Oneway should provide a clear difference with submit and go back to its original features with no validation or command completion.
  2. Oneway could be removed simplifying the command API to just submit.

The reason for oneway case 1 is to provide the best possible performance. A scenario is an actor that is subscribed to a 20 Hz demand event and sends out motion commands to one or more HCDs based on a calculation using data in this event. At 20 Hz, it’s probably not useful to get validation information. Nothing can be done anyway. The receiver could log a message.

However, maybe even in this case validation is useful and can be used by the sender to understand problems in the receiver, so providing only submit is adequate.

We are looking for feedback. Do you see a continued role for oneway? Maybe there is another scenario for oneway? Please write us if with your recommendation for oneway – and be warned that it may change behavior or disappear in the next release based on feedback.

The following responses can be received as a CommandResponse after sending a command with Submit or Oneway:

  • Accepted : The command is validated and will be executed, this is returned for a long-running action.
  • Completed : The command has been executed successfully.
  • CompletedWithResult : The command is executed successfully and generated some result as a parameter set.
  • Invalid : The command is not valid and will not be executed. A reason is provided.
  • NoLongerValid : The command can no longer be executed (will be deprecated)
  • Error : The command has failed in execution. A reason is provided.
  • Cancelled : The command was cancelled.
  • CommandNotAvailable : A queried command is not available.
  • NotAllowed : The command cannot be executed currently because of the current state of the destination component. Eg. another command is in execution in the presence of which it cannot accept any other command to execute or some other reason.

A command sent as Submit or Oneway is validated by the receiving component before actual execution. If the validation is successful, the actual execution can happen in two ways :

  • Immediate Completion - The component receiving the command can determine if the command can be executed immediately and thus provide the final execution response directly without sending a response for validation. This should be reserved for actions that do not take long to complete.
Scala
val eventualResponse: Future[SubmitResponse] = async {
  await(assemblyComponent.submit(Setup(prefix, immediateCmd, obsId))) match {
    case response: Completed ⇒
      //do something with completed result
      response
    case otherResponse ⇒
      // do something with other response which is not expected
      otherResponse
  }
}
Java
CompletableFuture<CommandResponse.SubmitResponse> eventualCommandResponse =
        hcdCmdService
                .submit(imdInvalidCommand, timeout)
                .thenApply(
                        response -> {
                            if (response instanceof CommandResponse.Completed) {
                                //do something with completed result
                            }
                            return response;
                        }
                );
  • Long Running Actions - The component receiving the command may determine that the command cannot be executed immediately. In this case, the component provides a Accepted response as an acknowledgement and maintains the state of the command. The sender can query the state of a particular command at a later time or use the subscribe method to get the final response when the execution is completed.

The sender component can use the following with the command id (RunId) of an executing command to get the current status, completion response and/or result of the command

  • Query - Query the current state of an executing command
Scala
val setupForQuery = Setup(prefix, longRunning, Some(obsId))
val finalResponse = assemblyCommandService.submit(setupForQuery)

//do some work before querying for the result of above command as needed
val eventualResponse: Future[QueryResponse] = assemblyCommandService.query(setupForQuery.runId)
Java
CompletableFuture<CommandResponse.SubmitResponse> submitResponseFuture = hcdCmdService.submit(controlCommand, timeout);

// do some work before querying for the result of above command as needed
CompletableFuture<CommandResponse.QueryResponse> queryResponseFuture = hcdCmdService.query(controlCommand.runId(), timeout);
  • Subscribe - It is also possible to subscribe to asynchronously get command response updates for an executing command. At least one response is always delivered.
Scala
val eventualCommandResponse = assemblyCommandService.submit(assemblyLongSetup).map {
  case Invalid(runId, _) ⇒ Error(runId, "")
  case x: SubmitResponse ⇒ x
}
Java
CompletableFuture<CommandResponse.SubmitResponse> testCommandResponse =
        hcdCmdService
                .submit(controlCommand, timeout)
                .thenCompose(commandResponse -> {
                    if (commandResponse instanceof CommandResponse.Invalid) {
                        return CompletableFuture.completedFuture(new CommandResponse.Error(commandResponse.runId(), "tests error"));
                    } else {
                        return CompletableFuture.completedFuture(commandResponse);
                    }
                });

CommandService

A helper/wrapper is provided called CommandService that provides a convenient way to use the Command Service with a component discovered using Location Service. A CommandService instance is created using the value from the Location Service. This CommandService instance will has methods for communicating with the component.

The API can be exercised as follows for different scenarios of command-based communication:

submit

Submit a command and get a CommandResponse as a Future. The CommandResponse can be a response from validation (Accepted, Invalid) or a final Response in case of immediate completion.

Scala/immediate-response
val eventualResponse: Future[SubmitResponse] = async {
  await(assemblyComponent.submit(Setup(prefix, immediateCmd, obsId))) match {
    case response: Completed ⇒
      //do something with completed result
      response
    case otherResponse ⇒
      // do something with other response which is not expected
      otherResponse
  }
}
Java/immediate-response
CompletableFuture<CommandResponse.SubmitResponse> eventualCommandResponse =
        hcdCmdService
                .submit(imdInvalidCommand, timeout)
                .thenApply(
                        response -> {
                            if (response instanceof CommandResponse.Completed) {
                                //do something with completed result
                            }
                            return response;
                        }
                );
Scala/validation-response
// `setupWithTimeoutMatcher` is a sample setup payload intended to be used when command response is not determined
// using matcher
val submitCommandResponseF: Future[Unit] = async {
  val initialResponse: SubmitResponse = await(assemblyComponent.submit(setupWithTimeoutMatcher))
  initialResponse match {
    case accepted: Accepted ⇒
    // do Something
    case invalid: Invalid ⇒
    // do Something
    case x ⇒
    // do Something
  }
}
Java/validation-response
CompletableFuture submitCommandResponseF = hcdCmdService
        .submit(setup, timeout)
        .thenAccept(submitResponse -> {
            if (submitResponse instanceof CommandResponse.Completed) {
                //do something
            } else if (submitResponse instanceof CommandResponse.Error) {
                //do something
            } else if (submitResponse instanceof CommandResponse.Invalid){
                //do something
            }
        });

CompletableFuture<CommandResponse.SubmitResponse> finalResponseCompletableFuture = hcdCmdService.submit(failureResCommand1, timeout);
CommandResponse.SubmitResponse actualValidationResponse = finalResponseCompletableFuture.get();

oneway

Send a command as a Oneway and get a CommandResponse as a Future. The CommandResponse can be a response of validation (Accepted, Invalid) or a final Response.

Scala
// `setupWithTimeoutMatcher` is a sample setup payload intended to be used when command response is not determined
// using matcher
// Note this and the next two don't do anything useful
val onewayCommandResponseF: Future[Unit] = async {
  val initialResponse: OnewayResponse = await(assemblyComponent.oneway(setupWithTimeoutMatcher))
  initialResponse match {
    case accepted: Accepted ⇒
    // do Something
    case invalid: Invalid ⇒
    // do Something
    case x ⇒
    // do Something
  }
}
Java
CompletableFuture onewayCommandResponseF = hcdCmdService
        .oneway(setup, timeout)
        .thenAccept(initialCommandResponse -> {
            if (initialCommandResponse instanceof CommandResponse.Accepted) {
                //do something
            } else if (initialCommandResponse instanceof CommandResponse.Invalid) {
                //do something
            } else {
                //do something
            }
        });

subscribe

Subscribe for the result of a long-running command which was sent as Submit to get a CommandResponse as a Future.

Scala
val eventualCommandResponse = assemblyCommandService.submit(assemblyLongSetup).map {
  case Invalid(runId, _) ⇒ Error(runId, "")
  case x: SubmitResponse ⇒ x
}
Java
CompletableFuture<CommandResponse.SubmitResponse> testCommandResponse =
        hcdCmdService
                .submit(controlCommand, timeout)
                .thenCompose(commandResponse -> {
                    if (commandResponse instanceof CommandResponse.Invalid) {
                        return CompletableFuture.completedFuture(new CommandResponse.Error(commandResponse.runId(), "tests error"));
                    } else {
                        return CompletableFuture.completedFuture(commandResponse);
                    }
                });

query

Query for the result of a long-running command which was sent as Submit to get a CommandResponse as a Future.

Scala
val setupForQuery = Setup(prefix, longRunning, Some(obsId))
val finalResponse = assemblyCommandService.submit(setupForQuery)

//do some work before querying for the result of above command as needed
val eventualResponse: Future[QueryResponse] = assemblyCommandService.query(setupForQuery.runId)
Java
CompletableFuture<CommandResponse.SubmitResponse> submitResponseFuture = hcdCmdService.submit(controlCommand, timeout);

// do some work before querying for the result of above command as needed
CompletableFuture<CommandResponse.QueryResponse> queryResponseFuture = hcdCmdService.query(controlCommand.runId(), timeout);

submit

Submit a command and Subscribe for the result if it was successfully validated as Accepted to get a final CommandResponse as a Future.

Scala
val setupForSubscribe = Setup(prefix, longRunning, Some(obsId))
val response          = assemblyCommandService.submit(setupForSubscribe)
Java
CompletableFuture submitCommandResponseF = hcdCmdService
        .submit(setup, timeout)
        .thenAccept(submitResponse -> {
            if (submitResponse instanceof CommandResponse.Completed) {
                //do something
            } else if (submitResponse instanceof CommandResponse.Error) {
                //do something
            } else if (submitResponse instanceof CommandResponse.Invalid){
                //do something
            }
        });

CompletableFuture<CommandResponse.SubmitResponse> finalResponseCompletableFuture = hcdCmdService.submit(failureResCommand1, timeout);
CommandResponse.SubmitResponse actualValidationResponse = finalResponseCompletableFuture.get();

onewayAndMatch

Send a command and match the published state from the component using a StateMatcher. If the match is successful a Completed response is provided as a future. In case of a failure or unmatched state, Error CommandResponse is provided as a Future.

Scala
val eventualResponse1: Future[MatchingResponse] = assemblyComponent.onewayAndMatch(setupWithMatcher, demandMatcher)
Java

// create a DemandMatcher which specifies the desired state to be matched. StateMatcher stateMatcher = new DemandMatcher(new DemandState(prefix().prefix(), new StateName("testStateName")).add(param), false, timeout); // create matcher instance Matcher matcher1 = new Matcher(AkkaLocationExt.RichAkkaLocation(hcdLocation).componentRef().narrow(), demandMatcher, ec, mat); // start the matcher so that it is ready to receive state published by the source CompletableFuture<MatcherResponse> matcherResponse = matcher1.jStart(); CompletableFuture<CommandResponse.MatchingResponse> matchedCommandResponse = hcdCmdService.onewayAndMatch(setup, stateMatcher, timeout);

submitAll

Submit multiple commands and get list of SubmitResponse for each of the submitted command.

Scala
val assemblyInitSetup = Setup(prefix, initCmd, Some(obsId))
val assemblyMoveSetup = Setup(prefix, moveCmd, Some(obsId))

val multiResponse1: Future[List[SubmitResponse]] =
  assemblyCommandService.submitAll(List(assemblyInitSetup, assemblyMoveSetup))
Java
Setup setupHcd1 = new Setup(prefix(), shortRunning(), Optional.empty()).add(encoderParam);
Setup setupHcd2 = new Setup(prefix(), mediumRunning(), Optional.empty()).add(encoderParam);

CompletableFuture<List<CommandResponse.SubmitResponse>> finalCommandResponse = hcdCmdService
        .submitAll(
                Arrays.asList(setupHcd1, setupHcd2),
                timeout
        );

subscribeCurrentState

This method can be used to subscribe to the CurrentState of the component by providing a callback. Subscribing results into a handle of CurrentStateSubscription which can be used to unsubscribe the subscription.

Scala
// subscribe to the current state of an assembly component and use a callback which forwards each received
// element to a test probe actor
assemblyCommandService.subscribeCurrentState(probe.ref ! _)
Java
// subscribe to the current state of an assembly component and use a callback which forwards each received
// element to a test probe actor
CurrentStateSubscription subscription = hcdCmdService.subscribeCurrentState(currentState -> probe.ref().tell(currentState));

Matching state for command completion

A Matcher is provided for matching state against a desired state. The matcher is created with a source of state identified by its ActorRef and an instance of StateMatcher which defines the state and criteria for matching. Several instances of StateMatcher are available for common use. These are DemandMatcherAll for matching the entire DemandState against the current state, DemandMatcher for matching state with or without units against the current state and PresenceMatcher which checks if a matching state is found with a provided prefix.

Scala

// create a DemandMatcher which specifies the desired state to be matched. val demandMatcher = DemandMatcher(DemandState(prefix, StateName("testStateName"), Set(param)), withUnits = false, timeout) // create matcher instance val matcher = new Matcher(assemblyLocation.componentRef, demandMatcher) // start the matcher so that it is ready to receive state published by the source val matcherResponseF: Future[MatcherResponse] = matcher.start // submit command and if the command is successfully validated, check for matching of demand state against current state val eventualCommandResponse: Future[MatchingResponse] = async { val initialResponse = await(assemblyComponent.oneway(setupWithMatcher)) initialResponse match { case _: Accepted ⇒ val matcherResponse = await(matcherResponseF) // create appropriate response if demand state was matched from among the published state or otherwise matcherResponse match { case MatchCompleted => Completed(setupWithMatcher.runId) case mf: MatchFailed => Error(setupWithMatcher.runId, mf.throwable.getMessage) } case invalid: Invalid ⇒ matcher.stop() invalid case locked: Locked => matcher.stop() locked } } val commandResponse = Await.result(eventualCommandResponse, timeout.duration)
Java

// create a DemandMatcher which specifies the desired state to be matched. DemandMatcher demandMatcher = new DemandMatcher(new DemandState(prefix().prefix(), new StateName("testStateName")).add(param), false, timeout); // create matcher instance Matcher matcher = new Matcher(AkkaLocationExt.RichAkkaLocation(hcdLocation).componentRef().narrow(), demandMatcher, ec, mat); // start the matcher so that it is ready to receive state published by the source CompletableFuture<MatcherResponse> matcherResponseFuture = matcher.jStart(); // submit command and if the command is successfully validated, check for matching of demand state against current state CompletableFuture<CommandResponse.MatchingResponse> commandResponseToBeMatched = hcdCmdService .oneway(setup, timeout) .thenCompose(initialCommandResponse -> { if (initialCommandResponse instanceof CommandResponse.Accepted) { return matcherResponseFuture.thenApply(matcherResponse -> { if (matcherResponse.getClass().isAssignableFrom(MatcherResponses.jMatchCompleted().getClass())) return new CommandResponse.Completed(initialCommandResponse.runId()); else return new CommandResponse.Error(initialCommandResponse.runId(), "Match not completed"); }); } else { matcher.stop(); // Need a better error message return CompletableFuture.completedFuture(new CommandResponse.Error(initialCommandResponse.runId(), "Matcher failed")); } }); CommandResponse.MatchingResponse actualResponse = commandResponseToBeMatched.get();