From c4a81aabf04a2a8236a07ba4479d4944b9b5e60f Mon Sep 17 00:00:00 2001 From: Sky Brewer <sky.brewer@ess.eu> Date: Thu, 23 Nov 2023 17:12:57 +0100 Subject: [PATCH] Clean up on comments and default channelfinder Adds a call to channelfinder to get ioc info --- epicsarchiver/channelfinder.py | 177 ++++++++++++++++++++ epicsarchiver/command.py | 12 ++ epicsarchiver/statistics/_external_stats.py | 32 ++++ epicsarchiver/statistics/report.py | 74 ++++++-- epicsarchiver/statistics/stat_responses.py | 18 ++ pyproject.toml | 2 +- tests/statistics/test_report.py | 16 +- tests/test_channelfinder.py | 168 +++++++++++++++++++ tests/test_epicsarchiver.py | 6 +- 9 files changed, 482 insertions(+), 23 deletions(-) create mode 100644 epicsarchiver/channelfinder.py create mode 100644 tests/test_channelfinder.py diff --git a/epicsarchiver/channelfinder.py b/epicsarchiver/channelfinder.py new file mode 100644 index 0000000..355b170 --- /dev/null +++ b/epicsarchiver/channelfinder.py @@ -0,0 +1,177 @@ +"""Minimal Channel Finder interface for calculating archiver statistics.""" +import logging +import urllib.parse +from dataclasses import dataclass +from itertools import chain +from typing import Any + +import requests +import urllib3 + +LOG: logging.Logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class Channel: + """Outline class of a channel finder channel data. + + Returns: + Channel: includes name, properties and tags of a channel. + """ + + name: str + properties: dict[str, str] + tags: list[str] + + @classmethod + def from_json(cls, json: dict[str, Any]) -> "Channel": + """Convert from json direct from channel finder to a "Channel". + + Args: + json (dict): input json dictionary + + Returns: + Channel: corresponding channel + """ + return Channel( + json["name"], + {p["name"]: p["value"] for p in json["properties"]}, + [t["name"] for t in json["tags"]], + ) + + def __hash__(self) -> int: + """Calculates a has of a "Channel". + + Returns: + int: hash of channel + """ + return hash( + self.name + str(tuple(sorted(self.properties.items()))) + str(self.tags) + ) + + +class ChannelFinder: + """Minimal Channel Finder client. + + Hold a session to the Channel Finder web application. + + Args: + hostname: Channel Finder url [default: localhost] + + Basic Usage:: + + >>> from epicsarchiver.channelfinder import ChannelFinder + >>> channelfinder = ChannelFinder('channelfinder.tn.esss.lu.se') + >>> channel = channelfinder.get_channels(['AccPSS::FBIS-BP_A']) + """ + + def __init__(self, hostname: str = "localhost"): + """Create Channel Finder object. + + Args: + hostname (str, optional): hostname of channelfinder. + """ + self.hostname = hostname + self.session = requests.Session() + + def __repr__(self) -> str: + """String representation of Channel Finder. + + Returns: + str: details including hostname of Channel Finder. + """ + return f"ChannelFinder({self.hostname})" + + def get_channels(self, pvs: list[str]) -> list[Channel]: + """Get the list of channels matching the pv name from channelfinder. + + Args: + pvs (list[str]): pv names + + Returns: + list[Channel]: list of matching channels + """ + url = urllib.parse.urljoin( + f"https://{self.hostname}", "/ChannelFinder/resources/channels" + ) + urllib3.disable_warnings() # ignoring warnings that certificate is self signed + LOG.debug("GET url: " + url) + value = self.session.request( + "GET", + url=url, + params={"~name": ",".join(pvs)}, + verify=False, + ) + value.raise_for_status() + value_json = value.json() + LOG.debug("Result from channelfinder search: " + str(value_json)) + return [Channel.from_json(rs) for rs in value_json] + + def get_all_channels( + self, pvs: list[str], min_prefix: int = 0, group_size: int = 1 + ) -> dict[str, Channel]: + """Get the list of channels matching the pv names from channelfinder. + + Args: + pvs (list[str]): list of pv names + min_prefix (int): Minimum length of prefix. + group_size (int): Number of pvsearches to submit to channelfinder + + Returns: + list[Channel]: list of matching channels + """ + LOG.info("Get all channels from channel finder with prefix " + str(min_prefix)) + minimized_pv_list = minimize_searches(pvs, min_prefix) + + pv_groups = [ + minimized_pv_list[i : i + group_size] + for i in range(0, len(minimized_pv_list), group_size) + ] + channels = set(chain(*[self.get_channels(pv_group) for pv_group in pv_groups])) + + pvs_set = set(pvs) + channels_dict = { + channel.name: channel for channel in channels if channel.name in pvs_set + } + return channels_dict + + +def minimize_searches(strings: list[str], minimum_prefix_len: int) -> list[str]: + """Minimizes searches by grouping similar strings. + + Uses a wildcard symbol '*' to represent common prefixes. + + Args: + strings (list[str]): A list of strings to be minimized. + minimum_prefix_len (int): Minimum length of prefix. + + Returns: + list[str]: A list of strings after minimizing searches. + """ + if not strings: + return [] + + # Helper function to find common prefix + def find_common_prefix(str1: str, str2: str, minimum_prefix_len: int) -> str: + prefix = [] + for c1, c2 in zip(str1, str2, strict=False): + if c1 == c2: + prefix.append(c1) + else: + break + if len(prefix) < minimum_prefix_len: + return "" + return "".join(prefix) + + # Sort the input list for consistent grouping + strings.sort() + + result = [strings[0]] + for i in range(1, len(strings)): + common_prefix = find_common_prefix(result[-1], strings[i], minimum_prefix_len) + if common_prefix: + result[-1] = common_prefix + "*" + else: + result.append(strings[i]) + + return result diff --git a/epicsarchiver/command.py b/epicsarchiver/command.py index 161fc18..054302c 100644 --- a/epicsarchiver/command.py +++ b/epicsarchiver/command.py @@ -7,6 +7,7 @@ import click from rich.console import Console from rich.logging import RichHandler +from epicsarchiver.channelfinder import ChannelFinder from epicsarchiver.epicsarchiver import ArchiverAppliance from epicsarchiver.statistics.report import ReportConfig, print_report @@ -162,6 +163,13 @@ def rename( type=str, help="Other Achiver Appliance hostname or IP [default: localhost]", ) +@click.option( + "--channelfinder_hostname", + "-cf", + default="channelfinder.tn.esss.lu.se", + type=str, + help="Channel Finder hostname or IP [default: localhost]", +) @click.option( "--time_minimum", "-t", @@ -218,6 +226,7 @@ def stats( config_files: Path | None, mb_per_day_minimum: float, events_dropped_minimum: int, + channelfinder_hostname: str | None, verbose: bool, # noqa: FBT001 output: Path, debug: bool, # noqa: FBT001, ARG001 @@ -259,6 +268,9 @@ def stats( other_archiver=other_archiver, mb_per_day_minimum=mb_per_day_minimum, events_dropped_minimum=events_dropped_minimum, + channelfinder=ChannelFinder(channelfinder_hostname) + if channelfinder_hostname + else None, ) LOG.info(f"Collecting statistics with configuration {config}") diff --git a/epicsarchiver/statistics/_external_stats.py b/epicsarchiver/statistics/_external_stats.py index 7f1ced2..e631ff7 100644 --- a/epicsarchiver/statistics/_external_stats.py +++ b/epicsarchiver/statistics/_external_stats.py @@ -3,10 +3,15 @@ from os import listdir from os.path import isfile, join from pathlib import Path +from numpy import mean + from epicsarchiver.archive_files import get_pvs_from_files +from epicsarchiver.channelfinder import ChannelFinder from epicsarchiver.epicsarchiver import ArchiverAppliance from epicsarchiver.statistics.stat_responses import ( + UNKNOWN_IOC, BothArchiversResponse, + Ioc, NoConfigResponse, ) @@ -52,3 +57,30 @@ def get_not_configured( - {ar["pv"] for ar in get_pvs_from_files(onlyfiles)} ) ] + + +def get_iocs(channelfinder: ChannelFinder, pvs: list[str]) -> dict[Ioc, list[str]]: + """Get the IOC hosts for a list of pvs. + + Args: + channelfinder (str): url of channelfinder + pvs (list[str]): list of pv names + + Returns: + dict[str, list[str]]: dictionary mapping ioc name to pv + """ + channels = channelfinder.get_all_channels( + pvs, int(mean([len(pv) for pv in pvs]) / 2) + ) + iocs = {pv: Ioc.from_channel(channels[pv]) for pv in channels.keys()} + + for pv in pvs: + if pv not in iocs.keys(): + iocs[pv] = UNKNOWN_IOC + + output: dict[Ioc, list[str]] = {} + for pv in pvs: + if iocs[pv] not in output: + output[iocs[pv]] = [] + output[iocs[pv]].append(pv) + return output diff --git a/epicsarchiver/statistics/report.py b/epicsarchiver/statistics/report.py index f244783..d1072b0 100644 --- a/epicsarchiver/statistics/report.py +++ b/epicsarchiver/statistics/report.py @@ -28,13 +28,17 @@ import pytz from rich.console import Console from epicsarchiver import ArchiverAppliance +from epicsarchiver.channelfinder import ChannelFinder from epicsarchiver.statistics._external_stats import ( get_double_archived, + get_iocs, get_not_configured, ) from epicsarchiver.statistics.stat_responses import ( + UNKNOWN_IOC, BaseStatResponse, DroppedReason, + Ioc, ) LOG: logging.Logger = logging.getLogger(__name__) @@ -51,6 +55,7 @@ class ReportConfig: other_archiver: ArchiverAppliance | None mb_per_day_minimum: float events_dropped_minimum: int + channelfinder: ChannelFinder | None class Stat(str, enum.Enum): @@ -207,10 +212,19 @@ def print_report( console.print_json(data=sum_report) +@dataclass +class _PVStats: + name: str + stats: dict[Stat, BaseStatResponse] + + def json_str(self) -> dict[str, str]: + return {s.name: str(self.stats[s]) for s in self.stats.keys()} + + def generate_all_stats( archiver: ArchiverAppliance, config: ReportConfig, -) -> dict[str, dict[Stat, BaseStatResponse]]: +) -> dict[Ioc, dict[str, _PVStats]]: """Generate all the statistics available from the Stat list and collate into a dict. Args: @@ -218,19 +232,42 @@ def generate_all_stats( config (ReportConfig): Configuration of the report Returns: - dict[str, dict[Stat, _BaseResponse]]: Return a dictionary with pv names as keys, + dict[Ioc, dict[str, _PVStats]]: Return a dictionary with pv names as keys, and detailed statistics after. """ - return _invert_data({stat: stat.generate_stats(archiver, config) for stat in Stat}) + inverted_data = _invert_data( + {stat: stat.generate_stats(archiver, config) for stat in Stat} + ) + if config.channelfinder: + return _organise_by_ioc( + inverted_data, + config.channelfinder, + ) + return {UNKNOWN_IOC: inverted_data} + + +def _organise_by_ioc( + inverted_report: dict[str, _PVStats], + channelfinder: ChannelFinder, +) -> dict[Ioc, dict[str, _PVStats]]: + iocs = get_iocs(channelfinder, list(inverted_report.keys())) + LOG.info("IOCS: " + str(_iocs_summary(iocs))) + return {ioc: {pv: inverted_report[pv] for pv in iocs[ioc]} for ioc in iocs.keys()} + + +def _iocs_summary(iocs: dict[Ioc, list[str]]) -> list[str]: + sorted_iocs = [(ioc, len(iocs[ioc])) for ioc in iocs.keys()] + sorted_iocs = sorted(sorted_iocs, key=lambda pair: pair[1]) + return [f"{ioc_pair[0]} has {ioc_pair[1]} BAD PVs" for ioc_pair in sorted_iocs] def _summary_report( - report: dict[str, dict[Stat, BaseStatResponse]] -) -> dict[str, dict[str, str]]: + report: dict[Ioc, dict[str, _PVStats]] +) -> dict[str, dict[str, dict[str, str]]]: """Creates a pure string and dictionary data output summary of the generated data. Easily converted to json and creates a sample output of: - { + "IOCName iocHostName": { "PV:1": { "TypeChange": "Dropped 31 events by TypeChange" }, @@ -244,22 +281,24 @@ def _summary_report( } Args: - report (dict[str, dict[Stat, BaseStatResponse]]): Base input data in form of + report (dict[str, _PVStats]): Base input data in form of pv mapped to Stat and responses from the archiver. Returns: dict[str, dict[str, str]]: pv to dictionary of Stat name and problem summary """ - summary_report = {} - for pv in report.keys(): - stat_strs = {s.name: str(report[pv][s]) for s in report[pv]} - summary_report[pv] = stat_strs - return summary_report + summary_report: dict[str, dict[str, dict[str, str]]] = {} + + for ioc in report.keys(): + pv_summary_report: dict[str, dict[str, str]] = {} + + for pv in report[ioc].keys(): + pv_summary_report[pv] = report[ioc][pv].json_str() + summary_report[f"IOC:{ioc.name}, host:{ioc.hostname}"] = pv_summary_report + return dict(sorted(summary_report.items())) -def _invert_data( - data: dict[Stat, dict[str, BaseStatResponse]] -) -> dict[str, dict[Stat, BaseStatResponse]]: +def _invert_data(data: dict[Stat, dict[str, BaseStatResponse]]) -> dict[str, _PVStats]: """Inverts data from being by statistic, to be by PV. Args: @@ -267,7 +306,7 @@ def _invert_data( dictionary with pv name keys Returns: - dict[str, dict[Stat, BaseStatResponse]]: Output with Pv name keys. + dict[str, _PVStats]: Output with Pv name keys. """ dict_data: dict[str, dict[Stat, BaseStatResponse]] = {} for stat in data: @@ -275,4 +314,5 @@ def _invert_data( if pv not in dict_data.keys(): dict_data[pv] = {} dict_data[pv][stat] = data[stat][pv] - return dict(sorted(dict_data.items())) + output = {pv: _PVStats(pv, dict_data[pv]) for pv in dict_data.keys()} + return dict(sorted(output.items())) diff --git a/epicsarchiver/statistics/stat_responses.py b/epicsarchiver/statistics/stat_responses.py index eb1e0e4..8bfdde3 100644 --- a/epicsarchiver/statistics/stat_responses.py +++ b/epicsarchiver/statistics/stat_responses.py @@ -5,6 +5,8 @@ from dataclasses import dataclass import pytz +from epicsarchiver.channelfinder import Channel + class DroppedReason(str, enum.Enum): """List of reasons why a PV could be dropping events. @@ -254,3 +256,19 @@ class NoConfigResponse(BaseStatResponse): str: f"Archived but not in config." """ return "Archived but not in config." + + +@dataclass(frozen=True) +class Ioc: + """Minimal info on an Ioc.""" + + hostname: str + name: str + + @classmethod + def from_channel(cls, channel: Channel) -> "Ioc": + """Gets IOC info from a channel.""" + return Ioc(channel.properties["hostName"], channel.properties["iocName"]) + + +UNKNOWN_IOC: Ioc = Ioc("unknown.ioc", "UNKNOWN:IOC") diff --git a/pyproject.toml b/pyproject.toml index 4e38f96..356492f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,7 +44,7 @@ lint = [ "types-requests", "types-python-dateutil", "types-protobuf", - "mypy-protobuf", + "mypy-protobuf" ] test = [ "coverage[toml]>=6.5", diff --git a/tests/statistics/test_report.py b/tests/statistics/test_report.py index cf5f3b4..1119b21 100644 --- a/tests/statistics/test_report.py +++ b/tests/statistics/test_report.py @@ -5,10 +5,12 @@ from pathlib import Path import pytz from pytest_mock import MockFixture +from epicsarchiver.channelfinder import Channel, ChannelFinder from epicsarchiver.epicsarchiver import ArchiverAppliance from epicsarchiver.statistics.report import ( ReportConfig, Stat, + _PVStats, generate_all_stats, ) from epicsarchiver.statistics.stat_responses import ( @@ -17,6 +19,7 @@ from epicsarchiver.statistics.stat_responses import ( DisconnectedPVsResponse, DroppedPVResponse, DroppedReason, + Ioc, LostConnectionsResponse, NoConfigResponse, SilentPVsResponse, @@ -83,6 +86,7 @@ def test_generate_buffer_overflow_stat(mocker: MockFixture) -> None: other_archiver=None, mb_per_day_minimum=10, events_dropped_minimum=1, + channelfinder=ChannelFinder("channelfinder.example.org"), ) actual = Stat.BufferOverflow.generate_stats(archiver, config) assert actual == { @@ -108,6 +112,7 @@ def mock_get_pvs_dropped( def test_generate_all_stats(mocker: MockFixture) -> None: + channel = Channel("MY:PV", {"iocName": "IOCNAME", "hostName": "IOCHOSTNAME"}, []) mocker.patch( "epicsarchiver.ArchiverAppliance.get_pvs_dropped", wraps=mock_get_pvs_dropped, @@ -132,6 +137,10 @@ def test_generate_all_stats(mocker: MockFixture) -> None: "epicsarchiver.ArchiverAppliance.get_all_pvs", return_value=["MY:PV"], ) + mocker.patch( + "epicsarchiver.channelfinder.ChannelFinder.get_all_channels", + return_value={"MY:PV": channel}, + ) archiver = ArchiverAppliance("archiver.example.org") other_archiver = ArchiverAppliance("other_archiver.example.org") config = ReportConfig( @@ -142,7 +151,10 @@ def test_generate_all_stats(mocker: MockFixture) -> None: other_archiver=other_archiver, mb_per_day_minimum=0, events_dropped_minimum=1, + channelfinder=ChannelFinder("channelfinder.example.org"), ) + ioc = Ioc(channel.properties["hostName"], channel.properties["iocName"]) actual = generate_all_stats(archiver, config) - assert "MY:PV" in actual.keys() - assert expected_all_stats == actual["MY:PV"] + assert ioc in actual.keys() + assert "MY:PV" in actual[ioc].keys() + assert _PVStats("MY:PV", expected_all_stats) == actual[ioc]["MY:PV"] diff --git a/tests/test_channelfinder.py b/tests/test_channelfinder.py new file mode 100644 index 0000000..aa781fd --- /dev/null +++ b/tests/test_channelfinder.py @@ -0,0 +1,168 @@ +import logging + +import responses +from rich.logging import RichHandler + +from epicsarchiver.channelfinder import ( + Channel, + ChannelFinder, + minimize_searches, +) + +logging.basicConfig( + level=logging.DEBUG, + handlers=[RichHandler(rich_tracebacks=True)], +) +LOG: logging.Logger = logging.getLogger(__name__) + + +@responses.activate +def test_get_channels() -> None: + channelfinder = ChannelFinder() + url = "https://localhost/ChannelFinder/resources/channels?~name=fred" + data = [ + { + "name": "fred", + "owner": "recceiver", + "properties": [ + { + "name": "hostName", + "owner": "recceiver", + "value": "host.blah", + "channels": [], + }, + { + "name": "iocName", + "owner": "recceiver", + "value": "FredsIOC", + "channels": [], + }, + { + "name": "pvStatus", + "owner": "recceiver", + "value": "Inactive", + "channels": [], + }, + ], + "tags": [], + } + ] + responses.add(responses.GET, url, json=data, status=200) + r = channelfinder.get_channels(["fred"]) + assert len(responses.calls) == 1 + assert len(r) == 1 + expected_channel = Channel( + "fred", + {"hostName": "host.blah", "iocName": "FredsIOC", "pvStatus": "Inactive"}, + [], + ) + assert r[0] == expected_channel + + +@responses.activate +def test_get_all_channels() -> None: + channelfinder = ChannelFinder() + urla = "https://localhost/ChannelFinder/resources/channels?~name=a*,ba" + dataa = [ + { + "name": "ab", + "properties": [ + { + "name": "hostName", + "value": "host.a", + }, + { + "name": "iocName", + "value": "AIOC", + }, + ], + "tags": [], + }, + { + "name": "ac", + "properties": [ + { + "name": "hostName", + "value": "host.a", + }, + { + "name": "iocName", + "value": "AIOC", + }, + ], + "tags": [], + }, + { + "name": "ad", + "properties": [ + { + "name": "hostName", + "value": "host.a", + }, + { + "name": "iocName", + "value": "AIOC", + }, + ], + "tags": [], + }, + { + "name": "ba", + "properties": [ + { + "name": "hostName", + "value": "host.b", + }, + { + "name": "iocName", + "value": "BIOC", + }, + ], + "tags": [], + }, + ] + responses.add(responses.GET, urla, json=dataa, status=200) + r = channelfinder.get_all_channels(["ac", "ab", "ba"], min_prefix=1, group_size=2) + assert len(responses.calls) == 1 + assert len(r) == 3 + expected_channels = { + "ab": Channel( + "ab", + {"hostName": "host.a", "iocName": "AIOC"}, + [], + ), + "ac": Channel( + "ac", + {"hostName": "host.a", "iocName": "AIOC"}, + [], + ), + "ba": Channel( + "ba", + {"hostName": "host.b", "iocName": "BIOC"}, + [], + ), + } + assert r == expected_channels + + +def test_minimize_zero_prefix() -> None: + assert [] == minimize_searches([], 0) + assert ["a"] == minimize_searches(["a"], 0) + assert ["a", "b"] == minimize_searches(["a", "b"], 0) + assert ["a*"] == minimize_searches(["ab", "ac"], 0) + assert ["a*", "ba"] == minimize_searches(["ab", "ac", "ba"], 0) + assert ["a*", "b*"] == minimize_searches(["ab", "ac", "ba", "bc"], 0) + assert ["a*", "b*", "s"] == minimize_searches(["ab", "ac", "s", "ba", "bc"], 0) + + +def test_minimize_two_prefix() -> None: + assert [] == minimize_searches([], 2) + assert ["a"] == minimize_searches(["a"], 2) + assert ["a", "b"] == minimize_searches(["a", "b"], 2) + assert ["ab", "ac"] == minimize_searches(["ab", "ac"], 2) + assert ["ab", "ac", "ba"] == minimize_searches(["ab", "ac", "ba"], 2) + assert ["ba*"] == minimize_searches(["bab", "bac"], 2) + assert ["ab", "ac", "ba*"] == minimize_searches(["ab", "ac", "bab", "bac"], 2) + assert ["ab", "ac", "ba*", "s"] == minimize_searches( + ["ab", "ac", "s", "bab", "bac"], 2 + ) diff --git a/tests/test_epicsarchiver.py b/tests/test_epicsarchiver.py index 072a2f4..7d8c162 100644 --- a/tests/test_epicsarchiver.py +++ b/tests/test_epicsarchiver.py @@ -420,7 +420,7 @@ def test_get_or_post_comma_separated_list() -> None: len(responses.calls) == 1 ) # ignore for https://github.com/getsentry/responses/pull/690 - assert responses.calls[0].request.body == pvs # type: ignore + assert responses.calls[0].request.body == pvs assert r == data @@ -470,7 +470,7 @@ def test_pause_pv_comma_separated_list() -> None: len(responses.calls) == 1 ) # ignore for https://github.com/getsentry/responses/pull/690 - assert responses.calls[0].request.body == pvs # type: ignore + assert responses.calls[0].request.body == pvs assert r == data @@ -511,7 +511,7 @@ def test_resume_pv_comma_separated_list() -> None: len(responses.calls) == 1 ) # ignore for https://github.com/getsentry/responses/pull/690 - assert responses.calls[0].request.body == pvs # type: ignore + assert responses.calls[0].request.body == pvs assert r == data -- GitLab