Bellok commited on
Commit
d90df2b
·
1 Parent(s): 08609d9

trying to allow ingesting in the background. Current throughput on ingest is 17+ documents per secon, that means only 25k documents out of over a million will load within HuggingFace's required time limit. If we want to have access to all 1.5m documents, we need to run the ingest in the background and allow Warbler-CDA to report healthy before the time-limit is up.

Browse files
Files changed (2) hide show
  1. TODO.md +21 -31
  2. app.py +95 -44
TODO.md CHANGED
@@ -1,39 +1,29 @@
1
- # NPC Dialog Language Pack Creation Plan
2
 
3
  ## Overview
4
- Create our own NPC dialog language pack by enhancing the synthetic NPCDialogueTransformer and generating a large dataset, replacing any references to copyrighted material.
5
 
6
  ## Tasks
7
 
8
- ### 1. Remove References to Copyrighted Dataset
9
- - [ ] Remove `amaydle/npc-dialogue` references from README.md
10
- - [ ] Remove `amaydle/npc-dialogue` references from PKG-INFO
11
- - [ ] Remove `amaydle/npc-dialogue` references from VALIDATION_REPORT_MIT_DATASETS.md
12
-
13
- ### 2. Update Pack Metadata
14
- - [ ] Update `packs/warbler-pack-npc-dialog/package.json` - change name from "warbler-pack-core" to "warbler-pack-npc-dialog"
15
- - [ ] Update `packs/warbler-pack-npc-dialog/README.md` - change content to focus on NPC dialog instead of core templates
16
- - [ ] Update `packs/warbler-pack-npc-dialog/README_HF_DATASET.md` - update metadata for NPC dialog focus
17
-
18
- ### 3. Enhance NPCDialogueTransformer
19
- - [ ] Expand archetypes from 3 to 10+ diverse NPC types (merchant, guard, healer, mage, blacksmith, innkeeper, etc.)
20
- - [ ] Add more varied queries and responses per archetype
21
- - [ ] Increase default sample size from 50 to 1000+
22
- - [ ] Add more emotion types and dialogue contexts
23
-
24
- ### 4. Update Templates (Optional)
25
- - [ ] Review `packs/warbler-pack-npc-dialog/pack/templates.json` for NPC-specific additions
26
- - [ ] Add templates for common NPC interactions if needed
27
-
28
- ### 5. Generate Large Dataset
29
- - [ ] Run ingestion script with NPC dialogue transformer
30
- - [ ] Generate 1000+ synthetic dialogue entries
31
- - [ ] Validate generated jsonl file
32
-
33
- ### 6. Test and Validate
34
- - [ ] Test pack loading and ingestion
35
- - [ ] Validate metadata and content structure
36
- - [ ] Ensure no copyrighted material references remain
37
 
38
  ## Status
39
  - [x] Plan created and approved
 
1
+ # Background Pack Ingestion Implementation
2
 
3
  ## Overview
4
+ Modify app.py to perform pack ingestion in a background thread, allowing the app to start immediately while documents load asynchronously.
5
 
6
  ## Tasks
7
 
8
+ ### 1. Add Background Ingestion Support
9
+ - [ ] Import threading module in app.py
10
+ - [ ] Add global variables to track ingestion status (running, progress, total_docs, processed, etc.)
11
+ - [ ] Create a background_ingest_packs() function that performs the ingestion logic
12
+ - [ ] Start the background thread after API initialization but before app launch
13
+
14
+ ### 2. Update System Stats
15
+ - [ ] Modify get_system_stats() to include ingestion progress information
16
+ - [ ] Display current ingestion status in the System Stats tab
17
+
18
+ ### 3. Handle Thread Safety
19
+ - [ ] Ensure API.add_document() calls are thread-safe (assuming they are)
20
+ - [ ] Add proper error handling in the background thread
21
+
22
+ ### 4. Test Implementation
23
+ - [ ] Test that app launches immediately
24
+ - [ ] Verify ingestion happens in background
25
+ - [ ] Check that queries work during ingestion
26
+ - [ ] Confirm progress is shown in System Stats
 
 
 
 
 
 
 
 
 
 
27
 
