Module csw.ConfigService

Classes

class ConfigData (content: bytes)
Expand source code
@dataclass
class ConfigData:
    """
    Represents the contents of a file in the Config Service.
    You can get the content as a string with `s = configData.content.decode('utf-8')`.
    Create from a string with `ConfigData(bytes('hello', 'utf-8'))`.
    """
    content: bytes

    def __str__(self):
        return self.content.decode('utf-8')

Represents the contents of a file in the Config Service. You can get the content as a string with s = configData.content.decode('utf-8'). Create from a string with ConfigData(bytes('hello', 'utf-8')).

Instance variables

var content : bytes

The type of the None singleton.

class ConfigFileInfo (path: str, id: str, author: str, comment: str)
Expand source code
@dataclass_json
@dataclass
class ConfigFileInfo:
    """
    Contains information about a config file stored in the config service
    """
    path: str
    id: str
    author: str
    comment: str

Contains information about a config file stored in the config service

Static methods

def from_dict(kvs: dict | list | str | int | float | bool | None, *, infer_missing=False) ‑> ~A
def from_json(s: str | bytes | bytearray,
*,
parse_float=None,
parse_int=None,
parse_constant=None,
infer_missing=False,
**kw) ‑> ~A
def schema(*,
infer_missing: bool = False,
only=None,
exclude=(),
many: bool = False,
context=None,
load_only=(),
dump_only=(),
partial: bool = False,
unknown=None) ‑> dataclasses_json.mm.SchemaF[~A]

Instance variables

var author : str

The type of the None singleton.

var comment : str

The type of the None singleton.

var id : str

The type of the None singleton.

var path : str

The type of the None singleton.

Methods

def to_dict(self, encode_json=False) ‑> Dict[str, dict | list | str | int | float | bool | None]
Expand source code
def to_dict(self, encode_json=False) -> Dict[str, Json]:
    return _asdict(self, encode_json=encode_json)
def to_json(self,
*,
skipkeys: bool = False,
ensure_ascii: bool = True,
check_circular: bool = True,
allow_nan: bool = True,
indent: int | str | None = None,
separators: Tuple[str, str] | None = None,
default: Callable | None = None,
sort_keys: bool = False,
**kw) ‑> str
Expand source code
def to_json(self,
            *,
            skipkeys: bool = False,
            ensure_ascii: bool = True,
            check_circular: bool = True,
            allow_nan: bool = True,
            indent: Optional[Union[int, str]] = None,
            separators: Optional[Tuple[str, str]] = None,
            default: Optional[Callable] = None,
            sort_keys: bool = False,
            **kw) -> str:
    return json.dumps(self.to_dict(encode_json=False),
                      cls=_ExtendedEncoder,
                      skipkeys=skipkeys,
                      ensure_ascii=ensure_ascii,
                      check_circular=check_circular,
                      allow_nan=allow_nan,
                      indent=indent,
                      separators=separators,
                      default=default,
                      sort_keys=sort_keys,
                      **kw)
class ConfigFileRevision (id: str, author: str, comment: str, time: str)
Expand source code
@dataclass_json
@dataclass
class ConfigFileRevision:
    """
    Holds information about a specific version of a config file
    """
    id: str
    author: str
    comment: str
    time: str

Holds information about a specific version of a config file

Static methods

def from_dict(kvs: dict | list | str | int | float | bool | None, *, infer_missing=False) ‑> ~A
def from_json(s: str | bytes | bytearray,
*,
parse_float=None,
parse_int=None,
parse_constant=None,
infer_missing=False,
**kw) ‑> ~A
def schema(*,
infer_missing: bool = False,
only=None,
exclude=(),
many: bool = False,
context=None,
load_only=(),
dump_only=(),
partial: bool = False,
unknown=None) ‑> dataclasses_json.mm.SchemaF[~A]

Instance variables

var author : str

The type of the None singleton.

var comment : str

The type of the None singleton.

var id : str

The type of the None singleton.

var time : str

The type of the None singleton.

Methods

def to_dict(self, encode_json=False) ‑> Dict[str, dict | list | str | int | float | bool | None]
Expand source code
def to_dict(self, encode_json=False) -> Dict[str, Json]:
    return _asdict(self, encode_json=encode_json)
