Module csw.TimeServiceScheduler

Classes

class TimeServiceScheduler
Expand source code
class TimeServiceScheduler:
    """
    Scheduler for scheduling periodic/non-periodic tasks at a specified time and/or interval.
    It supports scheduling on both UTCTime and TAITime.
    Each API returns a Cancellable which allows users to cancel the execution of tasks.
    Please note that implementation of Scheduler is optimised for high-throughput
    and high-frequency events. It is not to be confused with long-term schedulers such as Quartz.
    """

    log = structlog.get_logger()

    def scheduleOnce(self, startTime: TMTTime, func: Callable[[], Awaitable]) -> Cancellable:
        """
        Schedules an async function to execute once at the given start time.

        Args:
            startTime: the time at which the task should start its execution
            func: the async function (coroutine) to be scheduled for execution

        Returns:
            a handle to cancel the execution of the task if it hasn't been executed already
        """
        event = asyncio.Event()
        loop = asyncio.get_running_loop()
        secs = startTime.durationFromNow().total_seconds()
        timerHandle = loop.call_later(secs, lambda: event.set())

        async def wrapper():
            await event.wait()
            return await func()

        task = asyncio.create_task(wrapper())
        return TimerCancellable(timerHandle, task)

    def schedulePeriodically(self, interval: timedelta, func: Callable[[], Awaitable]) -> Cancellable:
        """
        Schedules a function to execute periodically at the given interval.
        The function is executed once immediately without any initial delay followed by periodic executions.
        In case you do not want to start scheduling immediately, you can use the overloaded method for `schedulePeriodically` with startTime.

        Args:
            interval: the time interval between the executions of the function
            func: the function to execute at each interval

        Returns:
            a handle to cancel execution of further tasks
        """
        secs = interval.total_seconds()

        async def periodic():
            while True:
                await func()
                await asyncio.sleep(secs)

        task = asyncio.create_task(periodic())
        return TimerCancellable(None, task)

    def schedulePeriodicallyStarting(self, startTime: TMTTime, interval: timedelta,
                             func: Callable[[], Awaitable]) -> Cancellable:
        """
        Schedules a function to execute periodically at the given interval.
        The task is executed once at the given start time followed by execution of task at each interval.

        Args:
            startTime: first time at which task is to be executed
            interval: the time interval between the executions of the function
            func: the function to execute at each interval

        Returns:
            a handle to cancel execution of further tasks
        """
        startSecs = startTime.durationFromNow().total_seconds()
        secs = interval.total_seconds()
        event = asyncio.Event()
        loop = asyncio.get_running_loop()

        async def periodic():
            await event.wait()
            while True:
                await func()
                await asyncio.sleep(secs)

        timerHandle = loop.call_later(startSecs, lambda: event.set())
        task = asyncio.create_task(periodic())
        return TimerCancellable(timerHandle, task)

Scheduler for scheduling periodic/non-periodic tasks at a specified time and/or interval. It supports scheduling on both UTCTime and TAITime. Each API returns a Cancellable which allows users to cancel the execution of tasks. Please note that implementation of Scheduler is optimised for high-throughput and high-frequency events. It is not to be confused with long-term schedulers such as Quartz.

Class variables

var log

The type of the None singleton.

Methods

def scheduleOnce(self,
startTime: TMTTime,
func: Callable[[], Awaitable]) ‑> Cancellable
Expand source code
def scheduleOnce(self, startTime: TMTTime, func: Callable[[], Awaitable]) -> Cancellable:
    """
    Schedules an async function to execute once at the given start time.

    Args:
        startTime: the time at which the task should start its execution
        func: the async function (coroutine) to be scheduled for execution

    Returns:
        a handle to cancel the execution of the task if it hasn't been executed already
    """
    event = asyncio.Event()
    loop = asyncio.get_running_loop()
    secs = startTime.durationFromNow().total_seconds()
    timerHandle = loop.call_later(secs, lambda: event.set())

    async def wrapper():
        await event.wait()
        return await func()

    task = asyncio.create_task(wrapper())
    return TimerCancellable(timerHandle, task)

Schedules an async function to execute once at the given start time.

Args

startTime
the time at which the task should start its execution
func
the async function (coroutine) to be scheduled for execution

Returns

a handle to cancel the execution of the task if it hasn't been executed already

def schedulePeriodically(self, interval: datetime.timedelta, func: Callable[[], Awaitable]) ‑> Cancellable
Expand source code
def schedulePeriodically(self, interval: timedelta, func: Callable[[], Awaitable]) -> Cancellable:
    """
    Schedules a function to execute periodically at the given interval.
    The function is executed once immediately without any initial delay followed by periodic executions.
    In case you do not want to start scheduling immediately, you can use the overloaded method for `schedulePeriodically` with startTime.

    Args:
        interval: the time interval between the executions of the function
        func: the function to execute at each interval

    Returns:
        a handle to cancel execution of further tasks
    """
    secs = interval.total_seconds()

    async def periodic():
        while True:
            await func()
            await asyncio.sleep(secs)

    task = asyncio.create_task(periodic())
    return TimerCancellable(None, task)

Schedules a function to execute periodically at the given interval. The function is executed once immediately without any initial delay followed by periodic executions. In case you do not want to start scheduling immediately, you can use the overloaded method for schedulePeriodically with startTime.

Args

interval
the time interval between the executions of the function
func
the function to execute at each interval

Returns

a handle to cancel execution of further tasks

def schedulePeriodicallyStarting(self,
startTime: TMTTime,
interval: datetime.timedelta,
func: Callable[[], Awaitable]) ‑> Cancellable
Expand source code
def schedulePeriodicallyStarting(self, startTime: TMTTime, interval: timedelta,
                         func: Callable[[], Awaitable]) -> Cancellable:
    """
    Schedules a function to execute periodically at the given interval.
    The task is executed once at the given start time followed by execution of task at each interval.

    Args:
        startTime: first time at which task is to be executed
        interval: the time interval between the executions of the function
        func: the function to execute at each interval

    Returns:
        a handle to cancel execution of further tasks
    """
    startSecs = startTime.durationFromNow().total_seconds()
    secs = interval.total_seconds()
    event = asyncio.Event()
    loop = asyncio.get_running_loop()

    async def periodic():
        await event.wait()
        while True:
            await func()
            await asyncio.sleep(secs)

    timerHandle = loop.call_later(startSecs, lambda: event.set())
    task = asyncio.create_task(periodic())
    return TimerCancellable(timerHandle, task)

Schedules a function to execute periodically at the given interval. The task is executed once at the given start time followed by execution of task at each interval.

Args

startTime
first time at which task is to be executed
interval
the time interval between the executions of the function
func
the function to execute at each interval

Returns

a handle to cancel execution of further tasks

class TimerCancellable (timerHandle: asyncio.events.TimerHandle | None, task: _asyncio.Task | None)
Expand source code
class TimerCancellable(Cancellable):
    def __init__(self, timerHandle: TimerHandle | None, task: Task | None):
        self.timerHandle = timerHandle
        self.task = task

    def cancel(self) -> bool:
        if self.timerHandle:
            self.timerHandle.cancel()
        if self.task:
            self.task.cancel()
        return True

API for a scheduled periodic task, that allows it to be cancelled.

Ancestors

Inherited members