Module csw.CommandResponseManager
Classes
class CommandResponseManager-
Expand source code
class CommandResponseManager: """ Keeps track of command responses for long-running commands by maintaining a map from runId to CommandResponse. """ tasks = {} def addTask(self, runId: str, task: Task): self.tasks[runId] = task async def waitForTask(self, runId: str, timeout: timedelta) -> CommandResponse: if runId in self.tasks: task: Task = self.tasks[runId] return await asyncio.wait_for(task, timeout=timeout.total_seconds()) else: return Error(runId, "No task was found for runId " + runId)Keeps track of command responses for long-running commands by maintaining a map from runId to CommandResponse.
Class variables
var tasks-
The type of the None singleton.
Methods
def addTask(self, runId: str, task: _asyncio.Task)-
Expand source code
def addTask(self, runId: str, task: Task): self.tasks[runId] = task async def waitForTask(self, runId: str, timeout: datetime.timedelta) ‑> CommandResponse-
Expand source code
async def waitForTask(self, runId: str, timeout: timedelta) -> CommandResponse: if runId in self.tasks: task: Task = self.tasks[runId] return await asyncio.wait_for(task, timeout=timeout.total_seconds()) else: return Error(runId, "No task was found for runId " + runId)