Module csw.CommandServer
Classes
class CommandServer (prefix: Prefix,
handler: ComponentHandlers,
port: int = 0)-
Expand source code
class CommandServer: async def _handlePost(self, request: Request) -> Response: obj = await request.json() method = obj['_type'] runId = str(uuid.uuid4()) try: command: ControlCommand = ControlCommand._fromDict(obj['controlCommand']) except TypeError: commandResponse = Error(runId, "Invalid command") return web.json_response(commandResponse._asDict()) self.log.info(f"Received command {command}") match method: case 'Submit': commandResponse, task = self.handler.onSubmit(runId, command) if task is not None: # noinspection PyTypeChecker self._crm.addTask(runId, task) self.log.debug("long-running task in progress...") case 'Oneway': commandResponse = self.handler.onOneway(runId, command) case 'Validate': commandResponse = self.handler.validateCommand(runId, command) case _: # should not happe commandResponse = Error(runId, "Invalid command") return web.json_response(commandResponse._asDict()) async def _handleQueryFinal(self, queryFinal: QueryFinal) -> Response: commandResponse = await self._crm.waitForTask(queryFinal.runId, queryFinal.timeout) responseDict = commandResponse._asDict() return web.json_response(responseDict) async def _handleWsTextMessage(self, ws: WebSocketResponse, msg: WSMessage): if msg.data == 'close': self.log.debug("Received ws close message") await ws.close() else: obj = json.loads(msg.data) match obj['_type']: case "QueryFinal": queryFinal = QueryFinal._fromDict(obj) resp = await self._handleQueryFinal(queryFinal) await ws.send_str(resp.text) await ws.close() case "SubscribeCurrentState": stateNames = SubscribeCurrentState._fromDict(obj).stateNames self.log.debug(f"Received SubscribeCurrentState: stateNames = {stateNames}") self.handler._subscribeCurrentState(stateNames, ws) case _: self.log.debug(f"Warning: Received unknown ws message: {str(msg.data)}") async def _handleWs(self, request: Request) -> WebSocketResponse: ws = web.WebSocketResponse() await ws.prepare(request) msg: WSMessage async for msg in ws: match msg.type: case aiohttp.WSMsgType.TEXT: await self._handleWsTextMessage(ws, msg) case aiohttp.WSMsgType.ERROR: self.log.debug('Error: ws connection closed with exception %s' % ws.exception()) self.log.debug('websocket connection closed') self.handler._unsubscribeCurrentState(ws) return ws def registerWithLocationService(self): locationService = LocationServiceSync() connection = ConnectionInfo.make(self.prefix, ComponentType.Service, ConnectionType.HttpType) atexit.register(locationService.unregister, connection) locationService.register(HttpRegistration(connection, self.port)) def __init__(self, prefix: Prefix, handler: ComponentHandlers, port: int = 0): """ Creates an HTTP server that can receive CSW commands and registers it with the Location Service using the given prefix, so that CSW components can locate it and send commands to it. Args: prefix (str): a CSW Prefix in the format $subsystem.name, where subsystem is one of the upper case TMT subsystem names and name is the name of the command server handler (ComponentHandlers): command handler notified when commands are received port (int): optional port for HTTP server """ self.log = structlog.get_logger() self.prefix = prefix self.handler = handler self.port = LocationServiceUtil.getFreePort(port) self._app = web.Application() self._crm = CommandResponseManager() self._log = structlog.get_logger() self._app.add_routes([ web.post('/post-endpoint', self._handlePost), web.get("/websocket-endpoint", self._handleWs) ]) self.registerWithLocationService() def start(self): """ Starts the command http server in a thread """ web.run_app(self._app, port=self.port)
Creates an HTTP server that can receive CSW commands and registers it with the Location Service using the given prefix, so that CSW components can locate it and send commands to it.
Args
prefix
:str
- a CSW Prefix in the format $subsystem.name, where subsystem is one of the upper case TMT subsystem names and name is the name of the command server
handler
:ComponentHandlers
- command handler notified when commands are received
port
:int
- optional port for HTTP server
Methods
def registerWithLocationService(self)
-
Expand source code
def registerWithLocationService(self): locationService = LocationServiceSync() connection = ConnectionInfo.make(self.prefix, ComponentType.Service, ConnectionType.HttpType) atexit.register(locationService.unregister, connection) locationService.register(HttpRegistration(connection, self.port))
def start(self)
-
Expand source code
def start(self): """ Starts the command http server in a thread """ web.run_app(self._app, port=self.port)
Starts the command http server in a thread