import io
import json
import os
import sys
import time
from collections import OrderedDict
from datetime import date, timedelta
import numpy as np
import pandas as pd
import panel as pn
import plotly.express as px
import plotly.graph_objects as go
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
from panel_app.convert_to_excel_panel import write_dfs_to_excel
from panel_app.kpi_health_check_drilldown_plots import build_drilldown_plot
from process_kpi.kpi_health_check.benchmarks import calculate_sla_metrics
from process_kpi.kpi_health_check.engine import (
evaluate_health_check,
is_bad,
max_consecutive_days,
max_consecutive_periods,
window_bounds,
window_bounds_period,
)
from process_kpi.kpi_health_check.export import build_export_bytes
from process_kpi.kpi_health_check.io import read_bytes_to_df
from process_kpi.kpi_health_check.kpi_groups import filter_kpis, get_kpis_by_group
from process_kpi.kpi_health_check.multi_rat import compute_multirat_views
from process_kpi.kpi_health_check.normalization import (
build_daily_kpi,
build_period_kpi,
infer_date_col,
infer_id_col,
)
from process_kpi.kpi_health_check.presets import (
delete_preset,
list_presets,
load_preset,
save_preset,
)
from process_kpi.kpi_health_check.profiles import (
delete_profile,
list_profiles,
load_profile,
save_profile,
)
from process_kpi.kpi_health_check.rules import (
infer_kpi_direction,
infer_kpi_policy,
infer_kpi_sla,
)
pn.extension("plotly", "tabulator")
pn.config.raw_css.append(
"""
.kpi-site-kpi-table .tabulator-row.kpi-row--persistent_degraded{border-left:6px solid #b71c1c !important;background-color:#ffebee !important;}
.kpi-site-kpi-table .tabulator-row.kpi-row--persistent_degraded:hover{background-color:#ffd6db !important;}
.kpi-site-kpi-table .tabulator-row.kpi-row--degraded{border-left:6px solid #e53935 !important;background-color:#fff5f5 !important;}
.kpi-site-kpi-table .tabulator-row.kpi-row--degraded:hover{background-color:#ffe4e6 !important;}
.kpi-site-kpi-table .tabulator-row.kpi-row--notify{border-left:6px solid #f9a825 !important;background-color:#fff8e1 !important;}
.kpi-site-kpi-table .tabulator-row.kpi-row--notify:hover{background-color:#ffefb8 !important;}
.kpi-site-kpi-table .tabulator-row.kpi-row--resolved{border-left:6px solid #2e7d32 !important;background-color:#f1f8e9 !important;}
.kpi-site-kpi-table .tabulator-row.kpi-row--resolved:hover{background-color:#dcedc8 !important;}
.kpi-site-kpi-table .tabulator-row.kpi-row--notify_resolved{border-left:6px solid #2e7d32 !important;background-color:#f1f8e9 !important;}
.kpi-site-kpi-table .tabulator-row.kpi-row--notify_resolved:hover{background-color:#dcedc8 !important;}
.kpi-site-kpi-table .tabulator-row.kpi-row--no_data{border-left:6px solid #616161 !important;background-color:#f5f5f5 !important;}
.kpi-site-kpi-table .tabulator-row.kpi-row--no_data:hover{background-color:#eeeeee !important;}
.kpi-site-kpi-table .tabulator-row.kpi-row--ok:hover{background-color:#1565c0 !important;}
.kpi-site-kpi-table .tabulator-row.kpi-row--persistent_degraded .tabulator-cell,
.kpi-site-kpi-table .tabulator-row.kpi-row--degraded .tabulator-cell,
.kpi-site-kpi-table .tabulator-row.kpi-row--notify .tabulator-cell,
.kpi-site-kpi-table .tabulator-row.kpi-row--resolved .tabulator-cell,
.kpi-site-kpi-table .tabulator-row.kpi-row--notify_resolved .tabulator-cell,
.kpi-site-kpi-table .tabulator-row.kpi-row--no_data .tabulator-cell{color:#111 !important;}
.kpi-site-kpi-table .tabulator-row.kpi-row--ok:hover .tabulator-cell{color:#fff !important;}
"""
)
PLOTLY_CONFIG = {"displaylogo": False, "scrollZoom": True, "displayModeBar": True}
def read_fileinput_to_df(file_input: pn.widgets.FileInput) -> pd.DataFrame | None:
if file_input is None or not file_input.value:
return None
return read_bytes_to_df(file_input.value, file_input.filename or "")
current_daily_by_rat: dict[str, pd.DataFrame] = {}
current_rules_df: pd.DataFrame | None = None
current_status_df: pd.DataFrame | None = None
current_summary_df: pd.DataFrame | None = None
current_multirat_df: pd.DataFrame | None = None
current_multirat_raw: pd.DataFrame | None = None
current_top_anomalies_df: pd.DataFrame | None = None
current_top_anomalies_raw: pd.DataFrame | None = None
current_ops_queue_df: pd.DataFrame | None = None
current_export_bytes: bytes | None = None
current_alert_pack_bytes: bytes | None = None
current_snapshot: dict | None = None
current_delta_df: pd.DataFrame | None = None
complaint_sites: set[int] = set()
_daily_version: int = 0
_rules_version: int = 0
_healthcheck_version: int = 0
_DRILLDOWN_CACHE_MAX: int = 64
_drilldown_fig_cache: "OrderedDict[tuple, tuple]" = OrderedDict()
_DOUBLE_CLICK_S: float = 0.35
_last_click_state: dict[str, tuple[float, int | None]] = {
"top": (0.0, None),
"multirat": (0.0, None),
}
_applying_profile: bool = False
_loading_datasets: bool = False
_updating_drilldown: bool = False
_drilldown_update_pending: bool = False
def _set_widget_value(widget, value) -> None:
global _updating_drilldown
try:
if getattr(widget, "value", None) == value:
return
except Exception: # noqa: BLE001
pass
_updating_drilldown = True
try:
widget.value = value
finally:
_updating_drilldown = False
def _schedule_drilldown_update(fn) -> None:
global _drilldown_update_pending
if _drilldown_update_pending:
return
_drilldown_update_pending = True
def _wrapped() -> None:
global _drilldown_update_pending
_drilldown_update_pending = False
fn()
doc = pn.state.curdoc
if doc is not None:
doc.add_next_tick_callback(_wrapped)
else:
_wrapped()
def _invalidate_drilldown_cache(
*,
data_changed: bool = False,
rules_changed: bool = False,
healthcheck_changed: bool = False,
) -> None:
global _daily_version, _rules_version, _healthcheck_version
if data_changed:
_daily_version += 1
if rules_changed:
_rules_version += 1
if healthcheck_changed:
_healthcheck_version += 1
try:
_drilldown_fig_cache.clear()
except Exception: # noqa: BLE001
pass
def _drilldown_cache_key(site_code: int, rat: str, kpi: str) -> tuple:
ar = analysis_range.value
ar_key = (
str(ar[0]) if ar and len(ar) == 2 and ar[0] else None,
str(ar[1]) if ar and len(ar) == 2 and ar[1] else None,
)
compare_kpis_key = tuple(sorted([str(x) for x in (kpi_compare_select.value or [])]))
norm_key = str(kpi_compare_norm.value or "None")
return (
int(_daily_version),
int(_rules_version),
int(_healthcheck_version),
int(site_code),
str(rat or ""),
str(kpi or ""),
str(granularity_select.value or "Daily"),
compare_kpis_key,
norm_key,
bool(show_sla_toggle.value),
ar_key,
int(_coerce_int(baseline_days.value) or 0),
int(_coerce_int(recent_days.value) or 0),
float(_coerce_float(rel_threshold_pct.value) or 0.0),
int(_coerce_int(min_consecutive_days.value) or 0),
# New cache keys
str(kpi_group_select.value),
str(kpi_group_mode.value),
str(corr_window_select.value),
)
def _drilldown_cache_get(key: tuple):
v = _drilldown_fig_cache.get(key)
if v is not None:
_drilldown_fig_cache.move_to_end(key)
return v
def _drilldown_cache_set(key: tuple, value) -> None:
_drilldown_fig_cache[key] = value
_drilldown_fig_cache.move_to_end(key)
while len(_drilldown_fig_cache) > int(_DRILLDOWN_CACHE_MAX):
_drilldown_fig_cache.popitem(last=False)
def _coerce_int(value) -> int | None:
try:
if value is None:
return None
if isinstance(value, (int, np.integer)):
return int(value)
if isinstance(value, float) and np.isnan(value):
return None
except Exception: # noqa: BLE001
pass
s = str(value).strip()
try:
return int(s)
except Exception: # noqa: BLE001
digits = "".join(ch for ch in s if ch.isdigit())
try:
return int(digits) if digits else None
except Exception: # noqa: BLE001
return None
def _coerce_float(value) -> float | None:
try:
if value is None:
return None
if isinstance(value, (float, np.floating)):
if np.isnan(value):
return None
return float(value)
except Exception: # noqa: BLE001
pass
try:
return float(value)
except Exception: # noqa: BLE001
return None
def _table_row_as_dict(table: pn.widgets.Tabulator, row_idx: int) -> dict | None:
try:
v = getattr(table, "current_view", None)
df = v if isinstance(v, pd.DataFrame) else table.value
if not isinstance(df, pd.DataFrame) or df.empty:
return None
if row_idx < 0 or row_idx >= len(df):
return None
return dict(df.iloc[int(row_idx)].to_dict())
except Exception: # noqa: BLE001
return None
def _apply_drilldown_selection(*, site_code, rat=None, kpi=None) -> None:
try:
if site_code is None:
return
site_code_int = _coerce_int(site_code)
if site_code_int is None:
return
if rat and rat in list(rat_select.options or []):
_set_widget_value(rat_select, str(rat))
_update_kpi_options()
if site_select.options and site_code_int not in site_select.options.values():
pass
_set_widget_value(site_select, site_code_int)
if kpi and str(kpi) in list(kpi_select.options or []):
_set_widget_value(kpi_select, str(kpi))
if kpi and str(kpi) in list(kpi_compare_select.options or []):
cur = list(kpi_compare_select.value or [])
if str(kpi) not in cur:
_set_widget_value(
kpi_compare_select, [str(kpi)] + cur if cur else [str(kpi)]
)
_schedule_drilldown_update(_update_site_view)
except Exception: # noqa: BLE001
return
def _handle_double_click(table_key: str, table: pn.widgets.Tabulator, event) -> None:
try:
row = getattr(event, "row", None)
if row is None:
return
row = int(row)
except Exception: # noqa: BLE001
return
now = float(time.monotonic())
last_t, last_row = _last_click_state.get(table_key, (0.0, None))
if last_row == row and (now - float(last_t)) <= float(_DOUBLE_CLICK_S):
_last_click_state[table_key] = (0.0, None)
data = _table_row_as_dict(table, row)
if not data:
return
if table_key == "top":
_apply_drilldown_selection(
site_code=data.get("site_code"),
rat=data.get("RAT"),
kpi=data.get("KPI"),
)
try:
status_pane.alert_type = "primary"
status_pane.object = f"Drill-down: site {int(data.get('site_code'))} | {data.get('RAT')} | {data.get('KPI')}"
except Exception: # noqa: BLE001
pass
return
if table_key in {"multirat", "ops"}:
site_code = data.get("site_code")
best_rat = rat_select.value
try:
best_score = -1
for r in list(rat_select.options or []):
p = pd.to_numeric(data.get(f"persistent_{r}", 0), errors="coerce")
d = pd.to_numeric(data.get(f"degraded_{r}", 0), errors="coerce")
p = int(p) if pd.notna(p) else 0
d = int(d) if pd.notna(d) else 0
score = p * 2 + d
if score > best_score:
best_score = score
best_rat = r
except Exception: # noqa: BLE001
best_rat = rat_select.value
_apply_drilldown_selection(site_code=site_code, rat=best_rat)
try:
status_pane.alert_type = "primary"
status_pane.object = f"Drill-down: site {int(site_code)} | {best_rat}"
except Exception: # noqa: BLE001
pass
return
else:
_last_click_state[table_key] = (now, row)
file_2g = pn.widgets.FileInput(name="2G KPI report", accept=".csv,.zip")
file_3g = pn.widgets.FileInput(name="3G KPI report", accept=".csv,.zip")
file_lte = pn.widgets.FileInput(name="LTE KPI report", accept=".csv,.zip")
file_twamp = pn.widgets.FileInput(name="TWAMP KPI report", accept=".csv,.zip")
complaint_sites_file = pn.widgets.FileInput(
name="Complaint sites list (optional)", accept=".csv,.txt,.xlsx"
)
only_complaint_sites = pn.widgets.Checkbox(name="Only complaint sites", value=False)
analysis_range = pn.widgets.DateRangePicker(name="Analysis date range (optional)")
granularity_select = pn.widgets.RadioButtonGroup(
name="Granularity", options=["Daily", "Hourly"], value="Daily"
)
baseline_days = pn.widgets.IntInput(name="Baseline window (days)", value=30)
recent_days = pn.widgets.IntInput(name="Recent window (days)", value=7)
rel_threshold_pct = pn.widgets.FloatInput(
name="Relative change threshold (%)", value=10.0, step=1.0
)
min_consecutive_days = pn.widgets.IntInput(
name="Min consecutive bad days (persistent)", value=3
)
min_criticality = pn.widgets.IntInput(name="Min criticality score", value=0)
min_anomaly_score = pn.widgets.IntInput(name="Min anomaly score", value=0)
city_filter = pn.widgets.TextInput(name="City contains (optional)", value="")
top_rat_filter = pn.widgets.CheckBoxGroup(
name="Top anomalies RAT",
options=["2G", "3G", "LTE", "TWAMP"],
value=["2G", "3G", "LTE", "TWAMP"],
)
top_status_filter = pn.widgets.CheckBoxGroup(
name="Top anomalies status",
options=["DEGRADED", "PERSISTENT_DEGRADED"],
value=["DEGRADED", "PERSISTENT_DEGRADED"],
)
preset_select = pn.widgets.Select(name="Rules preset", options=[], value=None)
preset_name_input = pn.widgets.TextInput(name="Save preset as", value="")
preset_refresh_button = pn.widgets.Button(name="Refresh presets", button_type="default")
preset_apply_button = pn.widgets.Button(name="Apply preset", button_type="primary")
preset_save_button = pn.widgets.Button(name="Save current rules", button_type="primary")
preset_delete_button = pn.widgets.Button(name="Delete preset", button_type="danger")
profile_select = pn.widgets.Select(name="Profile", options=[], value=None)
profile_name_input = pn.widgets.TextInput(name="Save profile as", value="")
profile_refresh_button = pn.widgets.Button(
name="Refresh profiles", button_type="default"
)
profile_apply_button = pn.widgets.Button(name="Apply profile", button_type="primary")
profile_save_button = pn.widgets.Button(name="Save profile", button_type="primary")
profile_delete_button = pn.widgets.Button(name="Delete profile", button_type="danger")
load_button = pn.widgets.Button(
name="Load datasets & build rules", button_type="primary"
)
run_button = pn.widgets.Button(name="Run health check", button_type="primary")
status_pane = pn.pane.Alert(
"Upload KPI reports (ZIP/CSV), then load datasets and run health check.",
alert_type="primary",
)
validation_pane = pn.pane.Alert("", alert_type="success", visible=False)
datasets_table = pn.widgets.Tabulator(
height=180, sizing_mode="stretch_width", layout="fit_data_table"
)
rules_table = pn.widgets.Tabulator(
height=260, sizing_mode="stretch_width", layout="fit_data_table"
)
def _set_tabulator_pagination(table: pn.widgets.Tabulator, page_size: int = 50) -> None:
try:
table.pagination = "local"
table.page_size = int(page_size)
return
except Exception: # noqa: BLE001
pass
try:
cfg = dict(table.configuration or {})
cfg["pagination"] = "local"
cfg["paginationSize"] = int(page_size)
table.configuration = cfg
except Exception: # noqa: BLE001
pass
try:
rules_table.editable = True
except Exception: # noqa: BLE001
try:
cfg = dict(rules_table.configuration or {})
cfg["editable"] = True
rules_table.configuration = cfg
except Exception: # noqa: BLE001
pass
site_summary_table = pn.widgets.Tabulator(
height=260, sizing_mode="stretch_width", layout="fit_data_table"
)
multirat_summary_table = pn.widgets.Tabulator(
height=260, sizing_mode="stretch_width", layout="fit_data_table"
)
complaint_multirat_summary_table = pn.widgets.Tabulator(
height=260, sizing_mode="stretch_width", layout="fit_data_table"
)
top_anomalies_table = pn.widgets.Tabulator(
height=260, sizing_mode="stretch_width", layout="fit_data_table"
)
complaint_top_anomalies_table = pn.widgets.Tabulator(
height=260, sizing_mode="stretch_width", layout="fit_data_table"
)
ops_queue_table = pn.widgets.Tabulator(
height=520, sizing_mode="stretch_width", layout="fit_data_table"
)
snapshot_file = pn.widgets.FileInput(name="Load snapshot (JSON)", accept=".json")
snapshot_download = pn.widgets.FileDownload(
label="Save snapshot",
filename="KPI_Health_Check_Snapshot.json",
button_type="primary",
)
snapshot_rules_table = pn.widgets.Tabulator(
height=220, sizing_mode="stretch_width", layout="fit_data_table"
)
snapshot_multirat_table = pn.widgets.Tabulator(
height=260, sizing_mode="stretch_width", layout="fit_data_table"
)
snapshot_top_table = pn.widgets.Tabulator(
height=260, sizing_mode="stretch_width", layout="fit_data_table"
)
delta_table = pn.widgets.Tabulator(
height=320, sizing_mode="stretch_width", layout="fit_data_table"
)
map_show_site_codes = pn.widgets.Checkbox(name="Show site codes", value=True)
map_max_labels = pn.widgets.IntInput(name="Max labels", value=200, step=50)
map_top_n = pn.widgets.IntInput(name="Top N sites (0=All)", value=0, step=100)
map_impacted_rat = pn.widgets.Select(
name="RAT impacted", options=["Any", "2G", "3G", "LTE", "TWAMP"], value="Any"
)
map_status_filter = pn.widgets.Select(
name="Status",
options=[
"Any",
"PERSISTENT_DEGRADED",
"DEGRADED",
"NOTIFY",
"RESOLVED",
"NOTIFY_RESOLVED",
"OK",
"NO_DATA",
],
value="Any",
)
map_auto_fit = pn.widgets.Checkbox(name="Auto fit", value=True)
map_fit_button = pn.widgets.Button(name="Fit to points", button_type="default")
map_search_input = pn.widgets.TextInput(name="Search (site_code or City)", value="")
map_search_go = pn.widgets.Button(name="Go", button_type="primary")
map_search_clear = pn.widgets.Button(name="Clear", button_type="default")
map_search_results = pn.widgets.Select(name="Matches", options={}, value=None)
map_search_results.visible = False
site_select = pn.widgets.AutocompleteInput(
name="Select a site (Type to search)",
options={},
case_sensitive=False,
search_strategy="includes",
restrict=True,
)
rat_select = pn.widgets.RadioButtonGroup(
name="RAT", options=["2G", "3G", "LTE", "TWAMP"], value="LTE"
)
kpi_select = pn.widgets.Select(name="KPI", options=[])
kpi_compare_select = pn.widgets.MultiChoice(name="Compare KPIs", options=[], value=[])
kpi_compare_norm = pn.widgets.Select(
name="Normalization", options=["None", "Min-Max", "Z-score"], value="None"
)
show_sla_toggle = pn.widgets.Checkbox(name="Show SLA", value=True)
corr_window_select = pn.widgets.Select(
name="Correlation window",
options=["Full (filtered range)", "Recent", "Baseline"],
value="Full (filtered range)",
)
kpi_group_select = pn.widgets.Select(
name="KPI Group", options=["All (selected KPIs)"], value="All (selected KPIs)"
)
kpi_group_mode = pn.widgets.Select(
name="Group Mode",
options=["Filter KPI list only (recommended)", "Add top 12 KPIs to compare"],
value="Filter KPI list only (recommended)",
)
drilldown_export_button = pn.widgets.FileDownload(
label="Download drill-down",
filename="KPI_Drilldown.xlsx",
button_type="primary",
)
site_kpi_table = pn.widgets.Tabulator(
height=520,
sizing_mode="stretch_width",
layout="fit_data_table",
css_classes=["kpi-site-kpi-table"],
)
try:
site_kpi_table.text_align = {
"status": "center",
"bad_days_recent": "center",
"max_streak_recent": "center",
"baseline_median": "right",
"recent_median": "right",
}
site_kpi_table.widths = {
"status": 170,
"KPI": 260,
"bad_days_recent": 140,
"max_streak_recent": 160,
}
site_kpi_table.formatters = {
"status": pn.io.JSCode(
"""
function format(cell, formatterParams, onRendered){
var v = cell.getValue();
var s = (v === null || v === undefined) ? '' : String(v);
var bg = '#455a64';
var rowClass = '';
if(s === 'PERSISTENT_DEGRADED') bg = '#b71c1c';
else if(s === 'DEGRADED') bg = '#e53935';
else if(s === 'NOTIFY') bg = '#f9a825';
else if(s === 'RESOLVED') bg = '#2e7d32';
else if(s === 'NOTIFY_RESOLVED') bg = '#2e7d32';
else if(s === 'NO_DATA') bg = '#616161';
else if(s === 'OK') bg = '#1565c0';
if(s === 'PERSISTENT_DEGRADED') rowClass = 'kpi-row--persistent_degraded';
else if(s === 'DEGRADED') rowClass = 'kpi-row--degraded';
else if(s === 'NOTIFY') rowClass = 'kpi-row--notify';
else if(s === 'RESOLVED') rowClass = 'kpi-row--resolved';
else if(s === 'NOTIFY_RESOLVED') rowClass = 'kpi-row--notify_resolved';
else if(s === 'NO_DATA') rowClass = 'kpi-row--no_data';
else if(s === 'OK') rowClass = 'kpi-row--ok';
onRendered(function(){
try {
var rowEl = cell.getRow().getElement();
if(rowEl){
var classesToRemove = [
'kpi-row--persistent_degraded',
'kpi-row--degraded',
'kpi-row--notify',
'kpi-row--resolved',
'kpi-row--notify_resolved',
'kpi-row--no_data',
'kpi-row--ok'
];
classesToRemove.forEach(function(c){ try { rowEl.classList.remove(c); } catch (e) {} });
if(rowClass){
try { rowEl.classList.add(rowClass); } catch (e) {}
}
}
var cellEl = cell.getElement();
if(cellEl){
cellEl.style.padding = '0px';
cellEl.style.textAlign = 'center';
cellEl.innerHTML = "" + s + "";
}
} catch (e) {
}
});
return s;
}
"""
)
}
except Exception: # noqa: BLE001
pass
_set_tabulator_pagination(site_summary_table, page_size=50)
_set_tabulator_pagination(multirat_summary_table, page_size=50)
_set_tabulator_pagination(top_anomalies_table, page_size=50)
_set_tabulator_pagination(complaint_multirat_summary_table, page_size=50)
_set_tabulator_pagination(complaint_top_anomalies_table, page_size=50)
_set_tabulator_pagination(ops_queue_table, page_size=50)
_set_tabulator_pagination(snapshot_rules_table, page_size=50)
_set_tabulator_pagination(snapshot_multirat_table, page_size=50)
_set_tabulator_pagination(snapshot_top_table, page_size=50)
_set_tabulator_pagination(delta_table, page_size=50)
_set_tabulator_pagination(site_kpi_table, page_size=50)
trend_plot_pane = pn.pane.Plotly(sizing_mode="stretch_both", config=PLOTLY_CONFIG)
heatmap_plot_pane = pn.pane.Plotly(sizing_mode="stretch_both", config=PLOTLY_CONFIG)
hist_plot_pane = pn.pane.Plotly(sizing_mode="stretch_both", config=PLOTLY_CONFIG)
map_pane = pn.pane.Plotly(sizing_mode="stretch_both", config=PLOTLY_CONFIG)
map_message = pn.pane.Alert("", alert_type="info", visible=False)
_map_search_city: str = ""
_map_center_override: dict | None = None
_map_force_fit: bool = False
corr_plot_pane = pn.pane.Plotly(sizing_mode="stretch_both", config=PLOTLY_CONFIG)
corr_message = pn.pane.Alert("", alert_type="info", visible=False)
def _coords_by_site() -> pd.DataFrame:
rows = []
for _, df in (current_daily_by_rat or {}).items():
if not isinstance(df, pd.DataFrame) or df.empty:
continue
cols = [
c for c in ["site_code", "Latitude", "Longitude", "City"] if c in df.columns
]
if "site_code" not in cols:
continue
tmp = df[cols].copy()
tmp["site_code"] = pd.to_numeric(tmp["site_code"], errors="coerce")
tmp = tmp.dropna(subset=["site_code"]).copy()
tmp["site_code"] = tmp["site_code"].astype(int)
tmp = tmp.drop_duplicates(subset=["site_code"]).copy()
rows.append(tmp)
if not rows:
return pd.DataFrame(columns=["site_code", "Latitude", "Longitude", "City"])
out = pd.concat(rows, ignore_index=True)
out = out.drop_duplicates(subset=["site_code"]).copy()
return out
def _dominant_status_by_site() -> pd.DataFrame:
s = (
current_status_df
if isinstance(current_status_df, pd.DataFrame)
else pd.DataFrame()
)
if (
s is None
or s.empty
or "site_code" not in s.columns
or "status" not in s.columns
):
return pd.DataFrame(columns=["site_code", "dominant_status"])
tmp = s[["site_code", "status"]].copy()
tmp["site_code"] = pd.to_numeric(tmp["site_code"], errors="coerce")
tmp = tmp.dropna(subset=["site_code"]).copy()
if tmp.empty:
return pd.DataFrame(columns=["site_code", "dominant_status"])
tmp["site_code"] = tmp["site_code"].astype(int)
tmp["status"] = tmp["status"].astype(str).str.strip().str.upper()
prio = {
"PERSISTENT_DEGRADED": 6,
"DEGRADED": 5,
"NOTIFY": 4,
"RESOLVED": 3,
"NOTIFY_RESOLVED": 2,
"OK": 1,
"NO_DATA": 0,
}
tmp["_prio"] = tmp["status"].map(prio).fillna(2).astype(int)
idx = tmp.groupby("site_code")["_prio"].idxmax()
out = tmp.loc[idx, ["site_code", "status"]].rename(
columns={"status": "dominant_status"}
)
return out
def _map_df() -> pd.DataFrame:
base = (
current_multirat_df
if isinstance(current_multirat_df, pd.DataFrame)
else pd.DataFrame()
)
if base is None or base.empty:
base = (
current_multirat_raw
if isinstance(current_multirat_raw, pd.DataFrame)
else pd.DataFrame()
)
if base is None or base.empty or "site_code" not in base.columns:
return pd.DataFrame()
base = base.copy()
base["site_code"] = pd.to_numeric(base["site_code"], errors="coerce")
base = base.dropna(subset=["site_code"]).copy()
base["site_code"] = base["site_code"].astype(int)
coords = _coords_by_site()
if coords is None or coords.empty:
return pd.DataFrame()
out = pd.merge(base, coords, on="site_code", how="left", suffixes=("", "_coord"))
if "City" not in out.columns and "City_coord" in out.columns:
out["City"] = out["City_coord"]
if "City" in out.columns and "City_coord" in out.columns:
out["City"] = out["City"].where(out["City"].notna(), out["City_coord"])
dom = _dominant_status_by_site()
if dom is not None and not dom.empty and "site_code" in dom.columns:
out = pd.merge(out, dom, on="site_code", how="left")
if "dominant_status" not in out.columns:
out["dominant_status"] = "NO_DATA"
out["dominant_status"] = (
out["dominant_status"]
.astype(str)
.str.strip()
.str.upper()
.replace({"": "NO_DATA"})
)
out["dominant_status"] = out["dominant_status"].where(
out["dominant_status"].notna(), "NO_DATA"
)
return out
def _apply_map_filters(df_map: pd.DataFrame) -> pd.DataFrame:
if df_map is None or df_map.empty:
return pd.DataFrame()
out = df_map.copy()
if _map_search_city:
if "City" in out.columns:
q = str(_map_search_city).strip().lower()
out = out[
out["City"].astype(str).str.lower().str.contains(q, na=False)
].copy()
else:
out = out.iloc[0:0].copy()
rat_imp = str(map_impacted_rat.value or "Any")
if rat_imp and rat_imp != "Any":
pcol = f"persistent_{rat_imp}"
dcol = f"degraded_{rat_imp}"
if pcol in out.columns or dcol in out.columns:
p = (
pd.to_numeric(out[pcol], errors="coerce").fillna(0)
if pcol in out.columns
else 0
)
d = (
pd.to_numeric(out[dcol], errors="coerce").fillna(0)
if dcol in out.columns
else 0
)
out = out[(p.astype(float) > 0) | (d.astype(float) > 0)].copy()
else:
out = out.iloc[0:0].copy()
st = str(map_status_filter.value or "Any")
if st and st != "Any" and "dominant_status" in out.columns:
out = out[out["dominant_status"].astype(str).str.upper() == st].copy()
score_col = (
"criticality_score_weighted"
if "criticality_score_weighted" in out.columns
else "criticality_score"
)
if score_col in out.columns:
out["_score"] = (
pd.to_numeric(out[score_col], errors="coerce").fillna(0).astype(float)
)
else:
out["_score"] = 0.0
topn = _coerce_int(map_top_n.value)
if topn is not None and int(topn) > 0:
out = out.sort_values(by=["_score"], ascending=False).head(int(topn)).copy()
return out
def _build_map_fig(df_map: pd.DataFrame) -> go.Figure | None:
if df_map is None or df_map.empty:
return None
if "Latitude" not in df_map.columns or "Longitude" not in df_map.columns:
return None
tmp = df_map.copy()
tmp["Latitude"] = pd.to_numeric(tmp["Latitude"], errors="coerce")
tmp["Longitude"] = pd.to_numeric(tmp["Longitude"], errors="coerce")
tmp = tmp.dropna(subset=["Latitude", "Longitude"]).copy()
if tmp.empty:
return None
score_col = (
"criticality_score_weighted"
if "criticality_score_weighted" in tmp.columns
else "criticality_score"
)
if score_col not in tmp.columns:
score_col = None
if score_col is not None:
tmp["_score"] = (
pd.to_numeric(tmp[score_col], errors="coerce").fillna(0).astype(float)
)
else:
tmp["_score"] = 0.0
size = (tmp["_score"].clip(lower=0) + 1.0).pow(0.5) * 6.0
tmp["_size"] = size.clip(lower=6, upper=28)
dom_order = [
"PERSISTENT_DEGRADED",
"DEGRADED",
"NOTIFY",
"RESOLVED",
"NOTIFY_RESOLVED",
"OK",
"NO_DATA",
]
color_map = {
"PERSISTENT_DEGRADED": "#8b0000",
"DEGRADED": "#ff6f00",
"NOTIFY": "#f9a825",
"RESOLVED": "#2e7d32",
"NOTIFY_RESOLVED": "#2e7d32",
"OK": "#1565c0",
"NO_DATA": "#616161",
}
hover_cols = [
c
for c in [
"site_code",
"City",
"dominant_status",
score_col,
"impacted_rats",
"persistent_kpis_total",
"degraded_kpis_total",
]
if c and c in tmp.columns
]
fig = px.scatter_mapbox(
tmp,
lat="Latitude",
lon="Longitude",
color="dominant_status" if "dominant_status" in tmp.columns else None,
size="_size",
size_max=28,
zoom=4,
hover_data=hover_cols,
custom_data=["site_code"],
category_orders={"dominant_status": dom_order},
color_discrete_map=color_map,
)
try:
if bool(map_show_site_codes.value):
max_labels = int(_coerce_int(map_max_labels.value) or 0)
if max_labels <= 0:
tmp["_label"] = ""
elif len(tmp) <= max_labels:
tmp["_label"] = tmp["site_code"].astype(str)
else:
top_idx = (
tmp.sort_values(by=["_score"], ascending=False)
.head(max_labels)
.index
)
tmp["_label"] = ""
tmp.loc[top_idx, "_label"] = tmp.loc[top_idx, "site_code"].astype(str)
fig.update_traces(
mode="markers+text",
text=tmp["_label"],
textposition="middle center",
textfont=dict(color="white", size=10),
)
except Exception: # noqa: BLE001
pass
fig.update_layout(
mapbox_style="open-street-map",
margin=dict(l=10, r=10, t=10, b=10),
height=700,
)
global _map_center_override, _map_force_fit
do_fit = bool(map_auto_fit.value) or bool(_map_force_fit)
if (
_map_center_override
and "lat" in _map_center_override
and "lon" in _map_center_override
):
try:
fig.update_layout(
mapbox_center=dict(
lat=float(_map_center_override["lat"]),
lon=float(_map_center_override["lon"]),
),
mapbox_zoom=float(_map_center_override.get("zoom", 9.0)),
)
do_fit = False
except Exception: # noqa: BLE001
pass
if do_fit:
try:
lat_min = float(tmp["Latitude"].min())
lat_max = float(tmp["Latitude"].max())
lon_min = float(tmp["Longitude"].min())
lon_max = float(tmp["Longitude"].max())
lat_pad = max(0.01, abs(lat_max - lat_min) * 0.1)
lon_pad = max(0.01, abs(lon_max - lon_min) * 0.1)
fig.update_layout(
mapbox=dict(
bounds=dict(
west=lon_min - lon_pad,
east=lon_max + lon_pad,
south=lat_min - lat_pad,
north=lat_max + lat_pad,
)
)
)
except Exception: # noqa: BLE001
pass
_map_force_fit = False
return fig
def _refresh_map_view(event=None) -> None:
df_map = _map_df()
if df_map is None or df_map.empty:
map_pane.object = None
map_message.alert_type = "info"
map_message.object = "Run health check to display the map (needs Multi-RAT results + coordinates)."
map_message.visible = True
return
df_map = _apply_map_filters(df_map)
if df_map is None or df_map.empty:
map_pane.object = None
map_message.alert_type = "warning"
map_message.object = "No sites to display after Map filters/search."
map_message.visible = True
return
fig = _build_map_fig(df_map)
if fig is None:
map_pane.object = None
map_message.alert_type = "warning"
map_message.object = (
"No geo coordinates available (Latitude/Longitude) for current sites."
)
map_message.visible = True
return
map_message.visible = False
map_pane.object = fig
def _on_map_fit_click(event=None) -> None:
global _map_force_fit, _map_center_override
_map_center_override = None
_map_force_fit = True
_refresh_map_view()
def _on_map_search(event=None) -> None:
global _map_search_city, _map_center_override
q = str(map_search_input.value or "").strip()
_map_search_city = ""
_map_center_override = None
map_search_results.options = {}
map_search_results.value = None
map_search_results.visible = False
if not q:
_refresh_map_view()
return
code = _coerce_int(q)
if code is not None:
try:
df_map = _map_df()
if isinstance(df_map, pd.DataFrame) and not df_map.empty:
m = df_map[df_map["site_code"] == int(code)]
if not m.empty and "Latitude" in m.columns and "Longitude" in m.columns:
lat = pd.to_numeric(m.iloc[0].get("Latitude"), errors="coerce")
lon = pd.to_numeric(m.iloc[0].get("Longitude"), errors="coerce")
if pd.notna(lat) and pd.notna(lon):
_map_center_override = {
"lat": float(lat),
"lon": float(lon),
"zoom": 10.5,
}
except Exception: # noqa: BLE001
_map_center_override = None
_apply_drilldown_selection(site_code=int(code))
_refresh_map_view()
return
_map_search_city = q
try:
df_map = _map_df()
df_map = _apply_map_filters(df_map)
if (
isinstance(df_map, pd.DataFrame)
and not df_map.empty
and "City" in df_map.columns
):
m = df_map[
df_map["City"].astype(str).str.lower().str.contains(q.lower(), na=False)
].copy()
if not m.empty:
score_col = (
"criticality_score_weighted"
if "criticality_score_weighted" in m.columns
else "criticality_score"
)
if score_col in m.columns:
m["_score"] = pd.to_numeric(m[score_col], errors="coerce").fillna(0)
m = m.sort_values(by=["_score"], ascending=False)
m = m.head(20)
opts = {}
for _, r in m.iterrows():
sc = _coerce_int(r.get("site_code"))
if sc is None:
continue
city = str(r.get("City") or "").strip()
label = f"{int(sc)} - {city}" if city else f"{int(sc)}"
opts[label] = int(sc)
map_search_results.options = opts
map_search_results.visible = bool(opts)
except Exception: # noqa: BLE001
map_search_results.options = {}
map_search_results.visible = False
_refresh_map_view()
def _on_map_search_pick(event=None) -> None:
global _map_center_override
code = _coerce_int(getattr(event, "new", None))
if code is None:
return
try:
df_map = _map_df()
if isinstance(df_map, pd.DataFrame) and not df_map.empty:
m = df_map[df_map["site_code"] == int(code)]
if not m.empty and "Latitude" in m.columns and "Longitude" in m.columns:
lat = pd.to_numeric(m.iloc[0].get("Latitude"), errors="coerce")
lon = pd.to_numeric(m.iloc[0].get("Longitude"), errors="coerce")
if pd.notna(lat) and pd.notna(lon):
_map_center_override = {
"lat": float(lat),
"lon": float(lon),
"zoom": 10.5,
}
except Exception: # noqa: BLE001
_map_center_override = None
_apply_drilldown_selection(site_code=int(code))
_refresh_map_view()
def _on_map_clear_search(event=None) -> None:
global _map_search_city, _map_center_override
_map_search_city = ""
_map_center_override = None
map_search_input.value = ""
map_search_results.options = {}
map_search_results.value = None
map_search_results.visible = False
_refresh_map_view()
def _on_map_click(event) -> None:
try:
cd = event.new
if not isinstance(cd, dict):
return
pts = cd.get("points", [])
if not pts:
return
p0 = pts[0]
custom = p0.get("customdata", None)
if not custom:
return
site_code = custom[0] if isinstance(custom, (list, tuple)) else custom
site_code_int = _coerce_int(site_code)
if site_code_int is None:
return
best_rat = rat_select.value
try:
row = None
if (
isinstance(current_multirat_raw, pd.DataFrame)
and not current_multirat_raw.empty
):
sel = current_multirat_raw[
current_multirat_raw["site_code"] == int(site_code_int)
]
row = sel.iloc[0].to_dict() if not sel.empty else None
if row:
best_score = -1
for r in list(rat_select.options or []):
p = pd.to_numeric(row.get(f"persistent_{r}", 0), errors="coerce")
d = pd.to_numeric(row.get(f"degraded_{r}", 0), errors="coerce")
p = int(p) if pd.notna(p) else 0
d = int(d) if pd.notna(d) else 0
score = p * 2 + d
if score > best_score:
best_score = score
best_rat = r
except Exception: # noqa: BLE001
best_rat = rat_select.value
_apply_drilldown_selection(site_code=site_code_int, rat=best_rat)
try:
status_pane.alert_type = "primary"
status_pane.object = f"Drill-down: site {int(site_code_int)} | {best_rat}"
except Exception: # noqa: BLE001
pass
except Exception: # noqa: BLE001
return
def _compute_delta_df() -> pd.DataFrame:
if not isinstance(current_snapshot, dict) or not current_snapshot:
return pd.DataFrame()
snap_rows = current_snapshot.get("multirat_df", [])
if not isinstance(snap_rows, list) or not snap_rows:
return pd.DataFrame()
snap = pd.DataFrame(snap_rows)
if snap.empty or "site_code" not in snap.columns:
return pd.DataFrame()
cur = (
current_multirat_raw
if isinstance(current_multirat_raw, pd.DataFrame)
else pd.DataFrame()
)
if cur.empty or "site_code" not in cur.columns:
return pd.DataFrame()
snap["site_code"] = pd.to_numeric(snap["site_code"], errors="coerce")
cur["site_code"] = pd.to_numeric(cur["site_code"], errors="coerce")
snap = snap.dropna(subset=["site_code"]).copy()
cur = cur.dropna(subset=["site_code"]).copy()
snap["site_code"] = snap["site_code"].astype(int)
cur["site_code"] = cur["site_code"].astype(int)
left = snap.set_index("site_code")
right = cur.set_index("site_code")
score_col = (
"criticality_score_weighted"
if "criticality_score_weighted" in right.columns
else "criticality_score"
)
if score_col not in right.columns:
score_col = (
"criticality_score" if "criticality_score" in right.columns else None
)
key_cols = [
"City",
"is_complaint_site",
"impacted_rats",
"persistent_kpis_total",
"degraded_kpis_total",
"resolved_kpis_total",
"criticality_score",
"criticality_score_weighted",
"traffic_gb_total",
]
all_sites = sorted(set(left.index.tolist()) | set(right.index.tolist()))
rows = []
for sc in all_sites:
srow = left.loc[sc] if sc in left.index else None
crow = right.loc[sc] if sc in right.index else None
def _get(row, col):
try:
if row is None:
return None
if isinstance(row, pd.DataFrame):
row = row.iloc[0]
return row.get(col, None)
except Exception: # noqa: BLE001
return None
snap_score = _get(srow, score_col) if score_col else None
cur_score = _get(crow, score_col) if score_col else None
try:
snap_score_f = (
float(snap_score)
if snap_score is not None and pd.notna(snap_score)
else 0.0
)
except Exception: # noqa: BLE001
snap_score_f = 0.0
try:
cur_score_f = (
float(cur_score)
if cur_score is not None and pd.notna(cur_score)
else 0.0
)
except Exception: # noqa: BLE001
cur_score_f = 0.0
if sc not in left.index:
change_type = "NEW"
elif sc not in right.index:
change_type = "MISSING"
else:
if cur_score_f > snap_score_f:
change_type = "SEVERITY_UP"
elif cur_score_f < snap_score_f:
change_type = "SEVERITY_DOWN"
else:
change_type = "UNCHANGED"
row_out = {
"site_code": int(sc),
"change_type": change_type,
"score_snapshot": int(round(snap_score_f)),
"score_current": int(round(cur_score_f)),
"score_delta": int(round(cur_score_f - snap_score_f)),
}
for c in key_cols:
row_out[f"snapshot_{c}"] = _get(srow, c)
row_out[f"current_{c}"] = _get(crow, c)
rows.append(row_out)
out = pd.DataFrame(rows)
if out.empty:
return out
try:
q = (city_filter.value or "").strip()
if q:
city_series = out.get("current_City")
if city_series is None:
city_series = out.get("snapshot_City")
if city_series is not None:
out = out[
city_series.astype(str).str.contains(q, case=False, na=False)
].copy()
except Exception: # noqa: BLE001
pass
try:
mc = int(min_criticality.value)
if mc > 0 and "score_current" in out.columns:
out = out[
pd.to_numeric(out["score_current"], errors="coerce").fillna(0) >= mc
]
except Exception: # noqa: BLE001
pass
order = {
"SEVERITY_UP": 0,
"NEW": 1,
"SEVERITY_DOWN": 2,
"UNCHANGED": 3,
"MISSING": 4,
}
try:
out["_order"] = out["change_type"].map(order).fillna(99).astype(int)
out = out.sort_values(
by=["_order", "score_delta"], ascending=[True, False]
).drop(columns=["_order"], errors="ignore")
except Exception: # noqa: BLE001
out = out.sort_values(
by=["change_type", "score_delta"], ascending=[True, False]
)
return out
def _refresh_delta_view(event=None) -> None:
global current_delta_df
try:
current_delta_df = _compute_delta_df()
except Exception: # noqa: BLE001
current_delta_df = pd.DataFrame()
delta_table.value = current_delta_df
export_button = pn.widgets.FileDownload(
label="Download KPI Health Check report",
filename="KPI_Health_Check_Report.xlsx",
button_type="primary",
)
export_include_raw_data = pn.widgets.Checkbox(
name="Include raw KPI data (slow)",
value=True,
)
perf_profiling = pn.widgets.Checkbox(
name="Perf profiling",
value=False,
)
alert_pack_button = pn.widgets.FileDownload(
label="Download Alert Pack",
filename="KPI_Alert_Pack.xlsx",
button_type="primary",
)
def _filtered_daily(df: pd.DataFrame) -> pd.DataFrame:
if df is None or df.empty:
return pd.DataFrame()
if (
analysis_range.value
and len(analysis_range.value) == 2
and analysis_range.value[0]
and analysis_range.value[1]
):
start, end = analysis_range.value
mask = (df["date_only"] >= start) & (df["date_only"] <= end)
return df[mask].copy()
return df
def _validate_inputs() -> tuple[list[str], list[str]]:
errors: list[str] = []
warnings: list[str] = []
bd = _coerce_int(baseline_days.value)
rd = _coerce_int(recent_days.value)
mcd = _coerce_int(min_consecutive_days.value)
thr = _coerce_float(rel_threshold_pct.value)
if bd is None or bd < 1:
errors.append("Baseline window (days) must be >= 1")
if rd is None or rd < 1:
errors.append("Recent window (days) must be >= 1")
if mcd is None or mcd < 1:
errors.append("Min consecutive bad days (persistent) must be >= 1")
if thr is None:
errors.append("Relative change threshold (%) is invalid")
elif thr < 0:
errors.append("Relative change threshold (%) must be >= 0")
g = str(granularity_select.value or "Daily").strip().lower()
is_hourly = g.startswith("hour") or g.startswith("h")
if is_hourly and (current_daily_by_rat or {}) and rd is not None and bd is not None:
try:
recent_periods = int(rd) * 24
baseline_periods = int(bd) * 24
except Exception: # noqa: BLE001
recent_periods = 0
baseline_periods = 0
if recent_periods > 0:
for rat_name, df0 in (current_daily_by_rat or {}).items():
if df0 is None or df0.empty or "period_start" not in df0.columns:
continue
df1 = _filtered_daily(df0)
if df1 is None or df1.empty or "period_start" not in df1.columns:
continue
t = pd.to_datetime(df1["period_start"], errors="coerce").dropna()
if t.empty:
continue
end_dt = pd.Timestamp(t.max()).floor("h")
min_dt = pd.Timestamp(t.min()).floor("h")
baseline_end = end_dt - timedelta(hours=int(recent_periods))
if min_dt > baseline_end:
warnings.append(
f"{rat_name}: Hourly baseline is empty (no data before the recent window). "
f"Export more hourly history or reduce Recent window (days)."
)
elif baseline_periods > 0:
try:
available_baseline_hours = int(
((baseline_end - min_dt) / timedelta(hours=1)) + 1
)
except Exception: # noqa: BLE001
available_baseline_hours = 0
if 0 < available_baseline_hours < int(baseline_periods):
warnings.append(
f"{rat_name}: Hourly baseline window is truncated "
f"({available_baseline_hours}h available < {int(baseline_periods)}h requested)."
)
ar = analysis_range.value
if ar and isinstance(ar, (list, tuple)) and len(ar) == 2 and ar[0] and ar[1]:
try:
if ar[0] > ar[1]:
errors.append("Analysis date range is invalid (start > end)")
except Exception: # noqa: BLE001
pass
has_overlap = False
for df in (current_daily_by_rat or {}).values():
if df is None or df.empty or "date_only" not in df.columns:
continue
try:
dmin = df["date_only"].min()
dmax = df["date_only"].max()
if dmin is None or dmax is None:
continue
if not (ar[1] < dmin or ar[0] > dmax):
has_overlap = True
break
except Exception: # noqa: BLE001
continue
if (current_daily_by_rat or {}) and not has_overlap:
warnings.append("No data falls within the selected analysis date range")
code_int = None
try:
code_int = _coerce_int(site_select.value)
if (
code_int is None
and isinstance(site_select.options, dict)
and site_select.value in site_select.options
):
code_int = _coerce_int(site_select.options.get(site_select.value))
except Exception: # noqa: BLE001
code_int = None
if (
(current_daily_by_rat or {})
and site_select.value is not None
and code_int is None
):
warnings.append("Selected site is not a valid site code")
try:
rat = rat_select.value
df = current_daily_by_rat.get(rat) if rat else None
if isinstance(df, pd.DataFrame) and not df.empty and "site_code" in df.columns:
n_sites = int(
pd.to_numeric(df["site_code"], errors="coerce").dropna().nunique()
)
if n_sites <= 1:
warnings.append(
f"{rat} dataset contains only {n_sites} site after normalization"
)
if code_int is not None:
available = set(
pd.to_numeric(df["site_code"], errors="coerce")
.dropna()
.astype(int)
.tolist()
)
if available and code_int not in available:
warnings.append(f"No {rat} data for selected site ({code_int})")
except Exception: # noqa: BLE001
pass
return errors, warnings
def _refresh_validation_state() -> None:
errors, warnings = _validate_inputs()
msgs: list[str] = []
msgs.extend([f"ERROR: {m}" for m in errors])
msgs.extend([f"WARN: {m}" for m in warnings])
if msgs:
validation_pane.object = "\n".join(msgs)
validation_pane.alert_type = "danger" if errors else "warning"
validation_pane.visible = True
else:
validation_pane.object = ""
validation_pane.alert_type = "success"
validation_pane.visible = bool(errors or warnings)
try:
run_button.disabled = bool(errors) or not has_data
except Exception: # noqa: BLE001
pass
try:
export_button.disabled = bool(errors) or not has_results
except Exception: # noqa: BLE001
pass
try:
rat = rat_select.value
daily = current_daily_by_rat.get(rat) if rat else None
code_int = _coerce_int(site_select.value)
if (
code_int is None
and isinstance(site_select.options, dict)
and site_select.value in site_select.options
):
code_int = _coerce_int(site_select.options.get(site_select.value))
if (
isinstance(daily, pd.DataFrame)
and not daily.empty
and code_int is not None
and "site_code" in daily.columns
):
available = set(
pd.to_numeric(daily["site_code"], errors="coerce")
.dropna()
.astype(int)
.tolist()
)
has_site = bool(available) and int(code_int) in available
else:
has_site = False
drilldown_export_button.disabled = (
bool(errors)
or (daily is None)
or (daily is not None and daily.empty)
or (code_int is None)
or (not has_site)
)
except Exception: # noqa: BLE001
pass
def _update_site_options() -> None:
global _updating_drilldown
all_sites = []
for df in current_daily_by_rat.values():
if df is None or df.empty:
continue
if "site_code" not in df.columns:
continue
cols = [c for c in ["site_code", "City"] if c in df.columns]
all_sites.append(df[cols].drop_duplicates("site_code"))
if not all_sites:
site_select.options = {}
site_select.value = None
return
sites_df = pd.concat(all_sites, ignore_index=True).drop_duplicates("site_code")
if "City" not in sites_df.columns:
sites_df["City"] = pd.NA
sites_df = sites_df.sort_values(by=["City", "site_code"], na_position="last")
opts: dict[str, int] = {}
for _, row in sites_df.iterrows():
label = (
f"{row['City']}_{row['site_code']}"
if pd.notna(row.get("City"))
else str(row["site_code"])
)
opts[str(label)] = int(row["site_code"])
_updating_drilldown = True
try:
site_select.options = opts
if opts and site_select.value not in opts.values():
site_select.value = next(iter(opts.values()))
finally:
_updating_drilldown = False
def _update_kpi_options() -> None:
if _applying_profile or _loading_datasets:
return
global _updating_drilldown
rat = rat_select.value
df = current_daily_by_rat.get(rat)
if df is None or df.empty:
kpi_select.options = []
kpi_select.value = None
return
kpis = [
c
for c in df.columns
if c
not in {
"site_code",
"date_only",
"period_start",
"Longitude",
"Latitude",
"City",
"RAT",
}
]
kpis = sorted([str(c) for c in kpis])
groups = get_kpis_by_group(kpis)
group_options = ["All (selected KPIs)"] + sorted(
[g for g in groups.keys() if g != "Other"]
)
if "Other" in groups:
group_options.append("Other")
_updating_drilldown = True
try:
kpi_group_select.options = group_options
if kpi_group_select.value not in group_options:
kpi_group_select.value = group_options[0]
filtered_kpis = filter_kpis(
kpis, kpi_group_select.value, mode=kpi_group_mode.value
)
kpi_select.options = filtered_kpis
if filtered_kpis and kpi_select.value not in filtered_kpis:
kpi_select.value = filtered_kpis[0]
kpi_compare_select.options = kpis
cur = list(kpi_compare_select.value or [])
cur2 = [str(x) for x in cur if str(x) in kpis]
if cur2 != cur:
kpi_compare_select.value = cur2
finally:
_updating_drilldown = False
def _update_site_view(event=None) -> None:
if _applying_profile or _loading_datasets or _updating_drilldown:
return
code = site_select.value
rat = rat_select.value
kpi = kpi_select.value
if code is None or rat is None:
site_kpi_table.value = pd.DataFrame()
trend_plot_pane.object = None
heatmap_plot_pane.object = None
hist_plot_pane.object = None
corr_plot_pane.object = None
corr_message.visible = False
return
daily = current_daily_by_rat.get(rat)
if daily is None or daily.empty:
trend_plot_pane.object = None
heatmap_plot_pane.object = None
hist_plot_pane.object = None
corr_plot_pane.object = None
corr_message.visible = False
return
d = _filtered_daily(daily)
if (
d is None
or d.empty
or "site_code" not in d.columns
or "date_only" not in d.columns
):
trend_plot_pane.object = None
heatmap_plot_pane.object = None
hist_plot_pane.object = None
corr_plot_pane.object = None
corr_message.visible = False
return
try:
available_sites = set(
pd.to_numeric(d["site_code"], errors="coerce").dropna().astype(int).tolist()
)
except Exception: # noqa: BLE001
available_sites = set()
code_int = None
if code is not None:
if hasattr(site_select, "options") and isinstance(site_select.options, dict):
if code in site_select.options:
code_int = site_select.options[code]
elif code in site_select.options.values():
code_int = code
if code_int is None:
try:
code_int = int(code)
except Exception:
code_int = _coerce_int(code)
if available_sites and code_int is not None and code_int not in available_sites:
site_kpi_table.value = pd.DataFrame()
trend_plot_pane.object = None
heatmap_plot_pane.object = None
hist_plot_pane.object = None
corr_plot_pane.object = None
corr_message.visible = False
_refresh_validation_state()
return
if code_int is None:
site_kpi_table.value = pd.DataFrame()
trend_plot_pane.object = None
heatmap_plot_pane.object = None
hist_plot_pane.object = None
corr_plot_pane.object = None
corr_message.visible = False
return
code = code_int
status_df = (
current_status_df
if isinstance(current_status_df, pd.DataFrame)
else pd.DataFrame()
)
if status_df is None or status_df.empty:
site_df = pd.DataFrame()
else:
site_df = status_df[
(status_df["site_code"] == int(code_int if code_int is not None else 0))
& (status_df["RAT"] == rat)
].copy()
site_kpi_table.value = site_df
if not kpi or kpi not in d.columns:
candidate_kpis = [
c
for c in d.columns
if c
not in {
"site_code",
"date_only",
"period_start",
"Longitude",
"Latitude",
"City",
"RAT",
}
]
candidate_kpis = sorted([str(c) for c in candidate_kpis])
if not candidate_kpis:
trend_plot_pane.object = None
heatmap_plot_pane.object = None
hist_plot_pane.object = None
corr_plot_pane.object = None
corr_message.visible = False
return
new_kpi = candidate_kpis[0]
_set_widget_value(kpi_select, new_kpi)
kpi = new_kpi
g = str(granularity_select.value or "Daily").strip().lower()
is_hourly = g.startswith("hour") or g.startswith("h")
time_col = (
"period_start" if (is_hourly and "period_start" in d.columns) else "date_only"
)
s = d[d["site_code"] == int(code_int)].copy().sort_values(time_col)
if s.empty:
trend_plot_pane.object = None
heatmap_plot_pane.object = None
hist_plot_pane.object = None
corr_plot_pane.object = None
corr_message.visible = False
return
cache_key = _drilldown_cache_key(int(code_int), str(rat), str(kpi))
cached = _drilldown_cache_get(cache_key)
if cached is not None:
try:
(
trend_plot_pane.object,
heatmap_plot_pane.object,
hist_plot_pane.object,
corr_plot_pane.object,
corr_message.object,
corr_message.alert_type,
corr_message.visible,
) = cached
except Exception: # noqa: BLE001
trend_plot_pane.object, heatmap_plot_pane.object, hist_plot_pane.object = (
cached
)
return
kpis_to_plot = []
selected = [str(x) for x in (kpi_compare_select.value or []) if str(x)]
if kpi and str(kpi) not in selected:
selected = [str(kpi)] + selected
if "Add top" in str(kpi_group_mode.value):
from_group = filter_kpis(
d.columns.tolist(), kpi_group_select.value, mode="Top-N", top_n=12
)
for gk in from_group:
if gk not in selected:
selected.append(gk)
kpis_to_plot = selected[:15]
rules_df = (
rules_table.value
if isinstance(rules_table.value, pd.DataFrame)
else pd.DataFrame()
)
relevant_rules = rules_df
try:
if (
isinstance(rules_df, pd.DataFrame)
and not rules_df.empty
and "RAT" in rules_df.columns
):
relevant_rules = rules_df[rules_df["RAT"] == rat].copy()
except Exception: # noqa: BLE001
relevant_rules = rules_df
fig = build_drilldown_plot(
df=d[d["site_code"] == int(code_int)],
kpis=kpis_to_plot,
rules_df=relevant_rules,
highlight_bad_days=True,
show_sla=bool(show_sla_toggle.value),
site_code=code_int,
rat=rat,
main_kpi=str(kpi),
baseline_days_n=int(_coerce_int(baseline_days.value) or 30),
recent_days_n=int(_coerce_int(recent_days.value) or 7),
rel_threshold_pct=float(_coerce_float(rel_threshold_pct.value) or 10.0),
normalization=str(kpi_compare_norm.value or "None"),
granularity=str(granularity_select.value or "Daily"),
)
trend_plot_pane.object = fig
kpis_for_heatmap = []
if (
isinstance(site_df, pd.DataFrame)
and not site_df.empty
and "KPI" in site_df.columns
):
sev = {
"PERSISTENT_DEGRADED": 4,
"DEGRADED": 3,
"NOTIFY": 2,
"RESOLVED": 1,
"NOTIFY_RESOLVED": 0,
"OK": 0,
"NO_DATA": -1,
}
tmp = site_df.copy()
if "status" in tmp.columns:
tmp["_sev"] = tmp["status"].map(sev).fillna(0).astype(int)
tmp = tmp.sort_values(by=["_sev", "KPI"], ascending=[False, True])
kpis_for_heatmap = tmp["KPI"].astype(str).tolist()[:30]
if not kpis_for_heatmap:
try:
if isinstance(rules_df, pd.DataFrame) and not rules_df.empty:
kpis_for_heatmap = (
rules_df[rules_df["RAT"] == rat]["KPI"].astype(str).tolist()[:30]
)
except Exception: # noqa: BLE001
kpis_for_heatmap = []
heatmap_plot_pane.object = _build_site_heatmap(
d, rules_df, int(code_int), rat, kpis_for_heatmap
)
hist_plot_pane.object = _build_baseline_recent_hist(d, int(code_int), str(kpi))
try:
corr_kpis = [str(x) for x in (kpi_compare_select.value or []) if str(x)]
if len(corr_kpis) < 2:
corr_kpis = [str(x) for x in (kpis_to_plot or []) if str(x)]
corr_kpis = [c for c in corr_kpis if c in d.columns]
corr_kpis = corr_kpis[:20]
df_corr = d
try:
windows = _compute_site_windows(d)
if windows is not None:
baseline_start, baseline_end, recent_start, recent_end = windows
w = str(corr_window_select.value or "")
if w.startswith("Recent"):
df_corr = d[
(
pd.to_datetime(d[time_col], errors="coerce")
>= pd.to_datetime(recent_start)
)
& (
pd.to_datetime(d[time_col], errors="coerce")
<= pd.to_datetime(recent_end)
)
].copy()
elif w.startswith("Baseline"):
df_corr = d[
(
pd.to_datetime(d[time_col], errors="coerce")
>= pd.to_datetime(baseline_start)
)
& (
pd.to_datetime(d[time_col], errors="coerce")
<= pd.to_datetime(baseline_end)
)
].copy()
except Exception: # noqa: BLE001
df_corr = d
corr_fig = _build_corr_heatmap(df_corr, int(code_int), corr_kpis)
if corr_fig is None:
corr_plot_pane.object = None
corr_message.alert_type = "info"
corr_message.object = "Correlation needs at least 2 KPIs with enough samples for the selected site."
corr_message.visible = True
else:
corr_message.visible = False
corr_plot_pane.object = corr_fig
except Exception: # noqa: BLE001
corr_plot_pane.object = None
corr_message.alert_type = "warning"
corr_message.object = "Unable to compute correlation."
corr_message.visible = True
try:
drilldown_export_button.filename = (
f"KPI_Drilldown_{rat}_site_{int(code_int)}.xlsx"
)
except Exception: # noqa: BLE001
pass
_drilldown_cache_set(
cache_key,
(
trend_plot_pane.object,
heatmap_plot_pane.object,
hist_plot_pane.object,
corr_plot_pane.object,
corr_message.object,
corr_message.alert_type,
corr_message.visible,
),
)
def _apply_city_filter(df: pd.DataFrame) -> pd.DataFrame:
if df is None or df.empty:
return pd.DataFrame()
q = (city_filter.value or "").strip()
if not q or "City" not in df.columns:
return df
return df[df["City"].astype(str).str.contains(q, case=False, na=False)].copy()
def _extract_site_codes_from_any(df: pd.DataFrame) -> set[int]:
if df is None or df.empty:
return set()
candidates = [
"site_code",
"site",
"site id",
"site_id",
"code",
"code_site",
"id",
]
col = None
cols_lower = {str(c).strip().lower(): c for c in df.columns}
for c in candidates:
if c in cols_lower:
col = cols_lower[c]
break
if col is None:
col = df.columns[0]
raw = df[col].astype(str).str.strip()
nums = pd.to_numeric(raw, errors="coerce")
if nums.isna().all():
extracted = raw.str.extract(r"(\d{3,})")[0]
nums = pd.to_numeric(extracted, errors="coerce")
out = nums.dropna().astype(int).tolist()
return set(out)
def _load_complaint_sites_from_bytes(content: bytes, filename: str) -> set[int]:
name = str(filename or "").lower()
if name.endswith(".xlsx") or name.endswith(".xls"):
try:
df = pd.read_excel(io.BytesIO(content))
except Exception: # noqa: BLE001
return set()
return _extract_site_codes_from_any(df)
if name.endswith(".txt"):
try:
lines = content.decode("utf-8", errors="ignore").splitlines()
except Exception: # noqa: BLE001
return set()
df = pd.DataFrame({"site_code": lines})
return _extract_site_codes_from_any(df)
try:
df = pd.read_csv(io.BytesIO(content), sep=None, engine="python")
except Exception: # noqa: BLE001
try:
df = pd.read_csv(io.BytesIO(content))
except Exception: # noqa: BLE001
return set()
return _extract_site_codes_from_any(df)
def _refresh_complaint_sites(event=None) -> None:
global complaint_sites
if complaint_sites_file.value:
complaint_sites = _load_complaint_sites_from_bytes(
complaint_sites_file.value, complaint_sites_file.filename or ""
)
else:
default_paths = [
os.path.join(ROOT_DIR, "data", "complaint_sites.csv"),
os.path.join(ROOT_DIR, "data", "complaint_sites.xlsx"),
os.path.join(ROOT_DIR, "data", "sites_plaintes.csv"),
os.path.join(ROOT_DIR, "data", "sites_plaintes.xlsx"),
]
loaded = False
for p in default_paths:
if not os.path.exists(p):
continue
try:
if p.lower().endswith((".xlsx", ".xls")):
df = pd.read_excel(p)
else:
df = pd.read_csv(p, sep=None, engine="python")
complaint_sites = _extract_site_codes_from_any(df)
loaded = True
break
except Exception: # noqa: BLE001
continue
if not loaded:
complaint_sites = set()
_apply_complaint_flags()
_invalidate_drilldown_cache(healthcheck_changed=True)
def _apply_complaint_flags() -> None:
global current_multirat_raw, current_top_anomalies_raw
if complaint_sites:
if current_multirat_raw is not None and not current_multirat_raw.empty:
current_multirat_raw["is_complaint_site"] = (
current_multirat_raw["site_code"].astype(int).isin(complaint_sites)
)
if (
current_top_anomalies_raw is not None
and not current_top_anomalies_raw.empty
):
current_top_anomalies_raw["is_complaint_site"] = (
current_top_anomalies_raw["site_code"].astype(int).isin(complaint_sites)
)
else:
if current_multirat_raw is not None and not current_multirat_raw.empty:
current_multirat_raw["is_complaint_site"] = False
if (
current_top_anomalies_raw is not None
and not current_top_anomalies_raw.empty
):
current_top_anomalies_raw["is_complaint_site"] = False
_refresh_filtered_results()
def _infer_rule_row(rules_df: pd.DataFrame, rat: str, kpi: str) -> dict:
if rules_df is None or rules_df.empty:
return {}
s = rules_df[(rules_df["RAT"] == rat) & (rules_df["KPI"] == kpi)]
if s.empty:
return {}
return dict(s.iloc[0].to_dict())
def _compute_site_windows(
daily_filtered: pd.DataFrame,
) -> (
tuple[
date | pd.Timestamp,
date | pd.Timestamp,
date | pd.Timestamp,
date | pd.Timestamp,
]
| None
):
if daily_filtered is None or daily_filtered.empty:
return None
rd = _coerce_int(recent_days.value)
bd = _coerce_int(baseline_days.value)
if rd is None or rd < 1 or bd is None or bd < 1:
return None
g = str(granularity_select.value or "Daily").strip().lower()
is_hourly = g.startswith("hour") or g.startswith("h")
time_col = (
"period_start"
if (is_hourly and "period_start" in daily_filtered.columns)
else "date_only"
)
if is_hourly and time_col == "period_start":
try:
end_dt = pd.to_datetime(daily_filtered[time_col], errors="coerce").max()
if pd.isna(end_dt):
return None
end_dt = pd.Timestamp(end_dt).floor("h")
except Exception: # noqa: BLE001
return None
recent_periods = int(rd) * 24
baseline_periods = int(bd) * 24
step = timedelta(hours=1)
recent_start, recent_end = window_bounds_period(end_dt, recent_periods, step)
baseline_end = recent_start - step
baseline_start, _ = window_bounds_period(baseline_end, baseline_periods, step)
return baseline_start, baseline_end, recent_start, recent_end
try:
end_ts = pd.to_datetime(daily_filtered["date_only"], errors="coerce").max()
if pd.isna(end_ts):
return None
end_date = end_ts.date()
except Exception: # noqa: BLE001
return None
recent_start, recent_end = window_bounds(end_date, int(rd))
baseline_end = recent_start - timedelta(days=1)
baseline_start = baseline_end - timedelta(days=int(bd) - 1)
return baseline_start, baseline_end, recent_start, recent_end
def _build_site_heatmap(
daily_filtered: pd.DataFrame,
rules_df: pd.DataFrame,
site_code: int,
rat: str,
kpis: list[str],
) -> go.Figure | None:
if daily_filtered is None or daily_filtered.empty:
return None
windows = _compute_site_windows(daily_filtered)
if windows is None:
return None
baseline_start, baseline_end, recent_start, recent_end = windows
g = str(granularity_select.value or "Daily").strip().lower()
is_hourly = g.startswith("hour") or g.startswith("h")
time_col = (
"period_start"
if (is_hourly and "period_start" in daily_filtered.columns)
else "date_only"
)
site_daily = daily_filtered[daily_filtered["site_code"] == int(site_code)].copy()
if site_daily.empty:
return None
site_daily = site_daily.sort_values(time_col)
dates = []
cur = recent_start
step = timedelta(hours=1) if is_hourly else timedelta(days=1)
while cur <= recent_end:
dates.append(cur)
cur = cur + step
z = []
hover = []
y_labels = []
for kpi in kpis:
if kpi not in site_daily.columns:
continue
rule = _infer_rule_row(rules_df, rat, kpi)
direction = str(rule.get("direction", "higher_is_better"))
policy = str(rule.get("policy", "enforce") or "enforce").strip().lower()
sla_raw = rule.get("sla", None)
try:
sla_val = float(sla_raw) if pd.notna(sla_raw) else None
except Exception: # noqa: BLE001
sla_val = None
sla_eval = None if policy == "notify" else sla_val
s = site_daily[[time_col, kpi]].dropna(subset=[kpi])
t = pd.to_datetime(s[time_col], errors="coerce")
baseline_mask = (t >= pd.to_datetime(baseline_start)) & (
t <= pd.to_datetime(baseline_end)
)
baseline = s.loc[baseline_mask, kpi].median() if baseline_mask.any() else np.nan
baseline_val = float(baseline) if pd.notna(baseline) else None
row_z = []
row_h = []
for d in dates:
v_series = site_daily.loc[
pd.to_datetime(site_daily[time_col], errors="coerce")
== pd.to_datetime(d),
kpi,
]
v = v_series.iloc[0] if not v_series.empty else np.nan
if v is None or (isinstance(v, float) and np.isnan(v)):
row_z.append(None)
row_h.append(f"{kpi}
{d}: NO_DATA")
continue
bad = is_bad(
float(v) if pd.notna(v) else None,
baseline_val,
direction,
float(rel_threshold_pct.value),
sla_eval,
)
row_z.append(1 if bad else 0)
row_h.append(
f"{kpi}
{d}: {float(v):.3f}
baseline: {baseline_val if baseline_val is not None else 'NA'}
sla: {sla_val if sla_val is not None else 'NA'}"
)
z.append(row_z)
hover.append(row_h)
y_labels.append(kpi)
if not z:
return None
fig = go.Figure(
data=[
go.Heatmap(
z=z,
x=[str(d) for d in dates],
y=y_labels,
colorscale=[[0.0, "#2ca02c"], [1.0, "#d62728"]],
zmin=0,
zmax=1,
showscale=False,
hovertext=hover,
hovertemplate="%{hovertext}",
)
]
)
fig.update_layout(
template="plotly_white",
title=f"{rat} - Site {int(site_code)} - Recent window heatmap",
xaxis_title="period",
yaxis_title="KPI",
height=420,
margin=dict(l=40, r=20, t=60, b=40),
)
return fig
def _build_baseline_recent_hist(
daily_filtered: pd.DataFrame,
site_code: int,
kpi: str,
) -> go.Figure | None:
if (
daily_filtered is None
or daily_filtered.empty
or not kpi
or kpi not in daily_filtered.columns
):
return None
windows = _compute_site_windows(daily_filtered)
if windows is None:
return None
baseline_start, baseline_end, recent_start, recent_end = windows
g = str(granularity_select.value or "Daily").strip().lower()
is_hourly = g.startswith("hour") or g.startswith("h")
time_col = (
"period_start"
if (is_hourly and "period_start" in daily_filtered.columns)
else "date_only"
)
site_daily = daily_filtered[daily_filtered["site_code"] == int(site_code)].copy()
if site_daily.empty:
return None
s = site_daily[[time_col, kpi]].dropna(subset=[kpi])
t = pd.to_datetime(s[time_col], errors="coerce")
baseline_mask = (t >= pd.to_datetime(baseline_start)) & (
t <= pd.to_datetime(baseline_end)
)
recent_mask = (t >= pd.to_datetime(recent_start)) & (
t <= pd.to_datetime(recent_end)
)
baseline_vals = (
pd.to_numeric(s.loc[baseline_mask, kpi], errors="coerce").dropna().astype(float)
)
recent_vals = (
pd.to_numeric(s.loc[recent_mask, kpi], errors="coerce").dropna().astype(float)
)
if baseline_vals.empty and recent_vals.empty:
return None
dfh = pd.concat(
[
pd.DataFrame({"window": "baseline", "value": baseline_vals}),
pd.DataFrame({"window": "recent", "value": recent_vals}),
],
ignore_index=True,
)
fig = px.histogram(
dfh,
x="value",
color="window",
barmode="overlay",
opacity=0.6,
)
fig.update_layout(
template="plotly_white",
title=f"{kpi} distribution (baseline vs recent)",
xaxis_title=kpi,
yaxis_title="count",
height=420,
margin=dict(l=40, r=20, t=60, b=40),
)
return fig
def _build_corr_heatmap(
daily_filtered: pd.DataFrame,
site_code: int,
kpis: list[str],
) -> go.Figure | None:
if daily_filtered is None or daily_filtered.empty:
return None
if not kpis:
return None
if "site_code" not in daily_filtered.columns:
return None
df_site = daily_filtered[daily_filtered["site_code"] == int(site_code)].copy()
if df_site.empty:
return None
cols = [str(c) for c in kpis if str(c) in df_site.columns]
cols = [
c
for c in cols
if c
not in {
"site_code",
"date_only",
"period_start",
"Longitude",
"Latitude",
"City",
"RAT",
}
]
cols = list(dict.fromkeys(cols))
if len(cols) < 2:
return None
x = df_site[cols].copy()
for c in cols:
x[c] = pd.to_numeric(x[c], errors="coerce")
x = x.dropna(how="all")
if x.empty or x.shape[0] < 5:
return None
x = x.dropna(axis=1, how="all")
if x.shape[1] < 2:
return None
corr = x.corr(method="pearson")
if corr is None or corr.empty:
return None
labels = [str(c) for c in corr.columns.tolist()]
z = corr.values
fig = go.Figure(
data=[
go.Heatmap(
z=z,
x=labels,
y=labels,
zmin=-1,
zmax=1,
colorscale="RdBu",
reversescale=True,
hovertemplate="%{y} vs %{x}
corr=%{z:.3f}",
)
]
)
fig.update_layout(
template="plotly_white",
title=f"Correlation heatmap (Pearson) - Site {int(site_code)}",
height=520,
margin=dict(l=60, r=20, t=60, b=60),
)
return fig
def _compute_site_traffic_gb(daily_by_rat: dict[str, pd.DataFrame]) -> pd.DataFrame:
MB_PER_GB = 1024.0
rows = []
for rat, daily in daily_by_rat.items():
if daily is None or daily.empty:
continue
d = _filtered_daily(daily)
if d.empty or "site_code" not in d.columns:
continue
cols: list[str] = []
if rat == "2G":
for c in d.columns:
key = str(c).lower().replace(" ", "_")
if "traffic_ps" in key:
cols.append(c)
elif rat == "3G":
if "Total_Data_Traffic" in d.columns:
cols.append("Total_Data_Traffic")
elif rat == "LTE":
for c in d.columns:
key = str(c).lower().replace(" ", "_")
if "traffic_volume" in key and "gbytes" in key:
cols.append(c)
cols = [c for c in cols if c in d.columns]
if not cols:
continue
traffic = pd.to_numeric(
d[cols].sum(axis=1, skipna=True), errors="coerce"
).fillna(0)
if rat == "2G":
traffic = traffic / MB_PER_GB
tmp = pd.DataFrame(
{
"site_code": d["site_code"].astype(int),
"RAT": rat,
"traffic_gb": traffic.astype(float),
}
)
tmp = tmp.groupby(["site_code", "RAT"], as_index=False)["traffic_gb"].sum()
rows.append(tmp)
if not rows:
return pd.DataFrame(columns=["site_code", "traffic_gb_total"])
all_rows = pd.concat(rows, ignore_index=True)
pivot = (
all_rows.pivot_table(
index="site_code",
columns="RAT",
values="traffic_gb",
aggfunc="sum",
fill_value=0,
)
.reset_index()
.copy()
)
traffic_cols = []
for rat in ["2G", "3G", "LTE", "TWAMP"]:
if rat in pivot.columns:
pivot = pivot.rename(columns={rat: f"traffic_gb_{rat}"})
traffic_cols.append(f"traffic_gb_{rat}")
if traffic_cols:
pivot["traffic_gb_total"] = (
pd.to_numeric(pivot[traffic_cols].sum(axis=1), errors="coerce")
.fillna(0)
.astype(float)
)
else:
pivot["traffic_gb_total"] = 0.0
return pivot
def _refresh_filtered_results(event=None) -> None:
global current_multirat_df, current_top_anomalies_df, current_ops_queue_df
global current_export_bytes, current_alert_pack_bytes
if _applying_profile or _loading_datasets:
return
if current_multirat_raw is not None and not current_multirat_raw.empty:
m = _apply_city_filter(current_multirat_raw)
if (
bool(only_complaint_sites.value)
and "is_complaint_site" in m.columns
and not m.empty
):
m = m[m["is_complaint_site"] == True] # noqa: E712
score_col = (
"criticality_score_weighted"
if "criticality_score_weighted" in m.columns
else "criticality_score"
)
if score_col in m.columns:
m = m[
pd.to_numeric(m[score_col], errors="coerce").fillna(0)
>= int(min_criticality.value)
]
m = m.sort_values(by=[score_col], ascending=False)
current_multirat_df = m
multirat_summary_table.value = current_multirat_df
else:
current_multirat_df = pd.DataFrame()
multirat_summary_table.value = current_multirat_df
if current_multirat_raw is not None and not current_multirat_raw.empty:
oq = _apply_city_filter(current_multirat_raw)
oq_score_col = (
"criticality_score_weighted"
if "criticality_score_weighted" in oq.columns
else "criticality_score"
)
if oq_score_col in oq.columns:
oq = oq[
pd.to_numeric(oq[oq_score_col], errors="coerce").fillna(0)
>= int(min_criticality.value)
]
oq = oq.copy()
try:
oq["priority_score"] = (
pd.to_numeric(oq[oq_score_col], errors="coerce")
.fillna(0)
.round(0)
.astype(int)
if oq_score_col in oq.columns
else 0
)
except Exception: # noqa: BLE001
oq["priority_score"] = 0
cols = []
for c in [
"priority_score",
"site_code",
"City",
"is_complaint_site",
"impacted_rats",
"persistent_kpis_total",
"degraded_kpis_total",
"resolved_kpis_total",
"criticality_score",
"criticality_score_weighted",
"traffic_gb_total",
"traffic_gb_2G",
"traffic_gb_3G",
"traffic_gb_LTE",
"traffic_gb_TWAMP",
]:
if c in oq.columns and c not in cols:
cols.append(c)
for prefix in ["persistent", "degraded", "resolved"]:
for r in ["2G", "3G", "LTE", "TWAMP"]:
c = f"{prefix}_{r}"
if c in oq.columns and c not in cols:
cols.append(c)
if cols:
oq = oq[cols].copy()
if not oq.empty and "priority_score" in oq.columns:
oq = oq.sort_values(by=["priority_score"], ascending=False)
current_ops_queue_df = oq
ops_queue_table.value = current_ops_queue_df
else:
current_ops_queue_df = pd.DataFrame()
ops_queue_table.value = current_ops_queue_df
if current_top_anomalies_raw is not None and not current_top_anomalies_raw.empty:
t = _apply_city_filter(current_top_anomalies_raw)
if (
bool(only_complaint_sites.value)
and "is_complaint_site" in t.columns
and not t.empty
):
t = t[t["is_complaint_site"] == True] # noqa: E712
if top_rat_filter.value:
t = t[t["RAT"].isin(list(top_rat_filter.value))]
if top_status_filter.value and "status" in t.columns:
t = t[t["status"].isin(list(top_status_filter.value))]
score_col = (
"anomaly_score_weighted"
if "anomaly_score_weighted" in t.columns
else "anomaly_score"
)
if score_col in t.columns:
t = t[
pd.to_numeric(t[score_col], errors="coerce").fillna(0)
>= int(min_anomaly_score.value)
]
t = t.sort_values(by=[score_col], ascending=False)
current_top_anomalies_df = t
top_anomalies_table.value = current_top_anomalies_df
else:
current_top_anomalies_df = pd.DataFrame()
top_anomalies_table.value = current_top_anomalies_df
if current_multirat_raw is not None and not current_multirat_raw.empty:
cm = _apply_city_filter(current_multirat_raw)
if "is_complaint_site" in cm.columns:
cm = cm[cm["is_complaint_site"] == True] # noqa: E712
else:
cm = cm.iloc[0:0].copy()
score_col = (
"criticality_score_weighted"
if "criticality_score_weighted" in cm.columns
else "criticality_score"
)
if score_col in cm.columns:
cm = cm[
pd.to_numeric(cm[score_col], errors="coerce").fillna(0)
>= int(min_criticality.value)
]
cm = cm.sort_values(by=[score_col], ascending=False)
complaint_multirat_summary_table.value = cm
else:
complaint_multirat_summary_table.value = pd.DataFrame()
if current_top_anomalies_raw is not None and not current_top_anomalies_raw.empty:
ct = _apply_city_filter(current_top_anomalies_raw)
if "is_complaint_site" in ct.columns:
ct = ct[ct["is_complaint_site"] == True] # noqa: E712
else:
ct = ct.iloc[0:0].copy()
if top_rat_filter.value:
ct = ct[ct["RAT"].isin(list(top_rat_filter.value))]
if top_status_filter.value and "status" in ct.columns:
ct = ct[ct["status"].isin(list(top_status_filter.value))]
score_col = (
"anomaly_score_weighted"
if "anomaly_score_weighted" in ct.columns
else "anomaly_score"
)
if score_col in ct.columns:
ct = ct[
pd.to_numeric(ct[score_col], errors="coerce").fillna(0)
>= int(min_anomaly_score.value)
]
ct = ct.sort_values(by=[score_col], ascending=False)
complaint_top_anomalies_table.value = ct
else:
complaint_top_anomalies_table.value = pd.DataFrame()
current_export_bytes = None
current_alert_pack_bytes = None
try:
_refresh_map_view()
except Exception: # noqa: BLE001
pass
def _refresh_presets(event=None) -> None:
names = list_presets()
preset_select.options = [""] + names
if preset_select.value not in preset_select.options:
preset_select.value = ""
def _refresh_profiles(event=None) -> None:
names = list_profiles()
profile_select.options = [""] + names
if profile_select.value not in profile_select.options:
profile_select.value = ""
def _current_profile_config() -> dict:
cfg: dict = {}
cfg["granularity"] = str(granularity_select.value or "Daily")
cfg["analysis_range"] = (
[
(
str(analysis_range.value[0])
if analysis_range.value and analysis_range.value[0]
else None
),
(
str(analysis_range.value[1])
if analysis_range.value and analysis_range.value[1]
else None
),
]
if analysis_range.value
else [None, None]
)
cfg["baseline_days"] = int(baseline_days.value)
cfg["recent_days"] = int(recent_days.value)
cfg["rel_threshold_pct"] = float(rel_threshold_pct.value)
cfg["min_consecutive_days"] = int(min_consecutive_days.value)
cfg["min_criticality"] = int(min_criticality.value)
cfg["min_anomaly_score"] = int(min_anomaly_score.value)
cfg["city_filter"] = str(city_filter.value or "")
cfg["only_complaint_sites"] = bool(only_complaint_sites.value)
cfg["top_rat_filter"] = list(top_rat_filter.value) if top_rat_filter.value else []
cfg["top_status_filter"] = (
list(top_status_filter.value) if top_status_filter.value else []
)
cfg["preset_selected"] = str(preset_select.value or "")
cfg["drilldown"] = {
"site_code": int(site_select.value) if site_select.value is not None else None,
"rat": str(rat_select.value or ""),
"kpi": str(kpi_select.value or ""),
"compare_kpis": (
list(kpi_compare_select.value) if kpi_compare_select.value else []
),
"compare_norm": str(kpi_compare_norm.value or "None"),
"show_sla": bool(show_sla_toggle.value),
}
return cfg
def _apply_profile_config(cfg: dict) -> None:
global _applying_profile
if cfg is None or not isinstance(cfg, dict):
return
_applying_profile = True
try:
try:
g = str(cfg.get("granularity", "") or "").strip()
if g and g in list(granularity_select.options):
granularity_select.value = g
except Exception: # noqa: BLE001
pass
try:
ar = cfg.get("analysis_range", [None, None])
if isinstance(ar, (list, tuple)) and len(ar) == 2 and ar[0] and ar[1]:
analysis_range.value = (
pd.to_datetime(ar[0]).date(),
pd.to_datetime(ar[1]).date(),
)
else:
analysis_range.value = None
except Exception: # noqa: BLE001
pass
for w, key, cast in [
(baseline_days, "baseline_days", int),
(recent_days, "recent_days", int),
(rel_threshold_pct, "rel_threshold_pct", float),
(min_consecutive_days, "min_consecutive_days", int),
(min_criticality, "min_criticality", int),
(min_anomaly_score, "min_anomaly_score", int),
]:
try:
if key in cfg and cfg[key] is not None:
w.value = cast(cfg[key])
except Exception: # noqa: BLE001
pass
try:
if (
"only_complaint_sites" in cfg
and cfg["only_complaint_sites"] is not None
):
only_complaint_sites.value = bool(cfg["only_complaint_sites"])
except Exception: # noqa: BLE001
pass
try:
city_filter.value = str(cfg.get("city_filter", "") or "")
except Exception: # noqa: BLE001
pass
try:
tr = cfg.get("top_rat_filter", [])
if isinstance(tr, list):
top_rat_filter.value = [x for x in tr if x in top_rat_filter.options]
except Exception: # noqa: BLE001
pass
try:
ts = cfg.get("top_status_filter", [])
if isinstance(ts, list):
top_status_filter.value = [
x for x in ts if x in top_status_filter.options
]
except Exception: # noqa: BLE001
pass
try:
preset_name = str(cfg.get("preset_selected", "") or "").strip()
if preset_name:
_refresh_presets()
if preset_name in preset_select.options:
preset_select.value = preset_name
try:
_apply_preset()
except Exception: # noqa: BLE001
pass
except Exception: # noqa: BLE001
pass
drill = (
cfg.get("drilldown", {})
if isinstance(cfg.get("drilldown", {}), dict)
else {}
)
try:
rat = str(drill.get("rat", "") or "")
if rat and rat in list(rat_select.options):
rat_select.value = rat
except Exception: # noqa: BLE001
pass
try:
sc = drill.get("site_code", None)
if sc is not None:
site_select.value = int(sc)
except Exception: # noqa: BLE001
pass
try:
kpi = str(drill.get("kpi", "") or "")
if kpi:
kpi_select.value = kpi
except Exception: # noqa: BLE001
pass
try:
norm = str(drill.get("compare_norm", "None") or "None")
if norm in list(kpi_compare_norm.options):
kpi_compare_norm.value = norm
except Exception: # noqa: BLE001
pass
try:
if "show_sla" in drill and drill["show_sla"] is not None:
show_sla_toggle.value = bool(drill["show_sla"])
except Exception: # noqa: BLE001
pass
try:
ck = drill.get("compare_kpis", [])
if isinstance(ck, list):
opts = list(kpi_compare_select.options or [])
kpi_compare_select.value = [str(x) for x in ck if str(x) in opts]
except Exception: # noqa: BLE001
pass
finally:
_applying_profile = False
_refresh_filtered_results()
_update_kpi_options()
_update_site_view()
def _apply_profile(event=None) -> None:
try:
if not profile_select.value:
return
cfg = load_profile(str(profile_select.value))
_apply_profile_config(cfg)
status_pane.alert_type = "success"
status_pane.object = f"Profile applied: {profile_select.value}"
except Exception as exc: # noqa: BLE001
status_pane.alert_type = "danger"
status_pane.object = f"Error applying profile: {exc}"
def _save_profile(event=None) -> None:
try:
name = (profile_name_input.value or "").strip()
if not name:
name = str(profile_select.value or "").strip()
if not name:
raise ValueError("Please provide a profile name")
cfg = _current_profile_config()
save_profile(name, cfg)
profile_name_input.value = ""
_refresh_profiles()
profile_select.value = name
status_pane.alert_type = "success"
status_pane.object = f"Profile saved: {name}"
except Exception as exc: # noqa: BLE001
status_pane.alert_type = "danger"
status_pane.object = f"Error saving profile: {exc}"
def _delete_profile(event=None) -> None:
try:
name = str(profile_select.value or "").strip()
if not name:
return
delete_profile(name)
_refresh_profiles()
status_pane.alert_type = "success"
status_pane.object = f"Profile deleted: {name}"
except Exception as exc: # noqa: BLE001
status_pane.alert_type = "danger"
status_pane.object = f"Error deleting profile: {exc}"
def _apply_preset(event=None) -> None:
global current_export_bytes, current_alert_pack_bytes
try:
if not preset_select.value:
return
preset_df = load_preset(str(preset_select.value))
if preset_df is None or preset_df.empty:
return
except Exception as exc: # noqa: BLE001
status_pane.alert_type = "danger"
status_pane.object = f"Error loading preset: {exc}"
return
cur = (
rules_table.value
if isinstance(rules_table.value, pd.DataFrame)
else pd.DataFrame()
)
if cur is None or cur.empty:
rules_table.value = preset_df
return
key = ["RAT", "KPI"]
upd_cols = [c for c in ["direction", "sla", "policy"] if c in preset_df.columns]
preset_df2 = preset_df[key + upd_cols].copy()
merged = pd.merge(cur, preset_df2, on=key, how="left", suffixes=("", "_preset"))
for c in upd_cols:
pc = f"{c}_preset"
if pc in merged.columns:
merged[c] = merged[pc].where(merged[pc].notna(), merged[c])
merged = merged.drop(columns=[pc])
rules_table.value = merged
status_pane.alert_type = "success"
status_pane.object = f"Preset applied: {preset_select.value}"
current_export_bytes = None
_invalidate_drilldown_cache(rules_changed=True)
def _save_current_rules_as_preset(event=None) -> None:
try:
name = (preset_name_input.value or "").strip()
if not name:
name = str(preset_select.value or "").strip()
if not name:
raise ValueError("Please provide a preset name")
cur = (
rules_table.value
if isinstance(rules_table.value, pd.DataFrame)
else pd.DataFrame()
)
save_preset(name, cur)
preset_name_input.value = ""
_refresh_presets()
preset_select.value = name
status_pane.alert_type = "success"
status_pane.object = f"Preset saved: {name}"
except Exception as exc: # noqa: BLE001
status_pane.alert_type = "danger"
status_pane.object = f"Error saving preset: {exc}"
def _delete_selected_preset(event=None) -> None:
global current_export_bytes, current_alert_pack_bytes
try:
name = str(preset_select.value or "").strip()
if not name:
return
delete_preset(name)
_refresh_presets()
status_pane.alert_type = "success"
status_pane.object = f"Preset deleted: {name}"
current_export_bytes = None
current_alert_pack_bytes = None
_invalidate_drilldown_cache(data_changed=True, rules_changed=True)
except Exception as exc: # noqa: BLE001
status_pane.alert_type = "danger"
status_pane.object = f"Error deleting preset: {exc}"
def load_datasets(event=None) -> None:
global _loading_datasets
_loading_datasets = True
try:
status_pane.alert_type = "primary"
status_pane.object = "Loading datasets..."
global current_daily_by_rat, current_rules_df
global current_status_df, current_summary_df, current_export_bytes
global current_multirat_df, current_multirat_raw, current_top_anomalies_df, current_top_anomalies_raw
global current_ops_queue_df
global current_alert_pack_bytes
global current_snapshot, current_delta_df
current_daily_by_rat = {}
current_rules_df = None
current_status_df = None
current_summary_df = None
current_multirat_df = None
current_multirat_raw = None
current_top_anomalies_df = None
current_top_anomalies_raw = None
current_ops_queue_df = None
current_export_bytes = None
current_alert_pack_bytes = None
current_snapshot = None
current_delta_df = None
_invalidate_drilldown_cache(
data_changed=True, rules_changed=True, healthcheck_changed=True
)
site_summary_table.value = pd.DataFrame()
multirat_summary_table.value = pd.DataFrame()
top_anomalies_table.value = pd.DataFrame()
complaint_multirat_summary_table.value = pd.DataFrame()
complaint_top_anomalies_table.value = pd.DataFrame()
ops_queue_table.value = pd.DataFrame()
delta_table.value = pd.DataFrame()
map_pane.object = None
map_message.visible = False
try:
global _map_search_city, _map_center_override, _map_force_fit
_map_search_city = ""
_map_center_override = None
_map_force_fit = False
map_search_input.value = ""
map_search_results.options = {}
map_search_results.value = None
map_search_results.visible = False
except Exception: # noqa: BLE001
pass
site_kpi_table.value = pd.DataFrame()
trend_plot_pane.object = None
heatmap_plot_pane.object = None
hist_plot_pane.object = None
corr_plot_pane.object = None
corr_message.visible = False
inputs = {"2G": file_2g, "3G": file_3g, "LTE": file_lte, "TWAMP": file_twamp}
rows = []
rules_rows = []
loaded_any = False
for rat, widget in inputs.items():
df_raw = read_fileinput_to_df(widget)
if df_raw is None:
continue
loaded_any = True
date_col = None
id_col = None
try:
date_col = infer_date_col(df_raw)
except Exception: # noqa: BLE001
date_col = None
try:
id_col = infer_id_col(df_raw, rat)
except Exception: # noqa: BLE001
id_col = None
daily, kpi_cols = build_period_kpi(df_raw, rat, granularity_select.value)
current_daily_by_rat[rat] = daily
d = _filtered_daily(daily)
periods_n = None
try:
if (
isinstance(d, pd.DataFrame)
and not d.empty
and "period_start" in d.columns
):
periods_n = int(
pd.to_datetime(d["period_start"], errors="coerce").nunique()
)
except Exception: # noqa: BLE001
periods_n = None
rows.append(
{
"RAT": rat,
"rows_raw": int(df_raw.shape[0]),
"cols_raw": int(df_raw.shape[1]),
"date_col": date_col,
"id_col": id_col,
"sites": int(d["site_code"].nunique()),
"days": int(d["date_only"].nunique()),
"periods": (
int(periods_n)
if periods_n is not None
else int(d["date_only"].nunique())
),
"kpis": int(len(kpi_cols)),
}
)
for kpi in kpi_cols:
direction = infer_kpi_direction(kpi, rat)
rules_rows.append(
{
"RAT": rat,
"KPI": kpi,
"direction": direction,
"sla": infer_kpi_sla(kpi, direction, rat),
"policy": infer_kpi_policy(kpi, rat),
}
)
if not loaded_any:
raise ValueError("Please upload at least one KPI report")
rats_loaded = [
r for r in ["2G", "3G", "LTE", "TWAMP"] if r in current_daily_by_rat
]
if rats_loaded:
rat_select.options = rats_loaded
if rat_select.value not in rats_loaded:
rat_select.value = rats_loaded[-1]
datasets_table.value = pd.DataFrame(rows)
rules_df = (
pd.DataFrame(rules_rows)
.drop_duplicates(subset=["RAT", "KPI"])
.sort_values(by=["RAT", "KPI"])
)
current_rules_df = rules_df
rules_table.value = rules_df
status_pane.alert_type = "success"
status_pane.object = (
"Datasets loaded. Edit KPI rules if needed, then run health check."
)
except Exception as exc: # noqa: BLE001
status_pane.alert_type = "danger"
status_pane.object = f"Error: {exc}"
try:
_refresh_validation_state()
except Exception: # noqa: BLE001
pass
finally:
_loading_datasets = False
try:
_update_site_options()
except Exception: # noqa: BLE001
pass
try:
_update_kpi_options()
except Exception: # noqa: BLE001
pass
try:
_update_site_view()
except Exception: # noqa: BLE001
pass
try:
_refresh_validation_state()
except Exception: # noqa: BLE001
pass
def run_health_check(event=None) -> None:
try:
status_pane.alert_type = "primary"
status_pane.object = "Running health check..."
global current_status_df, current_summary_df, current_export_bytes
global current_multirat_df, current_multirat_raw
global current_top_anomalies_df, current_top_anomalies_raw
global current_alert_pack_bytes
global current_snapshot, current_delta_df
rules_df = (
rules_table.value
if isinstance(rules_table.value, pd.DataFrame)
else pd.DataFrame()
)
if rules_df.empty:
raise ValueError("KPI rules table is empty")
bd = _coerce_int(baseline_days.value)
rd = _coerce_int(recent_days.value)
mcd = _coerce_int(min_consecutive_days.value)
thr = _coerce_float(rel_threshold_pct.value)
if bd is None or bd < 1:
raise ValueError("Baseline window (days) must be >= 1")
if rd is None or rd < 1:
raise ValueError("Recent window (days) must be >= 1")
if mcd is None or mcd < 1:
raise ValueError("Min consecutive bad days must be >= 1")
if thr is None:
raise ValueError("Relative change threshold (%) is invalid")
all_status = []
all_summary = []
do_profile = bool(perf_profiling.value)
t_run0 = time.perf_counter() if do_profile else 0.0
rat_timings = []
for rat, daily in current_daily_by_rat.items():
d = _filtered_daily(daily)
t0 = time.perf_counter() if do_profile else 0.0
status_df, summary_df = evaluate_health_check(
d,
rat,
rules_df,
int(bd),
int(rd),
float(thr),
int(mcd),
granularity=str(granularity_select.value or "Daily"),
)
if do_profile:
rat_timings.append(
{
"rat": str(rat),
"seconds": float(time.perf_counter() - t0),
"rows": int(len(d)) if isinstance(d, pd.DataFrame) else 0,
}
)
if not status_df.empty:
all_status.append(status_df)
if not summary_df.empty:
all_summary.append(summary_df)
current_status_df = (
pd.concat(all_status, ignore_index=True) if all_status else pd.DataFrame()
)
current_summary_df = (
pd.concat(all_summary, ignore_index=True) if all_summary else pd.DataFrame()
)
site_summary_table.value = current_summary_df
t_mr0 = time.perf_counter() if do_profile else 0.0
current_multirat_raw, current_top_anomalies_raw = compute_multirat_views(
current_status_df
)
mr_seconds = float(time.perf_counter() - t_mr0) if do_profile else 0.0
t_tr0 = time.perf_counter() if do_profile else 0.0
traffic_df = _compute_site_traffic_gb(current_daily_by_rat)
tr_seconds = float(time.perf_counter() - t_tr0) if do_profile else 0.0
if traffic_df is not None and not traffic_df.empty:
if current_multirat_raw is not None and not current_multirat_raw.empty:
current_multirat_raw = pd.merge(
current_multirat_raw, traffic_df, on="site_code", how="left"
)
if "criticality_score" in current_multirat_raw.columns:
w = 1.0 + np.log1p(
pd.to_numeric(
current_multirat_raw["traffic_gb_total"], errors="coerce"
)
.fillna(0)
.astype(float)
)
current_multirat_raw["criticality_score_weighted"] = (
(
pd.to_numeric(
current_multirat_raw["criticality_score"],
errors="coerce",
)
.fillna(0)
.astype(float)
* w
)
.round(0)
.astype(int)
)
if (
current_top_anomalies_raw is not None
and not current_top_anomalies_raw.empty
):
current_top_anomalies_raw = pd.merge(
current_top_anomalies_raw, traffic_df, on="site_code", how="left"
)
if "anomaly_score" in current_top_anomalies_raw.columns:
w = 1.0 + np.log1p(
pd.to_numeric(
current_top_anomalies_raw["traffic_gb_total"],
errors="coerce",
)
.fillna(0)
.astype(float)
)
current_top_anomalies_raw["anomaly_score_weighted"] = (
(
pd.to_numeric(
current_top_anomalies_raw["anomaly_score"],
errors="coerce",
)
.fillna(0)
.astype(float)
* w
)
.round(0)
.astype(int)
)
_apply_complaint_flags()
current_export_bytes = None
current_alert_pack_bytes = None
t_dl0 = time.perf_counter() if do_profile else 0.0
try:
current_delta_df = _compute_delta_df()
except Exception: # noqa: BLE001
current_delta_df = pd.DataFrame()
dl_seconds = float(time.perf_counter() - t_dl0) if do_profile else 0.0
delta_table.value = current_delta_df
_invalidate_drilldown_cache(healthcheck_changed=True)
_update_site_view()
status_pane.alert_type = "success"
msg = "Health check completed."
if do_profile:
total_s = float(time.perf_counter() - t_run0)
rat_timings_sorted = sorted(
rat_timings, key=lambda x: float(x.get("seconds", 0.0)), reverse=True
)
top_lines = [
f"- {r['rat']}: {r['seconds']:.2f}s (rows={r['rows']})"
for r in rat_timings_sorted[:8]
]
msg = (
msg
+ "\n\nPerf (seconds)"
+ f"\n- total_run: {total_s:.2f}s"
+ (
"\n- evaluate_health_check:"
+ ("\n" + "\n".join(top_lines) if top_lines else "")
)
+ f"\n- compute_multirat_views: {mr_seconds:.2f}s"
+ f"\n- compute_site_traffic_gb: {tr_seconds:.2f}s"
+ f"\n- compute_delta_df: {dl_seconds:.2f}s"
)
status_pane.object = msg
_refresh_validation_state()
except Exception as exc: # noqa: BLE001
status_pane.alert_type = "danger"
status_pane.object = f"Error: {exc}"
def _build_export_bytes(profile: dict | None = None) -> bytes:
include_raw = bool(export_include_raw_data.value)
daily_by_rat = (
current_daily_by_rat
if (include_raw and isinstance(current_daily_by_rat, dict))
else None
)
return build_export_bytes(
datasets_df=(
datasets_table.value
if isinstance(datasets_table.value, pd.DataFrame)
else None
),
rules_df=(
rules_table.value if isinstance(rules_table.value, pd.DataFrame) else None
),
summary_df=(
current_summary_df if isinstance(current_summary_df, pd.DataFrame) else None
),
status_df=(
current_status_df if isinstance(current_status_df, pd.DataFrame) else None
),
daily_by_rat=daily_by_rat,
granularity=str(granularity_select.value or "Daily"),
multirat_summary_df=(
current_multirat_df
if isinstance(current_multirat_df, pd.DataFrame)
else None
),
top_anomalies_df=(
current_top_anomalies_df
if isinstance(current_top_anomalies_df, pd.DataFrame)
else None
),
complaint_multirat_df=(
complaint_multirat_summary_table.value
if isinstance(complaint_multirat_summary_table.value, pd.DataFrame)
else None
),
complaint_top_anomalies_df=(
complaint_top_anomalies_table.value
if isinstance(complaint_top_anomalies_table.value, pd.DataFrame)
else None
),
ops_queue_df=(
current_ops_queue_df
if isinstance(current_ops_queue_df, pd.DataFrame)
else None
),
delta_df=(
current_delta_df if isinstance(current_delta_df, pd.DataFrame) else None
),
profile=profile,
)
def _export_callback() -> io.BytesIO:
global current_export_bytes
do_profile = bool(perf_profiling.value)
if do_profile or current_export_bytes is None:
try:
t0 = time.perf_counter() if do_profile else 0.0
profile = {} if do_profile else None
current_export_bytes = _build_export_bytes(profile=profile)
if do_profile:
total_s = float(time.perf_counter() - t0)
excel_s = (
float(profile.get("excel_total_seconds", 0.0)) if profile else 0.0
)
prep_s = (
float(profile.get("export_prep_seconds", 0.0)) if profile else 0.0
)
sheets = profile.get("excel_sheets") if profile else None
slow = []
if isinstance(sheets, list) and sheets:
sheets2 = [s for s in sheets if isinstance(s, dict)]
sheets2 = sorted(
sheets2,
key=lambda x: float(x.get("seconds", 0.0)),
reverse=True,
)
slow = [
f"- {s.get('name')}: {float(s.get('seconds', 0.0)):.2f}s (rows={s.get('rows')}, cols={s.get('cols')})"
for s in sheets2[:8]
]
status_pane.alert_type = "primary"
status_pane.object = (
"Export profiling"
+ f"\n- total_export: {total_s:.2f}s"
+ f"\n- export_prep: {prep_s:.2f}s"
+ f"\n- excel_write: {excel_s:.2f}s"
+ ("\n- slowest_sheets:\n" + "\n".join(slow) if slow else "")
)
except Exception: # noqa: BLE001
current_export_bytes = b""
return io.BytesIO(current_export_bytes or b"")
def _build_alert_pack_bytes() -> bytes:
params = {
"granularity": str(granularity_select.value or "Daily"),
"baseline_days": baseline_days.value,
"recent_days": recent_days.value,
"rel_threshold_pct": rel_threshold_pct.value,
"min_consecutive_days": min_consecutive_days.value,
"min_criticality": min_criticality.value,
"min_anomaly_score": min_anomaly_score.value,
"city_filter": str(city_filter.value or ""),
"only_complaint_sites": bool(only_complaint_sites.value),
"top_rat_filter": ",".join(list(top_rat_filter.value or [])),
"top_status_filter": ",".join(list(top_status_filter.value or [])),
}
params_df = pd.DataFrame(
{"key": list(params.keys()), "value": [params[k] for k in params.keys()]}
)
return write_dfs_to_excel(
[
params_df,
(
current_ops_queue_df
if isinstance(current_ops_queue_df, pd.DataFrame)
else pd.DataFrame()
),
(
current_top_anomalies_df
if isinstance(current_top_anomalies_df, pd.DataFrame)
else pd.DataFrame()
),
(
current_summary_df
if isinstance(current_summary_df, pd.DataFrame)
else pd.DataFrame()
),
],
["Run_Params", "Ops_Queue", "Top_Anomalies", "Site_Summary"],
index=False,
)
def _alert_pack_callback() -> io.BytesIO:
global current_alert_pack_bytes
if current_alert_pack_bytes is None:
try:
current_alert_pack_bytes = _build_alert_pack_bytes()
except Exception: # noqa: BLE001
current_alert_pack_bytes = b""
return io.BytesIO(current_alert_pack_bytes or b"")
def _build_snapshot_obj() -> dict:
cfg = _current_profile_config()
rules_df = (
rules_table.value
if isinstance(rules_table.value, pd.DataFrame)
else pd.DataFrame()
)
multirat_df = (
current_multirat_raw
if isinstance(current_multirat_raw, pd.DataFrame)
else pd.DataFrame()
)
top_df = (
current_top_anomalies_raw
if isinstance(current_top_anomalies_raw, pd.DataFrame)
else pd.DataFrame()
)
return {
"snapshot_version": 1,
"created_at": pd.Timestamp.utcnow().isoformat() + "Z",
"profile_config": cfg,
"rules_df": rules_df.to_dict(orient="records"),
"multirat_df": multirat_df.to_dict(orient="records"),
"top_anomalies_df": top_df.to_dict(orient="records"),
}
def _snapshot_download_callback() -> io.BytesIO:
b = b""
try:
obj = _build_snapshot_obj()
b = json.dumps(obj, ensure_ascii=False, indent=2).encode("utf-8")
except Exception: # noqa: BLE001
b = b""
return io.BytesIO(b)
def _snapshot_from_bytes(content: bytes) -> dict:
try:
txt = content.decode("utf-8", errors="ignore")
obj = json.loads(txt)
return obj if isinstance(obj, dict) else {}
except Exception: # noqa: BLE001
return {}
def _apply_snapshot_to_ui(obj: dict) -> None:
global current_snapshot, current_delta_df, current_export_bytes, current_alert_pack_bytes
current_snapshot = obj if isinstance(obj, dict) else {}
cfg = (
current_snapshot.get("profile_config", {})
if isinstance(current_snapshot.get("profile_config", {}), dict)
else {}
)
_apply_profile_config(cfg)
try:
r = current_snapshot.get("rules_df", [])
snapshot_rules_table.value = pd.DataFrame(r)
except Exception: # noqa: BLE001
snapshot_rules_table.value = pd.DataFrame()
try:
m = current_snapshot.get("multirat_df", [])
snapshot_multirat_table.value = pd.DataFrame(m)
except Exception: # noqa: BLE001
snapshot_multirat_table.value = pd.DataFrame()
try:
t = current_snapshot.get("top_anomalies_df", [])
snapshot_top_table.value = pd.DataFrame(t)
except Exception: # noqa: BLE001
snapshot_top_table.value = pd.DataFrame()
try:
current_delta_df = _compute_delta_df()
delta_table.value = current_delta_df
except Exception: # noqa: BLE001
current_delta_df = pd.DataFrame()
delta_table.value = current_delta_df
current_export_bytes = None
current_alert_pack_bytes = None
def _on_snapshot_upload(event=None) -> None:
if not snapshot_file.value:
return
obj = _snapshot_from_bytes(snapshot_file.value)
_apply_snapshot_to_ui(obj)
try:
status_pane.alert_type = "success"
status_pane.object = "Snapshot loaded."
except Exception: # noqa: BLE001
pass
load_button.on_click(load_datasets)
run_button.on_click(run_health_check)
preset_refresh_button.on_click(_refresh_presets)
preset_apply_button.on_click(_apply_preset)
preset_save_button.on_click(_save_current_rules_as_preset)
preset_delete_button.on_click(_delete_selected_preset)
profile_refresh_button.on_click(_refresh_profiles)
profile_apply_button.on_click(_apply_profile)
profile_save_button.on_click(_save_profile)
profile_delete_button.on_click(_delete_profile)
snapshot_file.param.watch(_on_snapshot_upload, "value")
snapshot_download.callback = _snapshot_download_callback
map_pane.param.watch(_on_map_click, "click_data")
map_fit_button.on_click(_on_map_fit_click)
map_search_go.on_click(_on_map_search)
map_search_clear.on_click(_on_map_clear_search)
map_search_results.param.watch(_on_map_search_pick, "value")
_refresh_presets()
_refresh_profiles()
_refresh_complaint_sites()
_refresh_validation_state()
try:
_refresh_map_view()
except Exception: # noqa: BLE001
pass
def _on_rat_change(event=None) -> None:
if _applying_profile or _loading_datasets or _updating_drilldown:
return
_schedule_drilldown_update(lambda: (_update_kpi_options(), _update_site_view()))
_refresh_validation_state()
def _on_drilldown_change(event=None) -> None:
if _applying_profile or _loading_datasets or _updating_drilldown:
return
_schedule_drilldown_update(_update_site_view)
_refresh_validation_state()
def _on_drilldown_params_change(event=None) -> None:
if _applying_profile or _loading_datasets or _updating_drilldown:
return
_invalidate_drilldown_cache()
_schedule_drilldown_update(_update_site_view)
_refresh_validation_state()
def _on_export_options_change(event=None) -> None:
global current_export_bytes
if _applying_profile or _loading_datasets:
return
current_export_bytes = None
def _on_granularity_change(event=None) -> None:
if _applying_profile or _loading_datasets:
return
_invalidate_drilldown_cache(data_changed=True, healthcheck_changed=True)
_refresh_validation_state()
try:
has_any = bool(
(file_2g and file_2g.value)
or (file_3g and file_3g.value)
or (file_lte and file_lte.value)
or (file_twamp and file_twamp.value)
)
except Exception: # noqa: BLE001
has_any = False
if has_any:
try:
load_datasets()
except Exception: # noqa: BLE001
pass
rat_select.param.watch(_on_rat_change, "value")
kpi_group_select.param.watch(
_on_rat_change, "value"
) # Updating group also needs to re-filter KPI options
kpi_group_mode.param.watch(_on_drilldown_change, "value")
site_select.param.watch(_on_drilldown_change, "value")
kpi_select.param.watch(_on_drilldown_change, "value")
kpi_compare_select.param.watch(_on_drilldown_change, "value")
kpi_compare_norm.param.watch(_on_drilldown_change, "value")
show_sla_toggle.param.watch(_on_drilldown_change, "value")
corr_window_select.param.watch(_on_drilldown_change, "value")
map_show_site_codes.param.watch(lambda e: _refresh_map_view(), "value")
map_max_labels.param.watch(lambda e: _refresh_map_view(), "value")
map_top_n.param.watch(lambda e: _refresh_map_view(), "value")
map_impacted_rat.param.watch(lambda e: _refresh_map_view(), "value")
map_status_filter.param.watch(lambda e: _refresh_map_view(), "value")
map_auto_fit.param.watch(lambda e: _refresh_map_view(), "value")
analysis_range.param.watch(_on_drilldown_params_change, "value")
granularity_select.param.watch(_on_granularity_change, "value")
baseline_days.param.watch(_on_drilldown_params_change, "value")
recent_days.param.watch(_on_drilldown_params_change, "value")
rel_threshold_pct.param.watch(_on_drilldown_params_change, "value")
min_consecutive_days.param.watch(_on_drilldown_params_change, "value")
export_include_raw_data.param.watch(_on_export_options_change, "value")
def _on_rules_table_change(event=None) -> None:
global current_export_bytes, current_alert_pack_bytes
if _applying_profile or _loading_datasets:
return
current_export_bytes = None
current_alert_pack_bytes = None
_invalidate_drilldown_cache(rules_changed=True)
rules_table.param.watch(_on_rules_table_change, "value")
try:
top_anomalies_table.on_click(
lambda e: _handle_double_click("top", top_anomalies_table, e)
)
except Exception: # noqa: BLE001
pass
try:
complaint_top_anomalies_table.on_click(
lambda e: _handle_double_click("top", complaint_top_anomalies_table, e)
)
except Exception: # noqa: BLE001
pass
try:
multirat_summary_table.on_click(
lambda e: _handle_double_click("multirat", multirat_summary_table, e)
)
except Exception: # noqa: BLE001
pass
try:
complaint_multirat_summary_table.on_click(
lambda e: _handle_double_click("multirat", complaint_multirat_summary_table, e)
)
except Exception: # noqa: BLE001
pass
try:
ops_queue_table.on_click(lambda e: _handle_double_click("ops", ops_queue_table, e))
except Exception: # noqa: BLE001
pass
min_criticality.param.watch(_refresh_filtered_results, "value")
min_anomaly_score.param.watch(_refresh_filtered_results, "value")
city_filter.param.watch(_refresh_filtered_results, "value")
only_complaint_sites.param.watch(_refresh_filtered_results, "value")
top_rat_filter.param.watch(_refresh_filtered_results, "value")
top_status_filter.param.watch(_refresh_filtered_results, "value")
min_criticality.param.watch(_refresh_delta_view, "value")
city_filter.param.watch(_refresh_delta_view, "value")
complaint_sites_file.param.watch(_refresh_complaint_sites, "value")
export_button.callback = _export_callback
alert_pack_button.callback = _alert_pack_callback
def _build_drilldown_export_bytes() -> bytes:
rat = rat_select.value
code = site_select.value
if rat is None or code is None:
return b""
code_int = _coerce_int(code)
if (
code_int is None
and hasattr(site_select, "options")
and isinstance(site_select.options, dict)
):
if code in site_select.options:
code_int = _coerce_int(site_select.options.get(code))
if code_int is None:
return b""
daily = current_daily_by_rat.get(rat)
if daily is None or daily.empty:
return b""
d = _filtered_daily(daily)
if d is None or d.empty:
return b""
g = str(granularity_select.value or "Daily").strip().lower()
is_hourly = g.startswith("hour") or g.startswith("h")
time_col = (
"period_start" if (is_hourly and "period_start" in d.columns) else "date_only"
)
s = d[d["site_code"] == int(code_int)].copy().sort_values(time_col)
if s.empty:
return b""
selected_kpis = [str(x) for x in (kpi_compare_select.value or []) if str(x)]
if not selected_kpis:
selected_kpis = [str(kpi_select.value)] if kpi_select.value else []
else:
if kpi_select.value and str(kpi_select.value) not in selected_kpis:
selected_kpis = [str(kpi_select.value)] + selected_kpis
selected_kpis = [k for k in selected_kpis if k in d.columns]
base_cols = [time_col]
daily_cols = base_cols + selected_kpis
daily_out = s[daily_cols].copy() if selected_kpis else s[[time_col]].copy()
rules_df = (
rules_table.value
if isinstance(rules_table.value, pd.DataFrame)
else pd.DataFrame()
)
windows = _compute_site_windows(d)
if windows is None:
summary_out = pd.DataFrame()
else:
baseline_start, baseline_end, recent_start, recent_end = windows
rows = []
for k in selected_kpis:
rule = _infer_rule_row(rules_df, str(rat), str(k))
direction = str(rule.get("direction", "higher_is_better"))
sla_raw = rule.get("sla", None)
try:
sla_val = float(sla_raw) if pd.notna(sla_raw) else None
except Exception: # noqa: BLE001
sla_val = None
sk = s[[time_col, k]].copy()
sk[k] = pd.to_numeric(sk[k], errors="coerce")
sk = sk.dropna(subset=[k])
baseline_mask = (
pd.to_datetime(sk[time_col], errors="coerce")
>= pd.to_datetime(baseline_start)
) & (
pd.to_datetime(sk[time_col], errors="coerce")
<= pd.to_datetime(baseline_end)
)
recent_mask = (
pd.to_datetime(sk[time_col], errors="coerce")
>= pd.to_datetime(recent_start)
) & (
pd.to_datetime(sk[time_col], errors="coerce")
<= pd.to_datetime(recent_end)
)
baseline_med = (
float(sk.loc[baseline_mask, k].median())
if baseline_mask.any()
else None
)
recent_med = (
float(sk.loc[recent_mask, k].median()) if recent_mask.any() else None
)
bad_flags = []
recent_vals = sk.loc[recent_mask, [time_col, k]].sort_values(time_col)
bad_dates = []
for _, r in recent_vals.iterrows():
v = r.get(k)
is_bad_day = bool(
is_bad(
float(v) if pd.notna(v) else None,
baseline_med,
direction,
float(rel_threshold_pct.value),
sla_val,
)
)
bad_flags.append(is_bad_day)
if is_bad_day:
try:
d0 = r.get(time_col)
if d0 is not None:
bad_dates.append(pd.to_datetime(d0, errors="coerce"))
except Exception: # noqa: BLE001
pass
rows.append(
{
"RAT": str(rat),
"site_code": int(code_int),
"KPI": str(k),
"direction": direction,
"sla": sla_val,
"baseline_median": baseline_med,
"recent_median": recent_med,
"bad_days_recent": int(sum(bad_flags)),
"max_streak_recent": int(
max_consecutive_periods(
bad_dates,
step=(
timedelta(hours=1) if is_hourly else timedelta(days=1)
),
)
if bad_dates
else 0
),
}
)
summary_out = pd.DataFrame(rows)
status_df = (
current_status_df
if isinstance(current_status_df, pd.DataFrame)
else pd.DataFrame()
)
if not status_df.empty:
status_out = status_df[
(status_df["site_code"] == int(code_int)) & (status_df["RAT"] == str(rat))
].copy()
else:
status_out = pd.DataFrame()
g2 = str(granularity_select.value or "Daily").strip().lower()
data_sheet = "Hourly" if (g2.startswith("hour") or g2.startswith("h")) else "Daily"
return write_dfs_to_excel(
[summary_out, daily_out, status_out],
["Summary", data_sheet, "KPI_Status"],
index=False,
)
def _drilldown_export_callback() -> io.BytesIO:
try:
b = _build_drilldown_export_bytes()
except Exception: # noqa: BLE001
b = b""
return io.BytesIO(b or b"")
drilldown_export_button.callback = _drilldown_export_callback
sidebar = pn.Column(
pn.Card(
file_2g,
file_3g,
file_lte,
file_twamp,
complaint_sites_file,
only_complaint_sites,
title="Datasets",
collapsed=False,
),
sizing_mode="stretch_width",
)
sidebar = pn.Column(
sidebar,
pn.Card(
analysis_range,
granularity_select,
baseline_days,
recent_days,
rel_threshold_pct,
min_consecutive_days,
title="Health check settings",
collapsible=False,
sizing_mode="stretch_width",
),
pn.Card(
load_button,
run_button,
title="Run",
collapsible=False,
sizing_mode="stretch_width",
),
pn.Card(
export_include_raw_data,
perf_profiling,
export_button,
alert_pack_button,
title="Export",
collapsible=False,
sizing_mode="stretch_width",
),
pn.Card(
min_criticality,
min_anomaly_score,
city_filter,
top_rat_filter,
top_status_filter,
title="Filters",
collapsible=False,
sizing_mode="stretch_width",
),
pn.Card(
preset_select,
pn.Row(preset_refresh_button, preset_apply_button, sizing_mode="stretch_width"),
preset_name_input,
pn.Row(preset_save_button, preset_delete_button, sizing_mode="stretch_width"),
title="Rule presets",
collapsible=False,
sizing_mode="stretch_width",
),
pn.Card(
profile_select,
pn.Row(
profile_refresh_button, profile_apply_button, sizing_mode="stretch_width"
),
profile_name_input,
pn.Row(profile_save_button, profile_delete_button, sizing_mode="stretch_width"),
title="Profiles",
collapsible=False,
sizing_mode="stretch_width",
),
sizing_mode="stretch_width",
)
_tab_overview = pn.Column(
pn.pane.Markdown("## Datasets"),
datasets_table,
pn.pane.Markdown("## KPI Rules (editable)"),
rules_table,
pn.pane.Markdown("## Site Summary"),
site_summary_table,
pn.pane.Markdown("## Multi-RAT Summary"),
multirat_summary_table,
pn.pane.Markdown("## Top anomalies (cross-RAT)"),
top_anomalies_table,
sizing_mode="stretch_width",
)
_tab_complaint = pn.Column(
pn.pane.Markdown("## Complaint sites only"),
pn.pane.Markdown("### Multi-RAT Summary (Complaint)"),
complaint_multirat_summary_table,
pn.pane.Markdown("### Top anomalies (Complaint)"),
complaint_top_anomalies_table,
sizing_mode="stretch_width",
)
_tab_ops_queue = pn.Column(
pn.pane.Markdown("## Ops Queue"),
ops_queue_table,
sizing_mode="stretch_width",
)
_tab_snapshot = pn.Column(
pn.pane.Markdown("## Snapshot"),
pn.Row(snapshot_download, snapshot_file),
pn.pane.Markdown("### Snapshot KPI rules"),
snapshot_rules_table,
pn.pane.Markdown("### Snapshot Multi-RAT"),
snapshot_multirat_table,
pn.pane.Markdown("### Snapshot Top anomalies"),
snapshot_top_table,
sizing_mode="stretch_width",
)
_tab_delta = pn.Column(
pn.pane.Markdown("## Delta"),
delta_table,
sizing_mode="stretch_width",
)
_tab_map = pn.Column(
pn.pane.Markdown("## Map"),
map_message,
pn.Row(
map_search_input, map_search_go, map_search_clear, map_fit_button, map_auto_fit
),
pn.Row(
map_top_n,
map_impacted_rat,
map_status_filter,
map_show_site_codes,
map_max_labels,
),
pn.Row(map_search_results),
pn.Column(map_pane, sizing_mode="stretch_both", min_height=700),
sizing_mode="stretch_both",
)
_tab_drilldown = pn.Column(
pn.pane.Markdown("## Drill-down"),
pn.Row(site_select, rat_select),
pn.Row(kpi_group_select, kpi_group_mode),
pn.Row(
kpi_select,
kpi_compare_select,
kpi_compare_norm,
show_sla_toggle,
drilldown_export_button,
),
pn.Column(trend_plot_pane, sizing_mode="stretch_both", min_height=500),
pn.Column(site_kpi_table, sizing_mode="stretch_width"),
pn.Column(heatmap_plot_pane, sizing_mode="stretch_both", min_height=400),
pn.Column(hist_plot_pane, sizing_mode="stretch_both", min_height=400),
pn.pane.Markdown("## Correlation"),
pn.Row(corr_window_select),
corr_message,
pn.Column(corr_plot_pane, sizing_mode="stretch_both", min_height=520),
sizing_mode="stretch_both",
)
_tabs_main = pn.Tabs(
("Overview", _tab_overview),
("Ops Queue", _tab_ops_queue),
("Complaint sites only", _tab_complaint),
("Snapshot", _tab_snapshot),
("Delta", _tab_delta),
("Map", _tab_map),
("Drill-down", _tab_drilldown),
dynamic=True,
sizing_mode="stretch_both",
)
main = pn.Column(
status_pane,
validation_pane,
_tabs_main,
)
def get_page_components():
return sidebar, main
if __name__ == "__main__":
template = pn.template.MaterialTemplate(title="KPI Health Check - Panel")
template.sidebar.append(sidebar)
template.main.append(main)
template.servable()