Module csw.LocationServiceSync

Classes

class LocationServiceSync
Expand source code
class LocationServiceSync:
    """
    A version of Location Service that does not use async
    """

    log = structlog.get_logger()
    baseUri = "http://127.0.0.1:7654/"
    postUri = f"{baseUri}post-endpoint"

    def register(self, registration: Registration) -> RegistrationResult:
        """
        Registers a connection.

        Args:
            registration (Registration): the Registration holding the connection and it's corresponding location to
            register with LocationService

        Returns: ConnectionInfo
            an object describing the connection
        """
        # noinspection PyUnresolvedReferences
        regJson = json.loads(registration.to_json())
        regJson['_type'] = registration.__class__.__name__
        jsonBody = f'{{"_type": "Register", "registration": {json.dumps(regJson)}}}'
        r = requests.post(self.postUri, json=json.loads(jsonBody))
        if not r.ok:
            raise Exception(r.text)
        maybeResult = json.loads(r.text)
        if len(maybeResult) != 0:
            location = Location._makeLocation(maybeResult)
            return RegistrationResult.make(location, self.unregister)

    def unregister(self, connection: ConnectionInfo):
        """
        Unregisters a connection.

        Args:
            connection (ConnectionInfo): an already registered connection
        """
        self.log.debug(f"Unregistering connection {connection} from the Location Service.")
        # noinspection PyUnresolvedReferences
        jsonBody = f'{{"_type": "Unregister", "connection": {connection.to_json()}}}'
        r = requests.post(self.postUri, json=json.loads(jsonBody))
        if not r.ok:
            raise Exception(r.text)

    def _postJson(self, jsonBody: str) -> Location:
        r = requests.post(self.postUri, json=json.loads(jsonBody))
        if not r.ok:
            raise Exception(r.text)
        maybeResult = json.loads(r.text)
        if len(maybeResult) != 0:
            return Location._makeLocation(maybeResult[0])

    def find(self, connection: ConnectionInfo) -> Location:
        """
        Resolves the location for a connection from the local cache

        Args:
            connection (ConnectionInfo): an already registered connection

        Returns: Location
            the Location
        """
        # noinspection PyUnresolvedReferences
        return self._postJson(f'{{"_type": "Find", "connection": {connection.to_json()}}}')

    # "within":"2 seconds"}}
    def resolve(self, connection: ConnectionInfo, withinSecs: int = "5") -> Location:
        """
        Resolves the location for a connection from the local cache, if not found waits for the event to arrive
        within specified time limit

        Args:
            connection (ConnectionInfo): an already registered connection
            withinSecs (int): max number of seconds to wait for the connection to be found

        Returns: Location
            the Location
        """
        resolveInfo = ResolveInfo("Resolve", connection, f"{withinSecs} seconds")
        # noinspection PyUnresolvedReferences
        return self._postJson(resolveInfo.to_json())

    def _list(self, jsonBody: str) -> List[Location]:
        r = requests.post(LocationServiceSync.postUri, json=json.loads(jsonBody))
        if not r.ok:
            raise Exception(r.text)
        return list(map(lambda x: Location._makeLocation(x), json.loads(r.text)))

    @dispatch()
    def list(self) -> List[Location]:
        """
        Lists all locations registered.

        Returns: List[Location]
            list of locations
        """
        jsonBody = '{"_type": "ListEntries"}'
        return self._list(jsonBody)

    @dispatch(ComponentType)
    def list(self, componentType: ComponentType) -> List[Location]:
        """
        Lists components of the given component type

        Args:
            componentType (ComponentType): the type of component to list

        Returns: List[Location]
            list of locations
        """
        jsonBody = f'{{"_type": "ListByComponentType", "componentType": "{componentType.value}"}}'
        return self._list(jsonBody)

    @dispatch(str)
    def list(self, hostname: str) -> List[Location]:
        """
        Lists all locations registered on the given hostname

        Args:
            hostname (str): restrict result to this host

        Returns: List[Location]
            list of locations
        """
        jsonBody = f'{{"_type": "ListByHostname", "hostname": "{hostname}"}}'
        return self._list(jsonBody)

    @dispatch(ConnectionType)
    def list(self, connectionType: ConnectionType) -> List[Location]:
        """
        Lists all locations registered with the given connection type

        Args:
            connectionType (ConnectionType): restrict result to this connection type

        Returns: List[Location]
            list of locations
        """
        jsonBody = f'{{"_type": "ListByConnectionType", "connectionType": "{connectionType.value}"}}'
        return self._list(jsonBody)

    def listByPrefix(self, prefix: str) -> List[Location]:
        """
        Lists all locations with the given prefix

        Args:
            prefix (str): restrict result to those starting with prefix

        Returns: List[Location]
            list of locations
        """
        jsonBody = f'{{"_type": "ListByPrefix", "prefix": "{prefix}"}}'
        return self._list(jsonBody)