def to_json(self,
*,
skipkeys: bool = False,
ensure_ascii: bool = True,
check_circular: bool = True,
allow_nan: bool = True,
indent: int | str | None = None,
separators: Tuple[str, str] | None = None,
default: Callable | None = None,
sort_keys: bool = False,
**kw) ‑> str
Expand source code
def to_json(self,
            *,
            skipkeys: bool = False,
            ensure_ascii: bool = True,
            check_circular: bool = True,
            allow_nan: bool = True,
            indent: Optional[Union[int, str]] = None,
            separators: Optional[Tuple[str, str]] = None,
            default: Optional[Callable] = None,
            sort_keys: bool = False,
            **kw) -> str:
    return json.dumps(self.to_dict(encode_json=False),
                      cls=_ExtendedEncoder,
                      skipkeys=skipkeys,
                      ensure_ascii=ensure_ascii,
                      check_circular=check_circular,
                      allow_nan=allow_nan,
                      indent=indent,
                      separators=separators,
                      default=default,
                      sort_keys=sort_keys,
                      **kw)
class ConfigId (id: str)
Expand source code
@dataclass_json
@dataclass
class ConfigId:
    """
    Type of an id returned from ConfigManager create or update methods
    """
    id: str

Type of an id returned from ConfigManager create or update methods

Static methods

def from_dict(kvs: dict | list | str | int | float | bool | None, *, infer_missing=False) ‑> ~A
def from_json(s: str | bytes | bytearray,
*,
parse_float=None,
parse_int=None,
parse_constant=None,
infer_missing=False,
**kw) ‑> ~A
def schema(*,
infer_missing: bool = False,
only=None,
exclude=(),
many: bool = False,
context=None,
load_only=(),
dump_only=(),
partial: bool = False,
unknown=None) ‑> dataclasses_json.mm.SchemaF[~A]

Instance variables

var id : str

The type of the None singleton.

Methods

def to_dict(self, encode_json=False) ‑> Dict[str, dict | list | str | int | float | bool | None]
Expand source code
def to_dict(self, encode_json=False) -> Dict[str, Json]:
    return _asdict(self, encode_json=encode_json)
def to_json(self,
*,
skipkeys: bool = False,
ensure_ascii: bool = True,
check_circular: bool = True,
allow_nan: bool = True,
indent: int | str | None = None,
separators: Tuple[str, str] | None = None,
default: Callable | None = None,
sort_keys: bool = False,
**kw) ‑> str
Expand source code
def to_json(self,
            *,
            skipkeys: bool = False,
            ensure_ascii: bool = True,
            check_circular: bool = True,
            allow_nan: bool = True,
            indent: Optional[Union[int, str]] = None,
            separators: Optional[Tuple[str, str]] = None,
            default: Optional[Callable] = None,
            sort_keys: bool = False,
            **kw) -> str:
    return json.dumps(self.to_dict(encode_json=False),
                      cls=_ExtendedEncoder,
                      skipkeys=skipkeys,
                      ensure_ascii=ensure_ascii,
                      check_circular=check_circular,
                      allow_nan=allow_nan,
                      indent=indent,
                      separators=separators,
                      default=default,
                      sort_keys=sort_keys,
                      **kw)
class ConfigMetadata (repoPath: str, annexPath: str, annexMinFileSize: str, maxConfigFileSize: str)
Expand source code
@dataclass_json
@dataclass
class ConfigMetadata:
    """
    Holds metadata information about config server
    """
    repoPath: str
    annexPath: str
    annexMinFileSize: str
    maxConfigFileSize: str

Holds metadata information about config server

Static methods

def from_dict(kvs: dict | list | str | int | float | bool | None, *, infer_missing=False) ‑> ~A
def from_json(s: str | bytes | bytearray,
*,
parse_float=None,
parse_int=None,
parse_constant=None,
infer_missing=False,
**kw) ‑> ~A
def schema(*,
infer_missing: bool = False,
only=None,
exclude=(),
many: bool = False,
context=None,
load_only=(),
dump_only=(),
partial: bool = False,
unknown=None) ‑> dataclasses_json.mm.SchemaF[~A]

Instance variables

var annexMinFileSize : str

The type of the None singleton.

var annexPath : str

The type of the None singleton.

var maxConfigFileSize : str

The type of the None singleton.

var repoPath : str

The type of the None singleton.

Methods

def to_dict(self, encode_json=False) ‑> Dict[str, dict | list | str | int | float | bool | None]
Expand source code
def to_dict(self, encode_json=False) -> Dict[str, Json]:
    return _asdict(self, encode_json=encode_json)
