bigwolfe
attempt 1
aca3d0b
raw
history blame
17.1 kB
"""SQLite-backed indexing utilities for notes."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
import logging
from pathlib import Path
import re
import sqlite3
import time
from typing import Any, Dict, List, Sequence
from .database import DatabaseService
from .vault import VaultNote
logger = logging.getLogger(__name__)
WIKILINK_PATTERN = re.compile(r"\[\[([^\]]+)\]\]")
TOKEN_PATTERN = re.compile(r"[0-9A-Za-z]+(?:\*)?")
def _utcnow_iso() -> str:
return datetime.now(timezone.utc).isoformat(timespec="seconds")
def normalize_slug(text: str | None) -> str:
"""Normalize text into a slug suitable for wikilink matching."""
if not text:
return ""
slug = text.lower()
slug = re.sub(r"[\s_]+", "-", slug)
slug = re.sub(r"[^a-z0-9-]", "", slug)
slug = re.sub(r"-+", "-", slug)
return slug.strip("-")
def normalize_tag(tag: str | None) -> str:
if not isinstance(tag, str):
return ""
return tag.strip().lower()
def _prepare_match_query(query: str) -> str:
"""
Sanitize user-supplied query text for FTS5 MATCH usage.
- Extracts tokens comprised of alphanumeric characters (per spec: split on non-alphanum).
- Preserves a single trailing '*' to allow prefix searches.
- Wraps each token in double quotes to neutralize MATCH operators.
"""
sanitized_terms: List[str] = []
for match in TOKEN_PATTERN.finditer(query or ""):
token = match.group()
has_prefix_star = token.endswith("*")
core = token[:-1] if has_prefix_star else token
if not core:
continue
sanitized_terms.append(f'"{core}"{"*" if has_prefix_star else ""}')
if not sanitized_terms:
raise ValueError("Search query must contain alphanumeric characters")
return " ".join(sanitized_terms)
class IndexerService:
"""Manage SQLite-backed metadata, tags, search index, and link graph."""
def __init__(self, db_service: DatabaseService | None = None) -> None:
self.db_service = db_service or DatabaseService()
def index_note(self, user_id: str, note: VaultNote) -> int:
"""Insert or update index rows for a note."""
start_time = time.time()
note_path = note["path"]
metadata = dict(note.get("metadata") or {})
title = note.get("title") or metadata.get("title") or Path(note_path).stem
body = note.get("body", "") or ""
size_bytes = int(note.get("size_bytes") or len(body.encode("utf-8")))
created = str(metadata.get("created") or _utcnow_iso())
updated = str(metadata.get("updated") or _utcnow_iso())
normalized_title_slug = normalize_slug(title)
normalized_path_slug = normalize_slug(Path(note_path).stem)
if not normalized_title_slug:
normalized_title_slug = normalized_path_slug
tags = self._prepare_tags(metadata.get("tags"))
wikilinks = self.extract_wikilinks(body)
conn = self.db_service.connect()
try:
with conn:
version = self.increment_version(conn, user_id, note_path)
self._delete_current_entries(conn, user_id, note_path)
conn.execute(
"""
INSERT INTO note_metadata (
user_id, note_path, version, title,
created, updated, size_bytes,
normalized_title_slug, normalized_path_slug
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
user_id,
note_path,
version,
title,
created,
updated,
size_bytes,
normalized_title_slug,
normalized_path_slug,
),
)
conn.execute(
"""
INSERT INTO note_fts (user_id, note_path, title, body)
VALUES (?, ?, ?, ?)
""",
(user_id, note_path, title, body),
)
if tags:
conn.executemany(
"""
INSERT INTO note_tags (user_id, note_path, tag)
VALUES (?, ?, ?)
""",
[(user_id, note_path, tag) for tag in tags],
)
if wikilinks:
resolved = self.resolve_wikilinks(conn, user_id, note_path, wikilinks)
conn.executemany(
"""
INSERT INTO note_links (user_id, source_path, target_path, link_text, is_resolved)
VALUES (?, ?, ?, ?, ?)
""",
[
(
user_id,
note_path,
entry["target_path"],
entry["link_text"],
1 if entry["is_resolved"] else 0,
)
for entry in resolved
],
)
self.update_index_health(conn, user_id)
duration_ms = (time.time() - start_time) * 1000
logger.info(
"Note indexed successfully",
extra={
"user_id": user_id,
"note_path": note_path,
"version": version,
"tags_count": len(tags),
"wikilinks_count": len(wikilinks),
"duration_ms": f"{duration_ms:.2f}"
}
)
return version
finally:
conn.close()
def delete_note_index(self, user_id: str, note_path: str) -> None:
"""Remove all index data for a note and update backlinks."""
conn = self.db_service.connect()
try:
with conn:
self._delete_current_entries(conn, user_id, note_path)
conn.execute(
"""
UPDATE note_links
SET target_path = NULL, is_resolved = 0
WHERE user_id = ? AND target_path = ?
""",
(user_id, note_path),
)
self.update_index_health(conn, user_id)
finally:
conn.close()
def extract_wikilinks(self, body: str) -> List[str]:
"""Extract wikilink text from Markdown body."""
links = []
for match in WIKILINK_PATTERN.finditer(body or ""):
link_text = match.group(1).strip()
if link_text:
links.append(link_text)
# Preserve order but drop duplicates
seen: Dict[str, None] = {}
for link in links:
if link not in seen:
seen[link] = None
return list(seen.keys())
def resolve_wikilinks(
self,
conn: sqlite3.Connection,
user_id: str,
note_path: str,
link_texts: Sequence[str],
) -> List[Dict[str, Any]]:
"""Resolve wikilinks to target note paths using slug comparison."""
if not link_texts:
return []
results: List[Dict[str, Any]] = []
note_folder = Path(note_path).parent
for text in link_texts:
slug = normalize_slug(text)
if not slug:
results.append({"link_text": text, "target_path": None, "is_resolved": False})
continue
rows = conn.execute(
"""
SELECT note_path
FROM note_metadata
WHERE user_id = ?
AND (normalized_title_slug = ? OR normalized_path_slug = ?)
""",
(user_id, slug, slug),
).fetchall()
if not rows:
results.append({"link_text": text, "target_path": None, "is_resolved": False})
continue
candidates = [row["note_path"] if isinstance(row, sqlite3.Row) else row[0] for row in rows]
target = sorted(
candidates,
key=lambda candidate: (Path(candidate).parent != note_folder, candidate),
)[0]
results.append({"link_text": text, "target_path": target, "is_resolved": True})
return results
def increment_version(
self, conn: sqlite3.Connection, user_id: str, note_path: str
) -> int:
"""Return the next version number for a note."""
row = conn.execute(
"SELECT version FROM note_metadata WHERE user_id = ? AND note_path = ?",
(user_id, note_path),
).fetchone()
if row is None:
return 1
current_version = row["version"] if isinstance(row, sqlite3.Row) else row[0]
return int(current_version) + 1
def update_index_health(self, conn: sqlite3.Connection, user_id: str) -> None:
"""Update per-user index health stats."""
row = conn.execute(
"SELECT COUNT(*) AS count FROM note_metadata WHERE user_id = ?",
(user_id,),
).fetchone()
note_count = int(row["count"] if isinstance(row, sqlite3.Row) else row[0])
now_iso = _utcnow_iso()
conn.execute(
"""
INSERT INTO index_health (user_id, note_count, last_incremental_update)
VALUES (?, ?, ?)
ON CONFLICT(user_id) DO UPDATE SET
note_count = excluded.note_count,
last_incremental_update = excluded.last_incremental_update
""",
(user_id, note_count, now_iso),
)
def search_notes(self, user_id: str, query: str, *, limit: int = 50) -> List[Dict[str, Any]]:
"""Execute a full-text search with recency bonus scoring."""
if not query or not query.strip():
raise ValueError("Search query cannot be empty")
sanitized_query = _prepare_match_query(query)
conn = self.db_service.connect()
try:
rows = conn.execute(
"""
SELECT
m.note_path,
m.title,
m.updated,
snippet(note_fts, 3, '<mark>', '</mark>', '...', 32) AS snippet,
bm25(note_fts, 3.0, 1.0) AS score
FROM note_fts
JOIN note_metadata m USING (user_id, note_path)
WHERE note_fts.user_id = ? AND note_fts MATCH ?
ORDER BY score DESC
LIMIT ?
""",
(user_id, sanitized_query, limit),
).fetchall()
finally:
conn.close()
now = datetime.now(timezone.utc)
seven_days = timedelta(days=7)
thirty_days = timedelta(days=30)
results: List[Dict[str, Any]] = []
for row in rows:
updated_raw = row["updated"] if isinstance(row, sqlite3.Row) else row[2]
snippet = row["snippet"] if isinstance(row, sqlite3.Row) else row[3]
base_score = float(row["score"] if isinstance(row, sqlite3.Row) else row[4])
try:
updated_dt = datetime.fromisoformat(str(updated_raw))
except ValueError:
updated_dt = now
delta = now - updated_dt
if delta <= seven_days:
bonus = 1.0
elif delta <= thirty_days:
bonus = 0.5
else:
bonus = 0.0
results.append(
{
"path": row["note_path"] if isinstance(row, sqlite3.Row) else row[0],
"title": row["title"] if isinstance(row, sqlite3.Row) else row[1],
"snippet": snippet or "",
"score": base_score + bonus,
"updated": updated_raw,
}
)
return sorted(results, key=lambda item: item["score"], reverse=True)
def get_backlinks(self, user_id: str, target_path: str) -> List[Dict[str, Any]]:
"""Return backlinks for a note."""
conn = self.db_service.connect()
try:
rows = conn.execute(
"""
SELECT DISTINCT l.source_path, m.title
FROM note_links l
JOIN note_metadata m
ON l.user_id = m.user_id AND l.source_path = m.note_path
WHERE l.user_id = ? AND l.target_path = ?
ORDER BY m.updated DESC
""",
(user_id, target_path),
).fetchall()
finally:
conn.close()
return [
{
"path": row["source_path"] if isinstance(row, sqlite3.Row) else row[0],
"title": row["title"] if isinstance(row, sqlite3.Row) else row[1],
}
for row in rows
]
def get_tags(self, user_id: str) -> List[Dict[str, Any]]:
"""Return tag counts for a user."""
conn = self.db_service.connect()
try:
rows = conn.execute(
"""
SELECT tag, COUNT(DISTINCT note_path) AS count
FROM note_tags
WHERE user_id = ?
GROUP BY tag
ORDER BY count DESC, tag ASC
""",
(user_id,),
).fetchall()
finally:
conn.close()
return [
{"tag": row["tag"] if isinstance(row, sqlite3.Row) else row[0], "count": int(row["count"] if isinstance(row, sqlite3.Row) else row[1])}
for row in rows
]
def get_graph_data(self, user_id: str) -> Dict[str, List[Dict[str, Any]]]:
"""Return graph visualization data (nodes and links)."""
conn = self.db_service.connect()
try:
# Fetch all notes
notes_rows = conn.execute(
"""
SELECT note_path, title
FROM note_metadata
WHERE user_id = ?
""",
(user_id,),
).fetchall()
# Fetch all resolved links
links_rows = conn.execute(
"""
SELECT source_path, target_path
FROM note_links
WHERE user_id = ? AND is_resolved = 1
""",
(user_id,),
).fetchall()
finally:
conn.close()
# Calculate link counts for node sizing
link_counts: Dict[str, int] = {}
links = []
for row in links_rows:
source = row["source_path"] if isinstance(row, sqlite3.Row) else row[0]
target = row["target_path"] if isinstance(row, sqlite3.Row) else row[1]
links.append({"source": source, "target": target})
link_counts[source] = link_counts.get(source, 0) + 1
link_counts[target] = link_counts.get(target, 0) + 1
# Build nodes list
nodes = []
for row in notes_rows:
path = row["note_path"] if isinstance(row, sqlite3.Row) else row[0]
title = row["title"] if isinstance(row, sqlite3.Row) else row[1]
# Derive group from top-level folder
parts = Path(path).parts
group = parts[0] if len(parts) > 1 else "root"
# Default size is 1, add link count
val = 1 + link_counts.get(path, 0)
nodes.append({
"id": path,
"label": title,
"val": val,
"group": group
})
return {"nodes": nodes, "links": links}
def _delete_current_entries(self, conn: sqlite3.Connection, user_id: str, note_path: str) -> None:
"""Delete existing index rows for a note."""
conn.execute(
"DELETE FROM note_metadata WHERE user_id = ? AND note_path = ?",
(user_id, note_path),
)
conn.execute(
"DELETE FROM note_fts WHERE user_id = ? AND note_path = ?",
(user_id, note_path),
)
conn.execute(
"DELETE FROM note_tags WHERE user_id = ? AND note_path = ?",
(user_id, note_path),
)
conn.execute(
"DELETE FROM note_links WHERE user_id = ? AND source_path = ?",
(user_id, note_path),
)
def _prepare_tags(self, tags: Any) -> List[str]:
if not isinstance(tags, list):
return []
normalized: List[str] = []
for tag in tags:
cleaned = normalize_tag(tag)
if cleaned and cleaned not in normalized:
normalized.append(cleaned)
return normalized
__all__ = ["IndexerService", "normalize_slug", "normalize_tag"]