A version of Location Service that does not use async

Class variables

var baseUri

The type of the None singleton.

var log

The type of the None singleton.

var postUri

The type of the None singleton.

Methods

def find(self,
connection: ConnectionInfo) ‑> Location
Expand source code
def find(self, connection: ConnectionInfo) -> Location:
    """
    Resolves the location for a connection from the local cache

    Args:
        connection (ConnectionInfo): an already registered connection

    Returns: Location
        the Location
    """
    # noinspection PyUnresolvedReferences
    return self._postJson(f'{{"_type": "Find", "connection": {connection.to_json()}}}')

Resolves the location for a connection from the local cache

Args

connection : ConnectionInfo
an already registered connection

Returns: Location the Location

def list(...)
Expand source code
def __get__(self, instance, owner):
    self.obj = instance
    self.cls = owner
    return self

Dispatch methods based on type signature

See Also: Dispatcher

def listByPrefix(self, prefix: str) ‑> List[Location]
Expand source code
def listByPrefix(self, prefix: str) -> List[Location]:
    """
    Lists all locations with the given prefix

    Args:
        prefix (str): restrict result to those starting with prefix

    Returns: List[Location]
        list of locations
    """
    jsonBody = f'{{"_type": "ListByPrefix", "prefix": "{prefix}"}}'
    return self._list(jsonBody)

Lists all locations with the given prefix

Args

prefix : str
restrict result to those starting with prefix

Returns: List[Location] list of locations

def register(self,
registration: Registration) ‑> RegistrationResult
Expand source code
def register(self, registration: Registration) -> RegistrationResult:
    """
    Registers a connection.

    Args:
        registration (Registration): the Registration holding the connection and it's corresponding location to
        register with LocationService

    Returns: ConnectionInfo
        an object describing the connection
    """
    # noinspection PyUnresolvedReferences
    regJson = json.loads(registration.to_json())
    regJson['_type'] = registration.__class__.__name__
    jsonBody = f'{{"_type": "Register", "registration": {json.dumps(regJson)}}}'
    r = requests.post(self.postUri, json=json.loads(jsonBody))
    if not r.ok:
        raise Exception(r.text)
    maybeResult = json.loads(r.text)
    if len(maybeResult) != 0:
        location = Location._makeLocation(maybeResult)
        return RegistrationResult.make(location, self.unregister)

Registers a connection.

Args

registration : Registration
the Registration holding the connection and it's corresponding location to

register with LocationService Returns: ConnectionInfo an object describing the connection

def resolve(self,
connection: ConnectionInfo,
withinSecs: int = '5') ‑> Location
Expand source code
def resolve(self, connection: ConnectionInfo, withinSecs: int = "5") -> Location:
    """
    Resolves the location for a connection from the local cache, if not found waits for the event to arrive
    within specified time limit

    Args:
        connection (ConnectionInfo): an already registered connection
        withinSecs (int): max number of seconds to wait for the connection to be found

    Returns: Location
        the Location
    """
    resolveInfo = ResolveInfo("Resolve", connection, f"{withinSecs} seconds")
    # noinspection PyUnresolvedReferences
    return self._postJson(resolveInfo.to_json())

Resolves the location for a connection from the local cache, if not found waits for the event to arrive within specified time limit

Args

connection : ConnectionInfo
an already registered connection
withinSecs : int
max number of seconds to wait for the connection to be found

Returns: Location the Location

def unregister(self,
connection: ConnectionInfo)
Expand source code
def unregister(self, connection: ConnectionInfo):
    """
    Unregisters a connection.

    Args:
        connection (ConnectionInfo): an already registered connection
    """
    self.log.debug(f"Unregistering connection {connection} from the Location Service.")
    # noinspection PyUnresolvedReferences
    jsonBody = f'{{"_type": "Unregister", "connection": {connection.to_json()}}}'
    r = requests.post(self.postUri, json=json.loads(jsonBody))
    if not r.ok:
        raise Exception(r.text)

Unregisters a connection.

Args

connection : ConnectionInfo
an already registered connection