def to_json(self,
*,
skipkeys: bool = False,
ensure_ascii: bool = True,
check_circular: bool = True,
allow_nan: bool = True,
indent: int | str | None = None,
separators: Tuple[str, str] | None = None,
default: Callable | None = None,
sort_keys: bool = False,
**kw) ‑> str
Expand source code
def to_json(self,
            *,
            skipkeys: bool = False,
            ensure_ascii: bool = True,
            check_circular: bool = True,
            allow_nan: bool = True,
            indent: Optional[Union[int, str]] = None,
            separators: Optional[Tuple[str, str]] = None,
            default: Optional[Callable] = None,
            sort_keys: bool = False,
            **kw) -> str:
    return json.dumps(self.to_dict(encode_json=False),
                      cls=_ExtendedEncoder,
                      skipkeys=skipkeys,
                      ensure_ascii=ensure_ascii,
                      check_circular=check_circular,
                      allow_nan=allow_nan,
                      indent=indent,
                      separators=separators,
                      default=default,
                      sort_keys=sort_keys,
                      **kw)
class ConfigService (clientSession: aiohttp.client.ClientSession,
client_id: str = 'tmt-frontend-app',
user: str = 'config-admin1',
password: str = 'config-admin1')
Expand source code
class ConfigService:

    def __init__(self,
                 clientSession: ClientSession,
                 client_id: str = "tmt-frontend-app",
                 user: str = "config-admin1",
                 password: str = "config-admin1",):
        self._session = clientSession
        self.client_id = client_id
        self.user = user
        self.password = password
        self._locationService = LocationService(clientSession)

    @staticmethod
    def _formatTime(time: datetime):
        return time.strftime("%Y-%m-%dT%H:%M:%SZ")

    async def _getBaseUri(self) -> str:
        prefix = Prefix(Subsystem.CSW, "ConfigServer")
        connection = ConnectionInfo.make(prefix, ComponentType.Service, ConnectionType.HttpType)
        location = await self._locationService.resolve(connection)
        if location is not None:
            location.__class__ = HttpLocation
            return location.uri
        raise RuntimeError("Can't locate CSW Config Service")

    async def _endPoint(self, path: str) -> str:
        return f'{await self._getBaseUri()}{path}'

    async def _locateAuthService(self) -> str:
        connection = ConnectionInfo.make(Prefix(Subsystem.CSW, "AAS"), ComponentType.Service, ConnectionType.HttpType)
        location = await self._locationService.resolve(connection)
        if location is not None:
            location.__class__ = HttpLocation
            return location.uri
        raise RuntimeError("Can't locate CSW Auth Service")

    async def _getToken(self):
        uri = await self._locateAuthService()
        keycloak_openid = KeycloakOpenID(server_url=f'{uri}/',
                                         client_id=self.client_id,
                                         realm_name='TMT')
        d = keycloak_openid.token(self.user, self.password)
        return d['access_token']

    @staticmethod
    def _validatePath(path: str):
        invalidChars = "!#<>$%&'@^`~+,;=\\s"
        if re.match(invalidChars, path):
            charsMessage = invalidChars.replace('\\s', '')
            raise RuntimeError(
                f"Input file path '{path}' contains invalid characters. "
                + f"Note, these characters {charsMessage} or 'white space' are not allowed in file path`")

    async def _createOrUpdate(self, create: bool, path: str, configData: ConfigData,
                        annex: bool = False, comment: str = "Created") -> ConfigId:
        self._validatePath(path)
        token = await self._getToken()
        params = urlencode({'annex': annex, 'comment': comment})
        baseUri = await self._endPoint(f'config/{path}')
        uri = f'{baseUri}?{params}'
        headers = {'Content-type': 'application/octet-stream', 'Authorization': f'Bearer {token}'}
        if create:
            response = await self._session.post(uri, headers=headers, data=configData.content)
        else:
            response = await self._session.put(uri, headers=headers, data=configData.content)
        if not response.ok:
            raise RuntimeError(await response.text())
        return ConfigId(await response.json())

    async def create(self, path: str, configData: ConfigData, annex: bool = False, comment: str = "Created") -> ConfigId:
        """
        Creates a file at a specified path with given data and comment.

        Args:
            path (str): the file path relative to the repository root
            configData (ConfigData): contents of the file
            annex (bool): true if the file is annex and requires special handling (external storage)
            comment (str): comment to associate with this operation

        Returns: id of file revision
        """
        return await self._createOrUpdate(True, path, configData, annex, comment)

    async def update(self, path: str, configData: ConfigData, annex: bool = False, comment: str = "Created") -> ConfigId:
        """
        Updates a file at a specified path with given data and comment.

        Args:
            path (str): the file path relative to the repository root
            configData (ConfigData): contents of the file
            annex (bool): true if the file is annex and requires special handling (external storage)
            comment (str): comment to associate with this operation

        Returns: id of file revision
        """
        return await self._createOrUpdate(False, path, configData, annex, comment)

    async def delete(self, path: str, comment: str = "Deleted"):
        """
        Deletes the given config file (older versions will still be available).

        Args:
            path: the file path relative to the repository root
            comment: comment to associate with this operation
        """
        token = await self._getToken()
        params = urlencode({'comment': comment})
        baseUri = await self._endPoint(f'config/{path}')
        uri = f'{baseUri}?{params}'
        headers = {'Authorization': f'Bearer {token}'}
        response = await self._session.delete(uri, headers=headers)
        if not response.ok:
            raise RuntimeError(await response.text())

    # noinspection PyUnresolvedReferences
    async def list(self, fileType: FileType = None, pattern: str = None) -> List[ConfigFileInfo]:
        """
        Returns a list containing all the known config files of given type(Annex or Normal) and whose name matches the provided pattern.

        Args:
            fileType: optional file type(Annex or Normal)
            pattern: optional pattern to match against the file name

        Returns: list of ConfigFileInfo

        """
        params = {}
        if fileType:
            params.update({'type': fileType.name})
        if pattern:
            params.update({'pattern': pattern})
        uri = f"{await self._endPoint('list')}?{urlencode(params)}"
        response = await self._session.get(uri)
        return list(map(lambda p: ConfigFileInfo.from_dict(p), await response.json()))

    async def exists(self, path: str, configId: ConfigId = None):
        """
        Returns true if the given path exists and is being managed.

        Args:
            path: the file path relative to the repository root
            configId: revision of the file

        Returns: true if the file exists in the repo
        """
        params = {}
        if configId:
            params.update({'id': configId.id})
        uri = f"{await self._endPoint('config')}/{path}?{urlencode(params)}"
        response = await self._session.head(uri)
        return response.ok

    async def getLatest(self, path: str) -> ConfigData:
        """
        Gets and returns the content of latest version of the file stored under the given path.

        Args:
            path (str): the file path relative to the repository root

        Returns: file contents
        """
        uri = f"{await self._endPoint('config')}/{path}"
        response = await self._session.get(uri)
        if not response.ok:
            raise RuntimeError(await response.text())
        return ConfigData(await response.content.read())

    async def getById(self, path: str, configId: ConfigId) -> ConfigData:
        """
        Gets and returns the file at the given path with the specified revision id

        Args:
            path (str): the file path relative to the repository root
            configId (ConfigId):  id used to specify a specific version to fetch

        Returns: file contents
        """
        params = {'id': configId.id}
        uri = f"{await self._endPoint('config')}/{path}?{urlencode(params)}"
        response = await self._session.get(uri)
        if not response.ok:
            raise RuntimeError(await response.text())
        return ConfigData(await response.content.read())

    async def getByTime(self, path: str, time: datetime) -> ConfigData:
        """
        Gets the file at the given path as it existed on the given instant.
        If instant is before the file was created, the initial version is returned.
        If instant is after the last change, the most recent version is returned.

        Args:
            path (str): the file path relative to the repository root
            time (datetime): the target instant

        Returns: file contents
        """
        params = {'date': self._formatTime(time)}
        uri = f"{await self._endPoint('config')}/{path}?{urlencode(params)}"
        response = await self._session.get(uri)
        if not response.ok:
            raise RuntimeError(await response.text())
        return ConfigData(await response.content.read())

    async def getActive(self, path: str) -> ConfigData:
        """
        Gets and returns the content of active version of the file stored under the given path.

        Args:
            path (str): the file path relative to the repository root

        Returns: file contents

        """
        uri = f"{await self._endPoint('active-config')}/{path}"
        response = await self._session.get(uri)
        if not response.ok:
            raise RuntimeError(await response.text())
        return ConfigData(await response.content.read())

    async def getActiveByTime(self, path: str, time: datetime) -> ConfigData:
        """
        Returns the content of active version of the file at the given path as it existed on the given instant

        Args:
            path (str): the file path relative to the repository root
            time (datetime): the target instant

        Returns: file contents

        """
        params = {'date': self._formatTime(time)}
        uri = f"{await self._endPoint('active-config')}/{path}?{urlencode(params)}"
        response = await self._session.get(uri)
        if not response.ok:
            raise RuntimeError(await response.text())
        return ConfigData(await response.content.read())

    async def getActiveVersion(self, path: str) -> ConfigId:
        """
        Returns the version which represents the "active version" of the file at the given path.

        Args:
            path (str): the file path relative to the repository root

        Returns: ConfigId indicating the id of the active version

        """
        uri = f"{await self._endPoint('active-version')}/{path}"
        response = await self._session.get(uri)
        if not response.ok:
            raise RuntimeError(await response.text())
        return ConfigId(await response.json())

    # noinspection PyUnresolvedReferences
    async def getMetadata(self) -> ConfigMetadata:
        """
        Query the metadata of config server.
        Returns: a ConfigMetadata object

        """
        uri = f"{await self._endPoint('metadata')}"
        response = await self._session.get(uri)
        if not response.ok:
            raise RuntimeError(await response.text())
        return ConfigMetadata.from_dict(await response.json())

    # noinspection PyUnresolvedReferences
    async def _history(self, key: str, path: str,
                 fromTime: datetime, toTime: datetime,
                 maxResults: int) -> List[ConfigFileRevision]:
        params = {}
        if fromTime:
            params.update({'from': self._formatTime(fromTime)})
        if toTime:
            params.update({'to': self._formatTime(toTime)})
        if maxResults:
            params.update({'maxResults': maxResults})
        uri = f"{await self._endPoint(key)}/{path}?{urlencode(params)}"
        response = await self._session.get(uri)
        if not response.ok:
            raise RuntimeError(await response.text())
        return list(map(lambda p: ConfigFileRevision.from_dict(p), await response.json()))

    async def history(self, path: str,
                fromTime: datetime = None, toTime: datetime = None,
                maxResults: int = 10000) -> List[ConfigFileRevision]:
        """
        Returns the history of versions of the file at the given path for a range of period specified by from and to.
        The size of the list is limited up to maxResults.

        Args:
            path: the file path relative to the repository root
            fromTime: optional start of the history range
            toTime: optional end of the history range
            maxResults: optional maximum number of history results to return (default: unlimited)

        Returns: list of ConfigFileRevision

        """
        return await self._history('history', path, fromTime, toTime, maxResults)

    async def historyActive(self, path: str,
                      fromTime: datetime = None, toTime: datetime = None,
                      maxResults: int = None) -> List[ConfigFileRevision]:
        """
        Returns the history of active versions of the file at the given path for a range of period specified by
        fromTime and toTime. The size of the list is limited upto maxResults.

        Args:
            path: the file path relative to the repository root
            fromTime: optional start of the history range
            toTime: optional end of the history range
            maxResults: optional maximum number of history results to return (default: unlimited)

        Returns: list of ConfigFileRevision

        """
        return await self._history('history-active', path, fromTime, toTime, maxResults)

    async def setActiveVersion(self, path: str, configId: ConfigId, comment: str):
        """
        Sets the active version to be the version provided for the file at the given path.
        If this method is not called, the active version will always be the version with which the file was created.
        After calling this method, the version with the given Id will be the active version.

        Args:
            path: the file path relative to the repository root
            configId: an id used to specify a specific version (by default the id of the version with which
                      the file was created i.e. 1)
            comment: comment to associate with this operation

        """
        token = await self._getToken()
        params = {'id': configId.id, 'comment': comment}
        headers = {'Authorization': f'Bearer {token}'}
        uri = f"{await self._endPoint('active-version')}/{path}?{urlencode(params)}"
        response = await self._session.put(uri, headers=headers)
        if not response.ok:
            raise RuntimeError(await response.text())

    async def resetActiveVersion(self, path: str, comment: str):
        """
        Resets the "active version" of the file at the given path to the latest version.

        Args:
            path: the file path relative to the repository root
            comment: comment to associate with this operation

        """
        token = await self._getToken()
        params = {'comment': comment}
        headers = {'Authorization': f'Bearer {token}'}
        uri = f"{await self._endPoint('active-version')}/{path}?{urlencode(params)}"
        response = await self._session.put(uri, headers=headers)
        if not response.ok:
            raise RuntimeError(await response.text())

