"""Base Archiver HTTP client module."""
from __future__ import annotations
import logging
import urllib.parse
from typing import Any
import requests
from requests import Response
from epicsarchiver_mgmt.exceptions import BaseMgmtError
[docs]
LOG: logging.Logger = logging.getLogger(__name__)
[docs]
def mgmt_url(hostname: str, port: int) -> str:
"""Generate the mgmt url from a hostname and a port number.
Args:
hostname (str): fqdn of service
port (int): Port number
Returns:
str: Completed url, for example "http://localhost:17665/mgmt/bpl/"
"""
return f"http://{hostname}:{port}/mgmt/bpl/"
[docs]
class ArchiverError(BaseMgmtError):
"""Base class for all exceptions raised by the archiver HTTP client."""
[docs]
class ArchiverConnectionError(ArchiverError):
"""Exception raised when there is a connection error with the archiver."""
def __init__(self, base_url: str, message: str | None = None) -> None:
"""Initialize the ArchiverConnectionError.
Args:
base_url (str): The base URL of the archiver.
message (str | None, optional): A custom error message. Defaults to None.
"""
[docs]
self.base_url = base_url
if message is None:
message = f"Failed to connect to archiver at {base_url}"
super().__init__(message)
[docs]
class ArchiverResponseError(ArchiverError):
"""Exception raised when the archiver returns an unexpected response."""
def __init__(
self,
base_url: str,
url: str | None = None,
response: str | None = None,
message: str | None = None,
) -> None:
"""Initialize the ArchiverResponseError.
Args:
base_url (str): The base URL of the archiver.
url (str | None, optional): The specific URL that caused the error.
Defaults to None.
response (str | None, optional): The response received from the archiver.
Defaults to None.
message (str | None, optional): A custom error message. Defaults to None.
"""
[docs]
self.base_url = base_url
if url is None:
url = base_url
[docs]
self.response = response
if message is None:
message = (
f"Received an unexpected response '{response}' from the archiver {self.base_url} for URL: {self.url}"
)
super().__init__(message)
[docs]
class BaseArchiverAppliance:
"""Base EPICS Archiver Appliance client.
Hold a session to the Archiver Appliance web application.
Args:
hostname: EPICS Archiver Appliance hostname [default: localhost]
port: EPICS Archiver Appliance management port [default: 17665]
"""
def __init__(self, hostname: str = "localhost", port: int = 17665) -> None:
"""Create Archiver Appliance object.
Args:
hostname (str, optional): hostname of archiver. Defaults to "localhost".
port (int, optional): port number of mgmt interface. Defaults to 17665.
"""
[docs]
self.hostname = hostname
[docs]
self.mgmt_url = mgmt_url(hostname, port)
[docs]
self._info: dict[str, str] = {}
[docs]
self._data_retrieval_url: str | None = None
[docs]
self.session = requests.Session()
[docs]
def __repr__(self) -> str:
"""String representation of Archiver Appliance.
Returns:
str: details including hostname of Archiver appliance.
"""
return f"ArchiverAppliance({self.hostname}, {self.port})"
[docs]
def _request(
self,
method: str,
url: str,
params: dict[str, str | list[str]] | dict[str, str] | None = None,
data: str | dict[str, str] | None = None,
json_data: Any | None = None, # noqa: ANN401
) -> Response:
"""Send a request using the session.
Args:
method: HTTP method
url: The URL to send the request to
params: Optional arguments
data: Optional data to be sent
json_data: Optional JSON data to be sent
Returns:
:class:`requests.Response <Response>` object
Raises:
ArchiverConnectionError: If there is a connection error.
ArchiverResponseError: If the response is not successful.
"""
try:
r = self.session.request(method, url, params=params, data=data, json=json_data)
r.raise_for_status()
except requests.ConnectionError as e:
raise ArchiverConnectionError(
base_url=self.mgmt_url,
) from e
except requests.HTTPError as e:
raise ArchiverResponseError(
base_url=self.mgmt_url,
url=url,
response=e.response.text if e.response else None,
) from e
else:
return r
[docs]
def _get(
self,
endpoint: str,
params: dict[str, str | list[str]] | dict[str, str] | None = None,
) -> Response:
"""Send a GET request to the given endpoint.
Args:
endpoint: API endpoint (relative or absolute)
params: Optional arguments to be sent
Returns:
:class:`requests.Response <Response>` object
"""
url = urllib.parse.urljoin(self.mgmt_url, endpoint.lstrip("/"))
LOG.debug("GET url: %s", url)
return self._request("GET", url, params=params)
[docs]
def _post(
self,
endpoint: str,
params: dict[str, str | list[str]] | dict[str, str] | None = None,
data: str | dict[str, str] | None = None,
json_data: Any | None = None, # noqa: ANN401
) -> Response:
"""Send a POST request to the given endpoint.
Args:
endpoint: API endpoint (relative or absolute)
params: Optional arguments to be sent
data: Optional data to be sent
json_data: Optional JSON data to be sent
Returns:
:class:`requests.Response <Response>` object
"""
url = urllib.parse.urljoin(self.mgmt_url, endpoint.lstrip("/"))
return self._request("POST", url, params=params, data=data, json_data=json_data)
@property
[docs]
def info(self) -> dict[str, str]:
"""EPICS Archiver Appliance information."""
if not self._info:
r = self._get("/getApplianceInfo")
self._info = r.json()
return self._info
@property
[docs]
def identity(self) -> str | None:
"""EPICS Archiver Appliance identity."""
return self.info.get("identity")
@property
[docs]
def version(self) -> str | None:
"""EPICS Archiver Appliance version."""
return self.info.get("version")
[docs]
def _get_or_post(self, endpoint: str, pv: str) -> Any: # noqa: ANN401
"""Send a GET or POST if pv is a comma separated list.
Args:
endpoint (str): API endpoint
pv (str): name of the pv. Can be a GLOB wildcards or a list of
comma separated names.
Returns:
Any: list of submitted PVs
"""
r = self._post(endpoint, data=pv) if "," in pv else self._get(endpoint, params={"pv": pv})
return r.json()