Wothmag07 commited on
Commit
f991fa0
·
1 Parent(s): 02af15b

feat: implement MCP server with vault management

Browse files
backend/src/mcp/__init__.py CHANGED
@@ -1 +1,5 @@
1
  """MCP (Model Context Protocol) server implementation."""
 
 
 
 
 
1
  """MCP (Model Context Protocol) server implementation."""
2
+
3
+ from .server import mcp
4
+
5
+ __all__ = ["mcp"]
backend/src/mcp/server.py ADDED
@@ -0,0 +1,138 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """FastMCP server exposing vault and indexing tools."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import os
6
+ from typing import Any, Dict, List, Optional
7
+
8
+ from fastmcp import FastMCP
9
+ from pydantic import Field
10
+
11
+ from ..services import IndexerService, VaultNote, VaultService
12
+
13
+ mcp = FastMCP(
14
+ "obsidian-docs-viewer",
15
+ instructions="Interact with a multi-tenant Obsidian-like documentation vault.",
16
+ )
17
+
18
+ vault_service = VaultService()
19
+ indexer_service = IndexerService()
20
+
21
+
22
+ def _current_user_id() -> str:
23
+ """Resolve the acting user ID (local mode defaults to local-dev)."""
24
+ return os.getenv("LOCAL_USER_ID", "local-dev")
25
+
26
+
27
+ def _note_to_response(note: VaultNote) -> Dict[str, Any]:
28
+ return {
29
+ "path": note["path"],
30
+ "title": note["title"],
31
+ "metadata": dict(note.get("metadata") or {}),
32
+ "body": note.get("body", ""),
33
+ }
34
+
35
+
36
+ @mcp.tool(name="list_notes", description="List notes in the vault (optionally scoped to a folder).")
37
+ def list_notes(
38
+ folder: Optional[str] = Field(
39
+ default=None,
40
+ description="Optional folder path (relative) to filter results. Example: 'api'.",
41
+ ),
42
+ ) -> List[Dict[str, Any]]:
43
+ user_id = _current_user_id()
44
+ notes = vault_service.list_notes(user_id, folder=folder)
45
+ return [
46
+ {
47
+ "path": entry["path"],
48
+ "title": entry["title"],
49
+ "last_modified": entry["last_modified"].isoformat(),
50
+ }
51
+ for entry in notes
52
+ ]
53
+
54
+
55
+ @mcp.tool(name="read_note", description="Read a Markdown note with metadata and body.")
56
+ def read_note(
57
+ path: str = Field(..., description="Relative path to the note (must include .md)"),
58
+ ) -> Dict[str, Any]:
59
+ user_id = _current_user_id()
60
+ note = vault_service.read_note(user_id, path)
61
+ return _note_to_response(note)
62
+
63
+
64
+ @mcp.tool(
65
+ name="write_note",
66
+ description="Create or update a note. Automatically updates frontmatter timestamps and search index.",
67
+ )
68
+ def write_note(
69
+ path: str = Field(..., description="Relative note path (includes .md)"),
70
+ body: str = Field(..., description="Markdown body content"),
71
+ title: Optional[str] = Field(
72
+ default=None,
73
+ description="Optional title override. Defaults to frontmatter title or first heading.",
74
+ ),
75
+ metadata: Optional[Dict[str, Any]] = Field(
76
+ default=None,
77
+ description="Optional frontmatter metadata dictionary (tags, project, etc.).",
78
+ ),
79
+ ) -> Dict[str, Any]:
80
+ user_id = _current_user_id()
81
+ note = vault_service.write_note(
82
+ user_id,
83
+ path,
84
+ title=title,
85
+ metadata=metadata,
86
+ body=body,
87
+ )
88
+ indexer_service.index_note(user_id, note)
89
+ return {"status": "ok", "path": path}
90
+
91
+
92
+ @mcp.tool(name="delete_note", description="Delete a note and remove it from the index.")
93
+ def delete_note(
94
+ path: str = Field(..., description="Relative note path (includes .md)"),
95
+ ) -> Dict[str, str]:
96
+ user_id = _current_user_id()
97
+ vault_service.delete_note(user_id, path)
98
+ indexer_service.delete_note_index(user_id, path)
99
+ return {"status": "ok"}
100
+
101
+
102
+ @mcp.tool(
103
+ name="search_notes",
104
+ description="Full-text search with snippets and recency-aware scoring.",
105
+ )
106
+ def search_notes(
107
+ query: str = Field(..., description="Search query (minimum 1 character)"),
108
+ limit: int = Field(50, ge=1, le=100, description="Maximum number of results to return."),
109
+ ) -> List[Dict[str, Any]]:
110
+ user_id = _current_user_id()
111
+ results = indexer_service.search_notes(user_id, query, limit=limit)
112
+ return [
113
+ {
114
+ "path": row["path"],
115
+ "title": row["title"],
116
+ "snippet": row["snippet"],
117
+ }
118
+ for row in results
119
+ ]
120
+
121
+
122
+ @mcp.tool(name="get_backlinks", description="List notes that reference the target note.")
123
+ def get_backlinks(
124
+ path: str = Field(..., description="Target note path (includes .md)"),
125
+ ) -> List[Dict[str, Any]]:
126
+ user_id = _current_user_id()
127
+ backlinks = indexer_service.get_backlinks(user_id, path)
128
+ return backlinks
129
+
130
+
131
+ @mcp.tool(name="get_tags", description="List tags and associated note counts.")
132
+ def get_tags() -> List[Dict[str, Any]]:
133
+ user_id = _current_user_id()
134
+ return indexer_service.get_tags(user_id)
135
+
136
+
137
+ if __name__ == "__main__":
138
+ mcp.run(transport="stdio")
backend/src/services/__init__.py CHANGED
@@ -3,7 +3,8 @@
3
  from .auth import AuthError, AuthService
4
  from .config import AppConfig, get_config, reload_config
5
  from .database import DatabaseService, init_database
6
- from .vault import VaultService, sanitize_path, validate_note_path
 
7
 
8
  __all__ = [
9
  "AppConfig",
@@ -14,6 +15,10 @@ __all__ = [
14
  "AuthService",
15
  "AuthError",
16
  "VaultService",
 
17
  "sanitize_path",
18
  "validate_note_path",
 
 
 
19
  ]
 
3
  from .auth import AuthError, AuthService
4
  from .config import AppConfig, get_config, reload_config
5
  from .database import DatabaseService, init_database
6
+ from .indexer import IndexerService, normalize_slug, normalize_tag
7
+ from .vault import VaultNote, VaultService, sanitize_path, validate_note_path
8
 
9
  __all__ = [
10
  "AppConfig",
 
15
  "AuthService",
16
  "AuthError",
17
  "VaultService",
18
+ "VaultNote",
19
  "sanitize_path",
20
  "validate_note_path",
21
+ "IndexerService",
22
+ "normalize_slug",
23
+ "normalize_tag",
24
  ]
backend/src/services/indexer.py ADDED
@@ -0,0 +1,376 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """SQLite-backed indexing utilities for notes."""
2
+
3
+ from __future__ import annotations
4
+
5
+ from datetime import datetime, timedelta, timezone
6
+ from pathlib import Path
7
+ import re
8
+ import sqlite3
9
+ from typing import Any, Dict, List, Sequence
10
+
11
+ from .database import DatabaseService
12
+ from .vault import VaultNote
13
+
14
+ WIKILINK_PATTERN = re.compile(r"\[\[([^\]]+)\]\]")
15
+
16
+
17
+ def _utcnow_iso() -> str:
18
+ return datetime.now(timezone.utc).isoformat(timespec="seconds")
19
+
20
+
21
+ def normalize_slug(text: str | None) -> str:
22
+ """Normalize text into a slug suitable for wikilink matching."""
23
+ if not text:
24
+ return ""
25
+ slug = text.lower()
26
+ slug = re.sub(r"[\s_]+", "-", slug)
27
+ slug = re.sub(r"[^a-z0-9-]", "", slug)
28
+ slug = re.sub(r"-+", "-", slug)
29
+ return slug.strip("-")
30
+
31
+
32
+ def normalize_tag(tag: str | None) -> str:
33
+ if not isinstance(tag, str):
34
+ return ""
35
+ return tag.strip().lower()
36
+
37
+
38
+ class IndexerService:
39
+ """Manage SQLite-backed metadata, tags, search index, and link graph."""
40
+
41
+ def __init__(self, db_service: DatabaseService | None = None) -> None:
42
+ self.db_service = db_service or DatabaseService()
43
+
44
+ def index_note(self, user_id: str, note: VaultNote) -> int:
45
+ """Insert or update index rows for a note."""
46
+ note_path = note["path"]
47
+ metadata = dict(note.get("metadata") or {})
48
+ title = note.get("title") or metadata.get("title") or Path(note_path).stem
49
+ body = note.get("body", "") or ""
50
+ size_bytes = int(note.get("size_bytes") or len(body.encode("utf-8")))
51
+ created = str(metadata.get("created") or _utcnow_iso())
52
+ updated = str(metadata.get("updated") or _utcnow_iso())
53
+
54
+ normalized_title_slug = normalize_slug(title)
55
+ normalized_path_slug = normalize_slug(Path(note_path).stem)
56
+ if not normalized_title_slug:
57
+ normalized_title_slug = normalized_path_slug
58
+
59
+ tags = self._prepare_tags(metadata.get("tags"))
60
+ wikilinks = self.extract_wikilinks(body)
61
+
62
+ conn = self.db_service.connect()
63
+ try:
64
+ with conn:
65
+ version = self.increment_version(conn, user_id, note_path)
66
+ self._delete_current_entries(conn, user_id, note_path)
67
+
68
+ conn.execute(
69
+ """
70
+ INSERT INTO note_metadata (
71
+ user_id, note_path, version, title,
72
+ created, updated, size_bytes,
73
+ normalized_title_slug, normalized_path_slug
74
+ ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
75
+ """,
76
+ (
77
+ user_id,
78
+ note_path,
79
+ version,
80
+ title,
81
+ created,
82
+ updated,
83
+ size_bytes,
84
+ normalized_title_slug,
85
+ normalized_path_slug,
86
+ ),
87
+ )
88
+
89
+ conn.execute(
90
+ """
91
+ INSERT INTO note_fts (user_id, note_path, title, body)
92
+ VALUES (?, ?, ?, ?)
93
+ """,
94
+ (user_id, note_path, title, body),
95
+ )
96
+
97
+ if tags:
98
+ conn.executemany(
99
+ """
100
+ INSERT INTO note_tags (user_id, note_path, tag)
101
+ VALUES (?, ?, ?)
102
+ """,
103
+ [(user_id, note_path, tag) for tag in tags],
104
+ )
105
+
106
+ if wikilinks:
107
+ resolved = self.resolve_wikilinks(conn, user_id, note_path, wikilinks)
108
+ conn.executemany(
109
+ """
110
+ INSERT INTO note_links (user_id, source_path, target_path, link_text, is_resolved)
111
+ VALUES (?, ?, ?, ?, ?)
112
+ """,
113
+ [
114
+ (
115
+ user_id,
116
+ note_path,
117
+ entry["target_path"],
118
+ entry["link_text"],
119
+ 1 if entry["is_resolved"] else 0,
120
+ )
121
+ for entry in resolved
122
+ ],
123
+ )
124
+
125
+ self.update_index_health(conn, user_id)
126
+
127
+ return version
128
+ finally:
129
+ conn.close()
130
+
131
+ def delete_note_index(self, user_id: str, note_path: str) -> None:
132
+ """Remove all index data for a note and update backlinks."""
133
+ conn = self.db_service.connect()
134
+ try:
135
+ with conn:
136
+ self._delete_current_entries(conn, user_id, note_path)
137
+ conn.execute(
138
+ """
139
+ UPDATE note_links
140
+ SET target_path = NULL, is_resolved = 0
141
+ WHERE user_id = ? AND target_path = ?
142
+ """,
143
+ (user_id, note_path),
144
+ )
145
+ self.update_index_health(conn, user_id)
146
+ finally:
147
+ conn.close()
148
+
149
+ def extract_wikilinks(self, body: str) -> List[str]:
150
+ """Extract wikilink text from Markdown body."""
151
+ links = []
152
+ for match in WIKILINK_PATTERN.finditer(body or ""):
153
+ link_text = match.group(1).strip()
154
+ if link_text:
155
+ links.append(link_text)
156
+ # Preserve order but drop duplicates
157
+ seen: Dict[str, None] = {}
158
+ for link in links:
159
+ if link not in seen:
160
+ seen[link] = None
161
+ return list(seen.keys())
162
+
163
+ def resolve_wikilinks(
164
+ self,
165
+ conn: sqlite3.Connection,
166
+ user_id: str,
167
+ note_path: str,
168
+ link_texts: Sequence[str],
169
+ ) -> List[Dict[str, Any]]:
170
+ """Resolve wikilinks to target note paths using slug comparison."""
171
+ if not link_texts:
172
+ return []
173
+
174
+ results: List[Dict[str, Any]] = []
175
+ note_folder = Path(note_path).parent
176
+
177
+ for text in link_texts:
178
+ slug = normalize_slug(text)
179
+ if not slug:
180
+ results.append({"link_text": text, "target_path": None, "is_resolved": False})
181
+ continue
182
+
183
+ rows = conn.execute(
184
+ """
185
+ SELECT note_path
186
+ FROM note_metadata
187
+ WHERE user_id = ?
188
+ AND (normalized_title_slug = ? OR normalized_path_slug = ?)
189
+ """,
190
+ (user_id, slug, slug),
191
+ ).fetchall()
192
+
193
+ if not rows:
194
+ results.append({"link_text": text, "target_path": None, "is_resolved": False})
195
+ continue
196
+
197
+ candidates = [row["note_path"] if isinstance(row, sqlite3.Row) else row[0] for row in rows]
198
+ target = sorted(
199
+ candidates,
200
+ key=lambda candidate: (Path(candidate).parent != note_folder, candidate),
201
+ )[0]
202
+
203
+ results.append({"link_text": text, "target_path": target, "is_resolved": True})
204
+
205
+ return results
206
+
207
+ def increment_version(
208
+ self, conn: sqlite3.Connection, user_id: str, note_path: str
209
+ ) -> int:
210
+ """Return the next version number for a note."""
211
+ row = conn.execute(
212
+ "SELECT version FROM note_metadata WHERE user_id = ? AND note_path = ?",
213
+ (user_id, note_path),
214
+ ).fetchone()
215
+ if row is None:
216
+ return 1
217
+ current_version = row["version"] if isinstance(row, sqlite3.Row) else row[0]
218
+ return int(current_version) + 1
219
+
220
+ def update_index_health(self, conn: sqlite3.Connection, user_id: str) -> None:
221
+ """Update per-user index health stats."""
222
+ row = conn.execute(
223
+ "SELECT COUNT(*) AS count FROM note_metadata WHERE user_id = ?",
224
+ (user_id,),
225
+ ).fetchone()
226
+ note_count = int(row["count"] if isinstance(row, sqlite3.Row) else row[0])
227
+ now_iso = _utcnow_iso()
228
+ conn.execute(
229
+ """
230
+ INSERT INTO index_health (user_id, note_count, last_incremental_update)
231
+ VALUES (?, ?, ?)
232
+ ON CONFLICT(user_id) DO UPDATE SET
233
+ note_count = excluded.note_count,
234
+ last_incremental_update = excluded.last_incremental_update
235
+ """,
236
+ (user_id, note_count, now_iso),
237
+ )
238
+
239
+ def search_notes(self, user_id: str, query: str, *, limit: int = 50) -> List[Dict[str, Any]]:
240
+ """Execute a full-text search with recency bonus scoring."""
241
+ if not query or not query.strip():
242
+ raise ValueError("Search query cannot be empty")
243
+
244
+ conn = self.db_service.connect()
245
+ try:
246
+ rows = conn.execute(
247
+ """
248
+ SELECT
249
+ m.note_path,
250
+ m.title,
251
+ m.updated,
252
+ snippet(note_fts, 3, '<mark>', '</mark>', '...', 32) AS snippet,
253
+ bm25(note_fts, 3.0, 1.0) AS score
254
+ FROM note_fts
255
+ JOIN note_metadata m USING (user_id, note_path)
256
+ WHERE note_fts.user_id = ? AND note_fts MATCH ?
257
+ ORDER BY score DESC
258
+ LIMIT ?
259
+ """,
260
+ (user_id, query, limit),
261
+ ).fetchall()
262
+ finally:
263
+ conn.close()
264
+
265
+ now = datetime.now(timezone.utc)
266
+ seven_days = timedelta(days=7)
267
+ thirty_days = timedelta(days=30)
268
+
269
+ results: List[Dict[str, Any]] = []
270
+ for row in rows:
271
+ updated_raw = row["updated"] if isinstance(row, sqlite3.Row) else row[2]
272
+ snippet = row["snippet"] if isinstance(row, sqlite3.Row) else row[3]
273
+ base_score = float(row["score"] if isinstance(row, sqlite3.Row) else row[4])
274
+ try:
275
+ updated_dt = datetime.fromisoformat(str(updated_raw))
276
+ except ValueError:
277
+ updated_dt = now
278
+ delta = now - updated_dt
279
+ if delta <= seven_days:
280
+ bonus = 1.0
281
+ elif delta <= thirty_days:
282
+ bonus = 0.5
283
+ else:
284
+ bonus = 0.0
285
+
286
+ results.append(
287
+ {
288
+ "path": row["note_path"] if isinstance(row, sqlite3.Row) else row[0],
289
+ "title": row["title"] if isinstance(row, sqlite3.Row) else row[1],
290
+ "snippet": snippet or "",
291
+ "score": base_score + bonus,
292
+ "updated": updated_raw,
293
+ }
294
+ )
295
+
296
+ return sorted(results, key=lambda item: item["score"], reverse=True)
297
+
298
+ def get_backlinks(self, user_id: str, target_path: str) -> List[Dict[str, Any]]:
299
+ """Return backlinks for a note."""
300
+ conn = self.db_service.connect()
301
+ try:
302
+ rows = conn.execute(
303
+ """
304
+ SELECT DISTINCT l.source_path, m.title
305
+ FROM note_links l
306
+ JOIN note_metadata m
307
+ ON l.user_id = m.user_id AND l.source_path = m.note_path
308
+ WHERE l.user_id = ? AND l.target_path = ?
309
+ ORDER BY m.updated DESC
310
+ """,
311
+ (user_id, target_path),
312
+ ).fetchall()
313
+ finally:
314
+ conn.close()
315
+
316
+ return [
317
+ {
318
+ "path": row["source_path"] if isinstance(row, sqlite3.Row) else row[0],
319
+ "title": row["title"] if isinstance(row, sqlite3.Row) else row[1],
320
+ }
321
+ for row in rows
322
+ ]
323
+
324
+ def get_tags(self, user_id: str) -> List[Dict[str, Any]]:
325
+ """Return tag counts for a user."""
326
+ conn = self.db_service.connect()
327
+ try:
328
+ rows = conn.execute(
329
+ """
330
+ SELECT tag, COUNT(DISTINCT note_path) AS count
331
+ FROM note_tags
332
+ WHERE user_id = ?
333
+ GROUP BY tag
334
+ ORDER BY count DESC, tag ASC
335
+ """,
336
+ (user_id,),
337
+ ).fetchall()
338
+ finally:
339
+ conn.close()
340
+
341
+ return [
342
+ {"tag": row["tag"] if isinstance(row, sqlite3.Row) else row[0], "count": int(row["count"] if isinstance(row, sqlite3.Row) else row[1])}
343
+ for row in rows
344
+ ]
345
+
346
+ def _delete_current_entries(self, conn: sqlite3.Connection, user_id: str, note_path: str) -> None:
347
+ """Delete existing index rows for a note."""
348
+ conn.execute(
349
+ "DELETE FROM note_metadata WHERE user_id = ? AND note_path = ?",
350
+ (user_id, note_path),
351
+ )
352
+ conn.execute(
353
+ "DELETE FROM note_fts WHERE user_id = ? AND note_path = ?",
354
+ (user_id, note_path),
355
+ )
356
+ conn.execute(
357
+ "DELETE FROM note_tags WHERE user_id = ? AND note_path = ?",
358
+ (user_id, note_path),
359
+ )
360
+ conn.execute(
361
+ "DELETE FROM note_links WHERE user_id = ? AND source_path = ?",
362
+ (user_id, note_path),
363
+ )
364
+
365
+ def _prepare_tags(self, tags: Any) -> List[str]:
366
+ if not isinstance(tags, list):
367
+ return []
368
+ normalized: List[str] = []
369
+ for tag in tags:
370
+ cleaned = normalize_tag(tag)
371
+ if cleaned and cleaned not in normalized:
372
+ normalized.append(cleaned)
373
+ return normalized
374
+
375
+
376
+ __all__ = ["IndexerService", "normalize_slug", "normalize_tag"]
backend/src/services/vault.py CHANGED
@@ -2,12 +2,20 @@
2
 
3
  from __future__ import annotations
4
 
 
5
  from pathlib import Path
6
- from typing import Tuple
 
 
 
7
 
8
  from .config import AppConfig, get_config
9
 
10
  INVALID_PATH_CHARS = {'<', '>', ':', '"', '|', '?', '*'}
 
 
 
 
11
 
12
 
13
  def validate_note_path(note_path: str) -> Tuple[bool, str]:
@@ -44,6 +52,42 @@ def sanitize_path(user_id: str, vault_root: Path, note_path: str) -> Path:
44
  return full_path
45
 
46
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
  class VaultService:
48
  """Service for managing vault directories and basic path validation."""
49
 
@@ -69,5 +113,123 @@ class VaultService:
69
  raise ValueError(message)
70
  return sanitize_path(user_id, self.vault_root, note_path)
71
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
72
 
73
  __all__ = ["VaultService", "validate_note_path", "sanitize_path"]
 
2
 
3
  from __future__ import annotations
4
 
5
+ from datetime import datetime, timezone
6
  from pathlib import Path
7
+ import re
8
+ from typing import Any, Dict, List, Tuple
9
+
10
+ import frontmatter
11
 
12
  from .config import AppConfig, get_config
13
 
14
  INVALID_PATH_CHARS = {'<', '>', ':', '"', '|', '?', '*'}
15
+ MAX_NOTE_BYTES = 1_048_576
16
+ H1_PATTERN = re.compile(r"^\s*#\s+(.+)$", re.MULTILINE)
17
+
18
+ VaultNote = Dict[str, Any]
19
 
20
 
21
  def validate_note_path(note_path: str) -> Tuple[bool, str]:
 
52
  return full_path
53
 
54
 
55
+ def _utcnow_iso() -> str:
56
+ return datetime.now(timezone.utc).isoformat(timespec="seconds")
57
+
58
+
59
+ def _validate_frontmatter(metadata: Dict[str, Any]) -> Dict[str, Any]:
60
+ reserved = {"version"}
61
+ for key in metadata.keys():
62
+ if key in reserved:
63
+ raise ValueError(f"Field '{key}' is reserved and cannot be set in frontmatter")
64
+ tags = metadata.get("tags")
65
+ if tags is not None:
66
+ if not isinstance(tags, list):
67
+ raise ValueError("Field 'tags' must be an array")
68
+ if not all(isinstance(tag, str) for tag in tags):
69
+ raise ValueError("All tags must be strings")
70
+ return metadata
71
+
72
+
73
+ def _validate_note_body(body: str) -> None:
74
+ body_bytes = body.encode("utf-8")
75
+ if len(body_bytes) > MAX_NOTE_BYTES:
76
+ raise ValueError("Note exceeds 1 MiB limit")
77
+
78
+
79
+ def _derive_title(note_path: str, metadata: Dict[str, Any], body: str) -> str:
80
+ title = metadata.get("title")
81
+ if isinstance(title, str) and title.strip():
82
+ return title.strip()
83
+ match = H1_PATTERN.search(body or "")
84
+ if match:
85
+ return match.group(1).strip()
86
+ stem = Path(note_path).stem
87
+ title_from_filename = stem.replace("-", " ").replace("_", " ").strip()
88
+ return title_from_filename or stem
89
+
90
+
91
  class VaultService:
92
  """Service for managing vault directories and basic path validation."""
93
 
 
113
  raise ValueError(message)
114
  return sanitize_path(user_id, self.vault_root, note_path)
115
 
116
+ def read_note(self, user_id: str, note_path: str) -> VaultNote:
117
+ """Read a Markdown note, returning metadata, body, and derived title."""
118
+ base = self.initialize_vault(user_id)
119
+ absolute_path = self.resolve_note_path(user_id, note_path)
120
+ if not absolute_path.exists():
121
+ raise FileNotFoundError(f"Note not found: {note_path}")
122
+ post = frontmatter.load(absolute_path)
123
+ metadata = dict(post.metadata or {})
124
+ body = post.content or ""
125
+ return self._build_note_payload(note_path, metadata, body, absolute_path)
126
+
127
+ def write_note(
128
+ self,
129
+ user_id: str,
130
+ note_path: str,
131
+ *,
132
+ title: str | None = None,
133
+ metadata: Dict[str, Any] | None = None,
134
+ body: str,
135
+ ) -> VaultNote:
136
+ """Create or update a note with validated metadata and content."""
137
+ absolute_path = self.resolve_note_path(user_id, note_path)
138
+ body = body or ""
139
+ _validate_note_body(body)
140
+
141
+ metadata_dict: Dict[str, Any] = dict(metadata or {})
142
+ _validate_frontmatter(metadata_dict)
143
+
144
+ existing_created: str | None = None
145
+ if absolute_path.exists():
146
+ try:
147
+ current = frontmatter.load(absolute_path)
148
+ current_created = current.metadata.get("created")
149
+ if isinstance(current_created, str):
150
+ existing_created = current_created
151
+ except Exception:
152
+ existing_created = None
153
+
154
+ effective_title = title or metadata_dict.get("title")
155
+ if not effective_title:
156
+ effective_title = _derive_title(note_path, metadata_dict, body)
157
+ metadata_dict["title"] = effective_title
158
+
159
+ now_iso = _utcnow_iso()
160
+ metadata_dict.setdefault("created", existing_created or metadata_dict.get("created") or now_iso)
161
+ metadata_dict["updated"] = now_iso
162
+
163
+ absolute_path.parent.mkdir(parents=True, exist_ok=True)
164
+ post = frontmatter.Post(body, **metadata_dict)
165
+ absolute_path.write_text(frontmatter.dumps(post), encoding="utf-8")
166
+ return self._build_note_payload(note_path, metadata_dict, body, absolute_path)
167
+
168
+ def delete_note(self, user_id: str, note_path: str) -> None:
169
+ """Delete a note from the vault."""
170
+ absolute_path = self.resolve_note_path(user_id, note_path)
171
+ try:
172
+ absolute_path.unlink()
173
+ except FileNotFoundError as exc:
174
+ raise FileNotFoundError(f"Note not found: {note_path}") from exc
175
+
176
+ def list_notes(self, user_id: str, folder: str | None = None) -> List[Dict[str, Any]]:
177
+ """List notes (optionally scoped to a folder) with titles and timestamps."""
178
+ base = self.initialize_vault(user_id).resolve()
179
+
180
+ if folder:
181
+ cleaned = folder.strip().strip("/")
182
+ if "\\" in cleaned or ".." in cleaned:
183
+ raise ValueError("Folder path contains invalid characters")
184
+ folder_path = (base / cleaned).resolve() if cleaned else base
185
+ if not str(folder_path).startswith(str(base)):
186
+ raise ValueError("Folder path escapes vault root")
187
+ if not folder_path.exists():
188
+ return []
189
+ if folder_path.is_file():
190
+ files = [folder_path] if folder_path.suffix == ".md" else []
191
+ else:
192
+ files = list(folder_path.rglob("*.md"))
193
+ else:
194
+ files = list(base.rglob("*.md"))
195
+
196
+ results: List[Dict[str, Any]] = []
197
+ for file_path in files:
198
+ if not file_path.is_file():
199
+ continue
200
+ relative_path = file_path.relative_to(base).as_posix()
201
+ try:
202
+ post = frontmatter.load(file_path)
203
+ metadata = dict(post.metadata or {})
204
+ body = post.content or ""
205
+ title = _derive_title(relative_path, metadata, body)
206
+ except Exception:
207
+ title = Path(relative_path).stem
208
+ stat = file_path.stat()
209
+ results.append(
210
+ {
211
+ "path": relative_path,
212
+ "title": title,
213
+ "last_modified": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc),
214
+ }
215
+ )
216
+
217
+ return sorted(results, key=lambda item: item["path"].lower())
218
+
219
+ def _build_note_payload(
220
+ self, note_path: str, metadata: Dict[str, Any], body: str, absolute_path: Path
221
+ ) -> VaultNote:
222
+ stat = absolute_path.stat()
223
+ title = _derive_title(note_path, metadata, body)
224
+ return {
225
+ "path": note_path,
226
+ "title": title,
227
+ "metadata": metadata,
228
+ "body": body,
229
+ "size_bytes": stat.st_size,
230
+ "modified": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc),
231
+ "absolute_path": absolute_path,
232
+ }
233
+
234
 
235
  __all__ = ["VaultService", "validate_note_path", "sanitize_path"]
frontend/.env.example ADDED
File without changes