Source code for epicsarchiver_mgmt.commands.archive

"""Provides the archive command to archive PVs."""

from __future__ import annotations

import logging

from requests import HTTPError

from epicsarchiver_mgmt.archiver.info import ArchiverMgmtInfo, ArchivingStatus
from epicsarchiver_mgmt.archiver.mgmt import (
    ArchivePVRequest,
    ArchiverMgmt,
)
from epicsarchiver_mgmt.commands.validation import (
    OPERATION_RESULT_STATUS,
    RequestHTTPError,
    validate_operation_results,
    validate_pvs_status,
)
from epicsarchiver_mgmt.exceptions import BaseMgmtError

[docs] LOG: logging.Logger = logging.getLogger(__name__)
[docs] class ArchivePolicyNotFoundError(BaseMgmtError): """Exception for when the policy is not found.""" def __init__(self, pv_request: ArchivePVRequest, policy_names: set[str]) -> None: """Error for when the policy is not found. Args: pv_request (ArchivePVRequest): The PV request policy_names (set[str]): The available policy names. """ super().__init__(f"Policy {pv_request.policy} not found in {policy_names} for archive request {pv_request.pv}.")
[docs] self.pv_request = pv_request
[docs] self.policy_names = policy_names
[docs] def validate_policy_names(archiver: ArchiverMgmt, pv_requests: list[ArchivePVRequest]) -> None: """Validate the policy names. Args: archiver (ArchiverMgmt): The archiver information. pv_requests (list[ArchivePVRequest]): The PVs to archive. Raises: ArchivePolicyNotFoundError: If the policy is not found. """ policy_names = archiver.get_policy_list().keys() for request in pv_requests: if request.policy and request.policy not in policy_names: raise ArchivePolicyNotFoundError(request, set(policy_names))
[docs] ARCHIVE_OPERATION_RESULT_STATUS_OK = "Archive request submitted"
[docs] ARCHIVE_OPERATION_EXPECTED_STATUS = {OPERATION_RESULT_STATUS: [ARCHIVE_OPERATION_RESULT_STATUS_OK]}
[docs] def archive(archiver_fqdn: str, pv_requests: list[ArchivePVRequest], *, dry_run: bool = False) -> None: """Provide the archive command to archive PVs. Args: archiver_fqdn (str): The fully qualified domain name of the archiver. pv_requests (list[ArchivePVRequest]): The PVs to archive. dry_run (bool): Whether to do a dry run or not. Raises: RequestHTTPError: If there is an error archiving the PVs. """ pvs = [request.pv for request in pv_requests] # Validate input archiver_info = ArchiverMgmtInfo(archiver_fqdn) validate_pvs_status( archiver_info=archiver_info, pvs=pvs, expected_statuses=[ ArchivingStatus.NotBeingArchived, ], ) archiver = ArchiverMgmt(archiver_fqdn) validate_policy_names(archiver, pv_requests) # Action LOG.info("Archiving PVs %s", pv_requests) LOG.info("Using archiver %s", archiver.info) if dry_run: LOG.info("Dry run, not executing.") return try: archive_results = archiver.archive_pv_requests(pv_requests) except HTTPError as e: LOG.error("Error archiving PVs: %s", e) # noqa: TRY400 LOG.debug("Error archiving PVs.", exc_info=True) raise RequestHTTPError(e) from e # Validate output validate_operation_results( pvs, archive_results, "archived", expected_operation_results=ARCHIVE_OPERATION_EXPECTED_STATUS, )