Methods

async def create(self,
path: str,
configData: ConfigData,
annex: bool = False,
comment: str = 'Created') ‑> ConfigId
Expand source code
async def create(self, path: str, configData: ConfigData, annex: bool = False, comment: str = "Created") -> ConfigId:
    """
    Creates a file at a specified path with given data and comment.

    Args:
        path (str): the file path relative to the repository root
        configData (ConfigData): contents of the file
        annex (bool): true if the file is annex and requires special handling (external storage)
        comment (str): comment to associate with this operation

    Returns: id of file revision
    """
    return await self._createOrUpdate(True, path, configData, annex, comment)

Creates a file at a specified path with given data and comment.

Args

path : str
the file path relative to the repository root
configData : ConfigData
contents of the file
annex : bool
true if the file is annex and requires special handling (external storage)
comment : str
comment to associate with this operation

Returns: id of file revision

async def delete(self, path: str, comment: str = 'Deleted')
Expand source code
async def delete(self, path: str, comment: str = "Deleted"):
    """
    Deletes the given config file (older versions will still be available).

    Args:
        path: the file path relative to the repository root
        comment: comment to associate with this operation
    """
    token = await self._getToken()
    params = urlencode({'comment': comment})
    baseUri = await self._endPoint(f'config/{path}')
    uri = f'{baseUri}?{params}'
    headers = {'Authorization': f'Bearer {token}'}
    response = await self._session.delete(uri, headers=headers)
    if not response.ok:
        raise RuntimeError(await response.text())

