import io import json import os import sys import time from collections import OrderedDict from datetime import date, timedelta import numpy as np import pandas as pd import panel as pn import plotly.express as px import plotly.graph_objects as go ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) from panel_app.convert_to_excel_panel import write_dfs_to_excel from panel_app.kpi_health_check_drilldown_plots import build_drilldown_plot from process_kpi.kpi_health_check.benchmarks import calculate_sla_metrics from process_kpi.kpi_health_check.engine import ( evaluate_health_check, is_bad, max_consecutive_days, max_consecutive_periods, window_bounds, window_bounds_period, ) from process_kpi.kpi_health_check.export import build_export_bytes from process_kpi.kpi_health_check.io import read_bytes_to_df from process_kpi.kpi_health_check.kpi_groups import filter_kpis, get_kpis_by_group from process_kpi.kpi_health_check.multi_rat import compute_multirat_views from process_kpi.kpi_health_check.normalization import ( build_daily_kpi, build_period_kpi, infer_date_col, infer_id_col, ) from process_kpi.kpi_health_check.presets import ( delete_preset, list_presets, load_preset, save_preset, ) from process_kpi.kpi_health_check.profiles import ( delete_profile, list_profiles, load_profile, save_profile, ) from process_kpi.kpi_health_check.rules import ( infer_kpi_direction, infer_kpi_policy, infer_kpi_sla, ) pn.extension("plotly", "tabulator") pn.config.raw_css.append( """ .kpi-site-kpi-table .tabulator-row.kpi-row--persistent_degraded{border-left:6px solid #b71c1c !important;background-color:#ffebee !important;} .kpi-site-kpi-table .tabulator-row.kpi-row--persistent_degraded:hover{background-color:#ffd6db !important;} .kpi-site-kpi-table .tabulator-row.kpi-row--degraded{border-left:6px solid #e53935 !important;background-color:#fff5f5 !important;} .kpi-site-kpi-table .tabulator-row.kpi-row--degraded:hover{background-color:#ffe4e6 !important;} .kpi-site-kpi-table .tabulator-row.kpi-row--notify{border-left:6px solid #f9a825 !important;background-color:#fff8e1 !important;} .kpi-site-kpi-table .tabulator-row.kpi-row--notify:hover{background-color:#ffefb8 !important;} .kpi-site-kpi-table .tabulator-row.kpi-row--resolved{border-left:6px solid #2e7d32 !important;background-color:#f1f8e9 !important;} .kpi-site-kpi-table .tabulator-row.kpi-row--resolved:hover{background-color:#dcedc8 !important;} .kpi-site-kpi-table .tabulator-row.kpi-row--notify_resolved{border-left:6px solid #2e7d32 !important;background-color:#f1f8e9 !important;} .kpi-site-kpi-table .tabulator-row.kpi-row--notify_resolved:hover{background-color:#dcedc8 !important;} .kpi-site-kpi-table .tabulator-row.kpi-row--no_data{border-left:6px solid #616161 !important;background-color:#f5f5f5 !important;} .kpi-site-kpi-table .tabulator-row.kpi-row--no_data:hover{background-color:#eeeeee !important;} .kpi-site-kpi-table .tabulator-row.kpi-row--ok:hover{background-color:#1565c0 !important;} .kpi-site-kpi-table .tabulator-row.kpi-row--persistent_degraded .tabulator-cell, .kpi-site-kpi-table .tabulator-row.kpi-row--degraded .tabulator-cell, .kpi-site-kpi-table .tabulator-row.kpi-row--notify .tabulator-cell, .kpi-site-kpi-table .tabulator-row.kpi-row--resolved .tabulator-cell, .kpi-site-kpi-table .tabulator-row.kpi-row--notify_resolved .tabulator-cell, .kpi-site-kpi-table .tabulator-row.kpi-row--no_data .tabulator-cell{color:#111 !important;} .kpi-site-kpi-table .tabulator-row.kpi-row--ok:hover .tabulator-cell{color:#fff !important;} """ ) PLOTLY_CONFIG = {"displaylogo": False, "scrollZoom": True, "displayModeBar": True} def read_fileinput_to_df(file_input: pn.widgets.FileInput) -> pd.DataFrame | None: if file_input is None or not file_input.value: return None return read_bytes_to_df(file_input.value, file_input.filename or "") current_daily_by_rat: dict[str, pd.DataFrame] = {} current_rules_df: pd.DataFrame | None = None current_status_df: pd.DataFrame | None = None current_summary_df: pd.DataFrame | None = None current_multirat_df: pd.DataFrame | None = None current_multirat_raw: pd.DataFrame | None = None current_top_anomalies_df: pd.DataFrame | None = None current_top_anomalies_raw: pd.DataFrame | None = None current_ops_queue_df: pd.DataFrame | None = None current_export_bytes: bytes | None = None current_alert_pack_bytes: bytes | None = None current_snapshot: dict | None = None current_delta_df: pd.DataFrame | None = None complaint_sites: set[int] = set() _daily_version: int = 0 _rules_version: int = 0 _healthcheck_version: int = 0 _DRILLDOWN_CACHE_MAX: int = 64 _drilldown_fig_cache: "OrderedDict[tuple, tuple]" = OrderedDict() _DOUBLE_CLICK_S: float = 0.35 _last_click_state: dict[str, tuple[float, int | None]] = { "top": (0.0, None), "multirat": (0.0, None), } _applying_profile: bool = False _loading_datasets: bool = False _updating_drilldown: bool = False _drilldown_update_pending: bool = False def _set_widget_value(widget, value) -> None: global _updating_drilldown try: if getattr(widget, "value", None) == value: return except Exception: # noqa: BLE001 pass _updating_drilldown = True try: widget.value = value finally: _updating_drilldown = False def _schedule_drilldown_update(fn) -> None: global _drilldown_update_pending if _drilldown_update_pending: return _drilldown_update_pending = True def _wrapped() -> None: global _drilldown_update_pending _drilldown_update_pending = False fn() doc = pn.state.curdoc if doc is not None: doc.add_next_tick_callback(_wrapped) else: _wrapped() def _invalidate_drilldown_cache( *, data_changed: bool = False, rules_changed: bool = False, healthcheck_changed: bool = False, ) -> None: global _daily_version, _rules_version, _healthcheck_version if data_changed: _daily_version += 1 if rules_changed: _rules_version += 1 if healthcheck_changed: _healthcheck_version += 1 try: _drilldown_fig_cache.clear() except Exception: # noqa: BLE001 pass def _drilldown_cache_key(site_code: int, rat: str, kpi: str) -> tuple: ar = analysis_range.value ar_key = ( str(ar[0]) if ar and len(ar) == 2 and ar[0] else None, str(ar[1]) if ar and len(ar) == 2 and ar[1] else None, ) compare_kpis_key = tuple(sorted([str(x) for x in (kpi_compare_select.value or [])])) norm_key = str(kpi_compare_norm.value or "None") return ( int(_daily_version), int(_rules_version), int(_healthcheck_version), int(site_code), str(rat or ""), str(kpi or ""), str(granularity_select.value or "Daily"), compare_kpis_key, norm_key, bool(show_sla_toggle.value), ar_key, int(_coerce_int(baseline_days.value) or 0), int(_coerce_int(recent_days.value) or 0), float(_coerce_float(rel_threshold_pct.value) or 0.0), int(_coerce_int(min_consecutive_days.value) or 0), # New cache keys str(kpi_group_select.value), str(kpi_group_mode.value), str(corr_window_select.value), ) def _drilldown_cache_get(key: tuple): v = _drilldown_fig_cache.get(key) if v is not None: _drilldown_fig_cache.move_to_end(key) return v def _drilldown_cache_set(key: tuple, value) -> None: _drilldown_fig_cache[key] = value _drilldown_fig_cache.move_to_end(key) while len(_drilldown_fig_cache) > int(_DRILLDOWN_CACHE_MAX): _drilldown_fig_cache.popitem(last=False) def _coerce_int(value) -> int | None: try: if value is None: return None if isinstance(value, (int, np.integer)): return int(value) if isinstance(value, float) and np.isnan(value): return None except Exception: # noqa: BLE001 pass s = str(value).strip() try: return int(s) except Exception: # noqa: BLE001 digits = "".join(ch for ch in s if ch.isdigit()) try: return int(digits) if digits else None except Exception: # noqa: BLE001 return None def _coerce_float(value) -> float | None: try: if value is None: return None if isinstance(value, (float, np.floating)): if np.isnan(value): return None return float(value) except Exception: # noqa: BLE001 pass try: return float(value) except Exception: # noqa: BLE001 return None def _table_row_as_dict(table: pn.widgets.Tabulator, row_idx: int) -> dict | None: try: v = getattr(table, "current_view", None) df = v if isinstance(v, pd.DataFrame) else table.value if not isinstance(df, pd.DataFrame) or df.empty: return None if row_idx < 0 or row_idx >= len(df): return None return dict(df.iloc[int(row_idx)].to_dict()) except Exception: # noqa: BLE001 return None def _apply_drilldown_selection(*, site_code, rat=None, kpi=None) -> None: try: if site_code is None: return site_code_int = _coerce_int(site_code) if site_code_int is None: return if rat and rat in list(rat_select.options or []): _set_widget_value(rat_select, str(rat)) _update_kpi_options() if site_select.options and site_code_int not in site_select.options.values(): pass _set_widget_value(site_select, site_code_int) if kpi and str(kpi) in list(kpi_select.options or []): _set_widget_value(kpi_select, str(kpi)) if kpi and str(kpi) in list(kpi_compare_select.options or []): cur = list(kpi_compare_select.value or []) if str(kpi) not in cur: _set_widget_value( kpi_compare_select, [str(kpi)] + cur if cur else [str(kpi)] ) _schedule_drilldown_update(_update_site_view) except Exception: # noqa: BLE001 return def _handle_double_click(table_key: str, table: pn.widgets.Tabulator, event) -> None: try: row = getattr(event, "row", None) if row is None: return row = int(row) except Exception: # noqa: BLE001 return now = float(time.monotonic()) last_t, last_row = _last_click_state.get(table_key, (0.0, None)) if last_row == row and (now - float(last_t)) <= float(_DOUBLE_CLICK_S): _last_click_state[table_key] = (0.0, None) data = _table_row_as_dict(table, row) if not data: return if table_key == "top": _apply_drilldown_selection( site_code=data.get("site_code"), rat=data.get("RAT"), kpi=data.get("KPI"), ) try: status_pane.alert_type = "primary" status_pane.object = f"Drill-down: site {int(data.get('site_code'))} | {data.get('RAT')} | {data.get('KPI')}" except Exception: # noqa: BLE001 pass return if table_key in {"multirat", "ops"}: site_code = data.get("site_code") best_rat = rat_select.value try: best_score = -1 for r in list(rat_select.options or []): p = pd.to_numeric(data.get(f"persistent_{r}", 0), errors="coerce") d = pd.to_numeric(data.get(f"degraded_{r}", 0), errors="coerce") p = int(p) if pd.notna(p) else 0 d = int(d) if pd.notna(d) else 0 score = p * 2 + d if score > best_score: best_score = score best_rat = r except Exception: # noqa: BLE001 best_rat = rat_select.value _apply_drilldown_selection(site_code=site_code, rat=best_rat) try: status_pane.alert_type = "primary" status_pane.object = f"Drill-down: site {int(site_code)} | {best_rat}" except Exception: # noqa: BLE001 pass return else: _last_click_state[table_key] = (now, row) file_2g = pn.widgets.FileInput(name="2G KPI report", accept=".csv,.zip") file_3g = pn.widgets.FileInput(name="3G KPI report", accept=".csv,.zip") file_lte = pn.widgets.FileInput(name="LTE KPI report", accept=".csv,.zip") file_twamp = pn.widgets.FileInput(name="TWAMP KPI report", accept=".csv,.zip") complaint_sites_file = pn.widgets.FileInput( name="Complaint sites list (optional)", accept=".csv,.txt,.xlsx" ) only_complaint_sites = pn.widgets.Checkbox(name="Only complaint sites", value=False) analysis_range = pn.widgets.DateRangePicker(name="Analysis date range (optional)") granularity_select = pn.widgets.RadioButtonGroup( name="Granularity", options=["Daily", "Hourly"], value="Daily" ) baseline_days = pn.widgets.IntInput(name="Baseline window (days)", value=30) recent_days = pn.widgets.IntInput(name="Recent window (days)", value=7) rel_threshold_pct = pn.widgets.FloatInput( name="Relative change threshold (%)", value=10.0, step=1.0 ) min_consecutive_days = pn.widgets.IntInput( name="Min consecutive bad days (persistent)", value=3 ) min_criticality = pn.widgets.IntInput(name="Min criticality score", value=0) min_anomaly_score = pn.widgets.IntInput(name="Min anomaly score", value=0) city_filter = pn.widgets.TextInput(name="City contains (optional)", value="") top_rat_filter = pn.widgets.CheckBoxGroup( name="Top anomalies RAT", options=["2G", "3G", "LTE", "TWAMP"], value=["2G", "3G", "LTE", "TWAMP"], ) top_status_filter = pn.widgets.CheckBoxGroup( name="Top anomalies status", options=["DEGRADED", "PERSISTENT_DEGRADED"], value=["DEGRADED", "PERSISTENT_DEGRADED"], ) preset_select = pn.widgets.Select(name="Rules preset", options=[], value=None) preset_name_input = pn.widgets.TextInput(name="Save preset as", value="") preset_refresh_button = pn.widgets.Button(name="Refresh presets", button_type="default") preset_apply_button = pn.widgets.Button(name="Apply preset", button_type="primary") preset_save_button = pn.widgets.Button(name="Save current rules", button_type="primary") preset_delete_button = pn.widgets.Button(name="Delete preset", button_type="danger") profile_select = pn.widgets.Select(name="Profile", options=[], value=None) profile_name_input = pn.widgets.TextInput(name="Save profile as", value="") profile_refresh_button = pn.widgets.Button( name="Refresh profiles", button_type="default" ) profile_apply_button = pn.widgets.Button(name="Apply profile", button_type="primary") profile_save_button = pn.widgets.Button(name="Save profile", button_type="primary") profile_delete_button = pn.widgets.Button(name="Delete profile", button_type="danger") load_button = pn.widgets.Button( name="Load datasets & build rules", button_type="primary" ) run_button = pn.widgets.Button(name="Run health check", button_type="primary") status_pane = pn.pane.Alert( "Upload KPI reports (ZIP/CSV), then load datasets and run health check.", alert_type="primary", ) validation_pane = pn.pane.Alert("", alert_type="success", visible=False) datasets_table = pn.widgets.Tabulator( height=180, sizing_mode="stretch_width", layout="fit_data_table" ) rules_table = pn.widgets.Tabulator( height=260, sizing_mode="stretch_width", layout="fit_data_table" ) def _set_tabulator_pagination(table: pn.widgets.Tabulator, page_size: int = 50) -> None: try: table.pagination = "local" table.page_size = int(page_size) return except Exception: # noqa: BLE001 pass try: cfg = dict(table.configuration or {}) cfg["pagination"] = "local" cfg["paginationSize"] = int(page_size) table.configuration = cfg except Exception: # noqa: BLE001 pass try: rules_table.editable = True except Exception: # noqa: BLE001 try: cfg = dict(rules_table.configuration or {}) cfg["editable"] = True rules_table.configuration = cfg except Exception: # noqa: BLE001 pass site_summary_table = pn.widgets.Tabulator( height=260, sizing_mode="stretch_width", layout="fit_data_table" ) multirat_summary_table = pn.widgets.Tabulator( height=260, sizing_mode="stretch_width", layout="fit_data_table" ) complaint_multirat_summary_table = pn.widgets.Tabulator( height=260, sizing_mode="stretch_width", layout="fit_data_table" ) top_anomalies_table = pn.widgets.Tabulator( height=260, sizing_mode="stretch_width", layout="fit_data_table" ) complaint_top_anomalies_table = pn.widgets.Tabulator( height=260, sizing_mode="stretch_width", layout="fit_data_table" ) ops_queue_table = pn.widgets.Tabulator( height=520, sizing_mode="stretch_width", layout="fit_data_table" ) snapshot_file = pn.widgets.FileInput(name="Load snapshot (JSON)", accept=".json") snapshot_download = pn.widgets.FileDownload( label="Save snapshot", filename="KPI_Health_Check_Snapshot.json", button_type="primary", ) snapshot_rules_table = pn.widgets.Tabulator( height=220, sizing_mode="stretch_width", layout="fit_data_table" ) snapshot_multirat_table = pn.widgets.Tabulator( height=260, sizing_mode="stretch_width", layout="fit_data_table" ) snapshot_top_table = pn.widgets.Tabulator( height=260, sizing_mode="stretch_width", layout="fit_data_table" ) delta_table = pn.widgets.Tabulator( height=320, sizing_mode="stretch_width", layout="fit_data_table" ) map_show_site_codes = pn.widgets.Checkbox(name="Show site codes", value=True) map_max_labels = pn.widgets.IntInput(name="Max labels", value=200, step=50) map_top_n = pn.widgets.IntInput(name="Top N sites (0=All)", value=0, step=100) map_impacted_rat = pn.widgets.Select( name="RAT impacted", options=["Any", "2G", "3G", "LTE", "TWAMP"], value="Any" ) map_status_filter = pn.widgets.Select( name="Status", options=[ "Any", "PERSISTENT_DEGRADED", "DEGRADED", "NOTIFY", "RESOLVED", "NOTIFY_RESOLVED", "OK", "NO_DATA", ], value="Any", ) map_auto_fit = pn.widgets.Checkbox(name="Auto fit", value=True) map_fit_button = pn.widgets.Button(name="Fit to points", button_type="default") map_search_input = pn.widgets.TextInput(name="Search (site_code or City)", value="") map_search_go = pn.widgets.Button(name="Go", button_type="primary") map_search_clear = pn.widgets.Button(name="Clear", button_type="default") map_search_results = pn.widgets.Select(name="Matches", options={}, value=None) map_search_results.visible = False site_select = pn.widgets.AutocompleteInput( name="Select a site (Type to search)", options={}, case_sensitive=False, search_strategy="includes", restrict=True, ) rat_select = pn.widgets.RadioButtonGroup( name="RAT", options=["2G", "3G", "LTE", "TWAMP"], value="LTE" ) kpi_select = pn.widgets.Select(name="KPI", options=[]) kpi_compare_select = pn.widgets.MultiChoice(name="Compare KPIs", options=[], value=[]) kpi_compare_norm = pn.widgets.Select( name="Normalization", options=["None", "Min-Max", "Z-score"], value="None" ) show_sla_toggle = pn.widgets.Checkbox(name="Show SLA", value=True) corr_window_select = pn.widgets.Select( name="Correlation window", options=["Full (filtered range)", "Recent", "Baseline"], value="Full (filtered range)", ) kpi_group_select = pn.widgets.Select( name="KPI Group", options=["All (selected KPIs)"], value="All (selected KPIs)" ) kpi_group_mode = pn.widgets.Select( name="Group Mode", options=["Filter KPI list only (recommended)", "Add top 12 KPIs to compare"], value="Filter KPI list only (recommended)", ) drilldown_export_button = pn.widgets.FileDownload( label="Download drill-down", filename="KPI_Drilldown.xlsx", button_type="primary", ) site_kpi_table = pn.widgets.Tabulator( height=520, sizing_mode="stretch_width", layout="fit_data_table", css_classes=["kpi-site-kpi-table"], ) try: site_kpi_table.text_align = { "status": "center", "bad_days_recent": "center", "max_streak_recent": "center", "baseline_median": "right", "recent_median": "right", } site_kpi_table.widths = { "status": 170, "KPI": 260, "bad_days_recent": 140, "max_streak_recent": 160, } site_kpi_table.formatters = { "status": pn.io.JSCode( """ function format(cell, formatterParams, onRendered){ var v = cell.getValue(); var s = (v === null || v === undefined) ? '' : String(v); var bg = '#455a64'; var rowClass = ''; if(s === 'PERSISTENT_DEGRADED') bg = '#b71c1c'; else if(s === 'DEGRADED') bg = '#e53935'; else if(s === 'NOTIFY') bg = '#f9a825'; else if(s === 'RESOLVED') bg = '#2e7d32'; else if(s === 'NOTIFY_RESOLVED') bg = '#2e7d32'; else if(s === 'NO_DATA') bg = '#616161'; else if(s === 'OK') bg = '#1565c0'; if(s === 'PERSISTENT_DEGRADED') rowClass = 'kpi-row--persistent_degraded'; else if(s === 'DEGRADED') rowClass = 'kpi-row--degraded'; else if(s === 'NOTIFY') rowClass = 'kpi-row--notify'; else if(s === 'RESOLVED') rowClass = 'kpi-row--resolved'; else if(s === 'NOTIFY_RESOLVED') rowClass = 'kpi-row--notify_resolved'; else if(s === 'NO_DATA') rowClass = 'kpi-row--no_data'; else if(s === 'OK') rowClass = 'kpi-row--ok'; onRendered(function(){ try { var rowEl = cell.getRow().getElement(); if(rowEl){ var classesToRemove = [ 'kpi-row--persistent_degraded', 'kpi-row--degraded', 'kpi-row--notify', 'kpi-row--resolved', 'kpi-row--notify_resolved', 'kpi-row--no_data', 'kpi-row--ok' ]; classesToRemove.forEach(function(c){ try { rowEl.classList.remove(c); } catch (e) {} }); if(rowClass){ try { rowEl.classList.add(rowClass); } catch (e) {} } } var cellEl = cell.getElement(); if(cellEl){ cellEl.style.padding = '0px'; cellEl.style.textAlign = 'center'; cellEl.innerHTML = "" + s + ""; } } catch (e) { } }); return s; } """ ) } except Exception: # noqa: BLE001 pass _set_tabulator_pagination(site_summary_table, page_size=50) _set_tabulator_pagination(multirat_summary_table, page_size=50) _set_tabulator_pagination(top_anomalies_table, page_size=50) _set_tabulator_pagination(complaint_multirat_summary_table, page_size=50) _set_tabulator_pagination(complaint_top_anomalies_table, page_size=50) _set_tabulator_pagination(ops_queue_table, page_size=50) _set_tabulator_pagination(snapshot_rules_table, page_size=50) _set_tabulator_pagination(snapshot_multirat_table, page_size=50) _set_tabulator_pagination(snapshot_top_table, page_size=50) _set_tabulator_pagination(delta_table, page_size=50) _set_tabulator_pagination(site_kpi_table, page_size=50) trend_plot_pane = pn.pane.Plotly(sizing_mode="stretch_both", config=PLOTLY_CONFIG) heatmap_plot_pane = pn.pane.Plotly(sizing_mode="stretch_both", config=PLOTLY_CONFIG) hist_plot_pane = pn.pane.Plotly(sizing_mode="stretch_both", config=PLOTLY_CONFIG) map_pane = pn.pane.Plotly(sizing_mode="stretch_both", config=PLOTLY_CONFIG) map_message = pn.pane.Alert("", alert_type="info", visible=False) _map_search_city: str = "" _map_center_override: dict | None = None _map_force_fit: bool = False corr_plot_pane = pn.pane.Plotly(sizing_mode="stretch_both", config=PLOTLY_CONFIG) corr_message = pn.pane.Alert("", alert_type="info", visible=False) def _coords_by_site() -> pd.DataFrame: rows = [] for _, df in (current_daily_by_rat or {}).items(): if not isinstance(df, pd.DataFrame) or df.empty: continue cols = [ c for c in ["site_code", "Latitude", "Longitude", "City"] if c in df.columns ] if "site_code" not in cols: continue tmp = df[cols].copy() tmp["site_code"] = pd.to_numeric(tmp["site_code"], errors="coerce") tmp = tmp.dropna(subset=["site_code"]).copy() tmp["site_code"] = tmp["site_code"].astype(int) tmp = tmp.drop_duplicates(subset=["site_code"]).copy() rows.append(tmp) if not rows: return pd.DataFrame(columns=["site_code", "Latitude", "Longitude", "City"]) out = pd.concat(rows, ignore_index=True) out = out.drop_duplicates(subset=["site_code"]).copy() return out def _dominant_status_by_site() -> pd.DataFrame: s = ( current_status_df if isinstance(current_status_df, pd.DataFrame) else pd.DataFrame() ) if ( s is None or s.empty or "site_code" not in s.columns or "status" not in s.columns ): return pd.DataFrame(columns=["site_code", "dominant_status"]) tmp = s[["site_code", "status"]].copy() tmp["site_code"] = pd.to_numeric(tmp["site_code"], errors="coerce") tmp = tmp.dropna(subset=["site_code"]).copy() if tmp.empty: return pd.DataFrame(columns=["site_code", "dominant_status"]) tmp["site_code"] = tmp["site_code"].astype(int) tmp["status"] = tmp["status"].astype(str).str.strip().str.upper() prio = { "PERSISTENT_DEGRADED": 6, "DEGRADED": 5, "NOTIFY": 4, "RESOLVED": 3, "NOTIFY_RESOLVED": 2, "OK": 1, "NO_DATA": 0, } tmp["_prio"] = tmp["status"].map(prio).fillna(2).astype(int) idx = tmp.groupby("site_code")["_prio"].idxmax() out = tmp.loc[idx, ["site_code", "status"]].rename( columns={"status": "dominant_status"} ) return out def _map_df() -> pd.DataFrame: base = ( current_multirat_df if isinstance(current_multirat_df, pd.DataFrame) else pd.DataFrame() ) if base is None or base.empty: base = ( current_multirat_raw if isinstance(current_multirat_raw, pd.DataFrame) else pd.DataFrame() ) if base is None or base.empty or "site_code" not in base.columns: return pd.DataFrame() base = base.copy() base["site_code"] = pd.to_numeric(base["site_code"], errors="coerce") base = base.dropna(subset=["site_code"]).copy() base["site_code"] = base["site_code"].astype(int) coords = _coords_by_site() if coords is None or coords.empty: return pd.DataFrame() out = pd.merge(base, coords, on="site_code", how="left", suffixes=("", "_coord")) if "City" not in out.columns and "City_coord" in out.columns: out["City"] = out["City_coord"] if "City" in out.columns and "City_coord" in out.columns: out["City"] = out["City"].where(out["City"].notna(), out["City_coord"]) dom = _dominant_status_by_site() if dom is not None and not dom.empty and "site_code" in dom.columns: out = pd.merge(out, dom, on="site_code", how="left") if "dominant_status" not in out.columns: out["dominant_status"] = "NO_DATA" out["dominant_status"] = ( out["dominant_status"] .astype(str) .str.strip() .str.upper() .replace({"": "NO_DATA"}) ) out["dominant_status"] = out["dominant_status"].where( out["dominant_status"].notna(), "NO_DATA" ) return out def _apply_map_filters(df_map: pd.DataFrame) -> pd.DataFrame: if df_map is None or df_map.empty: return pd.DataFrame() out = df_map.copy() if _map_search_city: if "City" in out.columns: q = str(_map_search_city).strip().lower() out = out[ out["City"].astype(str).str.lower().str.contains(q, na=False) ].copy() else: out = out.iloc[0:0].copy() rat_imp = str(map_impacted_rat.value or "Any") if rat_imp and rat_imp != "Any": pcol = f"persistent_{rat_imp}" dcol = f"degraded_{rat_imp}" if pcol in out.columns or dcol in out.columns: p = ( pd.to_numeric(out[pcol], errors="coerce").fillna(0) if pcol in out.columns else 0 ) d = ( pd.to_numeric(out[dcol], errors="coerce").fillna(0) if dcol in out.columns else 0 ) out = out[(p.astype(float) > 0) | (d.astype(float) > 0)].copy() else: out = out.iloc[0:0].copy() st = str(map_status_filter.value or "Any") if st and st != "Any" and "dominant_status" in out.columns: out = out[out["dominant_status"].astype(str).str.upper() == st].copy() score_col = ( "criticality_score_weighted" if "criticality_score_weighted" in out.columns else "criticality_score" ) if score_col in out.columns: out["_score"] = ( pd.to_numeric(out[score_col], errors="coerce").fillna(0).astype(float) ) else: out["_score"] = 0.0 topn = _coerce_int(map_top_n.value) if topn is not None and int(topn) > 0: out = out.sort_values(by=["_score"], ascending=False).head(int(topn)).copy() return out def _build_map_fig(df_map: pd.DataFrame) -> go.Figure | None: if df_map is None or df_map.empty: return None if "Latitude" not in df_map.columns or "Longitude" not in df_map.columns: return None tmp = df_map.copy() tmp["Latitude"] = pd.to_numeric(tmp["Latitude"], errors="coerce") tmp["Longitude"] = pd.to_numeric(tmp["Longitude"], errors="coerce") tmp = tmp.dropna(subset=["Latitude", "Longitude"]).copy() if tmp.empty: return None score_col = ( "criticality_score_weighted" if "criticality_score_weighted" in tmp.columns else "criticality_score" ) if score_col not in tmp.columns: score_col = None if score_col is not None: tmp["_score"] = ( pd.to_numeric(tmp[score_col], errors="coerce").fillna(0).astype(float) ) else: tmp["_score"] = 0.0 size = (tmp["_score"].clip(lower=0) + 1.0).pow(0.5) * 6.0 tmp["_size"] = size.clip(lower=6, upper=28) dom_order = [ "PERSISTENT_DEGRADED", "DEGRADED", "NOTIFY", "RESOLVED", "NOTIFY_RESOLVED", "OK", "NO_DATA", ] color_map = { "PERSISTENT_DEGRADED": "#8b0000", "DEGRADED": "#ff6f00", "NOTIFY": "#f9a825", "RESOLVED": "#2e7d32", "NOTIFY_RESOLVED": "#2e7d32", "OK": "#1565c0", "NO_DATA": "#616161", } hover_cols = [ c for c in [ "site_code", "City", "dominant_status", score_col, "impacted_rats", "persistent_kpis_total", "degraded_kpis_total", ] if c and c in tmp.columns ] fig = px.scatter_mapbox( tmp, lat="Latitude", lon="Longitude", color="dominant_status" if "dominant_status" in tmp.columns else None, size="_size", size_max=28, zoom=4, hover_data=hover_cols, custom_data=["site_code"], category_orders={"dominant_status": dom_order}, color_discrete_map=color_map, ) try: if bool(map_show_site_codes.value): max_labels = int(_coerce_int(map_max_labels.value) or 0) if max_labels <= 0: tmp["_label"] = "" elif len(tmp) <= max_labels: tmp["_label"] = tmp["site_code"].astype(str) else: top_idx = ( tmp.sort_values(by=["_score"], ascending=False) .head(max_labels) .index ) tmp["_label"] = "" tmp.loc[top_idx, "_label"] = tmp.loc[top_idx, "site_code"].astype(str) fig.update_traces( mode="markers+text", text=tmp["_label"], textposition="middle center", textfont=dict(color="white", size=10), ) except Exception: # noqa: BLE001 pass fig.update_layout( mapbox_style="open-street-map", margin=dict(l=10, r=10, t=10, b=10), height=700, ) global _map_center_override, _map_force_fit do_fit = bool(map_auto_fit.value) or bool(_map_force_fit) if ( _map_center_override and "lat" in _map_center_override and "lon" in _map_center_override ): try: fig.update_layout( mapbox_center=dict( lat=float(_map_center_override["lat"]), lon=float(_map_center_override["lon"]), ), mapbox_zoom=float(_map_center_override.get("zoom", 9.0)), ) do_fit = False except Exception: # noqa: BLE001 pass if do_fit: try: lat_min = float(tmp["Latitude"].min()) lat_max = float(tmp["Latitude"].max()) lon_min = float(tmp["Longitude"].min()) lon_max = float(tmp["Longitude"].max()) lat_pad = max(0.01, abs(lat_max - lat_min) * 0.1) lon_pad = max(0.01, abs(lon_max - lon_min) * 0.1) fig.update_layout( mapbox=dict( bounds=dict( west=lon_min - lon_pad, east=lon_max + lon_pad, south=lat_min - lat_pad, north=lat_max + lat_pad, ) ) ) except Exception: # noqa: BLE001 pass _map_force_fit = False return fig def _refresh_map_view(event=None) -> None: df_map = _map_df() if df_map is None or df_map.empty: map_pane.object = None map_message.alert_type = "info" map_message.object = "Run health check to display the map (needs Multi-RAT results + coordinates)." map_message.visible = True return df_map = _apply_map_filters(df_map) if df_map is None or df_map.empty: map_pane.object = None map_message.alert_type = "warning" map_message.object = "No sites to display after Map filters/search." map_message.visible = True return fig = _build_map_fig(df_map) if fig is None: map_pane.object = None map_message.alert_type = "warning" map_message.object = ( "No geo coordinates available (Latitude/Longitude) for current sites." ) map_message.visible = True return map_message.visible = False map_pane.object = fig def _on_map_fit_click(event=None) -> None: global _map_force_fit, _map_center_override _map_center_override = None _map_force_fit = True _refresh_map_view() def _on_map_search(event=None) -> None: global _map_search_city, _map_center_override q = str(map_search_input.value or "").strip() _map_search_city = "" _map_center_override = None map_search_results.options = {} map_search_results.value = None map_search_results.visible = False if not q: _refresh_map_view() return code = _coerce_int(q) if code is not None: try: df_map = _map_df() if isinstance(df_map, pd.DataFrame) and not df_map.empty: m = df_map[df_map["site_code"] == int(code)] if not m.empty and "Latitude" in m.columns and "Longitude" in m.columns: lat = pd.to_numeric(m.iloc[0].get("Latitude"), errors="coerce") lon = pd.to_numeric(m.iloc[0].get("Longitude"), errors="coerce") if pd.notna(lat) and pd.notna(lon): _map_center_override = { "lat": float(lat), "lon": float(lon), "zoom": 10.5, } except Exception: # noqa: BLE001 _map_center_override = None _apply_drilldown_selection(site_code=int(code)) _refresh_map_view() return _map_search_city = q try: df_map = _map_df() df_map = _apply_map_filters(df_map) if ( isinstance(df_map, pd.DataFrame) and not df_map.empty and "City" in df_map.columns ): m = df_map[ df_map["City"].astype(str).str.lower().str.contains(q.lower(), na=False) ].copy() if not m.empty: score_col = ( "criticality_score_weighted" if "criticality_score_weighted" in m.columns else "criticality_score" ) if score_col in m.columns: m["_score"] = pd.to_numeric(m[score_col], errors="coerce").fillna(0) m = m.sort_values(by=["_score"], ascending=False) m = m.head(20) opts = {} for _, r in m.iterrows(): sc = _coerce_int(r.get("site_code")) if sc is None: continue city = str(r.get("City") or "").strip() label = f"{int(sc)} - {city}" if city else f"{int(sc)}" opts[label] = int(sc) map_search_results.options = opts map_search_results.visible = bool(opts) except Exception: # noqa: BLE001 map_search_results.options = {} map_search_results.visible = False _refresh_map_view() def _on_map_search_pick(event=None) -> None: global _map_center_override code = _coerce_int(getattr(event, "new", None)) if code is None: return try: df_map = _map_df() if isinstance(df_map, pd.DataFrame) and not df_map.empty: m = df_map[df_map["site_code"] == int(code)] if not m.empty and "Latitude" in m.columns and "Longitude" in m.columns: lat = pd.to_numeric(m.iloc[0].get("Latitude"), errors="coerce") lon = pd.to_numeric(m.iloc[0].get("Longitude"), errors="coerce") if pd.notna(lat) and pd.notna(lon): _map_center_override = { "lat": float(lat), "lon": float(lon), "zoom": 10.5, } except Exception: # noqa: BLE001 _map_center_override = None _apply_drilldown_selection(site_code=int(code)) _refresh_map_view() def _on_map_clear_search(event=None) -> None: global _map_search_city, _map_center_override _map_search_city = "" _map_center_override = None map_search_input.value = "" map_search_results.options = {} map_search_results.value = None map_search_results.visible = False _refresh_map_view() def _on_map_click(event) -> None: try: cd = event.new if not isinstance(cd, dict): return pts = cd.get("points", []) if not pts: return p0 = pts[0] custom = p0.get("customdata", None) if not custom: return site_code = custom[0] if isinstance(custom, (list, tuple)) else custom site_code_int = _coerce_int(site_code) if site_code_int is None: return best_rat = rat_select.value try: row = None if ( isinstance(current_multirat_raw, pd.DataFrame) and not current_multirat_raw.empty ): sel = current_multirat_raw[ current_multirat_raw["site_code"] == int(site_code_int) ] row = sel.iloc[0].to_dict() if not sel.empty else None if row: best_score = -1 for r in list(rat_select.options or []): p = pd.to_numeric(row.get(f"persistent_{r}", 0), errors="coerce") d = pd.to_numeric(row.get(f"degraded_{r}", 0), errors="coerce") p = int(p) if pd.notna(p) else 0 d = int(d) if pd.notna(d) else 0 score = p * 2 + d if score > best_score: best_score = score best_rat = r except Exception: # noqa: BLE001 best_rat = rat_select.value _apply_drilldown_selection(site_code=site_code_int, rat=best_rat) try: status_pane.alert_type = "primary" status_pane.object = f"Drill-down: site {int(site_code_int)} | {best_rat}" except Exception: # noqa: BLE001 pass except Exception: # noqa: BLE001 return def _compute_delta_df() -> pd.DataFrame: if not isinstance(current_snapshot, dict) or not current_snapshot: return pd.DataFrame() snap_rows = current_snapshot.get("multirat_df", []) if not isinstance(snap_rows, list) or not snap_rows: return pd.DataFrame() snap = pd.DataFrame(snap_rows) if snap.empty or "site_code" not in snap.columns: return pd.DataFrame() cur = ( current_multirat_raw if isinstance(current_multirat_raw, pd.DataFrame) else pd.DataFrame() ) if cur.empty or "site_code" not in cur.columns: return pd.DataFrame() snap["site_code"] = pd.to_numeric(snap["site_code"], errors="coerce") cur["site_code"] = pd.to_numeric(cur["site_code"], errors="coerce") snap = snap.dropna(subset=["site_code"]).copy() cur = cur.dropna(subset=["site_code"]).copy() snap["site_code"] = snap["site_code"].astype(int) cur["site_code"] = cur["site_code"].astype(int) left = snap.set_index("site_code") right = cur.set_index("site_code") score_col = ( "criticality_score_weighted" if "criticality_score_weighted" in right.columns else "criticality_score" ) if score_col not in right.columns: score_col = ( "criticality_score" if "criticality_score" in right.columns else None ) key_cols = [ "City", "is_complaint_site", "impacted_rats", "persistent_kpis_total", "degraded_kpis_total", "resolved_kpis_total", "criticality_score", "criticality_score_weighted", "traffic_gb_total", ] all_sites = sorted(set(left.index.tolist()) | set(right.index.tolist())) rows = [] for sc in all_sites: srow = left.loc[sc] if sc in left.index else None crow = right.loc[sc] if sc in right.index else None def _get(row, col): try: if row is None: return None if isinstance(row, pd.DataFrame): row = row.iloc[0] return row.get(col, None) except Exception: # noqa: BLE001 return None snap_score = _get(srow, score_col) if score_col else None cur_score = _get(crow, score_col) if score_col else None try: snap_score_f = ( float(snap_score) if snap_score is not None and pd.notna(snap_score) else 0.0 ) except Exception: # noqa: BLE001 snap_score_f = 0.0 try: cur_score_f = ( float(cur_score) if cur_score is not None and pd.notna(cur_score) else 0.0 ) except Exception: # noqa: BLE001 cur_score_f = 0.0 if sc not in left.index: change_type = "NEW" elif sc not in right.index: change_type = "MISSING" else: if cur_score_f > snap_score_f: change_type = "SEVERITY_UP" elif cur_score_f < snap_score_f: change_type = "SEVERITY_DOWN" else: change_type = "UNCHANGED" row_out = { "site_code": int(sc), "change_type": change_type, "score_snapshot": int(round(snap_score_f)), "score_current": int(round(cur_score_f)), "score_delta": int(round(cur_score_f - snap_score_f)), } for c in key_cols: row_out[f"snapshot_{c}"] = _get(srow, c) row_out[f"current_{c}"] = _get(crow, c) rows.append(row_out) out = pd.DataFrame(rows) if out.empty: return out try: q = (city_filter.value or "").strip() if q: city_series = out.get("current_City") if city_series is None: city_series = out.get("snapshot_City") if city_series is not None: out = out[ city_series.astype(str).str.contains(q, case=False, na=False) ].copy() except Exception: # noqa: BLE001 pass try: mc = int(min_criticality.value) if mc > 0 and "score_current" in out.columns: out = out[ pd.to_numeric(out["score_current"], errors="coerce").fillna(0) >= mc ] except Exception: # noqa: BLE001 pass order = { "SEVERITY_UP": 0, "NEW": 1, "SEVERITY_DOWN": 2, "UNCHANGED": 3, "MISSING": 4, } try: out["_order"] = out["change_type"].map(order).fillna(99).astype(int) out = out.sort_values( by=["_order", "score_delta"], ascending=[True, False] ).drop(columns=["_order"], errors="ignore") except Exception: # noqa: BLE001 out = out.sort_values( by=["change_type", "score_delta"], ascending=[True, False] ) return out def _refresh_delta_view(event=None) -> None: global current_delta_df try: current_delta_df = _compute_delta_df() except Exception: # noqa: BLE001 current_delta_df = pd.DataFrame() delta_table.value = current_delta_df export_button = pn.widgets.FileDownload( label="Download KPI Health Check report", filename="KPI_Health_Check_Report.xlsx", button_type="primary", ) export_include_raw_data = pn.widgets.Checkbox( name="Include raw KPI data (slow)", value=True, ) perf_profiling = pn.widgets.Checkbox( name="Perf profiling", value=False, ) alert_pack_button = pn.widgets.FileDownload( label="Download Alert Pack", filename="KPI_Alert_Pack.xlsx", button_type="primary", ) def _filtered_daily(df: pd.DataFrame) -> pd.DataFrame: if df is None or df.empty: return pd.DataFrame() if ( analysis_range.value and len(analysis_range.value) == 2 and analysis_range.value[0] and analysis_range.value[1] ): start, end = analysis_range.value mask = (df["date_only"] >= start) & (df["date_only"] <= end) return df[mask].copy() return df def _validate_inputs() -> tuple[list[str], list[str]]: errors: list[str] = [] warnings: list[str] = [] bd = _coerce_int(baseline_days.value) rd = _coerce_int(recent_days.value) mcd = _coerce_int(min_consecutive_days.value) thr = _coerce_float(rel_threshold_pct.value) if bd is None or bd < 1: errors.append("Baseline window (days) must be >= 1") if rd is None or rd < 1: errors.append("Recent window (days) must be >= 1") if mcd is None or mcd < 1: errors.append("Min consecutive bad days (persistent) must be >= 1") if thr is None: errors.append("Relative change threshold (%) is invalid") elif thr < 0: errors.append("Relative change threshold (%) must be >= 0") g = str(granularity_select.value or "Daily").strip().lower() is_hourly = g.startswith("hour") or g.startswith("h") if is_hourly and (current_daily_by_rat or {}) and rd is not None and bd is not None: try: recent_periods = int(rd) * 24 baseline_periods = int(bd) * 24 except Exception: # noqa: BLE001 recent_periods = 0 baseline_periods = 0 if recent_periods > 0: for rat_name, df0 in (current_daily_by_rat or {}).items(): if df0 is None or df0.empty or "period_start" not in df0.columns: continue df1 = _filtered_daily(df0) if df1 is None or df1.empty or "period_start" not in df1.columns: continue t = pd.to_datetime(df1["period_start"], errors="coerce").dropna() if t.empty: continue end_dt = pd.Timestamp(t.max()).floor("h") min_dt = pd.Timestamp(t.min()).floor("h") baseline_end = end_dt - timedelta(hours=int(recent_periods)) if min_dt > baseline_end: warnings.append( f"{rat_name}: Hourly baseline is empty (no data before the recent window). " f"Export more hourly history or reduce Recent window (days)." ) elif baseline_periods > 0: try: available_baseline_hours = int( ((baseline_end - min_dt) / timedelta(hours=1)) + 1 ) except Exception: # noqa: BLE001 available_baseline_hours = 0 if 0 < available_baseline_hours < int(baseline_periods): warnings.append( f"{rat_name}: Hourly baseline window is truncated " f"({available_baseline_hours}h available < {int(baseline_periods)}h requested)." ) ar = analysis_range.value if ar and isinstance(ar, (list, tuple)) and len(ar) == 2 and ar[0] and ar[1]: try: if ar[0] > ar[1]: errors.append("Analysis date range is invalid (start > end)") except Exception: # noqa: BLE001 pass has_overlap = False for df in (current_daily_by_rat or {}).values(): if df is None or df.empty or "date_only" not in df.columns: continue try: dmin = df["date_only"].min() dmax = df["date_only"].max() if dmin is None or dmax is None: continue if not (ar[1] < dmin or ar[0] > dmax): has_overlap = True break except Exception: # noqa: BLE001 continue if (current_daily_by_rat or {}) and not has_overlap: warnings.append("No data falls within the selected analysis date range") code_int = None try: code_int = _coerce_int(site_select.value) if ( code_int is None and isinstance(site_select.options, dict) and site_select.value in site_select.options ): code_int = _coerce_int(site_select.options.get(site_select.value)) except Exception: # noqa: BLE001 code_int = None if ( (current_daily_by_rat or {}) and site_select.value is not None and code_int is None ): warnings.append("Selected site is not a valid site code") try: rat = rat_select.value df = current_daily_by_rat.get(rat) if rat else None if isinstance(df, pd.DataFrame) and not df.empty and "site_code" in df.columns: n_sites = int( pd.to_numeric(df["site_code"], errors="coerce").dropna().nunique() ) if n_sites <= 1: warnings.append( f"{rat} dataset contains only {n_sites} site after normalization" ) if code_int is not None: available = set( pd.to_numeric(df["site_code"], errors="coerce") .dropna() .astype(int) .tolist() ) if available and code_int not in available: warnings.append(f"No {rat} data for selected site ({code_int})") except Exception: # noqa: BLE001 pass return errors, warnings def _refresh_validation_state() -> None: errors, warnings = _validate_inputs() msgs: list[str] = [] msgs.extend([f"ERROR: {m}" for m in errors]) msgs.extend([f"WARN: {m}" for m in warnings]) if msgs: validation_pane.object = "\n".join(msgs) validation_pane.alert_type = "danger" if errors else "warning" validation_pane.visible = True else: validation_pane.object = "" validation_pane.alert_type = "success" validation_pane.visible = bool(errors or warnings) try: run_button.disabled = bool(errors) or not has_data except Exception: # noqa: BLE001 pass try: export_button.disabled = bool(errors) or not has_results except Exception: # noqa: BLE001 pass try: rat = rat_select.value daily = current_daily_by_rat.get(rat) if rat else None code_int = _coerce_int(site_select.value) if ( code_int is None and isinstance(site_select.options, dict) and site_select.value in site_select.options ): code_int = _coerce_int(site_select.options.get(site_select.value)) if ( isinstance(daily, pd.DataFrame) and not daily.empty and code_int is not None and "site_code" in daily.columns ): available = set( pd.to_numeric(daily["site_code"], errors="coerce") .dropna() .astype(int) .tolist() ) has_site = bool(available) and int(code_int) in available else: has_site = False drilldown_export_button.disabled = ( bool(errors) or (daily is None) or (daily is not None and daily.empty) or (code_int is None) or (not has_site) ) except Exception: # noqa: BLE001 pass def _update_site_options() -> None: global _updating_drilldown all_sites = [] for df in current_daily_by_rat.values(): if df is None or df.empty: continue if "site_code" not in df.columns: continue cols = [c for c in ["site_code", "City"] if c in df.columns] all_sites.append(df[cols].drop_duplicates("site_code")) if not all_sites: site_select.options = {} site_select.value = None return sites_df = pd.concat(all_sites, ignore_index=True).drop_duplicates("site_code") if "City" not in sites_df.columns: sites_df["City"] = pd.NA sites_df = sites_df.sort_values(by=["City", "site_code"], na_position="last") opts: dict[str, int] = {} for _, row in sites_df.iterrows(): label = ( f"{row['City']}_{row['site_code']}" if pd.notna(row.get("City")) else str(row["site_code"]) ) opts[str(label)] = int(row["site_code"]) _updating_drilldown = True try: site_select.options = opts if opts and site_select.value not in opts.values(): site_select.value = next(iter(opts.values())) finally: _updating_drilldown = False def _update_kpi_options() -> None: if _applying_profile or _loading_datasets: return global _updating_drilldown rat = rat_select.value df = current_daily_by_rat.get(rat) if df is None or df.empty: kpi_select.options = [] kpi_select.value = None return kpis = [ c for c in df.columns if c not in { "site_code", "date_only", "period_start", "Longitude", "Latitude", "City", "RAT", } ] kpis = sorted([str(c) for c in kpis]) groups = get_kpis_by_group(kpis) group_options = ["All (selected KPIs)"] + sorted( [g for g in groups.keys() if g != "Other"] ) if "Other" in groups: group_options.append("Other") _updating_drilldown = True try: kpi_group_select.options = group_options if kpi_group_select.value not in group_options: kpi_group_select.value = group_options[0] filtered_kpis = filter_kpis( kpis, kpi_group_select.value, mode=kpi_group_mode.value ) kpi_select.options = filtered_kpis if filtered_kpis and kpi_select.value not in filtered_kpis: kpi_select.value = filtered_kpis[0] kpi_compare_select.options = kpis cur = list(kpi_compare_select.value or []) cur2 = [str(x) for x in cur if str(x) in kpis] if cur2 != cur: kpi_compare_select.value = cur2 finally: _updating_drilldown = False def _update_site_view(event=None) -> None: if _applying_profile or _loading_datasets or _updating_drilldown: return code = site_select.value rat = rat_select.value kpi = kpi_select.value if code is None or rat is None: site_kpi_table.value = pd.DataFrame() trend_plot_pane.object = None heatmap_plot_pane.object = None hist_plot_pane.object = None corr_plot_pane.object = None corr_message.visible = False return daily = current_daily_by_rat.get(rat) if daily is None or daily.empty: trend_plot_pane.object = None heatmap_plot_pane.object = None hist_plot_pane.object = None corr_plot_pane.object = None corr_message.visible = False return d = _filtered_daily(daily) if ( d is None or d.empty or "site_code" not in d.columns or "date_only" not in d.columns ): trend_plot_pane.object = None heatmap_plot_pane.object = None hist_plot_pane.object = None corr_plot_pane.object = None corr_message.visible = False return try: available_sites = set( pd.to_numeric(d["site_code"], errors="coerce").dropna().astype(int).tolist() ) except Exception: # noqa: BLE001 available_sites = set() code_int = None if code is not None: if hasattr(site_select, "options") and isinstance(site_select.options, dict): if code in site_select.options: code_int = site_select.options[code] elif code in site_select.options.values(): code_int = code if code_int is None: try: code_int = int(code) except Exception: code_int = _coerce_int(code) if available_sites and code_int is not None and code_int not in available_sites: site_kpi_table.value = pd.DataFrame() trend_plot_pane.object = None heatmap_plot_pane.object = None hist_plot_pane.object = None corr_plot_pane.object = None corr_message.visible = False _refresh_validation_state() return if code_int is None: site_kpi_table.value = pd.DataFrame() trend_plot_pane.object = None heatmap_plot_pane.object = None hist_plot_pane.object = None corr_plot_pane.object = None corr_message.visible = False return code = code_int status_df = ( current_status_df if isinstance(current_status_df, pd.DataFrame) else pd.DataFrame() ) if status_df is None or status_df.empty: site_df = pd.DataFrame() else: site_df = status_df[ (status_df["site_code"] == int(code_int if code_int is not None else 0)) & (status_df["RAT"] == rat) ].copy() site_kpi_table.value = site_df if not kpi or kpi not in d.columns: candidate_kpis = [ c for c in d.columns if c not in { "site_code", "date_only", "period_start", "Longitude", "Latitude", "City", "RAT", } ] candidate_kpis = sorted([str(c) for c in candidate_kpis]) if not candidate_kpis: trend_plot_pane.object = None heatmap_plot_pane.object = None hist_plot_pane.object = None corr_plot_pane.object = None corr_message.visible = False return new_kpi = candidate_kpis[0] _set_widget_value(kpi_select, new_kpi) kpi = new_kpi g = str(granularity_select.value or "Daily").strip().lower() is_hourly = g.startswith("hour") or g.startswith("h") time_col = ( "period_start" if (is_hourly and "period_start" in d.columns) else "date_only" ) s = d[d["site_code"] == int(code_int)].copy().sort_values(time_col) if s.empty: trend_plot_pane.object = None heatmap_plot_pane.object = None hist_plot_pane.object = None corr_plot_pane.object = None corr_message.visible = False return cache_key = _drilldown_cache_key(int(code_int), str(rat), str(kpi)) cached = _drilldown_cache_get(cache_key) if cached is not None: try: ( trend_plot_pane.object, heatmap_plot_pane.object, hist_plot_pane.object, corr_plot_pane.object, corr_message.object, corr_message.alert_type, corr_message.visible, ) = cached except Exception: # noqa: BLE001 trend_plot_pane.object, heatmap_plot_pane.object, hist_plot_pane.object = ( cached ) return kpis_to_plot = [] selected = [str(x) for x in (kpi_compare_select.value or []) if str(x)] if kpi and str(kpi) not in selected: selected = [str(kpi)] + selected if "Add top" in str(kpi_group_mode.value): from_group = filter_kpis( d.columns.tolist(), kpi_group_select.value, mode="Top-N", top_n=12 ) for gk in from_group: if gk not in selected: selected.append(gk) kpis_to_plot = selected[:15] rules_df = ( rules_table.value if isinstance(rules_table.value, pd.DataFrame) else pd.DataFrame() ) relevant_rules = rules_df try: if ( isinstance(rules_df, pd.DataFrame) and not rules_df.empty and "RAT" in rules_df.columns ): relevant_rules = rules_df[rules_df["RAT"] == rat].copy() except Exception: # noqa: BLE001 relevant_rules = rules_df fig = build_drilldown_plot( df=d[d["site_code"] == int(code_int)], kpis=kpis_to_plot, rules_df=relevant_rules, highlight_bad_days=True, show_sla=bool(show_sla_toggle.value), site_code=code_int, rat=rat, main_kpi=str(kpi), baseline_days_n=int(_coerce_int(baseline_days.value) or 30), recent_days_n=int(_coerce_int(recent_days.value) or 7), rel_threshold_pct=float(_coerce_float(rel_threshold_pct.value) or 10.0), normalization=str(kpi_compare_norm.value or "None"), granularity=str(granularity_select.value or "Daily"), ) trend_plot_pane.object = fig kpis_for_heatmap = [] if ( isinstance(site_df, pd.DataFrame) and not site_df.empty and "KPI" in site_df.columns ): sev = { "PERSISTENT_DEGRADED": 4, "DEGRADED": 3, "NOTIFY": 2, "RESOLVED": 1, "NOTIFY_RESOLVED": 0, "OK": 0, "NO_DATA": -1, } tmp = site_df.copy() if "status" in tmp.columns: tmp["_sev"] = tmp["status"].map(sev).fillna(0).astype(int) tmp = tmp.sort_values(by=["_sev", "KPI"], ascending=[False, True]) kpis_for_heatmap = tmp["KPI"].astype(str).tolist()[:30] if not kpis_for_heatmap: try: if isinstance(rules_df, pd.DataFrame) and not rules_df.empty: kpis_for_heatmap = ( rules_df[rules_df["RAT"] == rat]["KPI"].astype(str).tolist()[:30] ) except Exception: # noqa: BLE001 kpis_for_heatmap = [] heatmap_plot_pane.object = _build_site_heatmap( d, rules_df, int(code_int), rat, kpis_for_heatmap ) hist_plot_pane.object = _build_baseline_recent_hist(d, int(code_int), str(kpi)) try: corr_kpis = [str(x) for x in (kpi_compare_select.value or []) if str(x)] if len(corr_kpis) < 2: corr_kpis = [str(x) for x in (kpis_to_plot or []) if str(x)] corr_kpis = [c for c in corr_kpis if c in d.columns] corr_kpis = corr_kpis[:20] df_corr = d try: windows = _compute_site_windows(d) if windows is not None: baseline_start, baseline_end, recent_start, recent_end = windows w = str(corr_window_select.value or "") if w.startswith("Recent"): df_corr = d[ ( pd.to_datetime(d[time_col], errors="coerce") >= pd.to_datetime(recent_start) ) & ( pd.to_datetime(d[time_col], errors="coerce") <= pd.to_datetime(recent_end) ) ].copy() elif w.startswith("Baseline"): df_corr = d[ ( pd.to_datetime(d[time_col], errors="coerce") >= pd.to_datetime(baseline_start) ) & ( pd.to_datetime(d[time_col], errors="coerce") <= pd.to_datetime(baseline_end) ) ].copy() except Exception: # noqa: BLE001 df_corr = d corr_fig = _build_corr_heatmap(df_corr, int(code_int), corr_kpis) if corr_fig is None: corr_plot_pane.object = None corr_message.alert_type = "info" corr_message.object = "Correlation needs at least 2 KPIs with enough samples for the selected site." corr_message.visible = True else: corr_message.visible = False corr_plot_pane.object = corr_fig except Exception: # noqa: BLE001 corr_plot_pane.object = None corr_message.alert_type = "warning" corr_message.object = "Unable to compute correlation." corr_message.visible = True try: drilldown_export_button.filename = ( f"KPI_Drilldown_{rat}_site_{int(code_int)}.xlsx" ) except Exception: # noqa: BLE001 pass _drilldown_cache_set( cache_key, ( trend_plot_pane.object, heatmap_plot_pane.object, hist_plot_pane.object, corr_plot_pane.object, corr_message.object, corr_message.alert_type, corr_message.visible, ), ) def _apply_city_filter(df: pd.DataFrame) -> pd.DataFrame: if df is None or df.empty: return pd.DataFrame() q = (city_filter.value or "").strip() if not q or "City" not in df.columns: return df return df[df["City"].astype(str).str.contains(q, case=False, na=False)].copy() def _extract_site_codes_from_any(df: pd.DataFrame) -> set[int]: if df is None or df.empty: return set() candidates = [ "site_code", "site", "site id", "site_id", "code", "code_site", "id", ] col = None cols_lower = {str(c).strip().lower(): c for c in df.columns} for c in candidates: if c in cols_lower: col = cols_lower[c] break if col is None: col = df.columns[0] raw = df[col].astype(str).str.strip() nums = pd.to_numeric(raw, errors="coerce") if nums.isna().all(): extracted = raw.str.extract(r"(\d{3,})")[0] nums = pd.to_numeric(extracted, errors="coerce") out = nums.dropna().astype(int).tolist() return set(out) def _load_complaint_sites_from_bytes(content: bytes, filename: str) -> set[int]: name = str(filename or "").lower() if name.endswith(".xlsx") or name.endswith(".xls"): try: df = pd.read_excel(io.BytesIO(content)) except Exception: # noqa: BLE001 return set() return _extract_site_codes_from_any(df) if name.endswith(".txt"): try: lines = content.decode("utf-8", errors="ignore").splitlines() except Exception: # noqa: BLE001 return set() df = pd.DataFrame({"site_code": lines}) return _extract_site_codes_from_any(df) try: df = pd.read_csv(io.BytesIO(content), sep=None, engine="python") except Exception: # noqa: BLE001 try: df = pd.read_csv(io.BytesIO(content)) except Exception: # noqa: BLE001 return set() return _extract_site_codes_from_any(df) def _refresh_complaint_sites(event=None) -> None: global complaint_sites if complaint_sites_file.value: complaint_sites = _load_complaint_sites_from_bytes( complaint_sites_file.value, complaint_sites_file.filename or "" ) else: default_paths = [ os.path.join(ROOT_DIR, "data", "complaint_sites.csv"), os.path.join(ROOT_DIR, "data", "complaint_sites.xlsx"), os.path.join(ROOT_DIR, "data", "sites_plaintes.csv"), os.path.join(ROOT_DIR, "data", "sites_plaintes.xlsx"), ] loaded = False for p in default_paths: if not os.path.exists(p): continue try: if p.lower().endswith((".xlsx", ".xls")): df = pd.read_excel(p) else: df = pd.read_csv(p, sep=None, engine="python") complaint_sites = _extract_site_codes_from_any(df) loaded = True break except Exception: # noqa: BLE001 continue if not loaded: complaint_sites = set() _apply_complaint_flags() _invalidate_drilldown_cache(healthcheck_changed=True) def _apply_complaint_flags() -> None: global current_multirat_raw, current_top_anomalies_raw if complaint_sites: if current_multirat_raw is not None and not current_multirat_raw.empty: current_multirat_raw["is_complaint_site"] = ( current_multirat_raw["site_code"].astype(int).isin(complaint_sites) ) if ( current_top_anomalies_raw is not None and not current_top_anomalies_raw.empty ): current_top_anomalies_raw["is_complaint_site"] = ( current_top_anomalies_raw["site_code"].astype(int).isin(complaint_sites) ) else: if current_multirat_raw is not None and not current_multirat_raw.empty: current_multirat_raw["is_complaint_site"] = False if ( current_top_anomalies_raw is not None and not current_top_anomalies_raw.empty ): current_top_anomalies_raw["is_complaint_site"] = False _refresh_filtered_results() def _infer_rule_row(rules_df: pd.DataFrame, rat: str, kpi: str) -> dict: if rules_df is None or rules_df.empty: return {} s = rules_df[(rules_df["RAT"] == rat) & (rules_df["KPI"] == kpi)] if s.empty: return {} return dict(s.iloc[0].to_dict()) def _compute_site_windows( daily_filtered: pd.DataFrame, ) -> ( tuple[ date | pd.Timestamp, date | pd.Timestamp, date | pd.Timestamp, date | pd.Timestamp, ] | None ): if daily_filtered is None or daily_filtered.empty: return None rd = _coerce_int(recent_days.value) bd = _coerce_int(baseline_days.value) if rd is None or rd < 1 or bd is None or bd < 1: return None g = str(granularity_select.value or "Daily").strip().lower() is_hourly = g.startswith("hour") or g.startswith("h") time_col = ( "period_start" if (is_hourly and "period_start" in daily_filtered.columns) else "date_only" ) if is_hourly and time_col == "period_start": try: end_dt = pd.to_datetime(daily_filtered[time_col], errors="coerce").max() if pd.isna(end_dt): return None end_dt = pd.Timestamp(end_dt).floor("h") except Exception: # noqa: BLE001 return None recent_periods = int(rd) * 24 baseline_periods = int(bd) * 24 step = timedelta(hours=1) recent_start, recent_end = window_bounds_period(end_dt, recent_periods, step) baseline_end = recent_start - step baseline_start, _ = window_bounds_period(baseline_end, baseline_periods, step) return baseline_start, baseline_end, recent_start, recent_end try: end_ts = pd.to_datetime(daily_filtered["date_only"], errors="coerce").max() if pd.isna(end_ts): return None end_date = end_ts.date() except Exception: # noqa: BLE001 return None recent_start, recent_end = window_bounds(end_date, int(rd)) baseline_end = recent_start - timedelta(days=1) baseline_start = baseline_end - timedelta(days=int(bd) - 1) return baseline_start, baseline_end, recent_start, recent_end def _build_site_heatmap( daily_filtered: pd.DataFrame, rules_df: pd.DataFrame, site_code: int, rat: str, kpis: list[str], ) -> go.Figure | None: if daily_filtered is None or daily_filtered.empty: return None windows = _compute_site_windows(daily_filtered) if windows is None: return None baseline_start, baseline_end, recent_start, recent_end = windows g = str(granularity_select.value or "Daily").strip().lower() is_hourly = g.startswith("hour") or g.startswith("h") time_col = ( "period_start" if (is_hourly and "period_start" in daily_filtered.columns) else "date_only" ) site_daily = daily_filtered[daily_filtered["site_code"] == int(site_code)].copy() if site_daily.empty: return None site_daily = site_daily.sort_values(time_col) dates = [] cur = recent_start step = timedelta(hours=1) if is_hourly else timedelta(days=1) while cur <= recent_end: dates.append(cur) cur = cur + step z = [] hover = [] y_labels = [] for kpi in kpis: if kpi not in site_daily.columns: continue rule = _infer_rule_row(rules_df, rat, kpi) direction = str(rule.get("direction", "higher_is_better")) policy = str(rule.get("policy", "enforce") or "enforce").strip().lower() sla_raw = rule.get("sla", None) try: sla_val = float(sla_raw) if pd.notna(sla_raw) else None except Exception: # noqa: BLE001 sla_val = None sla_eval = None if policy == "notify" else sla_val s = site_daily[[time_col, kpi]].dropna(subset=[kpi]) t = pd.to_datetime(s[time_col], errors="coerce") baseline_mask = (t >= pd.to_datetime(baseline_start)) & ( t <= pd.to_datetime(baseline_end) ) baseline = s.loc[baseline_mask, kpi].median() if baseline_mask.any() else np.nan baseline_val = float(baseline) if pd.notna(baseline) else None row_z = [] row_h = [] for d in dates: v_series = site_daily.loc[ pd.to_datetime(site_daily[time_col], errors="coerce") == pd.to_datetime(d), kpi, ] v = v_series.iloc[0] if not v_series.empty else np.nan if v is None or (isinstance(v, float) and np.isnan(v)): row_z.append(None) row_h.append(f"{kpi}
{d}: NO_DATA") continue bad = is_bad( float(v) if pd.notna(v) else None, baseline_val, direction, float(rel_threshold_pct.value), sla_eval, ) row_z.append(1 if bad else 0) row_h.append( f"{kpi}
{d}: {float(v):.3f}
baseline: {baseline_val if baseline_val is not None else 'NA'}
sla: {sla_val if sla_val is not None else 'NA'}" ) z.append(row_z) hover.append(row_h) y_labels.append(kpi) if not z: return None fig = go.Figure( data=[ go.Heatmap( z=z, x=[str(d) for d in dates], y=y_labels, colorscale=[[0.0, "#2ca02c"], [1.0, "#d62728"]], zmin=0, zmax=1, showscale=False, hovertext=hover, hovertemplate="%{hovertext}", ) ] ) fig.update_layout( template="plotly_white", title=f"{rat} - Site {int(site_code)} - Recent window heatmap", xaxis_title="period", yaxis_title="KPI", height=420, margin=dict(l=40, r=20, t=60, b=40), ) return fig def _build_baseline_recent_hist( daily_filtered: pd.DataFrame, site_code: int, kpi: str, ) -> go.Figure | None: if ( daily_filtered is None or daily_filtered.empty or not kpi or kpi not in daily_filtered.columns ): return None windows = _compute_site_windows(daily_filtered) if windows is None: return None baseline_start, baseline_end, recent_start, recent_end = windows g = str(granularity_select.value or "Daily").strip().lower() is_hourly = g.startswith("hour") or g.startswith("h") time_col = ( "period_start" if (is_hourly and "period_start" in daily_filtered.columns) else "date_only" ) site_daily = daily_filtered[daily_filtered["site_code"] == int(site_code)].copy() if site_daily.empty: return None s = site_daily[[time_col, kpi]].dropna(subset=[kpi]) t = pd.to_datetime(s[time_col], errors="coerce") baseline_mask = (t >= pd.to_datetime(baseline_start)) & ( t <= pd.to_datetime(baseline_end) ) recent_mask = (t >= pd.to_datetime(recent_start)) & ( t <= pd.to_datetime(recent_end) ) baseline_vals = ( pd.to_numeric(s.loc[baseline_mask, kpi], errors="coerce").dropna().astype(float) ) recent_vals = ( pd.to_numeric(s.loc[recent_mask, kpi], errors="coerce").dropna().astype(float) ) if baseline_vals.empty and recent_vals.empty: return None dfh = pd.concat( [ pd.DataFrame({"window": "baseline", "value": baseline_vals}), pd.DataFrame({"window": "recent", "value": recent_vals}), ], ignore_index=True, ) fig = px.histogram( dfh, x="value", color="window", barmode="overlay", opacity=0.6, ) fig.update_layout( template="plotly_white", title=f"{kpi} distribution (baseline vs recent)", xaxis_title=kpi, yaxis_title="count", height=420, margin=dict(l=40, r=20, t=60, b=40), ) return fig def _build_corr_heatmap( daily_filtered: pd.DataFrame, site_code: int, kpis: list[str], ) -> go.Figure | None: if daily_filtered is None or daily_filtered.empty: return None if not kpis: return None if "site_code" not in daily_filtered.columns: return None df_site = daily_filtered[daily_filtered["site_code"] == int(site_code)].copy() if df_site.empty: return None cols = [str(c) for c in kpis if str(c) in df_site.columns] cols = [ c for c in cols if c not in { "site_code", "date_only", "period_start", "Longitude", "Latitude", "City", "RAT", } ] cols = list(dict.fromkeys(cols)) if len(cols) < 2: return None x = df_site[cols].copy() for c in cols: x[c] = pd.to_numeric(x[c], errors="coerce") x = x.dropna(how="all") if x.empty or x.shape[0] < 5: return None x = x.dropna(axis=1, how="all") if x.shape[1] < 2: return None corr = x.corr(method="pearson") if corr is None or corr.empty: return None labels = [str(c) for c in corr.columns.tolist()] z = corr.values fig = go.Figure( data=[ go.Heatmap( z=z, x=labels, y=labels, zmin=-1, zmax=1, colorscale="RdBu", reversescale=True, hovertemplate="%{y} vs %{x}
corr=%{z:.3f}", ) ] ) fig.update_layout( template="plotly_white", title=f"Correlation heatmap (Pearson) - Site {int(site_code)}", height=520, margin=dict(l=60, r=20, t=60, b=60), ) return fig def _compute_site_traffic_gb(daily_by_rat: dict[str, pd.DataFrame]) -> pd.DataFrame: MB_PER_GB = 1024.0 rows = [] for rat, daily in daily_by_rat.items(): if daily is None or daily.empty: continue d = _filtered_daily(daily) if d.empty or "site_code" not in d.columns: continue cols: list[str] = [] if rat == "2G": for c in d.columns: key = str(c).lower().replace(" ", "_") if "traffic_ps" in key: cols.append(c) elif rat == "3G": if "Total_Data_Traffic" in d.columns: cols.append("Total_Data_Traffic") elif rat == "LTE": for c in d.columns: key = str(c).lower().replace(" ", "_") if "traffic_volume" in key and "gbytes" in key: cols.append(c) cols = [c for c in cols if c in d.columns] if not cols: continue traffic = pd.to_numeric( d[cols].sum(axis=1, skipna=True), errors="coerce" ).fillna(0) if rat == "2G": traffic = traffic / MB_PER_GB tmp = pd.DataFrame( { "site_code": d["site_code"].astype(int), "RAT": rat, "traffic_gb": traffic.astype(float), } ) tmp = tmp.groupby(["site_code", "RAT"], as_index=False)["traffic_gb"].sum() rows.append(tmp) if not rows: return pd.DataFrame(columns=["site_code", "traffic_gb_total"]) all_rows = pd.concat(rows, ignore_index=True) pivot = ( all_rows.pivot_table( index="site_code", columns="RAT", values="traffic_gb", aggfunc="sum", fill_value=0, ) .reset_index() .copy() ) traffic_cols = [] for rat in ["2G", "3G", "LTE", "TWAMP"]: if rat in pivot.columns: pivot = pivot.rename(columns={rat: f"traffic_gb_{rat}"}) traffic_cols.append(f"traffic_gb_{rat}") if traffic_cols: pivot["traffic_gb_total"] = ( pd.to_numeric(pivot[traffic_cols].sum(axis=1), errors="coerce") .fillna(0) .astype(float) ) else: pivot["traffic_gb_total"] = 0.0 return pivot def _refresh_filtered_results(event=None) -> None: global current_multirat_df, current_top_anomalies_df, current_ops_queue_df global current_export_bytes, current_alert_pack_bytes if _applying_profile or _loading_datasets: return if current_multirat_raw is not None and not current_multirat_raw.empty: m = _apply_city_filter(current_multirat_raw) if ( bool(only_complaint_sites.value) and "is_complaint_site" in m.columns and not m.empty ): m = m[m["is_complaint_site"] == True] # noqa: E712 score_col = ( "criticality_score_weighted" if "criticality_score_weighted" in m.columns else "criticality_score" ) if score_col in m.columns: m = m[ pd.to_numeric(m[score_col], errors="coerce").fillna(0) >= int(min_criticality.value) ] m = m.sort_values(by=[score_col], ascending=False) current_multirat_df = m multirat_summary_table.value = current_multirat_df else: current_multirat_df = pd.DataFrame() multirat_summary_table.value = current_multirat_df if current_multirat_raw is not None and not current_multirat_raw.empty: oq = _apply_city_filter(current_multirat_raw) oq_score_col = ( "criticality_score_weighted" if "criticality_score_weighted" in oq.columns else "criticality_score" ) if oq_score_col in oq.columns: oq = oq[ pd.to_numeric(oq[oq_score_col], errors="coerce").fillna(0) >= int(min_criticality.value) ] oq = oq.copy() try: oq["priority_score"] = ( pd.to_numeric(oq[oq_score_col], errors="coerce") .fillna(0) .round(0) .astype(int) if oq_score_col in oq.columns else 0 ) except Exception: # noqa: BLE001 oq["priority_score"] = 0 cols = [] for c in [ "priority_score", "site_code", "City", "is_complaint_site", "impacted_rats", "persistent_kpis_total", "degraded_kpis_total", "resolved_kpis_total", "criticality_score", "criticality_score_weighted", "traffic_gb_total", "traffic_gb_2G", "traffic_gb_3G", "traffic_gb_LTE", "traffic_gb_TWAMP", ]: if c in oq.columns and c not in cols: cols.append(c) for prefix in ["persistent", "degraded", "resolved"]: for r in ["2G", "3G", "LTE", "TWAMP"]: c = f"{prefix}_{r}" if c in oq.columns and c not in cols: cols.append(c) if cols: oq = oq[cols].copy() if not oq.empty and "priority_score" in oq.columns: oq = oq.sort_values(by=["priority_score"], ascending=False) current_ops_queue_df = oq ops_queue_table.value = current_ops_queue_df else: current_ops_queue_df = pd.DataFrame() ops_queue_table.value = current_ops_queue_df if current_top_anomalies_raw is not None and not current_top_anomalies_raw.empty: t = _apply_city_filter(current_top_anomalies_raw) if ( bool(only_complaint_sites.value) and "is_complaint_site" in t.columns and not t.empty ): t = t[t["is_complaint_site"] == True] # noqa: E712 if top_rat_filter.value: t = t[t["RAT"].isin(list(top_rat_filter.value))] if top_status_filter.value and "status" in t.columns: t = t[t["status"].isin(list(top_status_filter.value))] score_col = ( "anomaly_score_weighted" if "anomaly_score_weighted" in t.columns else "anomaly_score" ) if score_col in t.columns: t = t[ pd.to_numeric(t[score_col], errors="coerce").fillna(0) >= int(min_anomaly_score.value) ] t = t.sort_values(by=[score_col], ascending=False) current_top_anomalies_df = t top_anomalies_table.value = current_top_anomalies_df else: current_top_anomalies_df = pd.DataFrame() top_anomalies_table.value = current_top_anomalies_df if current_multirat_raw is not None and not current_multirat_raw.empty: cm = _apply_city_filter(current_multirat_raw) if "is_complaint_site" in cm.columns: cm = cm[cm["is_complaint_site"] == True] # noqa: E712 else: cm = cm.iloc[0:0].copy() score_col = ( "criticality_score_weighted" if "criticality_score_weighted" in cm.columns else "criticality_score" ) if score_col in cm.columns: cm = cm[ pd.to_numeric(cm[score_col], errors="coerce").fillna(0) >= int(min_criticality.value) ] cm = cm.sort_values(by=[score_col], ascending=False) complaint_multirat_summary_table.value = cm else: complaint_multirat_summary_table.value = pd.DataFrame() if current_top_anomalies_raw is not None and not current_top_anomalies_raw.empty: ct = _apply_city_filter(current_top_anomalies_raw) if "is_complaint_site" in ct.columns: ct = ct[ct["is_complaint_site"] == True] # noqa: E712 else: ct = ct.iloc[0:0].copy() if top_rat_filter.value: ct = ct[ct["RAT"].isin(list(top_rat_filter.value))] if top_status_filter.value and "status" in ct.columns: ct = ct[ct["status"].isin(list(top_status_filter.value))] score_col = ( "anomaly_score_weighted" if "anomaly_score_weighted" in ct.columns else "anomaly_score" ) if score_col in ct.columns: ct = ct[ pd.to_numeric(ct[score_col], errors="coerce").fillna(0) >= int(min_anomaly_score.value) ] ct = ct.sort_values(by=[score_col], ascending=False) complaint_top_anomalies_table.value = ct else: complaint_top_anomalies_table.value = pd.DataFrame() current_export_bytes = None current_alert_pack_bytes = None try: _refresh_map_view() except Exception: # noqa: BLE001 pass def _refresh_presets(event=None) -> None: names = list_presets() preset_select.options = [""] + names if preset_select.value not in preset_select.options: preset_select.value = "" def _refresh_profiles(event=None) -> None: names = list_profiles() profile_select.options = [""] + names if profile_select.value not in profile_select.options: profile_select.value = "" def _current_profile_config() -> dict: cfg: dict = {} cfg["granularity"] = str(granularity_select.value or "Daily") cfg["analysis_range"] = ( [ ( str(analysis_range.value[0]) if analysis_range.value and analysis_range.value[0] else None ), ( str(analysis_range.value[1]) if analysis_range.value and analysis_range.value[1] else None ), ] if analysis_range.value else [None, None] ) cfg["baseline_days"] = int(baseline_days.value) cfg["recent_days"] = int(recent_days.value) cfg["rel_threshold_pct"] = float(rel_threshold_pct.value) cfg["min_consecutive_days"] = int(min_consecutive_days.value) cfg["min_criticality"] = int(min_criticality.value) cfg["min_anomaly_score"] = int(min_anomaly_score.value) cfg["city_filter"] = str(city_filter.value or "") cfg["only_complaint_sites"] = bool(only_complaint_sites.value) cfg["top_rat_filter"] = list(top_rat_filter.value) if top_rat_filter.value else [] cfg["top_status_filter"] = ( list(top_status_filter.value) if top_status_filter.value else [] ) cfg["preset_selected"] = str(preset_select.value or "") cfg["drilldown"] = { "site_code": int(site_select.value) if site_select.value is not None else None, "rat": str(rat_select.value or ""), "kpi": str(kpi_select.value or ""), "compare_kpis": ( list(kpi_compare_select.value) if kpi_compare_select.value else [] ), "compare_norm": str(kpi_compare_norm.value or "None"), "show_sla": bool(show_sla_toggle.value), } return cfg def _apply_profile_config(cfg: dict) -> None: global _applying_profile if cfg is None or not isinstance(cfg, dict): return _applying_profile = True try: try: g = str(cfg.get("granularity", "") or "").strip() if g and g in list(granularity_select.options): granularity_select.value = g except Exception: # noqa: BLE001 pass try: ar = cfg.get("analysis_range", [None, None]) if isinstance(ar, (list, tuple)) and len(ar) == 2 and ar[0] and ar[1]: analysis_range.value = ( pd.to_datetime(ar[0]).date(), pd.to_datetime(ar[1]).date(), ) else: analysis_range.value = None except Exception: # noqa: BLE001 pass for w, key, cast in [ (baseline_days, "baseline_days", int), (recent_days, "recent_days", int), (rel_threshold_pct, "rel_threshold_pct", float), (min_consecutive_days, "min_consecutive_days", int), (min_criticality, "min_criticality", int), (min_anomaly_score, "min_anomaly_score", int), ]: try: if key in cfg and cfg[key] is not None: w.value = cast(cfg[key]) except Exception: # noqa: BLE001 pass try: if ( "only_complaint_sites" in cfg and cfg["only_complaint_sites"] is not None ): only_complaint_sites.value = bool(cfg["only_complaint_sites"]) except Exception: # noqa: BLE001 pass try: city_filter.value = str(cfg.get("city_filter", "") or "") except Exception: # noqa: BLE001 pass try: tr = cfg.get("top_rat_filter", []) if isinstance(tr, list): top_rat_filter.value = [x for x in tr if x in top_rat_filter.options] except Exception: # noqa: BLE001 pass try: ts = cfg.get("top_status_filter", []) if isinstance(ts, list): top_status_filter.value = [ x for x in ts if x in top_status_filter.options ] except Exception: # noqa: BLE001 pass try: preset_name = str(cfg.get("preset_selected", "") or "").strip() if preset_name: _refresh_presets() if preset_name in preset_select.options: preset_select.value = preset_name try: _apply_preset() except Exception: # noqa: BLE001 pass except Exception: # noqa: BLE001 pass drill = ( cfg.get("drilldown", {}) if isinstance(cfg.get("drilldown", {}), dict) else {} ) try: rat = str(drill.get("rat", "") or "") if rat and rat in list(rat_select.options): rat_select.value = rat except Exception: # noqa: BLE001 pass try: sc = drill.get("site_code", None) if sc is not None: site_select.value = int(sc) except Exception: # noqa: BLE001 pass try: kpi = str(drill.get("kpi", "") or "") if kpi: kpi_select.value = kpi except Exception: # noqa: BLE001 pass try: norm = str(drill.get("compare_norm", "None") or "None") if norm in list(kpi_compare_norm.options): kpi_compare_norm.value = norm except Exception: # noqa: BLE001 pass try: if "show_sla" in drill and drill["show_sla"] is not None: show_sla_toggle.value = bool(drill["show_sla"]) except Exception: # noqa: BLE001 pass try: ck = drill.get("compare_kpis", []) if isinstance(ck, list): opts = list(kpi_compare_select.options or []) kpi_compare_select.value = [str(x) for x in ck if str(x) in opts] except Exception: # noqa: BLE001 pass finally: _applying_profile = False _refresh_filtered_results() _update_kpi_options() _update_site_view() def _apply_profile(event=None) -> None: try: if not profile_select.value: return cfg = load_profile(str(profile_select.value)) _apply_profile_config(cfg) status_pane.alert_type = "success" status_pane.object = f"Profile applied: {profile_select.value}" except Exception as exc: # noqa: BLE001 status_pane.alert_type = "danger" status_pane.object = f"Error applying profile: {exc}" def _save_profile(event=None) -> None: try: name = (profile_name_input.value or "").strip() if not name: name = str(profile_select.value or "").strip() if not name: raise ValueError("Please provide a profile name") cfg = _current_profile_config() save_profile(name, cfg) profile_name_input.value = "" _refresh_profiles() profile_select.value = name status_pane.alert_type = "success" status_pane.object = f"Profile saved: {name}" except Exception as exc: # noqa: BLE001 status_pane.alert_type = "danger" status_pane.object = f"Error saving profile: {exc}" def _delete_profile(event=None) -> None: try: name = str(profile_select.value or "").strip() if not name: return delete_profile(name) _refresh_profiles() status_pane.alert_type = "success" status_pane.object = f"Profile deleted: {name}" except Exception as exc: # noqa: BLE001 status_pane.alert_type = "danger" status_pane.object = f"Error deleting profile: {exc}" def _apply_preset(event=None) -> None: global current_export_bytes, current_alert_pack_bytes try: if not preset_select.value: return preset_df = load_preset(str(preset_select.value)) if preset_df is None or preset_df.empty: return except Exception as exc: # noqa: BLE001 status_pane.alert_type = "danger" status_pane.object = f"Error loading preset: {exc}" return cur = ( rules_table.value if isinstance(rules_table.value, pd.DataFrame) else pd.DataFrame() ) if cur is None or cur.empty: rules_table.value = preset_df return key = ["RAT", "KPI"] upd_cols = [c for c in ["direction", "sla", "policy"] if c in preset_df.columns] preset_df2 = preset_df[key + upd_cols].copy() merged = pd.merge(cur, preset_df2, on=key, how="left", suffixes=("", "_preset")) for c in upd_cols: pc = f"{c}_preset" if pc in merged.columns: merged[c] = merged[pc].where(merged[pc].notna(), merged[c]) merged = merged.drop(columns=[pc]) rules_table.value = merged status_pane.alert_type = "success" status_pane.object = f"Preset applied: {preset_select.value}" current_export_bytes = None _invalidate_drilldown_cache(rules_changed=True) def _save_current_rules_as_preset(event=None) -> None: try: name = (preset_name_input.value or "").strip() if not name: name = str(preset_select.value or "").strip() if not name: raise ValueError("Please provide a preset name") cur = ( rules_table.value if isinstance(rules_table.value, pd.DataFrame) else pd.DataFrame() ) save_preset(name, cur) preset_name_input.value = "" _refresh_presets() preset_select.value = name status_pane.alert_type = "success" status_pane.object = f"Preset saved: {name}" except Exception as exc: # noqa: BLE001 status_pane.alert_type = "danger" status_pane.object = f"Error saving preset: {exc}" def _delete_selected_preset(event=None) -> None: global current_export_bytes, current_alert_pack_bytes try: name = str(preset_select.value or "").strip() if not name: return delete_preset(name) _refresh_presets() status_pane.alert_type = "success" status_pane.object = f"Preset deleted: {name}" current_export_bytes = None current_alert_pack_bytes = None _invalidate_drilldown_cache(data_changed=True, rules_changed=True) except Exception as exc: # noqa: BLE001 status_pane.alert_type = "danger" status_pane.object = f"Error deleting preset: {exc}" def load_datasets(event=None) -> None: global _loading_datasets _loading_datasets = True try: status_pane.alert_type = "primary" status_pane.object = "Loading datasets..." global current_daily_by_rat, current_rules_df global current_status_df, current_summary_df, current_export_bytes global current_multirat_df, current_multirat_raw, current_top_anomalies_df, current_top_anomalies_raw global current_ops_queue_df global current_alert_pack_bytes global current_snapshot, current_delta_df current_daily_by_rat = {} current_rules_df = None current_status_df = None current_summary_df = None current_multirat_df = None current_multirat_raw = None current_top_anomalies_df = None current_top_anomalies_raw = None current_ops_queue_df = None current_export_bytes = None current_alert_pack_bytes = None current_snapshot = None current_delta_df = None _invalidate_drilldown_cache( data_changed=True, rules_changed=True, healthcheck_changed=True ) site_summary_table.value = pd.DataFrame() multirat_summary_table.value = pd.DataFrame() top_anomalies_table.value = pd.DataFrame() complaint_multirat_summary_table.value = pd.DataFrame() complaint_top_anomalies_table.value = pd.DataFrame() ops_queue_table.value = pd.DataFrame() delta_table.value = pd.DataFrame() map_pane.object = None map_message.visible = False try: global _map_search_city, _map_center_override, _map_force_fit _map_search_city = "" _map_center_override = None _map_force_fit = False map_search_input.value = "" map_search_results.options = {} map_search_results.value = None map_search_results.visible = False except Exception: # noqa: BLE001 pass site_kpi_table.value = pd.DataFrame() trend_plot_pane.object = None heatmap_plot_pane.object = None hist_plot_pane.object = None corr_plot_pane.object = None corr_message.visible = False inputs = {"2G": file_2g, "3G": file_3g, "LTE": file_lte, "TWAMP": file_twamp} rows = [] rules_rows = [] loaded_any = False for rat, widget in inputs.items(): df_raw = read_fileinput_to_df(widget) if df_raw is None: continue loaded_any = True date_col = None id_col = None try: date_col = infer_date_col(df_raw) except Exception: # noqa: BLE001 date_col = None try: id_col = infer_id_col(df_raw, rat) except Exception: # noqa: BLE001 id_col = None daily, kpi_cols = build_period_kpi(df_raw, rat, granularity_select.value) current_daily_by_rat[rat] = daily d = _filtered_daily(daily) periods_n = None try: if ( isinstance(d, pd.DataFrame) and not d.empty and "period_start" in d.columns ): periods_n = int( pd.to_datetime(d["period_start"], errors="coerce").nunique() ) except Exception: # noqa: BLE001 periods_n = None rows.append( { "RAT": rat, "rows_raw": int(df_raw.shape[0]), "cols_raw": int(df_raw.shape[1]), "date_col": date_col, "id_col": id_col, "sites": int(d["site_code"].nunique()), "days": int(d["date_only"].nunique()), "periods": ( int(periods_n) if periods_n is not None else int(d["date_only"].nunique()) ), "kpis": int(len(kpi_cols)), } ) for kpi in kpi_cols: direction = infer_kpi_direction(kpi, rat) rules_rows.append( { "RAT": rat, "KPI": kpi, "direction": direction, "sla": infer_kpi_sla(kpi, direction, rat), "policy": infer_kpi_policy(kpi, rat), } ) if not loaded_any: raise ValueError("Please upload at least one KPI report") rats_loaded = [ r for r in ["2G", "3G", "LTE", "TWAMP"] if r in current_daily_by_rat ] if rats_loaded: rat_select.options = rats_loaded if rat_select.value not in rats_loaded: rat_select.value = rats_loaded[-1] datasets_table.value = pd.DataFrame(rows) rules_df = ( pd.DataFrame(rules_rows) .drop_duplicates(subset=["RAT", "KPI"]) .sort_values(by=["RAT", "KPI"]) ) current_rules_df = rules_df rules_table.value = rules_df status_pane.alert_type = "success" status_pane.object = ( "Datasets loaded. Edit KPI rules if needed, then run health check." ) except Exception as exc: # noqa: BLE001 status_pane.alert_type = "danger" status_pane.object = f"Error: {exc}" try: _refresh_validation_state() except Exception: # noqa: BLE001 pass finally: _loading_datasets = False try: _update_site_options() except Exception: # noqa: BLE001 pass try: _update_kpi_options() except Exception: # noqa: BLE001 pass try: _update_site_view() except Exception: # noqa: BLE001 pass try: _refresh_validation_state() except Exception: # noqa: BLE001 pass def run_health_check(event=None) -> None: try: status_pane.alert_type = "primary" status_pane.object = "Running health check..." global current_status_df, current_summary_df, current_export_bytes global current_multirat_df, current_multirat_raw global current_top_anomalies_df, current_top_anomalies_raw global current_alert_pack_bytes global current_snapshot, current_delta_df rules_df = ( rules_table.value if isinstance(rules_table.value, pd.DataFrame) else pd.DataFrame() ) if rules_df.empty: raise ValueError("KPI rules table is empty") bd = _coerce_int(baseline_days.value) rd = _coerce_int(recent_days.value) mcd = _coerce_int(min_consecutive_days.value) thr = _coerce_float(rel_threshold_pct.value) if bd is None or bd < 1: raise ValueError("Baseline window (days) must be >= 1") if rd is None or rd < 1: raise ValueError("Recent window (days) must be >= 1") if mcd is None or mcd < 1: raise ValueError("Min consecutive bad days must be >= 1") if thr is None: raise ValueError("Relative change threshold (%) is invalid") all_status = [] all_summary = [] do_profile = bool(perf_profiling.value) t_run0 = time.perf_counter() if do_profile else 0.0 rat_timings = [] for rat, daily in current_daily_by_rat.items(): d = _filtered_daily(daily) t0 = time.perf_counter() if do_profile else 0.0 status_df, summary_df = evaluate_health_check( d, rat, rules_df, int(bd), int(rd), float(thr), int(mcd), granularity=str(granularity_select.value or "Daily"), ) if do_profile: rat_timings.append( { "rat": str(rat), "seconds": float(time.perf_counter() - t0), "rows": int(len(d)) if isinstance(d, pd.DataFrame) else 0, } ) if not status_df.empty: all_status.append(status_df) if not summary_df.empty: all_summary.append(summary_df) current_status_df = ( pd.concat(all_status, ignore_index=True) if all_status else pd.DataFrame() ) current_summary_df = ( pd.concat(all_summary, ignore_index=True) if all_summary else pd.DataFrame() ) site_summary_table.value = current_summary_df t_mr0 = time.perf_counter() if do_profile else 0.0 current_multirat_raw, current_top_anomalies_raw = compute_multirat_views( current_status_df ) mr_seconds = float(time.perf_counter() - t_mr0) if do_profile else 0.0 t_tr0 = time.perf_counter() if do_profile else 0.0 traffic_df = _compute_site_traffic_gb(current_daily_by_rat) tr_seconds = float(time.perf_counter() - t_tr0) if do_profile else 0.0 if traffic_df is not None and not traffic_df.empty: if current_multirat_raw is not None and not current_multirat_raw.empty: current_multirat_raw = pd.merge( current_multirat_raw, traffic_df, on="site_code", how="left" ) if "criticality_score" in current_multirat_raw.columns: w = 1.0 + np.log1p( pd.to_numeric( current_multirat_raw["traffic_gb_total"], errors="coerce" ) .fillna(0) .astype(float) ) current_multirat_raw["criticality_score_weighted"] = ( ( pd.to_numeric( current_multirat_raw["criticality_score"], errors="coerce", ) .fillna(0) .astype(float) * w ) .round(0) .astype(int) ) if ( current_top_anomalies_raw is not None and not current_top_anomalies_raw.empty ): current_top_anomalies_raw = pd.merge( current_top_anomalies_raw, traffic_df, on="site_code", how="left" ) if "anomaly_score" in current_top_anomalies_raw.columns: w = 1.0 + np.log1p( pd.to_numeric( current_top_anomalies_raw["traffic_gb_total"], errors="coerce", ) .fillna(0) .astype(float) ) current_top_anomalies_raw["anomaly_score_weighted"] = ( ( pd.to_numeric( current_top_anomalies_raw["anomaly_score"], errors="coerce", ) .fillna(0) .astype(float) * w ) .round(0) .astype(int) ) _apply_complaint_flags() current_export_bytes = None current_alert_pack_bytes = None t_dl0 = time.perf_counter() if do_profile else 0.0 try: current_delta_df = _compute_delta_df() except Exception: # noqa: BLE001 current_delta_df = pd.DataFrame() dl_seconds = float(time.perf_counter() - t_dl0) if do_profile else 0.0 delta_table.value = current_delta_df _invalidate_drilldown_cache(healthcheck_changed=True) _update_site_view() status_pane.alert_type = "success" msg = "Health check completed." if do_profile: total_s = float(time.perf_counter() - t_run0) rat_timings_sorted = sorted( rat_timings, key=lambda x: float(x.get("seconds", 0.0)), reverse=True ) top_lines = [ f"- {r['rat']}: {r['seconds']:.2f}s (rows={r['rows']})" for r in rat_timings_sorted[:8] ] msg = ( msg + "\n\nPerf (seconds)" + f"\n- total_run: {total_s:.2f}s" + ( "\n- evaluate_health_check:" + ("\n" + "\n".join(top_lines) if top_lines else "") ) + f"\n- compute_multirat_views: {mr_seconds:.2f}s" + f"\n- compute_site_traffic_gb: {tr_seconds:.2f}s" + f"\n- compute_delta_df: {dl_seconds:.2f}s" ) status_pane.object = msg _refresh_validation_state() except Exception as exc: # noqa: BLE001 status_pane.alert_type = "danger" status_pane.object = f"Error: {exc}" def _build_export_bytes(profile: dict | None = None) -> bytes: include_raw = bool(export_include_raw_data.value) daily_by_rat = ( current_daily_by_rat if (include_raw and isinstance(current_daily_by_rat, dict)) else None ) return build_export_bytes( datasets_df=( datasets_table.value if isinstance(datasets_table.value, pd.DataFrame) else None ), rules_df=( rules_table.value if isinstance(rules_table.value, pd.DataFrame) else None ), summary_df=( current_summary_df if isinstance(current_summary_df, pd.DataFrame) else None ), status_df=( current_status_df if isinstance(current_status_df, pd.DataFrame) else None ), daily_by_rat=daily_by_rat, granularity=str(granularity_select.value or "Daily"), multirat_summary_df=( current_multirat_df if isinstance(current_multirat_df, pd.DataFrame) else None ), top_anomalies_df=( current_top_anomalies_df if isinstance(current_top_anomalies_df, pd.DataFrame) else None ), complaint_multirat_df=( complaint_multirat_summary_table.value if isinstance(complaint_multirat_summary_table.value, pd.DataFrame) else None ), complaint_top_anomalies_df=( complaint_top_anomalies_table.value if isinstance(complaint_top_anomalies_table.value, pd.DataFrame) else None ), ops_queue_df=( current_ops_queue_df if isinstance(current_ops_queue_df, pd.DataFrame) else None ), delta_df=( current_delta_df if isinstance(current_delta_df, pd.DataFrame) else None ), profile=profile, ) def _export_callback() -> io.BytesIO: global current_export_bytes do_profile = bool(perf_profiling.value) if do_profile or current_export_bytes is None: try: t0 = time.perf_counter() if do_profile else 0.0 profile = {} if do_profile else None current_export_bytes = _build_export_bytes(profile=profile) if do_profile: total_s = float(time.perf_counter() - t0) excel_s = ( float(profile.get("excel_total_seconds", 0.0)) if profile else 0.0 ) prep_s = ( float(profile.get("export_prep_seconds", 0.0)) if profile else 0.0 ) sheets = profile.get("excel_sheets") if profile else None slow = [] if isinstance(sheets, list) and sheets: sheets2 = [s for s in sheets if isinstance(s, dict)] sheets2 = sorted( sheets2, key=lambda x: float(x.get("seconds", 0.0)), reverse=True, ) slow = [ f"- {s.get('name')}: {float(s.get('seconds', 0.0)):.2f}s (rows={s.get('rows')}, cols={s.get('cols')})" for s in sheets2[:8] ] status_pane.alert_type = "primary" status_pane.object = ( "Export profiling" + f"\n- total_export: {total_s:.2f}s" + f"\n- export_prep: {prep_s:.2f}s" + f"\n- excel_write: {excel_s:.2f}s" + ("\n- slowest_sheets:\n" + "\n".join(slow) if slow else "") ) except Exception: # noqa: BLE001 current_export_bytes = b"" return io.BytesIO(current_export_bytes or b"") def _build_alert_pack_bytes() -> bytes: params = { "granularity": str(granularity_select.value or "Daily"), "baseline_days": baseline_days.value, "recent_days": recent_days.value, "rel_threshold_pct": rel_threshold_pct.value, "min_consecutive_days": min_consecutive_days.value, "min_criticality": min_criticality.value, "min_anomaly_score": min_anomaly_score.value, "city_filter": str(city_filter.value or ""), "only_complaint_sites": bool(only_complaint_sites.value), "top_rat_filter": ",".join(list(top_rat_filter.value or [])), "top_status_filter": ",".join(list(top_status_filter.value or [])), } params_df = pd.DataFrame( {"key": list(params.keys()), "value": [params[k] for k in params.keys()]} ) return write_dfs_to_excel( [ params_df, ( current_ops_queue_df if isinstance(current_ops_queue_df, pd.DataFrame) else pd.DataFrame() ), ( current_top_anomalies_df if isinstance(current_top_anomalies_df, pd.DataFrame) else pd.DataFrame() ), ( current_summary_df if isinstance(current_summary_df, pd.DataFrame) else pd.DataFrame() ), ], ["Run_Params", "Ops_Queue", "Top_Anomalies", "Site_Summary"], index=False, ) def _alert_pack_callback() -> io.BytesIO: global current_alert_pack_bytes if current_alert_pack_bytes is None: try: current_alert_pack_bytes = _build_alert_pack_bytes() except Exception: # noqa: BLE001 current_alert_pack_bytes = b"" return io.BytesIO(current_alert_pack_bytes or b"") def _build_snapshot_obj() -> dict: cfg = _current_profile_config() rules_df = ( rules_table.value if isinstance(rules_table.value, pd.DataFrame) else pd.DataFrame() ) multirat_df = ( current_multirat_raw if isinstance(current_multirat_raw, pd.DataFrame) else pd.DataFrame() ) top_df = ( current_top_anomalies_raw if isinstance(current_top_anomalies_raw, pd.DataFrame) else pd.DataFrame() ) return { "snapshot_version": 1, "created_at": pd.Timestamp.utcnow().isoformat() + "Z", "profile_config": cfg, "rules_df": rules_df.to_dict(orient="records"), "multirat_df": multirat_df.to_dict(orient="records"), "top_anomalies_df": top_df.to_dict(orient="records"), } def _snapshot_download_callback() -> io.BytesIO: b = b"" try: obj = _build_snapshot_obj() b = json.dumps(obj, ensure_ascii=False, indent=2).encode("utf-8") except Exception: # noqa: BLE001 b = b"" return io.BytesIO(b) def _snapshot_from_bytes(content: bytes) -> dict: try: txt = content.decode("utf-8", errors="ignore") obj = json.loads(txt) return obj if isinstance(obj, dict) else {} except Exception: # noqa: BLE001 return {} def _apply_snapshot_to_ui(obj: dict) -> None: global current_snapshot, current_delta_df, current_export_bytes, current_alert_pack_bytes current_snapshot = obj if isinstance(obj, dict) else {} cfg = ( current_snapshot.get("profile_config", {}) if isinstance(current_snapshot.get("profile_config", {}), dict) else {} ) _apply_profile_config(cfg) try: r = current_snapshot.get("rules_df", []) snapshot_rules_table.value = pd.DataFrame(r) except Exception: # noqa: BLE001 snapshot_rules_table.value = pd.DataFrame() try: m = current_snapshot.get("multirat_df", []) snapshot_multirat_table.value = pd.DataFrame(m) except Exception: # noqa: BLE001 snapshot_multirat_table.value = pd.DataFrame() try: t = current_snapshot.get("top_anomalies_df", []) snapshot_top_table.value = pd.DataFrame(t) except Exception: # noqa: BLE001 snapshot_top_table.value = pd.DataFrame() try: current_delta_df = _compute_delta_df() delta_table.value = current_delta_df except Exception: # noqa: BLE001 current_delta_df = pd.DataFrame() delta_table.value = current_delta_df current_export_bytes = None current_alert_pack_bytes = None def _on_snapshot_upload(event=None) -> None: if not snapshot_file.value: return obj = _snapshot_from_bytes(snapshot_file.value) _apply_snapshot_to_ui(obj) try: status_pane.alert_type = "success" status_pane.object = "Snapshot loaded." except Exception: # noqa: BLE001 pass load_button.on_click(load_datasets) run_button.on_click(run_health_check) preset_refresh_button.on_click(_refresh_presets) preset_apply_button.on_click(_apply_preset) preset_save_button.on_click(_save_current_rules_as_preset) preset_delete_button.on_click(_delete_selected_preset) profile_refresh_button.on_click(_refresh_profiles) profile_apply_button.on_click(_apply_profile) profile_save_button.on_click(_save_profile) profile_delete_button.on_click(_delete_profile) snapshot_file.param.watch(_on_snapshot_upload, "value") snapshot_download.callback = _snapshot_download_callback map_pane.param.watch(_on_map_click, "click_data") map_fit_button.on_click(_on_map_fit_click) map_search_go.on_click(_on_map_search) map_search_clear.on_click(_on_map_clear_search) map_search_results.param.watch(_on_map_search_pick, "value") _refresh_presets() _refresh_profiles() _refresh_complaint_sites() _refresh_validation_state() try: _refresh_map_view() except Exception: # noqa: BLE001 pass def _on_rat_change(event=None) -> None: if _applying_profile or _loading_datasets or _updating_drilldown: return _schedule_drilldown_update(lambda: (_update_kpi_options(), _update_site_view())) _refresh_validation_state() def _on_drilldown_change(event=None) -> None: if _applying_profile or _loading_datasets or _updating_drilldown: return _schedule_drilldown_update(_update_site_view) _refresh_validation_state() def _on_drilldown_params_change(event=None) -> None: if _applying_profile or _loading_datasets or _updating_drilldown: return _invalidate_drilldown_cache() _schedule_drilldown_update(_update_site_view) _refresh_validation_state() def _on_export_options_change(event=None) -> None: global current_export_bytes if _applying_profile or _loading_datasets: return current_export_bytes = None def _on_granularity_change(event=None) -> None: if _applying_profile or _loading_datasets: return _invalidate_drilldown_cache(data_changed=True, healthcheck_changed=True) _refresh_validation_state() try: has_any = bool( (file_2g and file_2g.value) or (file_3g and file_3g.value) or (file_lte and file_lte.value) or (file_twamp and file_twamp.value) ) except Exception: # noqa: BLE001 has_any = False if has_any: try: load_datasets() except Exception: # noqa: BLE001 pass rat_select.param.watch(_on_rat_change, "value") kpi_group_select.param.watch( _on_rat_change, "value" ) # Updating group also needs to re-filter KPI options kpi_group_mode.param.watch(_on_drilldown_change, "value") site_select.param.watch(_on_drilldown_change, "value") kpi_select.param.watch(_on_drilldown_change, "value") kpi_compare_select.param.watch(_on_drilldown_change, "value") kpi_compare_norm.param.watch(_on_drilldown_change, "value") show_sla_toggle.param.watch(_on_drilldown_change, "value") corr_window_select.param.watch(_on_drilldown_change, "value") map_show_site_codes.param.watch(lambda e: _refresh_map_view(), "value") map_max_labels.param.watch(lambda e: _refresh_map_view(), "value") map_top_n.param.watch(lambda e: _refresh_map_view(), "value") map_impacted_rat.param.watch(lambda e: _refresh_map_view(), "value") map_status_filter.param.watch(lambda e: _refresh_map_view(), "value") map_auto_fit.param.watch(lambda e: _refresh_map_view(), "value") analysis_range.param.watch(_on_drilldown_params_change, "value") granularity_select.param.watch(_on_granularity_change, "value") baseline_days.param.watch(_on_drilldown_params_change, "value") recent_days.param.watch(_on_drilldown_params_change, "value") rel_threshold_pct.param.watch(_on_drilldown_params_change, "value") min_consecutive_days.param.watch(_on_drilldown_params_change, "value") export_include_raw_data.param.watch(_on_export_options_change, "value") def _on_rules_table_change(event=None) -> None: global current_export_bytes, current_alert_pack_bytes if _applying_profile or _loading_datasets: return current_export_bytes = None current_alert_pack_bytes = None _invalidate_drilldown_cache(rules_changed=True) rules_table.param.watch(_on_rules_table_change, "value") try: top_anomalies_table.on_click( lambda e: _handle_double_click("top", top_anomalies_table, e) ) except Exception: # noqa: BLE001 pass try: complaint_top_anomalies_table.on_click( lambda e: _handle_double_click("top", complaint_top_anomalies_table, e) ) except Exception: # noqa: BLE001 pass try: multirat_summary_table.on_click( lambda e: _handle_double_click("multirat", multirat_summary_table, e) ) except Exception: # noqa: BLE001 pass try: complaint_multirat_summary_table.on_click( lambda e: _handle_double_click("multirat", complaint_multirat_summary_table, e) ) except Exception: # noqa: BLE001 pass try: ops_queue_table.on_click(lambda e: _handle_double_click("ops", ops_queue_table, e)) except Exception: # noqa: BLE001 pass min_criticality.param.watch(_refresh_filtered_results, "value") min_anomaly_score.param.watch(_refresh_filtered_results, "value") city_filter.param.watch(_refresh_filtered_results, "value") only_complaint_sites.param.watch(_refresh_filtered_results, "value") top_rat_filter.param.watch(_refresh_filtered_results, "value") top_status_filter.param.watch(_refresh_filtered_results, "value") min_criticality.param.watch(_refresh_delta_view, "value") city_filter.param.watch(_refresh_delta_view, "value") complaint_sites_file.param.watch(_refresh_complaint_sites, "value") export_button.callback = _export_callback alert_pack_button.callback = _alert_pack_callback def _build_drilldown_export_bytes() -> bytes: rat = rat_select.value code = site_select.value if rat is None or code is None: return b"" code_int = _coerce_int(code) if ( code_int is None and hasattr(site_select, "options") and isinstance(site_select.options, dict) ): if code in site_select.options: code_int = _coerce_int(site_select.options.get(code)) if code_int is None: return b"" daily = current_daily_by_rat.get(rat) if daily is None or daily.empty: return b"" d = _filtered_daily(daily) if d is None or d.empty: return b"" g = str(granularity_select.value or "Daily").strip().lower() is_hourly = g.startswith("hour") or g.startswith("h") time_col = ( "period_start" if (is_hourly and "period_start" in d.columns) else "date_only" ) s = d[d["site_code"] == int(code_int)].copy().sort_values(time_col) if s.empty: return b"" selected_kpis = [str(x) for x in (kpi_compare_select.value or []) if str(x)] if not selected_kpis: selected_kpis = [str(kpi_select.value)] if kpi_select.value else [] else: if kpi_select.value and str(kpi_select.value) not in selected_kpis: selected_kpis = [str(kpi_select.value)] + selected_kpis selected_kpis = [k for k in selected_kpis if k in d.columns] base_cols = [time_col] daily_cols = base_cols + selected_kpis daily_out = s[daily_cols].copy() if selected_kpis else s[[time_col]].copy() rules_df = ( rules_table.value if isinstance(rules_table.value, pd.DataFrame) else pd.DataFrame() ) windows = _compute_site_windows(d) if windows is None: summary_out = pd.DataFrame() else: baseline_start, baseline_end, recent_start, recent_end = windows rows = [] for k in selected_kpis: rule = _infer_rule_row(rules_df, str(rat), str(k)) direction = str(rule.get("direction", "higher_is_better")) sla_raw = rule.get("sla", None) try: sla_val = float(sla_raw) if pd.notna(sla_raw) else None except Exception: # noqa: BLE001 sla_val = None sk = s[[time_col, k]].copy() sk[k] = pd.to_numeric(sk[k], errors="coerce") sk = sk.dropna(subset=[k]) baseline_mask = ( pd.to_datetime(sk[time_col], errors="coerce") >= pd.to_datetime(baseline_start) ) & ( pd.to_datetime(sk[time_col], errors="coerce") <= pd.to_datetime(baseline_end) ) recent_mask = ( pd.to_datetime(sk[time_col], errors="coerce") >= pd.to_datetime(recent_start) ) & ( pd.to_datetime(sk[time_col], errors="coerce") <= pd.to_datetime(recent_end) ) baseline_med = ( float(sk.loc[baseline_mask, k].median()) if baseline_mask.any() else None ) recent_med = ( float(sk.loc[recent_mask, k].median()) if recent_mask.any() else None ) bad_flags = [] recent_vals = sk.loc[recent_mask, [time_col, k]].sort_values(time_col) bad_dates = [] for _, r in recent_vals.iterrows(): v = r.get(k) is_bad_day = bool( is_bad( float(v) if pd.notna(v) else None, baseline_med, direction, float(rel_threshold_pct.value), sla_val, ) ) bad_flags.append(is_bad_day) if is_bad_day: try: d0 = r.get(time_col) if d0 is not None: bad_dates.append(pd.to_datetime(d0, errors="coerce")) except Exception: # noqa: BLE001 pass rows.append( { "RAT": str(rat), "site_code": int(code_int), "KPI": str(k), "direction": direction, "sla": sla_val, "baseline_median": baseline_med, "recent_median": recent_med, "bad_days_recent": int(sum(bad_flags)), "max_streak_recent": int( max_consecutive_periods( bad_dates, step=( timedelta(hours=1) if is_hourly else timedelta(days=1) ), ) if bad_dates else 0 ), } ) summary_out = pd.DataFrame(rows) status_df = ( current_status_df if isinstance(current_status_df, pd.DataFrame) else pd.DataFrame() ) if not status_df.empty: status_out = status_df[ (status_df["site_code"] == int(code_int)) & (status_df["RAT"] == str(rat)) ].copy() else: status_out = pd.DataFrame() g2 = str(granularity_select.value or "Daily").strip().lower() data_sheet = "Hourly" if (g2.startswith("hour") or g2.startswith("h")) else "Daily" return write_dfs_to_excel( [summary_out, daily_out, status_out], ["Summary", data_sheet, "KPI_Status"], index=False, ) def _drilldown_export_callback() -> io.BytesIO: try: b = _build_drilldown_export_bytes() except Exception: # noqa: BLE001 b = b"" return io.BytesIO(b or b"") drilldown_export_button.callback = _drilldown_export_callback sidebar = pn.Column( pn.Card( file_2g, file_3g, file_lte, file_twamp, complaint_sites_file, only_complaint_sites, title="Datasets", collapsed=False, ), sizing_mode="stretch_width", ) sidebar = pn.Column( sidebar, pn.Card( analysis_range, granularity_select, baseline_days, recent_days, rel_threshold_pct, min_consecutive_days, title="Health check settings", collapsible=False, sizing_mode="stretch_width", ), pn.Card( load_button, run_button, title="Run", collapsible=False, sizing_mode="stretch_width", ), pn.Card( export_include_raw_data, perf_profiling, export_button, alert_pack_button, title="Export", collapsible=False, sizing_mode="stretch_width", ), pn.Card( min_criticality, min_anomaly_score, city_filter, top_rat_filter, top_status_filter, title="Filters", collapsible=False, sizing_mode="stretch_width", ), pn.Card( preset_select, pn.Row(preset_refresh_button, preset_apply_button, sizing_mode="stretch_width"), preset_name_input, pn.Row(preset_save_button, preset_delete_button, sizing_mode="stretch_width"), title="Rule presets", collapsible=False, sizing_mode="stretch_width", ), pn.Card( profile_select, pn.Row( profile_refresh_button, profile_apply_button, sizing_mode="stretch_width" ), profile_name_input, pn.Row(profile_save_button, profile_delete_button, sizing_mode="stretch_width"), title="Profiles", collapsible=False, sizing_mode="stretch_width", ), sizing_mode="stretch_width", ) _tab_overview = pn.Column( pn.pane.Markdown("## Datasets"), datasets_table, pn.pane.Markdown("## KPI Rules (editable)"), rules_table, pn.pane.Markdown("## Site Summary"), site_summary_table, pn.pane.Markdown("## Multi-RAT Summary"), multirat_summary_table, pn.pane.Markdown("## Top anomalies (cross-RAT)"), top_anomalies_table, sizing_mode="stretch_width", ) _tab_complaint = pn.Column( pn.pane.Markdown("## Complaint sites only"), pn.pane.Markdown("### Multi-RAT Summary (Complaint)"), complaint_multirat_summary_table, pn.pane.Markdown("### Top anomalies (Complaint)"), complaint_top_anomalies_table, sizing_mode="stretch_width", ) _tab_ops_queue = pn.Column( pn.pane.Markdown("## Ops Queue"), ops_queue_table, sizing_mode="stretch_width", ) _tab_snapshot = pn.Column( pn.pane.Markdown("## Snapshot"), pn.Row(snapshot_download, snapshot_file), pn.pane.Markdown("### Snapshot KPI rules"), snapshot_rules_table, pn.pane.Markdown("### Snapshot Multi-RAT"), snapshot_multirat_table, pn.pane.Markdown("### Snapshot Top anomalies"), snapshot_top_table, sizing_mode="stretch_width", ) _tab_delta = pn.Column( pn.pane.Markdown("## Delta"), delta_table, sizing_mode="stretch_width", ) _tab_map = pn.Column( pn.pane.Markdown("## Map"), map_message, pn.Row( map_search_input, map_search_go, map_search_clear, map_fit_button, map_auto_fit ), pn.Row( map_top_n, map_impacted_rat, map_status_filter, map_show_site_codes, map_max_labels, ), pn.Row(map_search_results), pn.Column(map_pane, sizing_mode="stretch_both", min_height=700), sizing_mode="stretch_both", ) _tab_drilldown = pn.Column( pn.pane.Markdown("## Drill-down"), pn.Row(site_select, rat_select), pn.Row(kpi_group_select, kpi_group_mode), pn.Row( kpi_select, kpi_compare_select, kpi_compare_norm, show_sla_toggle, drilldown_export_button, ), pn.Column(trend_plot_pane, sizing_mode="stretch_both", min_height=500), pn.Column(site_kpi_table, sizing_mode="stretch_width"), pn.Column(heatmap_plot_pane, sizing_mode="stretch_both", min_height=400), pn.Column(hist_plot_pane, sizing_mode="stretch_both", min_height=400), pn.pane.Markdown("## Correlation"), pn.Row(corr_window_select), corr_message, pn.Column(corr_plot_pane, sizing_mode="stretch_both", min_height=520), sizing_mode="stretch_both", ) _tabs_main = pn.Tabs( ("Overview", _tab_overview), ("Ops Queue", _tab_ops_queue), ("Complaint sites only", _tab_complaint), ("Snapshot", _tab_snapshot), ("Delta", _tab_delta), ("Map", _tab_map), ("Drill-down", _tab_drilldown), dynamic=True, sizing_mode="stretch_both", ) main = pn.Column( status_pane, validation_pane, _tabs_main, ) def get_page_components(): return sidebar, main if __name__ == "__main__": template = pn.template.MaterialTemplate(title="KPI Health Check - Panel") template.sidebar.append(sidebar) template.main.append(main) template.servable()