Module csw.CommandResponseManager

Classes

class CommandResponseManager
Expand source code
class CommandResponseManager:
    """
    Keeps track of command responses for long-running commands by maintaining a
    map from runId to CommandResponse.
    """
    tasks = {}

    def addTask(self, runId: str, task: Task):
        self.tasks[runId] = task

    async def waitForTask(self, runId: str, timeout: timedelta) -> CommandResponse:
        if runId in self.tasks:
            task: Task = self.tasks[runId]
            return await asyncio.wait_for(task, timeout=timeout.total_seconds())
        else:
            return Error(runId, "No task was found for runId " + runId)

Keeps track of command responses for long-running commands by maintaining a map from runId to CommandResponse.

Class variables

var tasks

The type of the None singleton.

Methods

def addTask(self, runId: str, task: _asyncio.Task)
Expand source code
def addTask(self, runId: str, task: Task):
    self.tasks[runId] = task
async def waitForTask(self, runId: str, timeout: datetime.timedelta) ‑> CommandResponse
Expand source code
async def waitForTask(self, runId: str, timeout: timedelta) -> CommandResponse:
    if runId in self.tasks:
        task: Task = self.tasks[runId]
        return await asyncio.wait_for(task, timeout=timeout.total_seconds())
    else:
        return Error(runId, "No task was found for runId " + runId)