"""Archiver Mgmt operations module."""
from __future__ import annotations
import enum
import logging
from dataclasses import dataclass
from enum import auto
from typing import cast
from epicsarchiver_mgmt.archiver.info import (
ArchiverMgmtInfo,
InfoResultList,
TypeInfo,
)
[docs]
LOG: logging.Logger = logging.getLogger(__name__)
[docs]
class Storage(enum.StrEnum):
"""Represents the different storage levels of the archiver appliance."""
[docs]
class PutInfoType(enum.Enum):
"""Represents the different types of put type info."""
[docs]
CreateNew = enum.auto()
[docs]
OperationResult = dict[str, str]
[docs]
OperationResultList = list[OperationResult]
[docs]
class EpicsProto(enum.StrEnum):
"""Represents the different input protocols of the archiver appliance."""
[docs]
def create_archive_request_pv_name(self, pv: str) -> str:
"""Return the PV name with the protocol.
Args:
pv (str): The PV name.
Returns:
str: The PV name with the protocol.
"""
return f"pva://{pv}" if self is EpicsProto.PVA else pv
[docs]
class SamplingMethod(enum.StrEnum):
"""Represents the different sampling methods of the archiver appliance."""
[docs]
class ArchDbrType(enum.Enum):
"""List of Dbr Types that the archiver uses."""
[docs]
DBR_SCALAR_STRING = auto()
[docs]
DBR_SCALAR_SHORT = auto()
[docs]
DBR_SCALAR_FLOAT = auto()
[docs]
DBR_SCALAR_ENUM = auto()
[docs]
DBR_SCALAR_BYTE = auto()
[docs]
DBR_SCALAR_INT = auto()
[docs]
DBR_SCALAR_DOUBLE = auto()
[docs]
DBR_V4_GENERIC_BYTES = auto()
@dataclass
[docs]
class ArchivePVRequest:
"""Information to archive a PV."""
[docs]
samplingmethod: SamplingMethod | None = None
[docs]
samplingperiod: float | None = None
[docs]
controllingPV: str | None = None # noqa: N815
[docs]
policy: str | None = None
[docs]
appliance: str | None = None
[docs]
def as_dict(self) -> dict[str, str]:
"""Return the object as a dictionary."""
output = {"pv": self.pv}
if self.samplingmethod:
output["samplingmethod"] = self.samplingmethod.value
if self.samplingperiod:
output["samplingperiod"] = None if self.samplingperiod is None else str(self.samplingperiod)
if self.controllingPV:
output["controllingPV"] = self.controllingPV
if self.policy:
output["policy"] = self.policy
if self.appliance:
output["appliance"] = self.appliance
return output
[docs]
class ArchiverMgmt(ArchiverMgmtInfo):
"""Mgmt Operations EPICS Archiver Appliance client.
Hold a session to the Archiver Appliance web application and use the mgmt interface.
Args:
hostname: EPICS Archiver Appliance hostname [default: localhost]
port: EPICS Archiver Appliance management port [default: 17665]
"""
# EPICS Archiver Appliance documentation of mgmt endpoints:
# https://epicsarchiver.readthedocs.io/en/latest/developer/mgmt_scriptables.html
[docs]
def archive_pv(
self,
pv: str,
*,
sampling_period: float | None = None,
sampling_method: SamplingMethod | None = None,
controlling_pv: str | None = None,
policy: str | None = None,
appliance: str | None = None,
protocol: EpicsProto = EpicsProto.CA,
) -> OperationResultList:
"""Archive a PV.
Args:
pv (str): PV name.
sampling_period (str | None, optional): The sampling period, i.e. 1.0 is 1Hz.
Defaults to None.
sampling_method (SamplingMethod | None, optional): The sampling method, SCAN or MONITOR.
Defaults to None.
controlling_pv (str | None, optional): A pv to control when to archive this pv.
Defaults to None.
policy (str | None, optional): The policy, can be found at /mgmt/bpl/getPolicyList.
Defaults to None.
appliance (str | None, optional): Can specify a specific appliance.
Defaults to None.
protocol (EpicsProto, optional): Protocol to use. Defaults to EpicsProto.CA.
Returns:
OperationResultList: _description_
"""
return self.archive_pv_requests([
ArchivePVRequest(
pv=protocol.create_archive_request_pv_name(pv),
samplingmethod=sampling_method,
samplingperiod=sampling_period,
controllingPV=controlling_pv,
policy=policy,
appliance=appliance,
)
])
[docs]
def archive_pv_requests(self, pv_requests: list[ArchivePVRequest]) -> OperationResultList:
"""Archive a list of PVs with the given parameters.
Args:
pv_requests (list[ArchivePVRequest]): The pv requests.
Returns:
OperationResultList: Result of the operation.
"""
request_data = [request.as_dict() for request in pv_requests]
LOG.debug("Archiving PVs %s", request_data)
r = self._post("/archivePV", json_data=request_data)
return cast("OperationResultList", r.json())
[docs]
def pause_pv(self, pv: str) -> OperationResult:
"""Pause the archiving of a PV(s).
Args:
pv: name of the pv. Can be a GLOB wildcards or a list of
comma separated names.
Returns:
list of submitted PVs
"""
response = self._get_or_post("/pauseArchivingPV", pv)
return cast("OperationResult", response)
[docs]
def resume_pv(self, pv: str) -> OperationResult:
"""Resume the archiving of a PV(s).
Args:
pv: name of the pv. Can be a GLOB wildcards or a list of
comma separated names.
Returns:
list of submitted PVs
"""
response = self._get_or_post("/resumeArchivingPV", pv)
return cast("OperationResult", response)
[docs]
def abort_pv(self, pv: str) -> OperationResult:
"""Abort any pending requests for archiving this PV.
Args:
pv: name of the pv.
Returns:
OperationResult: Status of action and description.
Example: {"status":"ok","desc":"Aborted request for archiving PV PV1"}
"""
r = self._get("/abortArchivingPV", params={"pv": pv})
return cast("OperationResult", r.json())
[docs]
def add_alias(self, pv: str, alias_name: str) -> OperationResult:
"""Add an alias to a pv.
Args:
pv: PV to add alias.
alias_name: name of alias to add to pv.
Returns:
OperationResult: Status of action and description.
"""
r = self._get("/addAlias", params={"pv": pv, "aliasname": alias_name})
r_json = r.json()
return cast("OperationResult", r_json)
[docs]
def remove_alias(self, pv: str, alias_name: str) -> OperationResult:
"""Remove an alias to a pv.
Args:
pv: PV to remove alias.
alias_name: name of alias to remove from pv.
Returns:
OperationResult: Status of action and description.
"""
r = self._get("/removeAlias", params={"pv": pv, "aliasname": alias_name})
r_json = r.json()
return cast("OperationResult", r_json)
[docs]
def delete_pv(
self,
pv: str,
delete_data: bool = False, # noqa: FBT002, FBT001
) -> OperationResult:
"""Stop archiving the specified PV.
The PV needs to be paused first.
Args:
pv: name of the pv.
delete_data: delete the data that has already been recorded.
Default to False.
Returns:
list of submitted PVs
"""
response = self._get("/deletePV", params={"pv": pv, "delete_data": str(delete_data)})
return cast("OperationResult", response.json())
[docs]
def rename_pv(self, pv: str, newname: str) -> OperationResult:
"""Rename this pv to a new name.
The PV needs to be paused first.
Args:
pv (str): name of the pv.
newname (str): new name of the pv
Returns:
OperationResult: Status of action and description. Example:
{"status":"ok","desc":"Successfully renamed PV PV1 to PV2"}
"""
r = self._get("/renamePV", params={"pv": pv, "newname": newname})
return cast("OperationResult", r.json())
[docs]
def update_pv(
self,
pv: str,
samplingperiod: float | None = None,
samplingmethod: str | None = None,
) -> OperationResult:
"""Change the archival parameters for a PV.
Args:
pv: name of the pv.
samplingperiod: the new sampling period in seconds.
samplingmethod: the new sampling method [SCAN|MONITOR]
Returns:
list of submitted PV
"""
params: dict[str, str] = {"pv": pv}
if samplingperiod:
params["samplingperiod"] = str(samplingperiod)
if samplingmethod:
params["samplingmethod"] = samplingmethod
r = self._get("/changeArchivalParameters", params=params)
return cast("OperationResult", r.json())
[docs]
def rename_and_append(self, old: str, new: str, storage: Storage) -> OperationResult:
"""Appends the data for an older PV into a newer PV.
The older PV is deleted and an alias mapping the older PV name to
the new PV is added.
Args:
old (str): The name of the older pv.
The data for this PV will be appended to the newer PV and then deleted.
new (str): The name of the newer pv.
storage (Storage): The name of the store to consolidate data
before appending.
Returns:
OperationResultList: Result of the operation
"""
response = self._get(
"/appendAndAliasPV",
params={"olderpv": old, "newerpv": new, "storage": storage},
)
LOG.debug("/appendAndAliasPV response %s", response.json())
return cast("OperationResult", response.json())
[docs]
def change_type(self, pv: str, new_type: ArchDbrType) -> OperationResult:
"""Change the type of a pv to a new type.
Args:
pv (str): Name of the PV
new_type (ArchDbrType): New DBR_TYPE
Returns:
OperationResult
"""
LOG.info("Change type of pv %s to %s", pv, new_type)
response = self._get("/changeTypeForPV", params={"pv": pv, "newtype": new_type.name})
return cast("OperationResult", response.json())
[docs]
def put_pv_type_info(self, pv: str, type_info: TypeInfo, put_info_type: PutInfoType) -> TypeInfo:
"""Put the type info for a PV.
Args:
pv (str): Name of the PV
type_info (InfoResult): Type info
put_info_type (PutInfoType): Whether override or create new
Returns:
OperationResult: The updated type info
"""
LOG.info("Put type info for pv %s", pv)
params = {"pv": pv}
if put_info_type == PutInfoType.CreateNew:
params["createnew"] = "true"
params["override"] = "false"
elif put_info_type == PutInfoType.Override:
params["createnew"] = "false"
params["override"] = "true"
response = self._post(
"/putPVTypeInfo",
params=params,
json_data=type_info,
)
result = cast("TypeInfo", response.json())
LOG.debug("Put type info %s for pv %s", result, pv)
return cast("TypeInfo", response.json())
[docs]
def get_policy_list(self) -> dict[str, str]:
"""Get the list of policies.
Returns:
dictionary of policies names and descriptions
"""
r = self._get("/getPolicyList")
return cast("dict[str, str]", r.json())
@property
[docs]
def never_connected_pvs(self) -> InfoResultList:
"""Get the never connected PVs of the archiver.
Returns:
InfoResultList: List of never connected items.
"""
r = self._get("/getNeverConnectedPVs")
return cast("InfoResultList", r.json())
@property
[docs]
def appliances_in_cluster(self) -> list[dict[str, str]]:
"""Get the appliances in the cluster.
Returns:
InfoResultList: List of appliances in the cluster.
"""
r = self._get("/getAppliancesInCluster")
return cast("list[dict[str, str]]", r.json())