28
  ## Status
29
  - [x] Plan created and approved
app.py CHANGED
@@ -6,6 +6,7 @@ Interactive demo of the Cognitive Development Architecture RAG system
6
  import json
7
  import time
8
  import os
 
9
  import gradio as gr
10
  import hashlib
11
  import spaces
@@ -122,6 +123,17 @@ class PackManager:
122
 
123
  pack_manager = PackManager()
124
 
 
 
 
 
 
 
 
 
 
 
 
125
  try:
126
  from warbler_cda import (
127
  RetrievalAPI,
@@ -173,50 +185,15 @@ if WARBLER_AVAILABLE:
173
  pack_docs = pack_loader.discover_documents()
174
 
175
  if pack_docs and pack_manager.should_ingest_packs(api, len(pack_docs)):
176
- print(f"[INFO] Ingesting {len(pack_docs)} documents from Warbler packs...")
177
- total_docs = len(pack_docs)
178
- processed = 0
179
- failed = 0
180
- start_time = time.time()
181
- batch_size = 1000
182
-
183
- # Process in batches to avoid memory issues and provide progress
184
- for batch_start in range(0, total_docs, batch_size):
185
- batch_end = min(batch_start + batch_size, total_docs)
186
- batch = pack_docs[batch_start:batch_end]
187
-
188
- batch_processed = 0
189
- batch_failed = 0
190
-
191
- for doc in batch:
192
- success = api.add_document(doc["id"], doc["content"], doc["metadata"])
193
- if not success:
194
- batch_failed += 1
195
- failed += 1
196
- if failed <= 5: # Log first few failures
197
- print(f"[WARN] Failed to add document {doc['id']}")
198
-
199
- batch_processed += 1
200
- processed += 1
201
-
202
- # Progress update after each batch
203
- elapsed = time.time() - start_time
204
- rate = processed / elapsed if elapsed > 0 else 0
205
- eta = (total_docs - processed) / rate if rate > 0 else 0
206
- print(f"[PROGRESS] {processed}/{total_docs} documents ingested "
207
- f"({processed/total_docs*100:.1f}%) - "
208
- f"{rate:.1f} docs/sec - ETA: {eta/60:.1f} min")
209
-
210
- # Force garbage collection after large batches to free memory
211
- if processed % 10000 == 0:
212
- import gc
213
- gc.collect()
214
-
215
- packs_loaded = processed
216
- pack_manager.mark_packs_ingested(1, packs_loaded)
217
- total_time = time.time() - start_time
218
- print(f"[OK] Loaded {packs_loaded} documents from Warbler packs "
219
- f"({failed} failed) in {total_time:.1f} seconds")
220
 
221
  elif pack_docs:
222
  packs_loaded = len(pack_docs)
@@ -240,6 +217,72 @@ if WARBLER_AVAILABLE:
240
  traceback.print_exc()
241
 
242
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
243
  @spaces.GPU
244
  def query_warbler(
245
  query_text: str,
@@ -497,6 +540,14 @@ with gr.Blocks(title="Warbler CDA - RAG System Demo", theme=gr.themes.Soft()) as
497
  # Auto-load stats on tab open
498
  demo.load(fn=get_system_stats, outputs=stats_output)
499
 
 
 
 
 
 
 
 
 
500
  with gr.Tab("About"):
501
  gr.Markdown(
502
  """
 
6
  import json
7
  import time
8
  import os
9
+ import threading
10
  import gradio as gr
11
  import hashlib
12
  import spaces
 
123
 
124
  pack_manager = PackManager()
125
 
126
+ # Global variables for background ingestion tracking
127
+ ingestion_status = {
128
+ "running": False,
129
+ "total_docs": 0,
130
+ "processed": 0,
131
+ "failed": 0,
132
+ "start_time": None,
133
+ "eta": 0,
134
+ "rate": 0,
135
+ }
136
+
137
  try:
138
  from warbler_cda import (
139
  RetrievalAPI,
 
185
  pack_docs = pack_loader.discover_documents()
186
 
187
  if pack_docs and pack_manager.should_ingest_packs(api, len(pack_docs)):
188
+ # Start background ingestion
189
+ ingestion_thread = threading.Thread(
190
+ target=background_ingest_packs,
191
+ args=(api, pack_docs, pack_manager),
192
+ daemon=True
193
+ )
194
+ ingestion_thread.start()
195
+ packs_loaded = 0 # Will be updated asynchronously
196
+ print(f"[INFO] Started background ingestion of {len(pack_docs)} documents")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
197
 
198
  elif pack_docs:
199
  packs_loaded = len(pack_docs)
 
217
  traceback.print_exc()
218
 
219
 
220
+ def background_ingest_packs(api, pack_docs, pack_manager):
221
+ """Background function to ingest packs without blocking app startup"""
222
+ global ingestion_status
223
+
224
+ ingestion_status["running"] = True
225
+ ingestion_status["total_docs"] = len(pack_docs)
226
+ ingestion_status["processed"] = 0
227
+ ingestion_status["failed"] = 0
228
+ ingestion_status["start_time"] = time.time()
229
+
230
+ print(f"[INFO] Ingesting {len(pack_docs)} documents from Warbler packs...")
231
+ total_docs = len(pack_docs)
232
+ processed = 0
233
+ failed = 0
234
+ start_time = time.time()
235
+ batch_size = 1000
236
+
237
+ # Process in batches to avoid memory issues and provide progress
238
+ for batch_start in range(0, total_docs, batch_size):
239
+ batch_end = min(batch_start + batch_size, total_docs)
240
+ batch = pack_docs[batch_start:batch_end]
241
+
242
+ batch_processed = 0
243
+ batch_failed = 0
244
+
245
+ for doc in batch:
246
+ success = api.add_document(doc["id"], doc["content"], doc["metadata"])
247
+ if not success:
248
+ batch_failed += 1
249
+ failed += 1
250
+ if failed <= 5: # Log first few failures
251
+ print(f"[WARN] Failed to add document {doc['id']}")
252
+
253
+ batch_processed += 1
254
+ processed += 1
255
+
256
+ # Update global status
257
+ ingestion_status["processed"] = processed
258
+ ingestion_status["failed"] = failed
259
+
260
+ # Progress update after each batch
261
+ elapsed = time.time() - start_time
262
+ rate = processed / elapsed if elapsed > 0 else 0
263
+ eta = (total_docs - processed) / rate if rate > 0 else 0
264
+ ingestion_status["rate"] = rate
265
+ ingestion_status["eta"] = eta
266
+
267
+ print(f"[PROGRESS] {processed}/{total_docs} documents ingested "
268
+ f"({processed/total_docs*100:.1f}%) - "
269
+ f"{rate:.1f} docs/sec - ETA: {eta/60:.1f} min")
270
+
271
+ # Force garbage collection after large batches to free memory
272
+ if processed % 10000 == 0:
273
+ import gc
274
+ gc.collect()
275
+
276
+ packs_loaded = processed
277
+ pack_manager.mark_packs_ingested(1, packs_loaded)
278
+ total_time = time.time() - start_time
279
+ print(f"[OK] Loaded {packs_loaded} documents from Warbler packs "
280
+ f"({failed} failed) in {total_time:.1f} seconds")
281
+
282
+ # Mark ingestion complete
283
+ ingestion_status["running"] = False
284
+
285
+
286
  @spaces.GPU
287
  def query_warbler(
288
  query_text: str,
 
540
  # Auto-load stats on tab open
541
  demo.load(fn=get_system_stats, outputs=stats_output)
542
 
543
+ # Refresh stats every 10 seconds if ingestion is running
544
+ def auto_refresh_stats():
545
+ while ingestion_status["running"]:
546
+ time.sleep(10)
547
+ # Note: In Gradio, we can't directly update from background thread
548
+ # This would need a more complex setup with queues or websockets
549
+ # For now, users can manually refresh
550
+
551
  with gr.Tab("About"):
552
  gr.Markdown(
553
  """