Source code for epicsarchiver_mgmt.commands.rename

"""Rename PVs in the archiver."""

from __future__ import annotations

import logging
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING

import click
from requests import HTTPError

from epicsarchiver_mgmt.archiver.info import ArchiverMgmtInfo, ArchivingStatus, InfoResultList
from epicsarchiver_mgmt.archiver.mgmt import (
    ArchiverMgmt,
    OperationResult,
    Storage,
)
from epicsarchiver_mgmt.commands.basic_commands import PauseCommand, ResumeCommand
from epicsarchiver_mgmt.commands.statuses import get_statuses_from_archiver
from epicsarchiver_mgmt.commands.validation import (
    CONFIRMATION_PROMPT,
    OPERATION_RESULT_OK,
    OPERATION_RESULT_STATUS,
    RequestHTTPError,
    validate_archiver_fqdns,
    validate_not_same,
    validate_operation_results,
    validate_pvs_status,
)
from epicsarchiver_mgmt.exceptions import BaseMgmtError

if TYPE_CHECKING:
    from collections.abc import Sequence

[docs] LOG: logging.Logger = logging.getLogger(__name__)
[docs] SIZE_KEY = "Estimated storage rate (MB/day)"
[docs] NO_SIZE_VAL = "Not enough info"
[docs] MAX_STORAGE_MB = 1000
[docs] def rename(archiver_fqdns: list[str], renames: Sequence[tuple[str, str]], *, dry_run: bool = False) -> None: """Rename PVs in the archiver, runs in parallel on multiple archivers. Args: archiver_fqdns (list[str]): The urls of the archivers. renames (Sequence[tuple[str, str]]): The PVs to rename. dry_run (bool): Whether to do a dry run or not. Raises: RequestHTTPError: If there is an error renaming the PVs. """ # Validate input validate_archiver_fqdns(archiver_fqdns) archiver_info = ArchiverMgmtInfo(archiver_fqdns[0]) validate_not_same(renames) # unpack list of pairs into two new lists old_pvs, new_pvs = zip(*renames, strict=False) old_pv_statuses: InfoResultList = get_statuses_from_archiver(archiver_info, old_pvs) new_pv_statuses: InfoResultList = get_statuses_from_archiver(archiver_info, new_pvs) validate_pvs_status( archiver_info, old_pvs, [ ArchivingStatus.BeingArchived, ArchivingStatus.Paused, ], existing_status_infos=old_pv_statuses, ) validate_pvs_status( archiver_info, new_pvs, [ ArchivingStatus.NotBeingArchived, ], existing_status_infos=new_pv_statuses, ) validate_size( archiver_info, old_pvs, ) # Action LOG.info("Renaming PVs %s", renames) archivers = [ArchiverMgmt(archiver_fqdn) for archiver_fqdn in archiver_fqdns] LOG.info("Using archivers %s", [archiver.info for archiver in archivers]) if dry_run: LOG.info("Dry run, not executing.") return click.confirm(CONFIRMATION_PROMPT, abort=True) PauseCommand().run_command( archiver_fqdns, [pv["pvName"] for pv in old_pv_statuses if pv["status"] == ArchivingStatus.BeingArchived], skip_validation=True, ) try: # rename all the pvs, this can take a long time so we do it in parallel rename_results = _parallel_execute_rename(archivers, renames) except HTTPError as e: LOG.error("Error Renaming PVs: %s", e) # noqa: TRY400 LOG.debug("Error Renaming PVs.", exc_info=True) raise RequestHTTPError(e) from e # Validate output validate_operation_results( new_pvs, rename_results, "renamed", ) ResumeCommand().run_command(archiver_fqdns, new_pvs)
[docs] def _parallel_execute_rename( archivers: list[ArchiverMgmt], renames: Sequence[tuple[str, str]] ) -> list[OperationResult]: def rename_pv_task(task_input: tuple[ArchiverMgmt, str, str]) -> OperationResult: archiver, old_pv, new_pv = task_input return archiver.rename_pv(old_pv, new_pv) with ThreadPoolExecutor(max_workers=len(archivers)) as executer: executer_input = [(archivers[i % len(archivers)], old_pv, new_pv) for i, (old_pv, new_pv) in enumerate(renames)] return list(executer.map(rename_pv_task, executer_input))
[docs] class TooMuchStoredDataError(BaseMgmtError): """Exception for when the old PV has a lot of data stored.""" def __init__(self, pv: str, storage: float) -> None: """Error for when the old PV has a lot of stored data. Args: pv (str): The PV. storage (float): The storage. """ super().__init__(f"Old PV {pv} has {storage} MB data stored. Manual intervention required.")
[docs] self.pv = pv
[docs] self.storage = storage
[docs] def validate_size(archiver: ArchiverMgmtInfo, pvs: Sequence[str], max_storage: float = MAX_STORAGE_MB) -> None: """Validate the old PVs are not too large. Args: archiver (ArchiverMgmt): The archiver. pvs (Sequence[str]): The PVs to check. max_storage (float): The maximum storage allowed in MB per day. Raises: TooMuchStoredDataError: If the old and new PVs are the same. """ for pv in pvs: pv_details = archiver.get_pv_details(pv) for detail in pv_details: if detail["name"] != SIZE_KEY: continue size_value = detail["value"] if size_value == NO_SIZE_VAL: continue pv_storage = float(size_value) if pv_storage > max_storage: raise TooMuchStoredDataError(pv, pv_storage)
[docs] def rename_and_append( archiver_fqdns: list[str], renames: Sequence[tuple[str, str]], storage: Storage = Storage.MTS, *, dry_run: bool = False, ) -> None: """Rename and append PVs in the archiver, runs in parallel on multiple archivers. Args: archiver_fqdns (list[str]): The urls of the archivers. renames (Sequence[tuple[str, str]]): The PVs to rename. storage (Storage): The storage to consolidate the data first. dry_run (bool): Whether to do a dry run or not. Raises: RequestHTTPError: If there is an error renaming the PVs. """ # Validate input validate_archiver_fqdns(archiver_fqdns) archiver_info = ArchiverMgmtInfo(archiver_fqdns[0]) # unpack list of pairs into two new lists old_pvs, new_pvs = zip(*renames, strict=False) old_pv_statuses: InfoResultList = get_statuses_from_archiver(archiver_info, old_pvs) new_pv_statuses: InfoResultList = get_statuses_from_archiver(archiver_info, new_pvs) validate_not_same(renames) validate_pvs_status( archiver_info, old_pvs, [ ArchivingStatus.BeingArchived, ArchivingStatus.Paused, ], existing_status_infos=old_pv_statuses, ) validate_pvs_status( archiver_info, new_pvs, [ ArchivingStatus.BeingArchived, ArchivingStatus.Paused, ], existing_status_infos=new_pv_statuses, ) validate_size( archiver_info, old_pvs, ) validate_size( archiver_info, new_pvs, ) # Action LOG.info("Renaming and Appending PVs %s", renames) archivers = [ArchiverMgmt(archiver_fqdn) for archiver_fqdn in archiver_fqdns] LOG.info("Using archivers %s", [archiver.info for archiver in archivers]) if dry_run: LOG.info("Dry run, not executing.") return click.confirm(CONFIRMATION_PROMPT, abort=True) to_pause_pvs = [pv["pvName"] for pv in old_pv_statuses if pv["status"] == ArchivingStatus.BeingArchived] + [ pv["pvName"] for pv in new_pv_statuses if pv["status"] == ArchivingStatus.BeingArchived ] if not to_pause_pvs: LOG.info("No PVs to pause, skipping.") else: LOG.info("Pausing PVs %s", to_pause_pvs) PauseCommand().run_command(archiver_fqdns, to_pause_pvs, skip_validation=True) try: # rename all the pvs, this can take a long time so we do it in parallel rename_results = _parallel_execute_rename_and_append(archivers, renames, storage) except HTTPError as e: LOG.error("Error Renaming and Appending PVs: %s", e) # noqa: TRY400 LOG.debug("Error Renaming and Appending PVs.", exc_info=True) raise RequestHTTPError(e) from e # Validate output validate_operation_results( new_pvs, rename_results, "Renamed and Appended", expected_operation_results={OPERATION_RESULT_STATUS: [OPERATION_RESULT_OK], "addAlias": [OPERATION_RESULT_OK]}, ) ResumeCommand().run_command(archiver_fqdns, new_pvs)
[docs] def _parallel_execute_rename_and_append( archivers: list[ArchiverMgmt], renames: Sequence[tuple[str, str]], storage: Storage ) -> list[OperationResult]: def rename_pv_task(task_input: tuple[ArchiverMgmt, str, str]) -> OperationResult: archiver, old_pv, new_pv = task_input return archiver.rename_and_append(old_pv, new_pv, storage) with ThreadPoolExecutor(max_workers=len(archivers)) as executer: executer_input = [(archivers[i % len(archivers)], old_pv, new_pv) for i, (old_pv, new_pv) in enumerate(renames)] return list(executer.map(rename_pv_task, executer_input))