Deletes the given config file (older versions will still be available).

Args

path
the file path relative to the repository root
comment
comment to associate with this operation
async def exists(self,
path: str,
configId: ConfigId = None)
Expand source code
async def exists(self, path: str, configId: ConfigId = None):
    """
    Returns true if the given path exists and is being managed.

    Args:
        path: the file path relative to the repository root
        configId: revision of the file

    Returns: true if the file exists in the repo
    """
    params = {}
    if configId:
        params.update({'id': configId.id})
    uri = f"{await self._endPoint('config')}/{path}?{urlencode(params)}"
    response = await self._session.head(uri)
    return response.ok

Returns true if the given path exists and is being managed.

Args

path
the file path relative to the repository root
configId
revision of the file

Returns: true if the file exists in the repo

async def getActive(self, path: str) ‑> ConfigData
Expand source code
async def getActive(self, path: str) -> ConfigData:
    """
    Gets and returns the content of active version of the file stored under the given path.

    Args:
        path (str): the file path relative to the repository root

    Returns: file contents

    """
    uri = f"{await self._endPoint('active-config')}/{path}"
    response = await self._session.get(uri)
    if not response.ok:
        raise RuntimeError(await response.text())
    return ConfigData(await response.content.read())

Gets and returns the content of active version of the file stored under the given path.

