Module csw.ConfigService
Classes
class ConfigData (content: bytes)
-
Expand source code
@dataclass class ConfigData: """ Represents the contents of a file in the Config Service. You can get the content as a string with `s = configData.content.decode('utf-8')`. Create from a string with `ConfigData(bytes('hello', 'utf-8'))`. """ content: bytes def __str__(self): return self.content.decode('utf-8')
Represents the contents of a file in the Config Service. You can get the content as a string with
s = configData.content.decode('utf-8')
. Create from a string withConfigData(bytes('hello', 'utf-8'))
.Instance variables
var content : bytes
-
The type of the None singleton.
class ConfigFileInfo (path: str, id: str, author: str, comment: str)
-
Expand source code
@dataclass_json @dataclass class ConfigFileInfo: """ Contains information about a config file stored in the config service """ path: str id: str author: str comment: str
Contains information about a config file stored in the config service
Static methods
def from_dict(kvs: dict | list | str | int | float | bool | None, *, infer_missing=False) ‑> ~A
def from_json(s: str | bytes | bytearray,
*,
parse_float=None,
parse_int=None,
parse_constant=None,
infer_missing=False,
**kw) ‑> ~Adef schema(*,
infer_missing: bool = False,
only=None,
exclude=(),
many: bool = False,
context=None,
load_only=(),
dump_only=(),
partial: bool = False,
unknown=None) ‑> dataclasses_json.mm.SchemaF[~A]
Instance variables
-
The type of the None singleton.
var comment : str
-
The type of the None singleton.
var id : str
-
The type of the None singleton.
var path : str
-
The type of the None singleton.
Methods
def to_dict(self, encode_json=False) ‑> Dict[str, dict | list | str | int | float | bool | None]
-
Expand source code
def to_dict(self, encode_json=False) -> Dict[str, Json]: return _asdict(self, encode_json=encode_json)
def to_json(self,
*,
skipkeys: bool = False,
ensure_ascii: bool = True,
check_circular: bool = True,
allow_nan: bool = True,
indent: int | str | None = None,
separators: Tuple[str, str] | None = None,
default: Callable | None = None,
sort_keys: bool = False,
**kw) ‑> str-
Expand source code
def to_json(self, *, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Optional[Union[int, str]] = None, separators: Optional[Tuple[str, str]] = None, default: Optional[Callable] = None, sort_keys: bool = False, **kw) -> str: return json.dumps(self.to_dict(encode_json=False), cls=_ExtendedEncoder, skipkeys=skipkeys, ensure_ascii=ensure_ascii, check_circular=check_circular, allow_nan=allow_nan, indent=indent, separators=separators, default=default, sort_keys=sort_keys, **kw)
class ConfigFileRevision (id: str, author: str, comment: str, time: str)
-
Expand source code
@dataclass_json @dataclass class ConfigFileRevision: """ Holds information about a specific version of a config file """ id: str author: str comment: str time: str
Holds information about a specific version of a config file
Static methods
def from_dict(kvs: dict | list | str | int | float | bool | None, *, infer_missing=False) ‑> ~A
def from_json(s: str | bytes | bytearray,
*,
parse_float=None,
parse_int=None,
parse_constant=None,
infer_missing=False,
**kw) ‑> ~Adef schema(*,
infer_missing: bool = False,
only=None,
exclude=(),
many: bool = False,
context=None,
load_only=(),
dump_only=(),
partial: bool = False,
unknown=None) ‑> dataclasses_json.mm.SchemaF[~A]
Instance variables
-
The type of the None singleton.
var comment : str
-
The type of the None singleton.
var id : str
-
The type of the None singleton.
var time : str
-
The type of the None singleton.
Methods
def to_dict(self, encode_json=False) ‑> Dict[str, dict | list | str | int | float | bool | None]
-
Expand source code
def to_dict(self, encode_json=False) -> Dict[str, Json]: return _asdict(self, encode_json=encode_json)
def to_json(self,
*,
skipkeys: bool = False,
ensure_ascii: bool = True,
check_circular: bool = True,
allow_nan: bool = True,
indent: int | str | None = None,
separators: Tuple[str, str] | None = None,
default: Callable | None = None,
sort_keys: bool = False,
**kw) ‑> str-
Expand source code
def to_json(self, *, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Optional[Union[int, str]] = None, separators: Optional[Tuple[str, str]] = None, default: Optional[Callable] = None, sort_keys: bool = False, **kw) -> str: return json.dumps(self.to_dict(encode_json=False), cls=_ExtendedEncoder, skipkeys=skipkeys, ensure_ascii=ensure_ascii, check_circular=check_circular, allow_nan=allow_nan, indent=indent, separators=separators, default=default, sort_keys=sort_keys, **kw)
class ConfigId (id: str)
-
Expand source code
@dataclass_json @dataclass class ConfigId: """ Type of an id returned from ConfigManager create or update methods """ id: str
Type of an id returned from ConfigManager create or update methods
Static methods
def from_dict(kvs: dict | list | str | int | float | bool | None, *, infer_missing=False) ‑> ~A
def from_json(s: str | bytes | bytearray,
*,
parse_float=None,
parse_int=None,
parse_constant=None,
infer_missing=False,
**kw) ‑> ~Adef schema(*,
infer_missing: bool = False,
only=None,
exclude=(),
many: bool = False,
context=None,
load_only=(),
dump_only=(),
partial: bool = False,
unknown=None) ‑> dataclasses_json.mm.SchemaF[~A]
Instance variables
var id : str
-
The type of the None singleton.
Methods
def to_dict(self, encode_json=False) ‑> Dict[str, dict | list | str | int | float | bool | None]
-
Expand source code
def to_dict(self, encode_json=False) -> Dict[str, Json]: return _asdict(self, encode_json=encode_json)
def to_json(self,
*,
skipkeys: bool = False,
ensure_ascii: bool = True,
check_circular: bool = True,
allow_nan: bool = True,
indent: int | str | None = None,
separators: Tuple[str, str] | None = None,
default: Callable | None = None,
sort_keys: bool = False,
**kw) ‑> str-
Expand source code
def to_json(self, *, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Optional[Union[int, str]] = None, separators: Optional[Tuple[str, str]] = None, default: Optional[Callable] = None, sort_keys: bool = False, **kw) -> str: return json.dumps(self.to_dict(encode_json=False), cls=_ExtendedEncoder, skipkeys=skipkeys, ensure_ascii=ensure_ascii, check_circular=check_circular, allow_nan=allow_nan, indent=indent, separators=separators, default=default, sort_keys=sort_keys, **kw)
class ConfigMetadata (repoPath: str, annexPath: str, annexMinFileSize: str, maxConfigFileSize: str)
-
Expand source code
@dataclass_json @dataclass class ConfigMetadata: """ Holds metadata information about config server """ repoPath: str annexPath: str annexMinFileSize: str maxConfigFileSize: str
Holds metadata information about config server
Static methods
def from_dict(kvs: dict | list | str | int | float | bool | None, *, infer_missing=False) ‑> ~A
def from_json(s: str | bytes | bytearray,
*,
parse_float=None,
parse_int=None,
parse_constant=None,
infer_missing=False,
**kw) ‑> ~Adef schema(*,
infer_missing: bool = False,
only=None,
exclude=(),
many: bool = False,
context=None,
load_only=(),
dump_only=(),
partial: bool = False,
unknown=None) ‑> dataclasses_json.mm.SchemaF[~A]
Instance variables
var annexMinFileSize : str
-
The type of the None singleton.
var annexPath : str
-
The type of the None singleton.
var maxConfigFileSize : str
-
The type of the None singleton.
var repoPath : str
-
The type of the None singleton.
Methods
def to_dict(self, encode_json=False) ‑> Dict[str, dict | list | str | int | float | bool | None]
-
Expand source code
def to_dict(self, encode_json=False) -> Dict[str, Json]: return _asdict(self, encode_json=encode_json)
def to_json(self,
*,
skipkeys: bool = False,
ensure_ascii: bool = True,
check_circular: bool = True,
allow_nan: bool = True,
indent: int | str | None = None,
separators: Tuple[str, str] | None = None,
default: Callable | None = None,
sort_keys: bool = False,
**kw) ‑> str-
Expand source code
def to_json(self, *, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Optional[Union[int, str]] = None, separators: Optional[Tuple[str, str]] = None, default: Optional[Callable] = None, sort_keys: bool = False, **kw) -> str: return json.dumps(self.to_dict(encode_json=False), cls=_ExtendedEncoder, skipkeys=skipkeys, ensure_ascii=ensure_ascii, check_circular=check_circular, allow_nan=allow_nan, indent=indent, separators=separators, default=default, sort_keys=sort_keys, **kw)
class ConfigService (clientSession: aiohttp.client.ClientSession,
client_id: str = 'tmt-frontend-app',
user: str = 'config-admin1',
password: str = 'config-admin1')-
Expand source code
class ConfigService: def __init__(self, clientSession: ClientSession, client_id: str = "tmt-frontend-app", user: str = "config-admin1", password: str = "config-admin1",): self._session = clientSession self.client_id = client_id self.user = user self.password = password self._locationService = LocationService(clientSession) @staticmethod def _formatTime(time: datetime): return time.strftime("%Y-%m-%dT%H:%M:%SZ") async def _getBaseUri(self) -> str: prefix = Prefix(Subsystem.CSW, "ConfigServer") connection = ConnectionInfo.make(prefix, ComponentType.Service, ConnectionType.HttpType) location = await self._locationService.resolve(connection) if location is not None: location.__class__ = HttpLocation return location.uri raise RuntimeError("Can't locate CSW Config Service") async def _endPoint(self, path: str) -> str: return f'{await self._getBaseUri()}{path}' async def _locateAuthService(self) -> str: connection = ConnectionInfo.make(Prefix(Subsystem.CSW, "AAS"), ComponentType.Service, ConnectionType.HttpType) location = await self._locationService.resolve(connection) if location is not None: location.__class__ = HttpLocation return location.uri raise RuntimeError("Can't locate CSW Auth Service") async def _getToken(self): uri = await self._locateAuthService() keycloak_openid = KeycloakOpenID(server_url=f'{uri}/', client_id=self.client_id, realm_name='TMT') d = keycloak_openid.token(self.user, self.password) return d['access_token'] @staticmethod def _validatePath(path: str): invalidChars = "!#<>$%&'@^`~+,;=\\s" if re.match(invalidChars, path): charsMessage = invalidChars.replace('\\s', '') raise RuntimeError( f"Input file path '{path}' contains invalid characters. " + f"Note, these characters {charsMessage} or 'white space' are not allowed in file path`") async def _createOrUpdate(self, create: bool, path: str, configData: ConfigData, annex: bool = False, comment: str = "Created") -> ConfigId: self._validatePath(path) token = await self._getToken() params = urlencode({'annex': annex, 'comment': comment}) baseUri = await self._endPoint(f'config/{path}') uri = f'{baseUri}?{params}' headers = {'Content-type': 'application/octet-stream', 'Authorization': f'Bearer {token}'} if create: response = await self._session.post(uri, headers=headers, data=configData.content) else: response = await self._session.put(uri, headers=headers, data=configData.content) if not response.ok: raise RuntimeError(await response.text()) return ConfigId(await response.json()) async def create(self, path: str, configData: ConfigData, annex: bool = False, comment: str = "Created") -> ConfigId: """ Creates a file at a specified path with given data and comment. Args: path (str): the file path relative to the repository root configData (ConfigData): contents of the file annex (bool): true if the file is annex and requires special handling (external storage) comment (str): comment to associate with this operation Returns: id of file revision """ return await self._createOrUpdate(True, path, configData, annex, comment) async def update(self, path: str, configData: ConfigData, annex: bool = False, comment: str = "Created") -> ConfigId: """ Updates a file at a specified path with given data and comment. Args: path (str): the file path relative to the repository root configData (ConfigData): contents of the file annex (bool): true if the file is annex and requires special handling (external storage) comment (str): comment to associate with this operation Returns: id of file revision """ return await self._createOrUpdate(False, path, configData, annex, comment) async def delete(self, path: str, comment: str = "Deleted"): """ Deletes the given config file (older versions will still be available). Args: path: the file path relative to the repository root comment: comment to associate with this operation """ token = await self._getToken() params = urlencode({'comment': comment}) baseUri = await self._endPoint(f'config/{path}') uri = f'{baseUri}?{params}' headers = {'Authorization': f'Bearer {token}'} response = await self._session.delete(uri, headers=headers) if not response.ok: raise RuntimeError(await response.text()) # noinspection PyUnresolvedReferences async def list(self, fileType: FileType = None, pattern: str = None) -> List[ConfigFileInfo]: """ Returns a list containing all the known config files of given type(Annex or Normal) and whose name matches the provided pattern. Args: fileType: optional file type(Annex or Normal) pattern: optional pattern to match against the file name Returns: list of ConfigFileInfo """ params = {} if fileType: params.update({'type': fileType.name}) if pattern: params.update({'pattern': pattern}) uri = f"{await self._endPoint('list')}?{urlencode(params)}" response = await self._session.get(uri) return list(map(lambda p: ConfigFileInfo.from_dict(p), await response.json())) async def exists(self, path: str, configId: ConfigId = None): """ Returns true if the given path exists and is being managed. Args: path: the file path relative to the repository root configId: revision of the file Returns: true if the file exists in the repo """ params = {} if configId: params.update({'id': configId.id}) uri = f"{await self._endPoint('config')}/{path}?{urlencode(params)}" response = await self._session.head(uri) return response.ok async def getLatest(self, path: str) -> ConfigData: """ Gets and returns the content of latest version of the file stored under the given path. Args: path (str): the file path relative to the repository root Returns: file contents """ uri = f"{await self._endPoint('config')}/{path}" response = await self._session.get(uri) if not response.ok: raise RuntimeError(await response.text()) return ConfigData(await response.content.read()) async def getById(self, path: str, configId: ConfigId) -> ConfigData: """ Gets and returns the file at the given path with the specified revision id Args: path (str): the file path relative to the repository root configId (ConfigId): id used to specify a specific version to fetch Returns: file contents """ params = {'id': configId.id} uri = f"{await self._endPoint('config')}/{path}?{urlencode(params)}" response = await self._session.get(uri) if not response.ok: raise RuntimeError(await response.text()) return ConfigData(await response.content.read()) async def getByTime(self, path: str, time: datetime) -> ConfigData: """ Gets the file at the given path as it existed on the given instant. If instant is before the file was created, the initial version is returned. If instant is after the last change, the most recent version is returned. Args: path (str): the file path relative to the repository root time (datetime): the target instant Returns: file contents """ params = {'date': self._formatTime(time)} uri = f"{await self._endPoint('config')}/{path}?{urlencode(params)}" response = await self._session.get(uri) if not response.ok: raise RuntimeError(await response.text()) return ConfigData(await response.content.read()) async def getActive(self, path: str) -> ConfigData: """ Gets and returns the content of active version of the file stored under the given path. Args: path (str): the file path relative to the repository root Returns: file contents """ uri = f"{await self._endPoint('active-config')}/{path}" response = await self._session.get(uri) if not response.ok: raise RuntimeError(await response.text()) return ConfigData(await response.content.read()) async def getActiveByTime(self, path: str, time: datetime) -> ConfigData: """ Returns the content of active version of the file at the given path as it existed on the given instant Args: path (str): the file path relative to the repository root time (datetime): the target instant Returns: file contents """ params = {'date': self._formatTime(time)} uri = f"{await self._endPoint('active-config')}/{path}?{urlencode(params)}" response = await self._session.get(uri) if not response.ok: raise RuntimeError(await response.text()) return ConfigData(await response.content.read()) async def getActiveVersion(self, path: str) -> ConfigId: """ Returns the version which represents the "active version" of the file at the given path. Args: path (str): the file path relative to the repository root Returns: ConfigId indicating the id of the active version """ uri = f"{await self._endPoint('active-version')}/{path}" response = await self._session.get(uri) if not response.ok: raise RuntimeError(await response.text()) return ConfigId(await response.json()) # noinspection PyUnresolvedReferences async def getMetadata(self) -> ConfigMetadata: """ Query the metadata of config server. Returns: a ConfigMetadata object """ uri = f"{await self._endPoint('metadata')}" response = await self._session.get(uri) if not response.ok: raise RuntimeError(await response.text()) return ConfigMetadata.from_dict(await response.json()) # noinspection PyUnresolvedReferences async def _history(self, key: str, path: str, fromTime: datetime, toTime: datetime, maxResults: int) -> List[ConfigFileRevision]: params = {} if fromTime: params.update({'from': self._formatTime(fromTime)}) if toTime: params.update({'to': self._formatTime(toTime)}) if maxResults: params.update({'maxResults': maxResults}) uri = f"{await self._endPoint(key)}/{path}?{urlencode(params)}" response = await self._session.get(uri) if not response.ok: raise RuntimeError(await response.text()) return list(map(lambda p: ConfigFileRevision.from_dict(p), await response.json())) async def history(self, path: str, fromTime: datetime = None, toTime: datetime = None, maxResults: int = 10000) -> List[ConfigFileRevision]: """ Returns the history of versions of the file at the given path for a range of period specified by from and to. The size of the list is limited up to maxResults. Args: path: the file path relative to the repository root fromTime: optional start of the history range toTime: optional end of the history range maxResults: optional maximum number of history results to return (default: unlimited) Returns: list of ConfigFileRevision """ return await self._history('history', path, fromTime, toTime, maxResults) async def historyActive(self, path: str, fromTime: datetime = None, toTime: datetime = None, maxResults: int = None) -> List[ConfigFileRevision]: """ Returns the history of active versions of the file at the given path for a range of period specified by fromTime and toTime. The size of the list is limited upto maxResults. Args: path: the file path relative to the repository root fromTime: optional start of the history range toTime: optional end of the history range maxResults: optional maximum number of history results to return (default: unlimited) Returns: list of ConfigFileRevision """ return await self._history('history-active', path, fromTime, toTime, maxResults) async def setActiveVersion(self, path: str, configId: ConfigId, comment: str): """ Sets the active version to be the version provided for the file at the given path. If this method is not called, the active version will always be the version with which the file was created. After calling this method, the version with the given Id will be the active version. Args: path: the file path relative to the repository root configId: an id used to specify a specific version (by default the id of the version with which the file was created i.e. 1) comment: comment to associate with this operation """ token = await self._getToken() params = {'id': configId.id, 'comment': comment} headers = {'Authorization': f'Bearer {token}'} uri = f"{await self._endPoint('active-version')}/{path}?{urlencode(params)}" response = await self._session.put(uri, headers=headers) if not response.ok: raise RuntimeError(await response.text()) async def resetActiveVersion(self, path: str, comment: str): """ Resets the "active version" of the file at the given path to the latest version. Args: path: the file path relative to the repository root comment: comment to associate with this operation """ token = await self._getToken() params = {'comment': comment} headers = {'Authorization': f'Bearer {token}'} uri = f"{await self._endPoint('active-version')}/{path}?{urlencode(params)}" response = await self._session.put(uri, headers=headers) if not response.ok: raise RuntimeError(await response.text())
Methods
async def create(self,
path: str,
configData: ConfigData,
annex: bool = False,
comment: str = 'Created') ‑> ConfigId-
Expand source code
async def create(self, path: str, configData: ConfigData, annex: bool = False, comment: str = "Created") -> ConfigId: """ Creates a file at a specified path with given data and comment. Args: path (str): the file path relative to the repository root configData (ConfigData): contents of the file annex (bool): true if the file is annex and requires special handling (external storage) comment (str): comment to associate with this operation Returns: id of file revision """ return await self._createOrUpdate(True, path, configData, annex, comment)
Creates a file at a specified path with given data and comment.
Args
path
:str
- the file path relative to the repository root
configData
:ConfigData
- contents of the file
annex
:bool
- true if the file is annex and requires special handling (external storage)
comment
:str
- comment to associate with this operation
Returns: id of file revision
async def delete(self, path: str, comment: str = 'Deleted')
-
Expand source code
async def delete(self, path: str, comment: str = "Deleted"): """ Deletes the given config file (older versions will still be available). Args: path: the file path relative to the repository root comment: comment to associate with this operation """ token = await self._getToken() params = urlencode({'comment': comment}) baseUri = await self._endPoint(f'config/{path}') uri = f'{baseUri}?{params}' headers = {'Authorization': f'Bearer {token}'} response = await self._session.delete(uri, headers=headers) if not response.ok: raise RuntimeError(await response.text())
Deletes the given config file (older versions will still be available).
Args
path
- the file path relative to the repository root
comment
- comment to associate with this operation
async def exists(self,
path: str,
configId: ConfigId = None)-
Expand source code
async def exists(self, path: str, configId: ConfigId = None): """ Returns true if the given path exists and is being managed. Args: path: the file path relative to the repository root configId: revision of the file Returns: true if the file exists in the repo """ params = {} if configId: params.update({'id': configId.id}) uri = f"{await self._endPoint('config')}/{path}?{urlencode(params)}" response = await self._session.head(uri) return response.ok
Returns true if the given path exists and is being managed.
Args
path
- the file path relative to the repository root
configId
- revision of the file
Returns: true if the file exists in the repo
async def getActive(self, path: str) ‑> ConfigData
-
Expand source code
async def getActive(self, path: str) -> ConfigData: """ Gets and returns the content of active version of the file stored under the given path. Args: path (str): the file path relative to the repository root Returns: file contents """ uri = f"{await self._endPoint('active-config')}/{path}" response = await self._session.get(uri) if not response.ok: raise RuntimeError(await response.text()) return ConfigData(await response.content.read())
Gets and returns the content of active version of the file stored under the given path.
Args
path
:str
- the file path relative to the repository root
Returns: file contents
async def getActiveByTime(self, path: str, time: datetime.datetime) ‑> ConfigData
-
Expand source code
async def getActiveByTime(self, path: str, time: datetime) -> ConfigData: """ Returns the content of active version of the file at the given path as it existed on the given instant Args: path (str): the file path relative to the repository root time (datetime): the target instant Returns: file contents """ params = {'date': self._formatTime(time)} uri = f"{await self._endPoint('active-config')}/{path}?{urlencode(params)}" response = await self._session.get(uri) if not response.ok: raise RuntimeError(await response.text()) return ConfigData(await response.content.read())
Returns the content of active version of the file at the given path as it existed on the given instant
Args
path
:str
- the file path relative to the repository root
time
:datetime
- the target instant
Returns: file contents
async def getActiveVersion(self, path: str) ‑> ConfigId
-
Expand source code
async def getActiveVersion(self, path: str) -> ConfigId: """ Returns the version which represents the "active version" of the file at the given path. Args: path (str): the file path relative to the repository root Returns: ConfigId indicating the id of the active version """ uri = f"{await self._endPoint('active-version')}/{path}" response = await self._session.get(uri) if not response.ok: raise RuntimeError(await response.text()) return ConfigId(await response.json())
Returns the version which represents the "active version" of the file at the given path.
Args
path
:str
- the file path relative to the repository root
Returns: ConfigId indicating the id of the active version
async def getById(self,
path: str,
configId: ConfigId) ‑> ConfigData-
Expand source code
async def getById(self, path: str, configId: ConfigId) -> ConfigData: """ Gets and returns the file at the given path with the specified revision id Args: path (str): the file path relative to the repository root configId (ConfigId): id used to specify a specific version to fetch Returns: file contents """ params = {'id': configId.id} uri = f"{await self._endPoint('config')}/{path}?{urlencode(params)}" response = await self._session.get(uri) if not response.ok: raise RuntimeError(await response.text()) return ConfigData(await response.content.read())
Gets and returns the file at the given path with the specified revision id
Args
path
:str
- the file path relative to the repository root
configId
:ConfigId
- id used to specify a specific version to fetch
Returns: file contents
async def getByTime(self, path: str, time: datetime.datetime) ‑> ConfigData
-
Expand source code
async def getByTime(self, path: str, time: datetime) -> ConfigData: """ Gets the file at the given path as it existed on the given instant. If instant is before the file was created, the initial version is returned. If instant is after the last change, the most recent version is returned. Args: path (str): the file path relative to the repository root time (datetime): the target instant Returns: file contents """ params = {'date': self._formatTime(time)} uri = f"{await self._endPoint('config')}/{path}?{urlencode(params)}" response = await self._session.get(uri) if not response.ok: raise RuntimeError(await response.text()) return ConfigData(await response.content.read())
Gets the file at the given path as it existed on the given instant. If instant is before the file was created, the initial version is returned. If instant is after the last change, the most recent version is returned.
Args
path
:str
- the file path relative to the repository root
time
:datetime
- the target instant
Returns: file contents
async def getLatest(self, path: str) ‑> ConfigData
-
Expand source code
async def getLatest(self, path: str) -> ConfigData: """ Gets and returns the content of latest version of the file stored under the given path. Args: path (str): the file path relative to the repository root Returns: file contents """ uri = f"{await self._endPoint('config')}/{path}" response = await self._session.get(uri) if not response.ok: raise RuntimeError(await response.text()) return ConfigData(await response.content.read())
Gets and returns the content of latest version of the file stored under the given path.
Args
path
:str
- the file path relative to the repository root
Returns: file contents
async def getMetadata(self) ‑> ConfigMetadata
-
Expand source code
async def getMetadata(self) -> ConfigMetadata: """ Query the metadata of config server. Returns: a ConfigMetadata object """ uri = f"{await self._endPoint('metadata')}" response = await self._session.get(uri) if not response.ok: raise RuntimeError(await response.text()) return ConfigMetadata.from_dict(await response.json())
Query the metadata of config server. Returns: a ConfigMetadata object
async def history(self,
path: str,
fromTime: datetime.datetime = None,
toTime: datetime.datetime = None,
maxResults: int = 10000) ‑> List[ConfigFileRevision]-
Expand source code
async def history(self, path: str, fromTime: datetime = None, toTime: datetime = None, maxResults: int = 10000) -> List[ConfigFileRevision]: """ Returns the history of versions of the file at the given path for a range of period specified by from and to. The size of the list is limited up to maxResults. Args: path: the file path relative to the repository root fromTime: optional start of the history range toTime: optional end of the history range maxResults: optional maximum number of history results to return (default: unlimited) Returns: list of ConfigFileRevision """ return await self._history('history', path, fromTime, toTime, maxResults)
Returns the history of versions of the file at the given path for a range of period specified by from and to. The size of the list is limited up to maxResults.
Args
path
- the file path relative to the repository root
fromTime
- optional start of the history range
toTime
- optional end of the history range
maxResults
- optional maximum number of history results to return (default: unlimited)
Returns: list of ConfigFileRevision
async def historyActive(self,
path: str,
fromTime: datetime.datetime = None,
toTime: datetime.datetime = None,
maxResults: int = None) ‑> List[ConfigFileRevision]-
Expand source code
async def historyActive(self, path: str, fromTime: datetime = None, toTime: datetime = None, maxResults: int = None) -> List[ConfigFileRevision]: """ Returns the history of active versions of the file at the given path for a range of period specified by fromTime and toTime. The size of the list is limited upto maxResults. Args: path: the file path relative to the repository root fromTime: optional start of the history range toTime: optional end of the history range maxResults: optional maximum number of history results to return (default: unlimited) Returns: list of ConfigFileRevision """ return await self._history('history-active', path, fromTime, toTime, maxResults)
Returns the history of active versions of the file at the given path for a range of period specified by fromTime and toTime. The size of the list is limited upto maxResults.
Args
path
- the file path relative to the repository root
fromTime
- optional start of the history range
toTime
- optional end of the history range
maxResults
- optional maximum number of history results to return (default: unlimited)
Returns: list of ConfigFileRevision
async def list(self,
fileType: FileType = None,
pattern: str = None) ‑> List[ConfigFileInfo]-
Expand source code
async def list(self, fileType: FileType = None, pattern: str = None) -> List[ConfigFileInfo]: """ Returns a list containing all the known config files of given type(Annex or Normal) and whose name matches the provided pattern. Args: fileType: optional file type(Annex or Normal) pattern: optional pattern to match against the file name Returns: list of ConfigFileInfo """ params = {} if fileType: params.update({'type': fileType.name}) if pattern: params.update({'pattern': pattern}) uri = f"{await self._endPoint('list')}?{urlencode(params)}" response = await self._session.get(uri) return list(map(lambda p: ConfigFileInfo.from_dict(p), await response.json()))
Returns a list containing all the known config files of given type(Annex or Normal) and whose name matches the provided pattern.
Args
fileType
- optional file type(Annex or Normal)
pattern
- optional pattern to match against the file name
Returns: list of ConfigFileInfo
async def resetActiveVersion(self, path: str, comment: str)
-
Expand source code
async def resetActiveVersion(self, path: str, comment: str): """ Resets the "active version" of the file at the given path to the latest version. Args: path: the file path relative to the repository root comment: comment to associate with this operation """ token = await self._getToken() params = {'comment': comment} headers = {'Authorization': f'Bearer {token}'} uri = f"{await self._endPoint('active-version')}/{path}?{urlencode(params)}" response = await self._session.put(uri, headers=headers) if not response.ok: raise RuntimeError(await response.text())
Resets the "active version" of the file at the given path to the latest version.
Args
path
- the file path relative to the repository root
comment
- comment to associate with this operation
async def setActiveVersion(self,
path: str,
configId: ConfigId,
comment: str)-
Expand source code
async def setActiveVersion(self, path: str, configId: ConfigId, comment: str): """ Sets the active version to be the version provided for the file at the given path. If this method is not called, the active version will always be the version with which the file was created. After calling this method, the version with the given Id will be the active version. Args: path: the file path relative to the repository root configId: an id used to specify a specific version (by default the id of the version with which the file was created i.e. 1) comment: comment to associate with this operation """ token = await self._getToken() params = {'id': configId.id, 'comment': comment} headers = {'Authorization': f'Bearer {token}'} uri = f"{await self._endPoint('active-version')}/{path}?{urlencode(params)}" response = await self._session.put(uri, headers=headers) if not response.ok: raise RuntimeError(await response.text())
Sets the active version to be the version provided for the file at the given path. If this method is not called, the active version will always be the version with which the file was created. After calling this method, the version with the given Id will be the active version.
Args
path
- the file path relative to the repository root
configId
- an id used to specify a specific version (by default the id of the version with which the file was created i.e. 1)
comment
- comment to associate with this operation
async def update(self,
path: str,
configData: ConfigData,
annex: bool = False,
comment: str = 'Created') ‑> ConfigId-
Expand source code
async def update(self, path: str, configData: ConfigData, annex: bool = False, comment: str = "Created") -> ConfigId: """ Updates a file at a specified path with given data and comment. Args: path (str): the file path relative to the repository root configData (ConfigData): contents of the file annex (bool): true if the file is annex and requires special handling (external storage) comment (str): comment to associate with this operation Returns: id of file revision """ return await self._createOrUpdate(False, path, configData, annex, comment)
Updates a file at a specified path with given data and comment.
Args
path
:str
- the file path relative to the repository root
configData
:ConfigData
- contents of the file
annex
:bool
- true if the file is annex and requires special handling (external storage)
comment
:str
- comment to associate with this operation
Returns: id of file revision
class FileType (*args, **kwds)
-
Expand source code
class FileType(Enum): """ Represents the type of storage for a configuration file """ Normal = 0 Annex = 1
Represents the type of storage for a configuration file
Ancestors
- enum.Enum
Class variables
var Annex
-
The type of the None singleton.
var Normal
-
The type of the None singleton.