Module csw.CommandResponseManager
Classes
class CommandResponseManager
-
Expand source code
class CommandResponseManager: """ Keeps track of command responses for long-running commands by maintaining a map from runId to CommandResponse. """ tasks = {} def addTask(self, runId: str, task: Task): self.tasks[runId] = task async def waitForTask(self, runId: str, timeout: timedelta) -> CommandResponse: if runId in self.tasks: task: Task = self.tasks[runId] return await asyncio.wait_for(task, timeout=timeout.total_seconds()) else: return Error(runId, "No task was found for runId " + runId)
Keeps track of command responses for long-running commands by maintaining a map from runId to CommandResponse.
Class variables
var tasks
-
The type of the None singleton.
Methods
def addTask(self, runId: str, task: _asyncio.Task)
-
Expand source code
def addTask(self, runId: str, task: Task): self.tasks[runId] = task
async def waitForTask(self, runId: str, timeout: datetime.timedelta) ‑> CommandResponse
-
Expand source code
async def waitForTask(self, runId: str, timeout: timedelta) -> CommandResponse: if runId in self.tasks: task: Task = self.tasks[runId] return await asyncio.wait_for(task, timeout=timeout.total_seconds()) else: return Error(runId, "No task was found for runId " + runId)