Communication using Commands
The csw-command library provides support for command based communication between components.
This section describes how to communicate with any other component using commands. To check how to manage commands received, please visit Component Handlers and Managing Command State.
Dependencies
- sbt
-
libraryDependencies += "com.github.tmtsoftware.csw" %% "csw-command" % "1.0.0"
Command-based Communication Between Components
A component can send Commands to other components. The commands can be sent as one of the following three types of messages:
- submit - A command is sent as Submit when the result of completion is desired.
- oneway - A command is sent as Oneway when the result of completion is not desired.
- validate - A command is sent, but it is only validated with no actions started.
A submit
is the typical way of sending a command from one component to another. When received, a submit
command is validated and if accepted, the actions associated with the command are executed or started. submit
has a different responses that allow for different scenarios. When the actions started are long-running, the caller can wait for the actions to complete if needed.
A oneway
is primarily meant to be used between an Assembly and an HCD when no completion information is desired. It is also useful when tracking completion using a Matcher and current state values (see below) or events.
A validate
message is used to ask a destination component to validate a command and determine if the command can be executed. It does not execute the command and only returns the result of validation. In some scenarios, it may be useful to test to see if a command can be executed prior to trying to execute the command.
Component Locking
If a component is locked, any Command Service command will have a response of Locked
. The corresponding handler will not be called until the component is unlocked. See Creating a Component for more information on locking.
Note that the code in the receiving component’s handler does not need to return Locked
. If the component has been Locked, the component’s Supervisor returns the Locked
response to the caller and the handler is not called.
Command Validation
Command validations occur in two scenarios. One is using the validate
message as described above. For example, validate
could be used by an Assembly when it needs to send multiple commands to different HCDs and it wants to first check that all the HCDs can execute their commands before sending a command to any of the HCDs.
The second scenario is that it is always the first step in processing a command–either submit
or oneway
.
If the receiving component is not locked, the component’s supervisor calls the validateCommand
handler of the Top Level Actor. The developer code evaluates and returns a ValidateCommandResponse
as shown in the following table.
ValidateCommandResponse | Description |
---|---|
Accepted | The command is valid and can be executed by the component. |
Invalid | The command is not valid and cannot be executed. The response includes a reason in the form of a CommandIssue |
Locked | The component is locked by some other command sender. The validation could not occur. |
The Submit Message
A submit
message is sent with its command to a component destination. A SubmitResponse
is returned to the caller when the submit
message is used. If the validateCommand
handler returns Accepted
, the framework calls the onSubmit
handler of the Top Level Actor. The onSubmit
handler always returns a SubmitResponse
.
Immediate Completion Scenario
If the actions of the submit
command take a very short time to complete, they may be completed by the onSubmit
handler. This is called immediate completion. The time for the actions to complete should be less than 1 second. (Note: The framework will timeout if the onSubmit
handler does not return a response within 1 second.) In this scenario with onSubmit
, the values of SubmitResponse
can be Completed
, CompletedWithResult
, or Error
. Error
is returned when the actions could not be accomplished. This is different than Invalid
, which indicates that the command could not be validated.
The immediate completion behavior is similar to a remote procedure call although the execution is entirely asynchronous. If the actions do not produce a value for the client, the Completed
SubmitResponse
is returned. If there is a result, the CompletedWithResult
SubmitResult
is returned with a parameter set of Result
type.
Long Running Actions Scenario
When actions take longer than 1 second, onSubmit
should start the actions and return the Started
SubmitResponse
. The Started
response indicates to the framework that long running actions have been started.
Once the long running actions have started, the receiving component code must notify the framework when the actions are completed. This is done be updating the Command Response Manager.
In addition to the values returned for immediate completion, long running actions can return Cancelled
. If the component supports a separate command to stop a long running command, the stopped command should return Cancelled
when successfully cancelled. The command that cancels the long running command should return Completed
.
The following table summarizes all the possible values for SubmitResponse
.
SubmitResponse | Description |
---|---|
Invalid | The command is not valid and cannot be executed. The response includes a reason in the form of a CommandIssue . onSubmit is not executed. |
Completed | This response is returned when the actions associated with a command are complete. |
CompletedWithResult | This response is returned when the actions associated with a command are complete and a result is returned. |
Started | Returned when long running actions have been started. |
Error | Error is returned the the actions started by a command do not complete properly. A message is returned explaining the error. |
Cancelled | The actions associated with a long running command have been cancelled. |
Locked | The component is locked by some other command sender. The validation could not occur. |
The Oneway Message
The other option for sending a command to a component destination is the oneway
message. The central difference between submit
and oneway
is that oneway
does not track or allow reporting of completion information. It supports a fire and forget type of communication approach.
A OnewayResponse
is returned to the caller when the oneway
message is used. Oneway
does validate the command. If the component is not locked, the validateCommand
handler is called.
If the validateCommand
handler returns Accepted
, the framework calls the onOneway
handler of the Top Level Actor. However, the onOneway
handler does not return a value. The sender of the oneway
message receives the result of the validation or the Locked indication.
The following table summarizes all the possible values for OnewayResponse
.
OnewayResponse | Description |
---|---|
Invalid | The command is not valid and cannot be executed. The response includes a reason in the form of a CommandIssue . onSubmit is not executed. |
Accepted | Returned when validation succeeds and the command was passed to the onOneway handler. |
Locked | The component is locked by some other command sender. The validation could not occur. |
Oneway
is available as a higher performance option when an Assembly needs to send commands to an HCD but doesn’t really care about completion such as the case when demands are being sent to a motor. Validation is still present to ensure the HCD supports the standalone operation requirement and can check that it is not getting out of range values.
Oneway
can be used with a matcher. The matcher can use CurrentState or even events from the Event Service to determine completion. This can be more complicated than submit
, but may be useful in some scenarios.
CommandService
A helper/wrapper is provided called CommandService
that provides a convenient way to use the Command Service with a component from the Location Service.
A CommandService
instance is created using an AkkaLocation
of the receiving component, discovered from the Location Service. This CommandService
instance has methods for communicating with the component. A new CommandService
is created for each component for which commands are to be sent.
The API can be exercised as follows for different scenarios of command-based communication:
submit
Sending a submit
message with a command returns a SubmitResponse
as a Future. The Future returned by submit
will always be the final response in case of short running command which may be a positive completion (Completed
or CompletedWithResult
) or a negative completion (Invalid
, Error
, Cancelled
, Locked
) or initial response in case of long running command which can be Started
. In case of long running command, Started
or Invalid
response is returned and then user can either query
for response or queryFinal
for final response.
This example shows an immediate response command using submit
that returns Started
, and queryFinal
to get the final response.
- Scala/submit w/immediate-response
-
val setupForSubscribe = Setup(prefix, longRunning, Some(obsId)) val initialResponse = assemblyCommandService.submit(setupForSubscribe) val finalResponse = assemblyCommandService.queryFinal(setupForSubscribe.runId)
- Java/submit w/immediate-response
-
Setup submitSetup = new Setup(prefix(), longRunningCmd(), Optional.empty()).add(intParameter1); CompletableFuture<SubmitResponse> submitResponse = hcdCmdService.submit(submitSetup, timeout); CompletableFuture<SubmitResponse> finalSubmitResponse = hcdCmdService.queryFinal(submitSetup.runId(), timeout);
Note that the Scala examples are using async/await
which simplifies handling the Futures, but is not necessary. The async/await
library is not available in Java. If using submit
and the validation fails in the destination component, the Invalid
response is returned.
- Scala/submit w/invalid response
-
val invalidSubmitSetup = Setup(prefix, invalidCmd, obsId) val invalidSubmitCommandF = assemblyCmdService.submit(invalidSubmitSetup) async { await(invalidSubmitCommandF) match { case Started(runId) => // Do Completed thing case Invalid(runId, issue) => issue shouldBe a[Invalid] case other => // Unexpected result log.error(s"Some other response: $other") } }
- Java/submit w/invalid response
-
Setup invalidSubmitSetup = new Setup(prefix(), invalidCmd(), Optional.empty()).add(intParameter2); CompletableFuture<SubmitResponse> invalidSubmitCommandF = hcdCmdService.submit(invalidSetup, timeout).thenApply( response -> { if (response instanceof Started) { //do something with completed result } else if (response instanceof Invalid) { // Cast the response to get the issue Invalid invalid = (Invalid) response; assert (invalid.issue().reason().contains("failure")); } return response; } );
The handling of long-running and immediate completion commands look the same from the command sender’s perspective. The following example shows a long-running command that returns a value when the command action completes with a result.
- Scala/submit long running
-
val queryFinalF = async { // longRunningSetup3 has already been submitted // Use queryFinal and runId to wait for completion and result await(assemblyCmdService.queryFinal(longRunningSetup3.runId)) match { case CompletedWithResult(_, result) => Some(result(encoder).head) case otherResponse => // log a message? None } } Await.result(queryFinalF, timeout.duration) shouldBe Some(20)
- Java/submit long running
-
// longRunningSetup3 has already been submitted CompletableFuture<Optional<Integer>> int3F = hcdCmdService.queryFinal(longRunningSetup3.runId(), timeout).thenCompose(response -> { if (response instanceof CompletedWithResult) { // This extracts and returns the the first value of parameter encoder Result result = ((CompletedWithResult) response).result(); Optional<Integer> rvalue = Optional.of(result.jGet(encoder).orElseThrow().head()); return CompletableFuture.completedFuture(rvalue); } else { // For some other response, return empty return CompletableFuture.completedFuture(Optional.empty()); } }); Assert.assertEquals(Optional.of(20), int3F.get());
If a command is long-running and the sender needs to determine that the actions have started properly, the query
method of CommandService
can be used as shown in the following example without using the Future returned by submit
, which provides the final completion notification.
- Scala/submit long running/query
-
val longRunningSetup4 = Setup(prefix, longRunningCmd, obsId) async { assemblyCmdService.submit(longRunningSetup4) await(assemblyCmdService.query(longRunningSetup4.runId)) match { case Started(runId) => // happy case - no action needed // Do some other work case a => // log.error. This indicates that the command probably failed to start. } }
- Java/submit long running/query
-
Setup longRunningQuerySetup3 = new Setup(prefix(), longRunningCmd(), Optional.empty()).add(intParameter1); CompletableFuture<SubmitResponse> longRunningSubmitResultF3 = hcdCmdService.submit(longRunningQuerySetup3, timeout); // do some work before querying for the result of above command as needed CompletableFuture<QueryResponse> queryResponseF3 = hcdCmdService.query(longRunningQuerySetup3.runId(), timeout); queryResponseF3.thenAccept(r -> { if (r instanceof Started) { // happy case - no action needed // Do some other work } else { // log.error. This indicates that the command probably failed to start. } });
submitAndWait
submitAndWait
is wrapper method which sends submit
message and waits for final response. Sending a submit
message with a command returns a SubmitResponse
as a Future. The Future returned by submitAndWait
will always be the final response, which may be a positive completion (Completed
or CompletedWithResult
) or a negative completion (Invalid
, Error
, Cancelled
, Locked
). The Started
response is never seen by the programmer when using the submitAndWait
of CommandService
.The handling of long-running and immediate completion commands look the same from the command sender’s perspective
This example shows an immediate completion command using submitAndWait
.
- Scala/submitAndWait w/immediate-response
-
val immediateSetup = Setup(prefix, immediateCmd, obsId) val immediateCommandF = async { await(assemblyCmdService.submitAndWait(immediateSetup)) match { case response: Completed => //do something with completed result response case otherResponse => // do something with other response which is not expected otherResponse } }
- Java/submitAndWait w/immediate-response
-
CompletableFuture<SubmitResponse> immediateCommandF = hcdCmdService .submitAndWait(immediateCmd, timeout) .thenApply( response -> { if (response instanceof Completed) { //do something with completed result } else { // do something with unexpected response } return response; } );
Note that the Scala examples are using async/await
which simplifies handling the Futures, but is not necessary. The async/await
library is not available in Java. If using submitAndWait
and the validation fails in the destination component, the Invalid
response is returned.
- Scala/submitAndWait w/invalid response
-
val invalidSetup = Setup(prefix, invalidCmd, obsId) val invalidCommandF = assemblyCmdService.submitAndWait(invalidSetup) async { await(invalidCommandF) match { case Completed(runId) => // Do Completed thing case Invalid(runId, issue) => issue shouldBe a[Invalid] case other => // Unexpected result log.error(s"Some other response: $other") } }
- Java/submitAndWait w/invalid response
-
Setup invalidSetup = new Setup(prefix(), invalidCmd(), Optional.empty()).add(intParameter2); CompletableFuture<SubmitResponse> invalidCommandF = hcdCmdService.submitAndWait(invalidSetup, timeout).thenApply( response -> { if (response instanceof Completed) { //do something with completed result } else if (response instanceof Invalid) { // Cast the response to get the issue Invalid invalid = (Invalid) response; assert (invalid.issue().reason().contains("failure")); } return response; } );
If a command is long-running and the sender needs to determine that the actions have started properly, the query
method of CommandService
can be used as shown in the following example without using the Future returned by submitAndWait
, which provides the final completion notification.
- Scala/submitAndWait long running/query
-
val longRunningSetup2 = longRunningSetup1.cloneCommand val longRunningQueryResultF = async { // The following val is set so we can do query and work and complete later val longRunningF = assemblyCmdService.submitAndWait(longRunningSetup2) await(assemblyCmdService.query(longRunningSetup2.runId)) match { case Started(runId) => // happy case - no action needed // Do some other work case a => // log.error. This indicates that the command probably failed to start. } // Now wait for completion and result await(longRunningF) match { case CompletedWithResult(_, result) => Some(result(encoder).head) case otherResponse => // log a message? None } }
- Java/submitAndWait long running/query
-
Setup longRunningSetup2 = longRunningSetup1.cloneCommand(); CompletableFuture<SubmitResponse> longRunningQueryResultF = hcdCmdService.submitAndWait(longRunningSetup2, timeout); // do some work before querying for the result of above command as needed CompletableFuture<QueryResponse> queryResponseF = hcdCmdService.query(longRunningSetup2.runId(), timeout); queryResponseF.thenAccept(r -> { if (r instanceof Started) { // happy case - no action needed // Do some other work } else { // log.error. This indicates that the command probably failed to start. } }); CompletableFuture<Optional<Integer>> intF = longRunningQueryResultF.thenCompose(response -> { if (response instanceof CompletedWithResult) { // This extracts and returns the the first value of parameter encoder Result result = ((CompletedWithResult) response).result(); Optional<Integer> rvalue = Optional.of(result.jGet(encoder).orElseThrow().head()); return CompletableFuture.completedFuture(rvalue); } else { // For some other response, return empty return CompletableFuture.completedFuture(Optional.empty()); } });
It’s also possible to use the runId of the Setup
with queryFinal
to determine when the actions are completed. This is equivalent to using the Future returned by submitAndWait
, but in this case, the original Future from submitAndWait
is not available.
- Scala/submitAndWait long running/queryFinal
-
val queryFinalF = async { // longRunningSetup3 has already been submitted // Use queryFinal and runId to wait for completion and result await(assemblyCmdService.queryFinal(longRunningSetup3.runId)) match { case CompletedWithResult(_, result) => Some(result(encoder).head) case otherResponse => // log a message? None } } Await.result(queryFinalF, timeout.duration) shouldBe Some(20)
- Java/submitAndWait long running/queryFinal
-
// longRunningSetup3 has already been submitted CompletableFuture<Optional<Integer>> int3F = hcdCmdService.queryFinal(longRunningSetup3.runId(), timeout).thenCompose(response -> { if (response instanceof CompletedWithResult) { // This extracts and returns the the first value of parameter encoder Result result = ((CompletedWithResult) response).result(); Optional<Integer> rvalue = Optional.of(result.jGet(encoder).orElseThrow().head()); return CompletableFuture.completedFuture(rvalue); } else { // For some other response, return empty return CompletableFuture.completedFuture(Optional.empty()); } }); Assert.assertEquals(Optional.of(20), int3F.get());
oneway
Oneway
does not provide completion information but does return the result of validateCommand
handler in the Top-Level-Actor (Accepted
, Invalid
, or Locked
). When sending a command as a oneway
message, a OnewayResponse
is returned as a Future that can be used to check that it was validated.
- Scala
-
// `onewayCmd` is a sample to demonstrate oneway without any actions val onewaySetup = Setup(prefix, onewayCmd, obsId) // Don't care about the futures from async val oneWayF = async { await(assemblyCmdService.oneway(onewaySetup)) match { case invalid: Invalid => // Log an error here case _ => // Ignore anything other than invalid } } Await.ready(oneWayF, timeout.duration)
- Java
-
Setup onewaySetup = new Setup(prefix(), onewayCmd(), Optional.empty()).add(intParameter1); CompletableFuture onewayF = hcdCmdService .oneway(onewaySetup, timeout) .thenAccept(onewayResponse -> { if (onewayResponse instanceof Invalid) { // log an error here } else { // Ignore anything other than invalid } });
validate
Sometimes it may be useful to test whether or not a component can execute a command without committing to executing its actions. The validate
message can be used for this purpose. Validate
returns a ValidateResponse
of Accepted
, Invalid
, or Locked
.
- Scala
-
val validateCommandF = async { await(assemblyCmdService.validate(immediateSetup)) match { case response: Accepted => true case Invalid(_, issue) => // do something with other response which is not expected log.error(s"Command failed to validate with issue: $issue") false case l: Locked => false } } Await.result(validateCommandF, timeout.duration) shouldBe true
- Java
-
CompletableFuture<Boolean> validateCommandF = hcdCmdService .validate(immediateCmd) .thenApply( response -> { if (response instanceof Accepted) { //do something with completed result return true; } else if (response instanceof Invalid) { // do something with unexpected response return false; } else { // Locked return false; } } ); Assert.assertTrue(validateCommandF.get());
query
At any time, the query
call of CommandService
can be used to check the current status of a command that has been sent via the submit
message using the command’s runId
. This is most useful with a long-running command but all commands that use submit
are available.
The query
message returns a QueryResponse
, which coubld be any of the values of SubmitResponse
plus the CommandNotAvailable
response. This response occurs when the framework has no knowledge of the command associated with the runId
passed with the query
. The previous long-running example above showed the use of query
to check that the actions associated with a command that had started. Another usage is to check the final value of a command that is already completed using its runId
.
- Scala/query usage
-
// Check on a command that was completed in the past val queryValue = Await.result(assemblyCmdService.query(longRunningSetup2.runId), timeout.duration) queryValue shouldBe a[CompletedWithResult]
- Java/query usage
-
CompletableFuture<QueryResponse> queryResponseF2 = hcdCmdService.query(longRunningSetup2.runId(), timeout); Assert.assertTrue(queryResponseF2.get() instanceof CompletedWithResult);
submitAllAndWait
SubmitAll
can be used to send multiple commands sequentially to the same component. This could be used to send initialization commands to an HCD, for instance. The argument for submitAllAndWait
is a list of commands. SubmitAll
returns a list of SubmitResponse
s – one for each command in the list. While submitAndWait
returns a SubmitResponse
as a Future, submitAllAndWait
returns a list of SubmitResponse
s as a future, which completes when all the commands in the list have completed.
- Scala/query usage
-
val submitAllF = async { await(assemblyCmdService.submitAllAndWait(List(submitAllSetup1, submitAllSetup2, submitAllinvalidSetup))) } val submitAllResponse = Await.result(submitAllF, timeout.duration) submitAllResponse.length shouldBe 3 submitAllResponse(0) shouldBe a[Completed] submitAllResponse(1) shouldBe a[CompletedWithResult] submitAllResponse(2) shouldBe a[Invalid]
- Java/query usage
-
Setup submitAllSetup1 = new Setup(prefix(), immediateCmd(), Optional.empty()).add(encoderParam); Setup submitAllSetup2 = new Setup(prefix(), longRunningCmd(), Optional.empty()).add(encoderParam); Setup submitAllSetup3 = new Setup(prefix(), invalidCmd(), Optional.empty()).add(encoderParam); CompletableFuture<List<SubmitResponse>> submitAllF = hcdCmdService .submitAllAndWait( List.of(submitAllSetup1, submitAllSetup2, submitAllSetup3), timeout ); List<SubmitResponse> submitAllResponse = submitAllF.get(); Assert.assertEquals(submitAllResponse.size(), 3); Assert.assertTrue(submitAllResponse.get(0) instanceof Completed); Assert.assertTrue(submitAllResponse.get(1) instanceof CompletedWithResult); Assert.assertTrue(submitAllResponse.get(2) instanceof Invalid);
In the first example, three commands are sent and the result is a list with three SubmitResponse
s. The last one returned invalid and was not executed.
The commands in submitAllAndWait
will execute sequentially, but each one must complete successfully for the subsequent commands to be executed. If any one of the commands fails, submitAllAndWait
stops and the list is returned with the commands that are completed up to and including the command that failed. This is shown in the following example by making the invalid command second in the list.
- Scala/query usage
-
val submitAllF2 = async { await(assemblyCmdService.submitAllAndWait(List(submitAllSetup1, submitAllinvalidSetup, submitAllSetup2))) } val submitAllResponse2 = Await.result(submitAllF2, timeout.duration) submitAllResponse2.length shouldBe 2 submitAllResponse2(0) shouldBe a[Completed] submitAllResponse2(1) shouldBe a[Invalid]
- Java/query usage
-
CompletableFuture<List<SubmitResponse>> submitAllF2 = hcdCmdService .submitAllAndWait( List.of(submitAllSetup1, submitAllSetup3, submitAllSetup2), timeout ); List<SubmitResponse> submitAllResponse2 = submitAllF2.get(); Assert.assertEquals(submitAllResponse2.size(), 2); Assert.assertTrue(submitAllResponse2.get(0) instanceof Completed); Assert.assertTrue(submitAllResponse2.get(1) instanceof Invalid);
In this case, the returned list is of length 2 rather than 3.
subscribeCurrentState
This method provided by CommandService
can be used to subscribe to the CurrentState of a component by providing a callback that is called with the arrival of every CurrentState
item.
Callbacks are not thread-safe on the JVM. If you are doing side effects/mutations inside the callback, you should ensure that it is done in a thread-safe way inside an actor. Here is an example of how it can be done.
SubscribeCurrentState
returns a handle of the CurrentStateSubscription
which should be used to unsubscribe the subscription.
The following example code shows an Assembly that subscribes to all CurrentState
items of an HCD. The example sends a Setup
with an encoder parameter value to the HCD as a oneway
message.
- Scala
-
// Subscriber code val expectedEncoderValue = 234 val currStateSetup = Setup(prefix, hcdCurrentStateCmd, obsId).add(encoder.set(expectedEncoderValue)) // Setup a callback response to CurrentState var cstate: CurrentState = CurrentState(prefix, StateName("no cstate"), Set.empty) val subscription = hcdCmdService.subscribeCurrentState(cs => cstate = cs) // Send a oneway to the HCD that will cause it to publish a CurrentState with the encoder value // in the command parameter "encoder". Callback will store value into cstate. hcdCmdService.oneway(currStateSetup) // Wait for a bit for callback Thread.sleep(500) // Test to see if value was received cstate(encoder).head shouldBe expectedEncoderValue // Unsubscribe to CurrentState subscription.unsubscribe()
- Java
-
// Subscriber code int expectedEncoderValue = 234; Setup currStateSetup = new Setup(prefix(), hcdCurrentStateCmd(), Optional.empty()).add(encoder.set(expectedEncoderValue)); // Setup a callback response to CurrentState - use AtomicInteger to capture final value final AtomicInteger cstate = new AtomicInteger((1)); CurrentStateSubscription subscription = hcdCmdService.subscribeCurrentState(cs -> { // Example sets variable outside scope of closure cstate.set(cs.jGet(encoder).orElseThrow().head()); }); // Send a oneway to the HCD that will cause a publish of a CurrentState with the encoder value // in the command parameter "encoder" hcdCmdService.oneway(currStateSetup, timeout); // Wait for a bit for the callback Thread.sleep(200); // Check to see if CurrentState has the value we sent Assert.assertEquals(expectedEncoderValue, cstate.get()); // Unsubscribe from CurrentState subscription.unsubscribe(); // subscribe to the current state of an assembly component and use a callback which forwards each received // element to a test probe actor CurrentStateSubscription subscription = hcdCmdService.subscribeCurrentState(currentState -> probe.ref().tell(currentState));
The second part of the example shows the code in the HCD. When the HCD receives the oneway
message, it extracts the encoder value and publishes a CurrentState item with the encoder parameter.
- Scala
-
val currentState = CurrentState(prefix, StateName("HCDState"), controlCommand.paramSet) cswCtx.currentStatePublisher.publish(currentState)
- Java
-
Key<Integer> encoder = JKeyType.IntKey().make("encoder"); int expectedEncoderValue = setup.jGet(encoder).orElseThrow().head(); CurrentState currentState = new CurrentState(prefix(), new StateName("HCDState")).add(encoder().set(expectedEncoderValue)); currentStatePublisher.publish(currentState);
There are two subscribeCurrentState
methods in CommandService
. The method shown in the above examples subscribes the caller to all CurrentState published. Each CurrentState
item has a StateName
. A second signature for subscribeCurrentState
can include a Set of StateName
when the caller only needs some of the CurrentState published by a component.
Matching state for command completion
The matcher
is provided to allow a component sending a command to use CurrentState
published by a component to determine when actions are complete. The expected case is an Assembly using the CurrentState
published by an HCD. When using a submit
, completion is determined in the destination. In some scenarios, the Assembly may want to determine when actions are complete. This is what the matcher
allows.
To use this feature, the oneway
message is used rather than submit
. A oneway
command is validated but the framework does not provide completion. Doing a query with the runId
of a oneway
will always return CommandNotAvailable
, but oneway
is perfect for use with a matcher
.
The matcher
is created with the ActorRef of the component that is the source of CurrentState
and an instance of StateMatcher
, which defines the state and criteria for matching.
Several types of StateMatcher
are provided as part of CSW for common use. These are DemandMatcherAll
for matching the entire DemandState
against the current state, DemandMatcher
for matching state with or without units against the current state, and PresenceMatcher
which checks if a matching state is found with a provided prefix.
The developer is not limited to these StateMatcher
s. Any class the implements the StateMatcher
interface can be provided to a Matcher
.
- Scala
-
val param: Parameter[Int] = encoder.set(100) val setupWithMatcher = Setup(prefix, matcherCmd, obsId) // create a StateMatcher which specifies the desired algorithm and state to be matched. val demandMatcher: StateMatcher = DemandMatcher(DemandState(prefix, StateName("testStateName")).add(param), withUnits = false, timeout) // create matcher instance val matcher = new Matcher(assemblyLocation.componentRef, demandMatcher) // start the matcher so that it is ready to receive state published by the source val matcherResponseF: Future[MatcherResponse] = matcher.start // Submit command as a oneway and if the command is successfully validated, // check for matching of demand state against current state val matchResponseF: Future[MatchingResponse] = async { val onewayResponse: OnewayResponse = await(assemblyCmdService.oneway(setupWithMatcher)) onewayResponse match { case Accepted(runId) => val matcherResponse = await(matcherResponseF) // create appropriate response if demand state was matched from among the published state or otherwise // this would allow the response to be used to complete a command received by the Assembly matcherResponse match { case MatchCompleted => Completed(setupWithMatcher.runId) case mf: MatchFailed => Error(setupWithMatcher.runId, mf.throwable.getMessage) } case invalid: Invalid => matcher.stop() invalid case locked: Locked => matcher.stop() locked } } val commandResponse = Await.result(matchResponseF, timeout.duration) commandResponse shouldBe Completed(setupWithMatcher.runId)
- Java
-
Parameter<Integer> param = JKeyType.IntKey().make("encoder").set(100); Setup setupWithMatcher = new Setup(prefix(), matcherCmd(), Optional.empty()).add(param); // create a StateMatcher which specifies the desired algorithm and state to be matched. DemandMatcher demandMatcher = new DemandMatcher(new DemandState(prefix(), new StateName("testStateName")).add(param), false, timeout); // create the matcher instance Matcher matcher = new Matcher(AkkaLocationExt.RichAkkaLocation(hcdLocation).componentRef(hcdActorSystem).narrow(), demandMatcher, ec, mat); // start the matcher so that it is ready to receive state published by the source CompletableFuture<MatcherResponse> matcherResponseFuture = matcher.jStart(); // Submit command as a oneway and if the command is successfully validated, // check for matching of demand state against current state CompletableFuture<MatchingResponse> matchResponseF = hcdCmdService .oneway(setupWithMatcher, timeout) .thenCompose(initialCommandResponse -> { if (initialCommandResponse instanceof Accepted) { return matcherResponseFuture.thenApply(matcherResponse -> { if (matcherResponse.getClass().isAssignableFrom(MatcherResponses.jMatchCompleted().getClass())) return new Completed(initialCommandResponse.runId()); else return new CommandResponse.Error(initialCommandResponse.runId(), "Match not completed"); }); } else { matcher.stop(); return CompletableFuture.completedFuture(new CommandResponse.Error(initialCommandResponse.runId(), "Matcher failed")); } }); MatchingResponse actualResponse = matchResponseF.get(); Completed expectedResponse = new Completed(setupWithMatcher.runId()); Assert.assertEquals(expectedResponse, actualResponse);
One important point is that the matcher
is created and must be shutdown when you are finished with it using the stop
method of the matcher as shown in the example.
onewayAndMatch
CommandService
provides a short-cut called onewayAndMatch
that combines a oneway
and a matcher
and implements much of the boilerplate of the previous example.
- Scala
-
val onewayMatchF = async { await(assemblyCmdService.onewayAndMatch(setupWithMatcher, demandMatcher)) match { case i: Invalid => // Command was not accepted log.error(s"Oneway match was not accepted: ${i.issue}") i case c: Completed => // Do some completed work c case e: Error => // Match failed and timedout generating an error - log a message println("Error") log.error(s"Oeway match produced an error: ${e.message}") e case l: Locked => // Destination component was locked, log a message log.error(s"Destination component was locked") l } } Await.result(onewayMatchF, timeout.duration) shouldBe Completed(setupWithMatcher.runId)
- Java
-
// create a DemandMatcher which specifies the desired state to be matched. StateMatcher stateMatcher = new DemandMatcher(new DemandState(prefix(), new StateName("testStateName")).add(param), false, timeout); // create matcher instance //Matcher matcher1 = new Matcher(AkkaLocationExt.RichAkkaLocation(hcdLocation).componentRef().narrow(), demandMatcher, ec, mat); // start the matcher so that it is ready to receive state published by the source // CompletableFuture<MatcherResponse> matcherResponse = matcher1.jStart(); CompletableFuture<MatchingResponse> matchedCommandResponse = hcdCmdService.onewayAndMatch(setupWithMatcher, stateMatcher, timeout);