Args

path : str
the file path relative to the repository root

Returns: file contents

async def getActiveByTime(self, path: str, time: datetime.datetime) ‑> ConfigData
Expand source code
async def getActiveByTime(self, path: str, time: datetime) -> ConfigData:
    """
    Returns the content of active version of the file at the given path as it existed on the given instant

    Args:
        path (str): the file path relative to the repository root
        time (datetime): the target instant

    Returns: file contents

    """
    params = {'date': self._formatTime(time)}
    uri = f"{await self._endPoint('active-config')}/{path}?{urlencode(params)}"
    response = await self._session.get(uri)
    if not response.ok:
        raise RuntimeError(await response.text())
    return ConfigData(await response.content.read())

Returns the content of active version of the file at the given path as it existed on the given instant

Args

path : str
the file path relative to the repository root
time : datetime
the target instant

Returns: file contents

async def getActiveVersion(self, path: str) ‑> ConfigId
Expand source code
async def getActiveVersion(self, path: str) -> ConfigId:
    """
    Returns the version which represents the "active version" of the file at the given path.

    Args:
        path (str): the file path relative to the repository root

    Returns: ConfigId indicating the id of the active version

    """
    uri = f"{await self._endPoint('active-version')}/{path}"
    response = await self._session.get(uri)
    if not response.ok:
        raise RuntimeError(await response.text())
    return ConfigId(await response.json())

Returns the version which represents the "active version" of the file at the given path.

Args

path : str
the file path relative to the repository root

Returns: ConfigId indicating the id of the active version

