File size: 6,638 Bytes
628c421 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
"""Compat layer for DatabaseManager to provide methods expected by legacy app code.
This module monkey-patches the DatabaseManager class from database.db_manager
to add:
- log_provider_status
- get_uptime_percentage
- get_avg_response_time
The implementations are lightweight and defensive: if the underlying engine
is not available, they fail gracefully instead of raising errors.
"""
from __future__ import annotations
from datetime import datetime, timedelta
from typing import Optional
try:
from sqlalchemy import text as _sa_text
except Exception: # pragma: no cover - extremely defensive
_sa_text = None # type: ignore
try:
from .db_manager import DatabaseManager # type: ignore
except Exception: # pragma: no cover
DatabaseManager = None # type: ignore
def _get_engine(instance) -> Optional[object]:
"""Best-effort helper to get an SQLAlchemy engine from the manager."""
return getattr(instance, "engine", None)
def _ensure_table(conn) -> None:
"""Create provider_status table if it does not exist yet."""
if _sa_text is None:
return
conn.execute(
_sa_text(
"""
CREATE TABLE IF NOT EXISTS provider_status (
id INTEGER PRIMARY KEY AUTOINCREMENT,
provider_name TEXT NOT NULL,
category TEXT NOT NULL,
status TEXT NOT NULL,
response_time REAL,
status_code INTEGER,
error_message TEXT,
endpoint_tested TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
)
def _log_provider_status(
self,
provider_name: str,
category: str,
status: str,
response_time: Optional[float] = None,
status_code: Optional[int] = None,
endpoint_tested: Optional[str] = None,
error_message: Optional[str] = None,
) -> None:
"""Insert a status row into provider_status.
This is a best-effort logger; if no engine is available it silently returns.
"""
engine = _get_engine(self)
if engine is None or _sa_text is None:
return
now = datetime.utcnow()
try:
with engine.begin() as conn: # type: ignore[call-arg]
_ensure_table(conn)
conn.execute(
_sa_text(
"""
INSERT INTO provider_status (
provider_name,
category,
status,
response_time,
status_code,
error_message,
endpoint_tested,
created_at
)
VALUES (
:provider_name,
:category,
:status,
:response_time,
:status_code,
:error_message,
:endpoint_tested,
:created_at
)
"""
),
{
"provider_name": provider_name,
"category": category,
"status": status,
"response_time": response_time,
"status_code": status_code,
"error_message": error_message,
"endpoint_tested": endpoint_tested,
"created_at": now,
},
)
except Exception: # pragma: no cover - we never want this to crash the app
# Swallow DB errors; health endpoints must not bring the whole app down.
return
def _get_uptime_percentage(self, provider_name: str, hours: int = 24) -> float:
"""Compute uptime percentage for a provider in the last N hours.
Uptime is calculated as the ratio of rows with status='online' to total
rows in the provider_status table within the given time window.
"""
engine = _get_engine(self)
if engine is None or _sa_text is None:
return 0.0
cutoff = datetime.utcnow() - timedelta(hours=hours)
try:
with engine.begin() as conn: # type: ignore[call-arg]
_ensure_table(conn)
result = conn.execute(
_sa_text(
"""
SELECT
COUNT(*) AS total,
SUM(CASE WHEN status = 'online' THEN 1 ELSE 0 END) AS online
FROM provider_status
WHERE provider_name = :provider_name
AND created_at >= :cutoff
"""
),
{"provider_name": provider_name, "cutoff": cutoff},
).first()
except Exception:
return 0.0
if not result or result[0] in (None, 0):
return 0.0
total = float(result[0] or 0)
online = float(result[1] or 0)
return round(100.0 * online / total, 2)
def _get_avg_response_time(self, provider_name: str, hours: int = 24) -> float:
"""Average response time (ms) for a provider over the last N hours."""
engine = _get_engine(self)
if engine is None or _sa_text is None:
return 0.0
cutoff = datetime.utcnow() - timedelta(hours=hours)
try:
with engine.begin() as conn: # type: ignore[call-arg]
_ensure_table(conn)
result = conn.execute(
_sa_text(
"""
SELECT AVG(response_time) AS avg_response
FROM provider_status
WHERE provider_name = :provider_name
AND response_time IS NOT NULL
AND created_at >= :cutoff
"""
),
{"provider_name": provider_name, "cutoff": cutoff},
).first()
except Exception:
return 0.0
if not result or result[0] is None:
return 0.0
return round(float(result[0]), 2)
# Apply monkey-patches when this module is imported.
if DatabaseManager is not None: # pragma: no cover
if not hasattr(DatabaseManager, "log_provider_status"):
DatabaseManager.log_provider_status = _log_provider_status # type: ignore[attr-defined]
if not hasattr(DatabaseManager, "get_uptime_percentage"):
DatabaseManager.get_uptime_percentage = _get_uptime_percentage # type: ignore[attr-defined]
if not hasattr(DatabaseManager, "get_avg_response_time"):
DatabaseManager.get_avg_response_time = _get_avg_response_time # type: ignore[attr-defined]
|