Module csw.CommandService
Classes
class CommandService (prefix: Prefix,
componentType: ComponentType,
clientSession: aiohttp.client.ClientSession)-
Expand source code
class CommandService: def __init__(self, prefix: Prefix, componentType: ComponentType, clientSession: ClientSession): self.prefix = prefix self.componentType = componentType self._session = clientSession self.log = structlog.get_logger() def _getBaseUri(self) -> str: locationService = LocationServiceSync() connection = ConnectionInfo.make(self.prefix, self.componentType, ConnectionType.HttpType) location = locationService.resolve(connection) if location is not None: location.__class__ = HttpLocation return location.uri raise RuntimeError async def _postCommand(self, command: str, controlCommand: ControlCommand) -> SubmitResponse: baseUri = self._getBaseUri() postUri = f"{baseUri}post-endpoint" headers = {'Content-type': 'application/json'} match command: case 'Submit': data = Submit(controlCommand)._asDict() case 'Validate': data = Validate(controlCommand)._asDict() case _: data = Oneway(controlCommand)._asDict() jsonData = json.loads(json.dumps(data)) response = await self._session.post(postUri, headers=headers, json=jsonData) if not response.ok: runId = str(uuid.uuid4()) return Error(runId, await response.text()) resp = CommandResponse._fromDict(await response.json()) return resp async def submit(self, controlCommand: ControlCommand) -> SubmitResponse: """ Submits a command to the command service Args: controlCommand (ControlCommand): command to submit Returns: SubmitResponse a subclass of SubmitResponse """ return await self._postCommand("Submit", controlCommand) async def validate(self, controlCommand: ControlCommand) -> ValidateResponse: """ Validates a command to be sent to the command service. Args: controlCommand (ControlCommand): command to submit Returns: SubmitResponse a subclass of SubmitResponse (only Accepted, Invalid or Locked) """ return await self._postCommand("Validate", controlCommand) async def oneway(self, controlCommand: ControlCommand) -> OnewayResponse: """ Sends a command to the command service without expecting a reply. Args: controlCommand (ControlCommand): command to submit Returns: SubmitResponse a subclass of SubmitResponse (only Accepted, Invalid or Locked) """ return await self._postCommand("Oneway", controlCommand) # noinspection DuplicatedCode async def queryFinal(self, runId: str, timeout: timedelta) -> SubmitResponse: """ If the command for runId returned Started (long-running command), this will return the final result. Args: runId (str): runId for the command timeout (timedelta): timr to wait before returning an error Returns: SubmitResponse a subclass of SubmitResponse """ baseUri = (self._getBaseUri()).replace('http:', 'ws:') wsUri = f"{baseUri}websocket-endpoint" msgDict = QueryFinal(runId, timeout)._asDict() jsonStr = json.dumps(msgDict) ws = await self._session.ws_connect(wsUri) await ws.send_str(jsonStr) jsonResp = await ws.receive_str() await ws.close() return CommandResponse._fromDict(json.loads(jsonResp)) async def submitAndWaitAsync(self, controlCommand: ControlCommand, timeout: timedelta) -> SubmitResponse: """ Submits a command to the command service and waits for the final response. This version returns a future response (async). See submitAndWait() for a blocking version. Args: controlCommand (ControlCommand): command to submit timeout (timedelta): time to wait before returning an error Returns: SubmitResponse a subclass of SubmitResponse """ resp = await self.submit(controlCommand) match resp: case Started(runId): return await self.queryFinal(runId, timeout) case _: return resp # noinspection DuplicatedCode async def query(self, runId: str) -> SubmitResponse: """ Query for the result of a long-running command which was sent as Submit to get a SubmitResponse. Query allows checking to see if a long-running command is completed without waiting as with queryFinal. Args: runId (str): runId for the command Returns: SubmitResponse a subclass of SubmitResponse """ baseUri = self._getBaseUri() postUri = f"{baseUri}post-endpoint" headers = {'Content-type': 'application/json'} data = Query(runId)._asDict() jsonData = json.loads(json.dumps(data)) response = await self._session.post(postUri, headers=headers, json=jsonData) if not response.ok: raise Exception(f"CommandService: query failed: {await response.json()}") # return CommandResponse._fromDict(json.loads(await response.json())) return CommandResponse._fromDict(await response.json()) async def submitAndWait(self, controlCommand: ControlCommand, timeout: timedelta) -> SubmitResponse: """ Submits a command to the command service and waits for the final response. Args: controlCommand (ControlCommand): command to submit timeout (timedelta): time to wait before returning an error Returns: SubmitResponse a subclass of SubmitResponse """ resp = await self.submit(controlCommand) match resp: case Started(runId): return await self.queryFinal(runId, timeout) case _: return resp async def _subscribeCurrentState(self, names: List[str], callback: Callable[[CurrentState], Awaitable]): baseUri = self._getBaseUri().replace('http:', 'ws:') wsUri = f"{baseUri}websocket-endpoint" msgDict = SubscribeCurrentState(names)._asDict() jsonStr = json.dumps(msgDict) async with connect(wsUri) as websocket: await websocket.send(jsonStr) async for message in websocket: await callback(CurrentState._fromDict(json.loads(message))) # async def _subscribeCurrentState(self, names: List[str], callback: Callable[[CurrentState], Awaitable]): # baseUri = (self._getBaseUri()).replace('http:', 'ws:') # wsUri = f"{baseUri}websocket-endpoint" # msgDict = SubscribeCurrentState(names)._asDict() # jsonStr = json.dumps(msgDict) # print(f"XXX _subscribeCurrentState: json = {jsonStr}") # async with self._session.ws_connect(wsUri) as ws: # await ws.send_str(jsonStr) # async for msgF in ws: # msg = await msgF # print(f"XXX _subscribeCurrentState: message = {msg}") # match msg.type: # case aiohttp.WSMsgType.TEXT: # print(f"XXX _subscribeCurrentState: callback({CurrentState._fromDict(json.loads(msg.data))})") # await callback(CurrentState._fromDict(json.loads(msg.data))) # case aiohttp.WSMsgType.CLOSED: # break # case aiohttp.WSMsgType.ERROR: # break async def subscribeCurrentState(self, names: List[str], callback: Callable[[CurrentState], Awaitable]) -> Subscription: """ Subscribe to the current state of a component Args: names (List[str]): subscribe to states which have any of the provided value for name. If no states are provided, all the current states will be received. callback: a function to be called with the CurrentState values Returns: subscription task """ task = asyncio.create_task(self._subscribeCurrentState(names, callback)) await asyncio.sleep(0.1) # XXX TODO FIXME: Need to wait for task to startup return Subscription(task) async def executeDiagnosticMode(self, startTime: UTCTime, hint: str): """ On receiving a diagnostic data command, the component goes into a diagnostic data mode based on hint at the specified startTime. Validation of supported hints need to be handled by the component writer. Args: startTime: represents the time at which the diagnostic mode actions will take effect hint: represents supported diagnostic data mode for a component """ baseUri = self._getBaseUri() postUri = f"{baseUri}post-endpoint" headers = {'Content-type': 'application/json'} data = ExecuteDiagnosticMode(startTime, hint)._asDict() jsonData = json.loads(json.dumps(data)) response = await self._session.post(postUri, headers=headers, json=jsonData) if not response.ok: raise Exception(f"CommandService: executeDiagnosticMode failed: {await response.text()}") async def executeOperationsMode(self): """ On receiving a operations mode command, the current diagnostic data mode is halted. """ baseUri = self._getBaseUri() postUri = f"{baseUri}post-endpoint" headers = {'Content-type': 'application/json'} data = ExecuteOperationsMode()._asDict() jsonData = json.loads(json.dumps(data)) response = await self._session.post(postUri, headers=headers, json=jsonData) if not response.ok: raise Exception(f"CommandService: executeOperationsMode failed: {await response.text()}") async def goOnline(self): baseUri = self._getBaseUri() postUri = f"{baseUri}post-endpoint" headers = {'Content-type': 'application/json'} data = GoOnline()._asDict() jsonData = json.loads(json.dumps(data)) response = await self._session.post(postUri, headers=headers, json=jsonData) if not response.ok: raise Exception(f"CommandService: goOnline failed: {await response.text()}") async def goOffline(self): baseUri = self._getBaseUri() postUri = f"{baseUri}post-endpoint" headers = {'Content-type': 'application/json'} data = GoOffline()._asDict() jsonData = json.loads(json.dumps(data)) response = await self._session.post(postUri, headers=headers, json=jsonData) if not response.ok: raise Exception(f"CommandService: goOffline failed: {await response.text()}")
Methods
async def executeDiagnosticMode(self,
startTime: UTCTime,
hint: str)-
Expand source code
async def executeDiagnosticMode(self, startTime: UTCTime, hint: str): """ On receiving a diagnostic data command, the component goes into a diagnostic data mode based on hint at the specified startTime. Validation of supported hints need to be handled by the component writer. Args: startTime: represents the time at which the diagnostic mode actions will take effect hint: represents supported diagnostic data mode for a component """ baseUri = self._getBaseUri() postUri = f"{baseUri}post-endpoint" headers = {'Content-type': 'application/json'} data = ExecuteDiagnosticMode(startTime, hint)._asDict() jsonData = json.loads(json.dumps(data)) response = await self._session.post(postUri, headers=headers, json=jsonData) if not response.ok: raise Exception(f"CommandService: executeDiagnosticMode failed: {await response.text()}")
On receiving a diagnostic data command, the component goes into a diagnostic data mode based on hint at the specified startTime. Validation of supported hints need to be handled by the component writer.
Args
startTime
- represents the time at which the diagnostic mode actions will take effect
hint
- represents supported diagnostic data mode for a component
async def executeOperationsMode(self)
-
Expand source code
async def executeOperationsMode(self): """ On receiving a operations mode command, the current diagnostic data mode is halted. """ baseUri = self._getBaseUri() postUri = f"{baseUri}post-endpoint" headers = {'Content-type': 'application/json'} data = ExecuteOperationsMode()._asDict() jsonData = json.loads(json.dumps(data)) response = await self._session.post(postUri, headers=headers, json=jsonData) if not response.ok: raise Exception(f"CommandService: executeOperationsMode failed: {await response.text()}")
On receiving a operations mode command, the current diagnostic data mode is halted.
async def goOffline(self)
-
Expand source code
async def goOffline(self): baseUri = self._getBaseUri() postUri = f"{baseUri}post-endpoint" headers = {'Content-type': 'application/json'} data = GoOffline()._asDict() jsonData = json.loads(json.dumps(data)) response = await self._session.post(postUri, headers=headers, json=jsonData) if not response.ok: raise Exception(f"CommandService: goOffline failed: {await response.text()}")
async def goOnline(self)
-
Expand source code
async def goOnline(self): baseUri = self._getBaseUri() postUri = f"{baseUri}post-endpoint" headers = {'Content-type': 'application/json'} data = GoOnline()._asDict() jsonData = json.loads(json.dumps(data)) response = await self._session.post(postUri, headers=headers, json=jsonData) if not response.ok: raise Exception(f"CommandService: goOnline failed: {await response.text()}")
async def oneway(self,
controlCommand: ControlCommand) ‑> Accepted | Invalid | Locked-
Expand source code
async def oneway(self, controlCommand: ControlCommand) -> OnewayResponse: """ Sends a command to the command service without expecting a reply. Args: controlCommand (ControlCommand): command to submit Returns: SubmitResponse a subclass of SubmitResponse (only Accepted, Invalid or Locked) """ return await self._postCommand("Oneway", controlCommand)
Sends a command to the command service without expecting a reply.
Args
controlCommand
:ControlCommand
- command to submit
Returns: SubmitResponse a subclass of SubmitResponse (only Accepted, Invalid or Locked)
async def query(self, runId: str) ‑> Error | Invalid | Locked | Started | Completed | Cancelled
-
Expand source code
async def query(self, runId: str) -> SubmitResponse: """ Query for the result of a long-running command which was sent as Submit to get a SubmitResponse. Query allows checking to see if a long-running command is completed without waiting as with queryFinal. Args: runId (str): runId for the command Returns: SubmitResponse a subclass of SubmitResponse """ baseUri = self._getBaseUri() postUri = f"{baseUri}post-endpoint" headers = {'Content-type': 'application/json'} data = Query(runId)._asDict() jsonData = json.loads(json.dumps(data)) response = await self._session.post(postUri, headers=headers, json=jsonData) if not response.ok: raise Exception(f"CommandService: query failed: {await response.json()}") # return CommandResponse._fromDict(json.loads(await response.json())) return CommandResponse._fromDict(await response.json())
Query for the result of a long-running command which was sent as Submit to get a SubmitResponse. Query allows checking to see if a long-running command is completed without waiting as with queryFinal.
Args
runId
:str
- runId for the command
Returns: SubmitResponse a subclass of SubmitResponse
async def queryFinal(self, runId: str, timeout: datetime.timedelta) ‑> Error | Invalid | Locked | Started | Completed | Cancelled
-
Expand source code
async def queryFinal(self, runId: str, timeout: timedelta) -> SubmitResponse: """ If the command for runId returned Started (long-running command), this will return the final result. Args: runId (str): runId for the command timeout (timedelta): timr to wait before returning an error Returns: SubmitResponse a subclass of SubmitResponse """ baseUri = (self._getBaseUri()).replace('http:', 'ws:') wsUri = f"{baseUri}websocket-endpoint" msgDict = QueryFinal(runId, timeout)._asDict() jsonStr = json.dumps(msgDict) ws = await self._session.ws_connect(wsUri) await ws.send_str(jsonStr) jsonResp = await ws.receive_str() await ws.close() return CommandResponse._fromDict(json.loads(jsonResp))
If the command for runId returned Started (long-running command), this will return the final result.
Args
runId
:str
- runId for the command
timeout
:timedelta
- timr to wait before returning an error
Returns: SubmitResponse a subclass of SubmitResponse
async def submit(self,
controlCommand: ControlCommand) ‑> Error | Invalid | Locked | Started | Completed | Cancelled-
Expand source code
async def submit(self, controlCommand: ControlCommand) -> SubmitResponse: """ Submits a command to the command service Args: controlCommand (ControlCommand): command to submit Returns: SubmitResponse a subclass of SubmitResponse """ return await self._postCommand("Submit", controlCommand)
Submits a command to the command service
Args
controlCommand
:ControlCommand
- command to submit
Returns: SubmitResponse a subclass of SubmitResponse
async def submitAndWait(self,
controlCommand: ControlCommand,
timeout: datetime.timedelta) ‑> Error | Invalid | Locked | Started | Completed | Cancelled-
Expand source code
async def submitAndWait(self, controlCommand: ControlCommand, timeout: timedelta) -> SubmitResponse: """ Submits a command to the command service and waits for the final response. Args: controlCommand (ControlCommand): command to submit timeout (timedelta): time to wait before returning an error Returns: SubmitResponse a subclass of SubmitResponse """ resp = await self.submit(controlCommand) match resp: case Started(runId): return await self.queryFinal(runId, timeout) case _: return resp
Submits a command to the command service and waits for the final response.
Args
controlCommand
:ControlCommand
- command to submit
timeout
:timedelta
- time to wait before returning an error
Returns: SubmitResponse a subclass of SubmitResponse
async def submitAndWaitAsync(self,
controlCommand: ControlCommand,
timeout: datetime.timedelta) ‑> Error | Invalid | Locked | Started | Completed | Cancelled-
Expand source code
async def submitAndWaitAsync(self, controlCommand: ControlCommand, timeout: timedelta) -> SubmitResponse: """ Submits a command to the command service and waits for the final response. This version returns a future response (async). See submitAndWait() for a blocking version. Args: controlCommand (ControlCommand): command to submit timeout (timedelta): time to wait before returning an error Returns: SubmitResponse a subclass of SubmitResponse """ resp = await self.submit(controlCommand) match resp: case Started(runId): return await self.queryFinal(runId, timeout) case _: return resp
Submits a command to the command service and waits for the final response. This version returns a future response (async). See submitAndWait() for a blocking version.
Args
controlCommand
:ControlCommand
- command to submit
timeout
:timedelta
- time to wait before returning an error
Returns: SubmitResponse a subclass of SubmitResponse
async def subscribeCurrentState(self,
names: List[str],
callback: Callable[[CurrentState], Awaitable]) ‑> Subscription-
Expand source code
async def subscribeCurrentState(self, names: List[str], callback: Callable[[CurrentState], Awaitable]) -> Subscription: """ Subscribe to the current state of a component Args: names (List[str]): subscribe to states which have any of the provided value for name. If no states are provided, all the current states will be received. callback: a function to be called with the CurrentState values Returns: subscription task """ task = asyncio.create_task(self._subscribeCurrentState(names, callback)) await asyncio.sleep(0.1) # XXX TODO FIXME: Need to wait for task to startup return Subscription(task)
Subscribe to the current state of a component
Args
names
:List[str]
- subscribe to states which have any of the provided value for name. If no states are provided, all the current states will be received.
callback
- a function to be called with the CurrentState values
Returns: subscription task
async def validate(self,
controlCommand: ControlCommand) ‑> Accepted | Invalid | Locked-
Expand source code
async def validate(self, controlCommand: ControlCommand) -> ValidateResponse: """ Validates a command to be sent to the command service. Args: controlCommand (ControlCommand): command to submit Returns: SubmitResponse a subclass of SubmitResponse (only Accepted, Invalid or Locked) """ return await self._postCommand("Validate", controlCommand)
Validates a command to be sent to the command service.
Args
controlCommand
:ControlCommand
- command to submit
Returns: SubmitResponse a subclass of SubmitResponse (only Accepted, Invalid or Locked)
class Subscription (task: _asyncio.Task)
-
Expand source code
@dataclass class Subscription: task: Task def cancel(self): self.task.cancel()
Subscription(task: _asyncio.Task)
Instance variables
var task : _asyncio.Task
-
The type of the None singleton.
Methods
def cancel(self)
-
Expand source code
def cancel(self): self.task.cancel()