async def getById(self,
path: str,
configId: ConfigId) ‑> ConfigData
Expand source code
async def getById(self, path: str, configId: ConfigId) -> ConfigData:
    """
    Gets and returns the file at the given path with the specified revision id

    Args:
        path (str): the file path relative to the repository root
        configId (ConfigId):  id used to specify a specific version to fetch

    Returns: file contents
    """
    params = {'id': configId.id}
    uri = f"{await self._endPoint('config')}/{path}?{urlencode(params)}"
    response = await self._session.get(uri)
    if not response.ok:
        raise RuntimeError(await response.text())
    return ConfigData(await response.content.read())

Gets and returns the file at the given path with the specified revision id

Args

path : str
the file path relative to the repository root
configId : ConfigId
id used to specify a specific version to fetch

Returns: file contents

async def getByTime(self, path: str, time: datetime.datetime) ‑> ConfigData
Expand source code
async def getByTime(self, path: str, time: datetime) -> ConfigData:
    """
    Gets the file at the given path as it existed on the given instant.
    If instant is before the file was created, the initial version is returned.
    If instant is after the last change, the most recent version is returned.

    Args:
        path (str): the file path relative to the repository root
        time (datetime): the target instant

    Returns: file contents
    """
    params = {'date': self._formatTime(time)}
    uri = f"{await self._endPoint('config')}/{path}?{urlencode(params)}"
    response = await self._session.get(uri)
    if not response.ok:
        raise RuntimeError(await response.text())
    return ConfigData(await response.content.read())

Gets the file at the given path as it existed on the given instant. If instant is before the file was created, the initial version is returned. If instant is after the last change, the most recent version is returned.

Args

path : str
the file path relative to the repository root
time : datetime
the target instant

Returns: file contents

async def getLatest(self, path: str) ‑> ConfigData
Expand source code
async def getLatest(self, path: str) -> ConfigData:
    """
    Gets and returns the content of latest version of the file stored under the given path.

    Args:
        path (str): the file path relative to the repository root

    Returns: file contents
    """
    uri = f"{await self._endPoint('config')}/{path}"
    response = await self._session.get(uri)
    if not response.ok:
        raise RuntimeError(await response.text())
    return ConfigData(await response.content.read())

Gets and returns the content of latest version of the file stored under the given path.

Args

path : str
the file path relative to the repository root

Returns: file contents

async def getMetadata(self) ‑> ConfigMetadata
Expand source code
async def getMetadata(self) -> ConfigMetadata:
    """
    Query the metadata of config server.
    Returns: a ConfigMetadata object

    """
    uri = f"{await self._endPoint('metadata')}"
    response = await self._session.get(uri)
    if not response.ok:
        raise RuntimeError(await response.text())
    return ConfigMetadata.from_dict(await response.json())

Query the metadata of config server. Returns: a ConfigMetadata object

async def history(self,
path: str,
fromTime: datetime.datetime = None,
toTime: datetime.datetime = None,
maxResults: int = 10000) ‑> List[ConfigFileRevision]
Expand source code
async def history(self, path: str,
            fromTime: datetime = None, toTime: datetime = None,
            maxResults: int = 10000) -> List[ConfigFileRevision]:
    """
    Returns the history of versions of the file at the given path for a range of period specified by from and to.
    The size of the list is limited up to maxResults.

    Args:
        path: the file path relative to the repository root
        fromTime: optional start of the history range
        toTime: optional end of the history range
        maxResults: optional maximum number of history results to return (default: unlimited)

    Returns: list of ConfigFileRevision

    """
    return await self._history('history', path, fromTime, toTime, maxResults)

Returns the history of versions of the file at the given path for a range of period specified by from and to. The size of the list is limited up to maxResults.

Args

path
the file path relative to the repository root
fromTime
optional start of the history range
toTime
optional end of the history range
maxResults
optional maximum number of history results to return (default: unlimited)

Returns: list of ConfigFileRevision

async def historyActive(self,
path: str,
fromTime: datetime.datetime = None,
toTime: datetime.datetime = None,
maxResults: int = None) ‑> List[ConfigFileRevision]
Expand source code
async def historyActive(self, path: str,
                  fromTime: datetime = None, toTime: datetime = None,
                  maxResults: int = None) -> List[ConfigFileRevision]:
    """
    Returns the history of active versions of the file at the given path for a range of period specified by
    fromTime and toTime. The size of the list is limited upto maxResults.

    Args:
        path: the file path relative to the repository root
        fromTime: optional start of the history range
        toTime: optional end of the history range
        maxResults: optional maximum number of history results to return (default: unlimited)

    Returns: list of ConfigFileRevision

    """
    return await self._history('history-active', path, fromTime, toTime, maxResults)

