Spaces:
Running
on
Zero
Running
on
Zero
Bellok
commited on
Commit
·
08609d9
1
Parent(s):
c2695d4
`Refactor Warbler pack ingestion to process in batches for memory efficiency and add progress updates.`
Browse files
app.py
CHANGED
|
@@ -174,13 +174,49 @@ if WARBLER_AVAILABLE:
|
|
| 174 |
|
| 175 |
if pack_docs and pack_manager.should_ingest_packs(api, len(pack_docs)):
|
| 176 |
print(f"[INFO] Ingesting {len(pack_docs)} documents from Warbler packs...")
|
| 177 |
-
|
| 178 |
-
|
| 179 |
-
|
| 180 |
-
|
| 181 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 182 |
pack_manager.mark_packs_ingested(1, packs_loaded)
|
| 183 |
-
|
|
|
|
|
|
|
| 184 |
|
| 185 |
elif pack_docs:
|
| 186 |
packs_loaded = len(pack_docs)
|
|
|
|
| 174 |
|
| 175 |
if pack_docs and pack_manager.should_ingest_packs(api, len(pack_docs)):
|
| 176 |
print(f"[INFO] Ingesting {len(pack_docs)} documents from Warbler packs...")
|
| 177 |
+
total_docs = len(pack_docs)
|
| 178 |
+
processed = 0
|
| 179 |
+
failed = 0
|
| 180 |
+
start_time = time.time()
|
| 181 |
+
batch_size = 1000
|
| 182 |
+
|
| 183 |
+
# Process in batches to avoid memory issues and provide progress
|
| 184 |
+
for batch_start in range(0, total_docs, batch_size):
|
| 185 |
+
batch_end = min(batch_start + batch_size, total_docs)
|
| 186 |
+
batch = pack_docs[batch_start:batch_end]
|
| 187 |
+
|
| 188 |
+
batch_processed = 0
|
| 189 |
+
batch_failed = 0
|
| 190 |
+
|
| 191 |
+
for doc in batch:
|
| 192 |
+
success = api.add_document(doc["id"], doc["content"], doc["metadata"])
|
| 193 |
+
if not success:
|
| 194 |
+
batch_failed += 1
|
| 195 |
+
failed += 1
|
| 196 |
+
if failed <= 5: # Log first few failures
|
| 197 |
+
print(f"[WARN] Failed to add document {doc['id']}")
|
| 198 |
+
|
| 199 |
+
batch_processed += 1
|
| 200 |
+
processed += 1
|
| 201 |
+
|
| 202 |
+
# Progress update after each batch
|
| 203 |
+
elapsed = time.time() - start_time
|
| 204 |
+
rate = processed / elapsed if elapsed > 0 else 0
|
| 205 |
+
eta = (total_docs - processed) / rate if rate > 0 else 0
|
| 206 |
+
print(f"[PROGRESS] {processed}/{total_docs} documents ingested "
|
| 207 |
+
f"({processed/total_docs*100:.1f}%) - "
|
| 208 |
+
f"{rate:.1f} docs/sec - ETA: {eta/60:.1f} min")
|
| 209 |
+
|
| 210 |
+
# Force garbage collection after large batches to free memory
|
| 211 |
+
if processed % 10000 == 0:
|
| 212 |
+
import gc
|
| 213 |
+
gc.collect()
|
| 214 |
+
|
| 215 |
+
packs_loaded = processed
|
| 216 |
pack_manager.mark_packs_ingested(1, packs_loaded)
|
| 217 |
+
total_time = time.time() - start_time
|
| 218 |
+
print(f"[OK] Loaded {packs_loaded} documents from Warbler packs "
|
| 219 |
+
f"({failed} failed) in {total_time:.1f} seconds")
|
| 220 |
|
| 221 |
elif pack_docs:
|
| 222 |
packs_loaded = len(pack_docs)
|