Module csw.ComponentHandlers

Classes

class ComponentHandlers
Expand source code
class ComponentHandlers:
    """
    Abstract base class for handling CSW commands.
    Subclasses can override onSubmit, onOneway, validateCommand, and currentState to implement the behavior of the
    component.
    """

    log = structlog.get_logger()

    # map of current state name to list of websockets going to subscribers
    _currentStateSubscribers = {}

    def onSubmit(self, runId: str, command: ControlCommand) -> (CommandResponse, Task):
        """
        Handles the given setup command and returns a CommandResponse subclass

        Args:
            runId (str): unique id for this command.
            command (ControlCommand): contains the command

        Returns: (CommandResponse, Task)
            a pair: (subclass of CommandResponse, Task), where the task can be None if the command response is final.
            For long-running commands, you can respond with Started(runId, "...") and a task that completes the work
            in the background.
        """
        # noinspection PyTypeChecker
        return Error(runId, "Not implemented: submit command handler"), None

    def onOneway(self, runId: str, command: ControlCommand) -> CommandResponse:
        """
        Handles the given setup command and returns an immediate CommandResponse

        Args:
            runId (str): unique id for this command.
            command (ControlCommand): contains the command

        Returns: CommandResponse
            a subclass of CommandResponse (only Accepted, Invalid or Locked are allowed)
        """
        return Error(runId, "Not implemented: oneway command handler")

    def validateCommand(self, runId: str, command: ControlCommand) -> CommandResponse:
        """
        Validates the given command

        Args:
            runId (str): unique id for this command
            command (ControlCommand): contains the command

        Returns: CommandResponse
            a subclass of CommandResponse (only Accepted, Invalid or Locked are allowed)
       """
        return Accepted(runId)

    def currentStates(self) -> List[CurrentState]:
        """
        Returns the current states for the component
        """
        return []

    # ---Do not override the following methods ---

    async def publishCurrentStates(self):
        """
        Publish the current state of the python based CSW component
        """
        for currentState in self.currentStates():
            s1 = self._currentStateSubscribers.get("") or set()
            s2 = self._currentStateSubscribers.get(currentState.stateName) or set()
            subscribers = s1 | s2
            cs = json.dumps(currentState._asDict())
            for ws in subscribers:
                self.log.debug(f"Publishing current state: {cs}")
                await ws.send_str(cs)

    def _addCurrentStateSubscriber(self, stateName: str, ws: WebSocketResponse):
        if (stateName in self._currentStateSubscribers):
            self._currentStateSubscribers[stateName].add(ws)
        else:
            self._currentStateSubscribers[stateName] = {ws}

    def _subscribeCurrentState(self, stateNames: List[str], ws: WebSocketResponse):
        """
        Internal method used to subscribe to current state of this component.
        If stateNames is empty, subscribe to all current states
        """
        if len(stateNames) == 0:
            self._addCurrentStateSubscriber("", ws)
        else:
            for stateName in stateNames:
                self._addCurrentStateSubscriber(stateName, ws)

    def _unsubscribeCurrentState(self, ws: WebSocketResponse):
        """
        Internal method used to unsubscribe a websocket from current state events
        """
        for stateName in self._currentStateSubscribers:
            if ws in self._currentStateSubscribers[stateName]:
                self._currentStateSubscribers[stateName].remove(ws)

Abstract base class for handling CSW commands. Subclasses can override onSubmit, onOneway, validateCommand, and currentState to implement the behavior of the component.

Class variables

var log

The type of the None singleton.

Methods

def currentStates(self) ‑> List[CurrentState]
Expand source code
def currentStates(self) -> List[CurrentState]:
    """
    Returns the current states for the component
    """
    return []

Returns the current states for the component

def onOneway(self,
runId: str,
command: ControlCommand) ‑> CommandResponse
Expand source code
def onOneway(self, runId: str, command: ControlCommand) -> CommandResponse:
    """
    Handles the given setup command and returns an immediate CommandResponse

    Args:
        runId (str): unique id for this command.
        command (ControlCommand): contains the command

    Returns: CommandResponse
        a subclass of CommandResponse (only Accepted, Invalid or Locked are allowed)
    """
    return Error(runId, "Not implemented: oneway command handler")

Handles the given setup command and returns an immediate CommandResponse

Args

runId : str
unique id for this command.
command : ControlCommand
contains the command

Returns: CommandResponse a subclass of CommandResponse (only Accepted, Invalid or Locked are allowed)

def onSubmit(self,
runId: str,
command: ControlCommand) ‑> (CommandResponse'>, )
Expand source code
def onSubmit(self, runId: str, command: ControlCommand) -> (CommandResponse, Task):
    """
    Handles the given setup command and returns a CommandResponse subclass

    Args:
        runId (str): unique id for this command.
        command (ControlCommand): contains the command

    Returns: (CommandResponse, Task)
        a pair: (subclass of CommandResponse, Task), where the task can be None if the command response is final.
        For long-running commands, you can respond with Started(runId, "...") and a task that completes the work
        in the background.
    """
    # noinspection PyTypeChecker
    return Error(runId, "Not implemented: submit command handler"), None

Handles the given setup command and returns a CommandResponse subclass

Args

runId : str
unique id for this command.
command : ControlCommand
contains the command

Returns: (CommandResponse, Task) a pair: (subclass of CommandResponse, Task), where the task can be None if the command response is final. For long-running commands, you can respond with Started(runId, "…") and a task that completes the work in the background.

async def publishCurrentStates(self)
Expand source code
async def publishCurrentStates(self):
    """
    Publish the current state of the python based CSW component
    """
    for currentState in self.currentStates():
        s1 = self._currentStateSubscribers.get("") or set()
        s2 = self._currentStateSubscribers.get(currentState.stateName) or set()
        subscribers = s1 | s2
        cs = json.dumps(currentState._asDict())
        for ws in subscribers:
            self.log.debug(f"Publishing current state: {cs}")
            await ws.send_str(cs)

Publish the current state of the python based CSW component

def validateCommand(self,
runId: str,
command: ControlCommand) ‑> CommandResponse
Expand source code
def validateCommand(self, runId: str, command: ControlCommand) -> CommandResponse:
    """
    Validates the given command

    Args:
        runId (str): unique id for this command
        command (ControlCommand): contains the command

    Returns: CommandResponse
        a subclass of CommandResponse (only Accepted, Invalid or Locked are allowed)
   """
    return Accepted(runId)

Validates the given command

Args

runId : str
unique id for this command
command : ControlCommand
contains the command

Returns: CommandResponse a subclass of CommandResponse (only Accepted, Invalid or Locked are allowed)