Returns the history of active versions of the file at the given path for a range of period specified by fromTime and toTime. The size of the list is limited upto maxResults.

Args

path
the file path relative to the repository root
fromTime
optional start of the history range
toTime
optional end of the history range
maxResults
optional maximum number of history results to return (default: unlimited)

Returns: list of ConfigFileRevision

async def list(self,
fileType: FileType = None,
pattern: str = None) ‑> List[ConfigFileInfo]
Expand source code
async def list(self, fileType: FileType = None, pattern: str = None) -> List[ConfigFileInfo]:
    """
    Returns a list containing all the known config files of given type(Annex or Normal) and whose name matches the provided pattern.

    Args:
        fileType: optional file type(Annex or Normal)
        pattern: optional pattern to match against the file name

    Returns: list of ConfigFileInfo

    """
    params = {}
    if fileType:
        params.update({'type': fileType.name})
    if pattern:
        params.update({'pattern': pattern})
    uri = f"{await self._endPoint('list')}?{urlencode(params)}"
    response = await self._session.get(uri)
    return list(map(lambda p: ConfigFileInfo.from_dict(p), await response.json()))

Returns a list containing all the known config files of given type(Annex or Normal) and whose name matches the provided pattern.

Args

fileType
optional file type(Annex or Normal)
pattern
optional pattern to match against the file name

Returns: list of ConfigFileInfo

async def resetActiveVersion(self, path: str, comment: str)
Expand source code
async def resetActiveVersion(self, path: str, comment: str):
    """
    Resets the "active version" of the file at the given path to the latest version.

    Args:
        path: the file path relative to the repository root
        comment: comment to associate with this operation

    """
    token = await self._getToken()
    params = {'comment': comment}
    headers = {'Authorization': f'Bearer {token}'}
    uri = f"{await self._endPoint('active-version')}/{path}?{urlencode(params)}"
    response = await self._session.put(uri, headers=headers)
    if not response.ok:
        raise RuntimeError(await response.text())

Resets the "active version" of the file at the given path to the latest version.

Args

path
the file path relative to the repository root
comment
comment to associate with this operation
async def setActiveVersion(self,
path: str,
configId: ConfigId,
comment: str)
Expand source code
async def setActiveVersion(self, path: str, configId: ConfigId, comment: str):
    """
    Sets the active version to be the version provided for the file at the given path.
    If this method is not called, the active version will always be the version with which the file was created.
    After calling this method, the version with the given Id will be the active version.

    Args:
        path: the file path relative to the repository root
        configId: an id used to specify a specific version (by default the id of the version with which
                  the file was created i.e. 1)
        comment: comment to associate with this operation

    """
    token = await self._getToken()
    params = {'id': configId.id, 'comment': comment}
    headers = {'Authorization': f'Bearer {token}'}
    uri = f"{await self._endPoint('active-version')}/{path}?{urlencode(params)}"
    response = await self._session.put(uri, headers=headers)
    if not response.ok:
        raise RuntimeError(await response.text())

Sets the active version to be the version provided for the file at the given path. If this method is not called, the active version will always be the version with which the file was created. After calling this method, the version with the given Id will be the active version.

Args

path
the file path relative to the repository root
configId
an id used to specify a specific version (by default the id of the version with which the file was created i.e. 1)
comment
comment to associate with this operation
async def update(self,
path: str,
configData: ConfigData,
annex: bool = False,
comment: str = 'Created') ‑> ConfigId
Expand source code
async def update(self, path: str, configData: ConfigData, annex: bool = False, comment: str = "Created") -> ConfigId:
    """
    Updates a file at a specified path with given data and comment.

    Args:
        path (str): the file path relative to the repository root
        configData (ConfigData): contents of the file
        annex (bool): true if the file is annex and requires special handling (external storage)
        comment (str): comment to associate with this operation

    Returns: id of file revision
    """
    return await self._createOrUpdate(False, path, configData, annex, comment)

Updates a file at a specified path with given data and comment.

Args

path : str
the file path relative to the repository root
configData : ConfigData
contents of the file
annex : bool
true if the file is annex and requires special handling (external storage)
comment : str
comment to associate with this operation

Returns: id of file revision

class FileType (*args, **kwds)
Expand source code
class FileType(Enum):
    """
    Represents the type of storage for a configuration file
    """
    Normal = 0
    Annex = 1

Represents the type of storage for a configuration file

Ancestors

  • enum.Enum

Class variables

var Annex

The type of the None singleton.

var Normal

The type of the None singleton.