Communication using Commands
csw-command library provides support for command based communication between components.
This section describes how to communicate with any other component using commands. To check how to manage commands received, please visit Managing Command State
Dependencies
- sbt
-
libraryDependencies += "com.github.tmtsoftware.csw" %% "csw-command" % "0.6.0-RC1"
Command-based Communication Between Components
A component can send Commands to other components. The commands can be sent as following two types of messages:
- submit - A command is sent as Submit when the result of completion is desired.
- oneway - A command is sent as Oneway when the result of completion is not desired.
A oneway
is meant to be used between an Assembly and an HCD. It is also used when tracking completion using a Matcher and current state values (see below).
In the FDR prototype version of CSW, the oneway
message was used between an Assembly and HCD to “fire and forget”. A oneway
command was sent to the destination and no validation or command completion is provided in order to be as efficient as possible. In this case completion information is provided through Command Service pubsub and current state values.
In this release version of CSW, oneway
does provide validation to the caller, and this version of CSW Command Service also reinstates immediate completion, which was dropped in the CSW prototype release. The only feature left from the FDR version is that oneway
tells the sender that there is no completion information provided.
Given these two changes, the use of oneway
is not sufficiently different from submit. With this in mind we anticipate one of the two possibly futures for oneway
:
Oneway
should provide a clear difference withsubmit
and go back to its original features with no validation or command completion.Oneway
could be removed simplifying the command API to justsubmit
.
The reason for oneway
case 1 is to provide the best possible performance. A scenario is an actor that is subscribed to a 20 Hz demand event and sends out motion commands to one or more HCDs based on a calculation using data in this event. At 20 Hz, it’s probably not useful to get validation information. Nothing can be done anyway. The receiver could log a message.
However, maybe even in this case validation is useful and can be used by the sender to understand problems in the receiver, so providing only submit
is adequate.
We are looking for feedback. Do you see a continued role for oneway
? Maybe there is another scenario for oneway
? Please write us if with your recommendation for oneway – and be warned that it may change behavior or disappear in the next release based on feedback.
The following responses can be received as a CommandResponse
after sending a command with Submit
or Oneway
:
- Accepted : The command is validated and will be executed, this is returned for a long-running action.
- Completed : The command has been executed successfully.
- CompletedWithResult : The command is executed successfully and generated some result as a parameter set.
- Invalid : The command is not valid and will not be executed. A reason is provided.
- NoLongerValid : The command can no longer be executed (will be deprecated)
- Error : The command has failed in execution. A reason is provided.
- Cancelled : The command was cancelled.
- CommandNotAvailable : A queried command is not available.
- NotAllowed : The command cannot be executed currently because of the current state of the destination component. Eg. another command is in execution in the presence of which it cannot accept any other command to execute or some other reason.
A command sent as Submit
or Oneway
is validated by the receiving component before actual execution. If the validation is successful, the actual execution can happen in two ways :
- Immediate Completion - The component receiving the command can determine if the command can be executed immediately and thus provide the final execution response directly without sending a response for validation. This should be reserved for actions that do not take long to complete.
- Scala
-
val eventualResponse: Future[SubmitResponse] = async { await(assemblyComponent.submit(Setup(prefix, immediateCmd, obsId))) match { case response: Completed ⇒ //do something with completed result response case otherResponse ⇒ // do something with other response which is not expected otherResponse } }
- Java
-
CompletableFuture<CommandResponse.SubmitResponse> eventualCommandResponse = hcdCmdService .submit(imdInvalidCommand, timeout) .thenApply( response -> { if (response instanceof CommandResponse.Completed) { //do something with completed result } return response; } );
- Long Running Actions - The component receiving the command may determine that the command cannot be executed immediately. In this case, the component provides a
Accepted
response as an acknowledgement and maintains the state of the command. The sender can query the state of a particular command at a later time or use the subscribe method to get the final response when the execution is completed.
The sender component can use the following with the command id (RunId) of an executing command to get the current status, completion response and/or result of the command
- Query - Query the current state of an executing command
- Scala
-
val setupForQuery = Setup(prefix, longRunning, Some(obsId)) val finalResponse = assemblyCommandService.submit(setupForQuery) //do some work before querying for the result of above command as needed val eventualResponse: Future[QueryResponse] = assemblyCommandService.query(setupForQuery.runId)
- Java
-
CompletableFuture<CommandResponse.SubmitResponse> submitResponseFuture = hcdCmdService.submit(controlCommand, timeout); // do some work before querying for the result of above command as needed CompletableFuture<CommandResponse.QueryResponse> queryResponseFuture = hcdCmdService.query(controlCommand.runId(), timeout);
- Subscribe - It is also possible to subscribe to asynchronously get command response updates for an executing command. At least one response is always delivered.
- Scala
-
val eventualCommandResponse = assemblyCommandService.submit(assemblyLongSetup).map { case Invalid(runId, _) ⇒ Error(runId, "") case x: SubmitResponse ⇒ x }
- Java
-
CompletableFuture<CommandResponse.SubmitResponse> testCommandResponse = hcdCmdService .submit(controlCommand, timeout) .thenCompose(commandResponse -> { if (commandResponse instanceof CommandResponse.Invalid) { return CompletableFuture.completedFuture(new CommandResponse.Error(commandResponse.runId(), "tests error")); } else { return CompletableFuture.completedFuture(commandResponse); } });
CommandService
A helper/wrapper is provided called CommandService
that provides a convenient way to use the Command Service with a component discovered using Location Service. A CommandService
instance is created using the value from the Location Service. This CommandService
instance will has methods for communicating with the component.
The API can be exercised as follows for different scenarios of command-based communication:
submit
Submit a command and get a CommandResponse
as a Future. The CommandResponse can be a response from validation (Accepted, Invalid) or a final Response in case of immediate completion.
- Scala/immediate-response
-
val eventualResponse: Future[SubmitResponse] = async { await(assemblyComponent.submit(Setup(prefix, immediateCmd, obsId))) match { case response: Completed ⇒ //do something with completed result response case otherResponse ⇒ // do something with other response which is not expected otherResponse } }
- Java/immediate-response
-
CompletableFuture<CommandResponse.SubmitResponse> eventualCommandResponse = hcdCmdService .submit(imdInvalidCommand, timeout) .thenApply( response -> { if (response instanceof CommandResponse.Completed) { //do something with completed result } return response; } );
- Scala/validation-response
-
// `setupWithTimeoutMatcher` is a sample setup payload intended to be used when command response is not determined // using matcher val submitCommandResponseF: Future[Unit] = async { val initialResponse: SubmitResponse = await(assemblyComponent.submit(setupWithTimeoutMatcher)) initialResponse match { case accepted: Accepted ⇒ // do Something case invalid: Invalid ⇒ // do Something case x ⇒ // do Something } }
- Java/validation-response
-
CompletableFuture submitCommandResponseF = hcdCmdService .submit(setup, timeout) .thenAccept(submitResponse -> { if (submitResponse instanceof CommandResponse.Completed) { //do something } else if (submitResponse instanceof CommandResponse.Error) { //do something } else if (submitResponse instanceof CommandResponse.Invalid){ //do something } }); CompletableFuture<CommandResponse.SubmitResponse> finalResponseCompletableFuture = hcdCmdService.submit(failureResCommand1, timeout); CommandResponse.SubmitResponse actualValidationResponse = finalResponseCompletableFuture.get();
oneway
Send a command as a Oneway and get a CommandResponse
as a Future. The CommandResponse can be a response of validation (Accepted, Invalid) or a final Response.
- Scala
-
// `setupWithTimeoutMatcher` is a sample setup payload intended to be used when command response is not determined // using matcher // Note this and the next two don't do anything useful val onewayCommandResponseF: Future[Unit] = async { val initialResponse: OnewayResponse = await(assemblyComponent.oneway(setupWithTimeoutMatcher)) initialResponse match { case accepted: Accepted ⇒ // do Something case invalid: Invalid ⇒ // do Something case x ⇒ // do Something } }
- Java
-
CompletableFuture onewayCommandResponseF = hcdCmdService .oneway(setup, timeout) .thenAccept(initialCommandResponse -> { if (initialCommandResponse instanceof CommandResponse.Accepted) { //do something } else if (initialCommandResponse instanceof CommandResponse.Invalid) { //do something } else { //do something } });
subscribe
Subscribe for the result of a long-running command which was sent as Submit to get a CommandResponse
as a Future.
- Scala
-
val eventualCommandResponse = assemblyCommandService.submit(assemblyLongSetup).map { case Invalid(runId, _) ⇒ Error(runId, "") case x: SubmitResponse ⇒ x }
- Java
-
CompletableFuture<CommandResponse.SubmitResponse> testCommandResponse = hcdCmdService .submit(controlCommand, timeout) .thenCompose(commandResponse -> { if (commandResponse instanceof CommandResponse.Invalid) { return CompletableFuture.completedFuture(new CommandResponse.Error(commandResponse.runId(), "tests error")); } else { return CompletableFuture.completedFuture(commandResponse); } });
query
Query for the result of a long-running command which was sent as Submit to get a CommandResponse
as a Future.
- Scala
-
val setupForQuery = Setup(prefix, longRunning, Some(obsId)) val finalResponse = assemblyCommandService.submit(setupForQuery) //do some work before querying for the result of above command as needed val eventualResponse: Future[QueryResponse] = assemblyCommandService.query(setupForQuery.runId)
- Java
-
CompletableFuture<CommandResponse.SubmitResponse> submitResponseFuture = hcdCmdService.submit(controlCommand, timeout); // do some work before querying for the result of above command as needed CompletableFuture<CommandResponse.QueryResponse> queryResponseFuture = hcdCmdService.query(controlCommand.runId(), timeout);
submit
Submit a command and Subscribe for the result if it was successfully validated as Accepted
to get a final CommandResponse
as a Future.
- Scala
-
val setupForSubscribe = Setup(prefix, longRunning, Some(obsId)) val response = assemblyCommandService.submit(setupForSubscribe)
- Java
-
CompletableFuture submitCommandResponseF = hcdCmdService .submit(setup, timeout) .thenAccept(submitResponse -> { if (submitResponse instanceof CommandResponse.Completed) { //do something } else if (submitResponse instanceof CommandResponse.Error) { //do something } else if (submitResponse instanceof CommandResponse.Invalid){ //do something } }); CompletableFuture<CommandResponse.SubmitResponse> finalResponseCompletableFuture = hcdCmdService.submit(failureResCommand1, timeout); CommandResponse.SubmitResponse actualValidationResponse = finalResponseCompletableFuture.get();
onewayAndMatch
Send a command and match the published state from the component using a StateMatcher
. If the match is successful a Completed
response is provided as a future. In case of a failure or unmatched state, Error
CommandResponse is provided as a Future.
- Scala
-
val eventualResponse1: Future[MatchingResponse] = assemblyComponent.onewayAndMatch(setupWithMatcher, demandMatcher)
- Java
-
// create a DemandMatcher which specifies the desired state to be matched. StateMatcher stateMatcher = new DemandMatcher(new DemandState(prefix().prefix(), new StateName("testStateName")).add(param), false, timeout); // create matcher instance Matcher matcher1 = new Matcher(AkkaLocationExt.RichAkkaLocation(hcdLocation).componentRef().narrow(), demandMatcher, ec, mat); // start the matcher so that it is ready to receive state published by the source CompletableFuture<MatcherResponse> matcherResponse = matcher1.jStart(); CompletableFuture<CommandResponse.MatchingResponse> matchedCommandResponse = hcdCmdService.onewayAndMatch(setup, stateMatcher, timeout);
submitAll
Submit multiple commands and get list of SubmitResponse for each of the submitted command.
- Scala
-
val assemblyInitSetup = Setup(prefix, initCmd, Some(obsId)) val assemblyMoveSetup = Setup(prefix, moveCmd, Some(obsId)) val multiResponse1: Future[List[SubmitResponse]] = assemblyCommandService.submitAll(List(assemblyInitSetup, assemblyMoveSetup))
- Java
-
Setup setupHcd1 = new Setup(prefix(), shortRunning(), Optional.empty()).add(encoderParam); Setup setupHcd2 = new Setup(prefix(), mediumRunning(), Optional.empty()).add(encoderParam); CompletableFuture<List<CommandResponse.SubmitResponse>> finalCommandResponse = hcdCmdService .submitAll( Arrays.asList(setupHcd1, setupHcd2), timeout );
subscribeCurrentState
This method can be used to subscribe to the CurrentState of the component by providing a callback. Subscribing results into a handle of CurrentStateSubscription
which can be used to unsubscribe the subscription.
- Scala
-
// subscribe to the current state of an assembly component and use a callback which forwards each received // element to a test probe actor assemblyCommandService.subscribeCurrentState(probe.ref ! _)
- Java
-
// subscribe to the current state of an assembly component and use a callback which forwards each received // element to a test probe actor CurrentStateSubscription subscription = hcdCmdService.subscribeCurrentState(currentState -> probe.ref().tell(currentState));
Matching state for command completion
A Matcher
is provided for matching state against a desired state. The matcher is created with a source of state identified by its ActorRef and an instance of StateMatcher
which defines the state and criteria for matching. Several instances of StateMatcher
are available for common use. These are DemandMatcherAll
for matching the entire DemandState
against the current state, DemandMatcher
for matching state with or without units against the current state and PresenceMatcher
which checks if a matching state is found with a provided prefix.
- Scala
-
// create a DemandMatcher which specifies the desired state to be matched. val demandMatcher = DemandMatcher(DemandState(prefix, StateName("testStateName"), Set(param)), withUnits = false, timeout) // create matcher instance val matcher = new Matcher(assemblyLocation.componentRef, demandMatcher) // start the matcher so that it is ready to receive state published by the source val matcherResponseF: Future[MatcherResponse] = matcher.start // submit command and if the command is successfully validated, check for matching of demand state against current state val eventualCommandResponse: Future[MatchingResponse] = async { val initialResponse = await(assemblyComponent.oneway(setupWithMatcher)) initialResponse match { case _: Accepted ⇒ val matcherResponse = await(matcherResponseF) // create appropriate response if demand state was matched from among the published state or otherwise matcherResponse match { case MatchCompleted => Completed(setupWithMatcher.runId) case mf: MatchFailed => Error(setupWithMatcher.runId, mf.throwable.getMessage) } case invalid: Invalid ⇒ matcher.stop() invalid case locked: Locked => matcher.stop() locked } } val commandResponse = Await.result(eventualCommandResponse, timeout.duration) - Java
-
// create a DemandMatcher which specifies the desired state to be matched. DemandMatcher demandMatcher = new DemandMatcher(new DemandState(prefix().prefix(), new StateName("testStateName")).add(param), false, timeout); // create matcher instance Matcher matcher = new Matcher(AkkaLocationExt.RichAkkaLocation(hcdLocation).componentRef().narrow(), demandMatcher, ec, mat); // start the matcher so that it is ready to receive state published by the source CompletableFuture<MatcherResponse> matcherResponseFuture = matcher.jStart(); // submit command and if the command is successfully validated, check for matching of demand state against current state CompletableFuture<CommandResponse.MatchingResponse> commandResponseToBeMatched = hcdCmdService .oneway(setup, timeout) .thenCompose(initialCommandResponse -> { if (initialCommandResponse instanceof CommandResponse.Accepted) { return matcherResponseFuture.thenApply(matcherResponse -> { if (matcherResponse.getClass().isAssignableFrom(MatcherResponses.jMatchCompleted().getClass())) return new CommandResponse.Completed(initialCommandResponse.runId()); else return new CommandResponse.Error(initialCommandResponse.runId(), "Match not completed"); }); } else { matcher.stop(); // Need a better error message return CompletableFuture.completedFuture(new CommandResponse.Error(initialCommandResponse.runId(), "Matcher failed")); } }); CommandResponse.MatchingResponse actualResponse = commandResponseToBeMatched.get();