"""Basic commands to modify PVs in the archiver."""
from __future__ import annotations
import logging
from abc import abstractmethod
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from typing import TYPE_CHECKING
from requests import HTTPError
from epicsarchiver_mgmt.archiver.info import ArchiverMgmtInfo, ArchivingStatus
from epicsarchiver_mgmt.archiver.mgmt import (
ArchiverMgmt,
OperationResult,
)
from epicsarchiver_mgmt.commands.validation import (
OPERATION_RESULT_STATUS,
RequestHTTPError,
validate_archiver_fqdns,
validate_operation_results,
validate_pvs_status,
)
if TYPE_CHECKING:
from collections.abc import Sequence
[docs]
LOG: logging.Logger = logging.getLogger(__name__)
[docs]
PAUSE_EXPECTED_STATUS = {OPERATION_RESULT_STATUS: ["ok", "no"]}
@dataclass
[docs]
class BasicCommand:
"""Basic command to modify PVs in the archiver."""
[docs]
expected_statuses: Sequence[ArchivingStatus | None]
[docs]
expected_operation_results: dict[str, list[str]] | None = None
@abstractmethod
[docs]
def __call__(
self,
archiver: ArchiverMgmt,
pv: str,
) -> OperationResult:
"""Command to execute.
Args:
archiver (ArchiverMgmt): The archiver to use.
pv (str): The PV to modify.
Returns:
OperationResult: The result of the command.
"""
msg = "Subclasses must implement this method."
raise NotImplementedError(msg)
[docs]
def run_command(
self,
archiver_fqdns: list[str],
pvs: Sequence[str],
*,
skip_validation: bool = False,
) -> None:
"""Basic command to modify PVs in the archiver.
Args:
archiver_fqdns (list[str]): The urls of the archiver.
pvs (list[str]): The PVs to change.
skip_validation (bool): Whether to skip validation of the PVs status.
Raises:
RequestHTTPError: If there is an error changing the PVs.
"""
# Validate input
validate_archiver_fqdns(archiver_fqdns)
archiver_info = ArchiverMgmtInfo(archiver_fqdns[0])
if not skip_validation:
validate_pvs_status(archiver_info, pvs, self.expected_statuses)
# Action
LOG.info("%s PVs %s", self.command_name, pvs)
archivers = [ArchiverMgmt(fqdn) for fqdn in archiver_fqdns]
LOG.info("Using archivers %s", [archiver.info for archiver in archivers])
try:
def run_task(task_input: tuple[ArchiverMgmt, str]) -> OperationResult:
archiver, pv = task_input
return self(archiver, pv)
with ThreadPoolExecutor(max_workers=len(archivers)) as executer:
executer_input = [(archivers[i % len(archivers)], pv) for i, pv in enumerate(pvs)]
command_results = list(executer.map(run_task, executer_input))
except HTTPError as e:
LOG.error("Error %s PVs: %s", self.command_name, e) # noqa: TRY400
LOG.debug("Error %s PVs.", self.command_name, exc_info=True)
raise RequestHTTPError(e) from e
# Validate output
validate_operation_results(
pvs,
command_results,
f"{self.command_name} done",
expected_operation_results=self.expected_operation_results,
)
[docs]
class PauseCommand(BasicCommand):
"""Pause command to modify PVs in the archiver."""
def __init__(self) -> None:
"""Initialize the PauseCommand class."""
super().__init__(
command_name="Pausing",
expected_statuses=[
ArchivingStatus.BeingArchived,
ArchivingStatus.NotBeingArchived,
ArchivingStatus.Paused,
],
)
[docs]
def __call__(
self,
archiver: ArchiverMgmt,
pv: str,
) -> OperationResult:
"""Pause PV in the archiver.
Args:
archiver (ArchiverMgmt): The archiver to use.
pv (str): The PV to pause.
Returns:
OperationResult: The result of the command.
"""
return archiver.pause_pv(pv)
[docs]
class ResumeCommand(BasicCommand):
"""Resume command to modify PVs in the archiver."""
def __init__(self) -> None:
"""Initialize the ResumeCommand class."""
super().__init__(
command_name="Resuming",
expected_statuses=[
ArchivingStatus.Paused,
],
)
[docs]
def __call__(
self,
archiver: ArchiverMgmt,
pv: str,
) -> OperationResult:
"""Resume PV in the archiver.
Args:
archiver (ArchiverMgmt): The archiver to use.
pv (str): The PV to resume.
Returns:
OperationResult: The result of the command.
"""
return archiver.resume_pv(pv)
[docs]
class DeleteCommand(BasicCommand):
"""Delete command to modify PVs in the archiver."""
def __init__(self) -> None:
"""Initialize the DeleteCommand class."""
super().__init__(
command_name="Deleting",
expected_statuses=[
ArchivingStatus.Paused,
],
)
[docs]
def __call__(
self,
archiver: ArchiverMgmt,
pv: str,
) -> OperationResult:
"""Delete PV in the archiver.
Args:
archiver (ArchiverMgmt): The archiver to use.
pv (str): The PV to delete.
Returns:
OperationResult: The result of the command.
"""
return archiver.delete_pv(pv)
[docs]
class AbortCommand(BasicCommand):
"""Abort command to modify PVs in the archiver."""
def __init__(self) -> None:
"""Initialize the AbortCommand class."""
super().__init__(
command_name="Aborting",
expected_statuses=[ArchivingStatus.BeingArchived, None],
)
[docs]
def __call__(
self,
archiver: ArchiverMgmt,
pv: str,
) -> OperationResult:
"""Abort archiving PV in the archiver.
Args:
archiver (ArchiverMgmt): The archiver to use.
pv (str): The PV to abort.
Returns:
OperationResult: The result of the command.
"""
return archiver.abort_pv(pv)