403 lines
13 KiB
Python
403 lines
13 KiB
Python
# Document Ingestion Logic
|
|
import asyncio
|
|
import os
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Optional, BinaryIO
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
|
|
# Import local modules
|
|
from .config import settings
|
|
from .chunking import chunk_text, estimate_tokens
|
|
from .embeddings import embed_texts
|
|
from .vector_store import upsert_chunks
|
|
from .db import replace_library_documents, upsert_library
|
|
from .git_source import ingest_git_source
|
|
|
|
SUPPORTED_EXTENSIONS = {'.md', '.txt', '.py', '.js', '.ts', '.json',
|
|
'.yaml', '.yml', '.html', '.css', '.pdf'}
|
|
|
|
# Default documents path from environment or fallback
|
|
DOCS_PATH = Path(os.getenv("DOCS_PATH", "./docs"))
|
|
|
|
|
|
def get_file_size(path: Path) -> int:
|
|
"""Get file size in bytes."""
|
|
try:
|
|
return path.stat().st_size
|
|
except OSError:
|
|
return -1
|
|
|
|
|
|
async def read_document_file(path: Path) -> str:
|
|
"""
|
|
Read document content from a file.
|
|
|
|
Args:
|
|
path: Path to the file
|
|
|
|
Returns:
|
|
Content as string, or empty string if error
|
|
|
|
Raises:
|
|
ValueError: If file type not supported
|
|
"""
|
|
if not path.exists():
|
|
return ""
|
|
|
|
# Check extension
|
|
suffix = path.suffix.lower()
|
|
if suffix == '.pdf':
|
|
from pypdf import PdfReader
|
|
|
|
try:
|
|
reader = PdfReader(str(path))
|
|
pages = []
|
|
for page_num in range(len(reader.pages)):
|
|
page = reader.pages[page_num]
|
|
text = page.extract_text()
|
|
if text:
|
|
pages.append(text)
|
|
return "\n\n".join(pages)
|
|
except ImportError:
|
|
raise ImportError("pypdf is required for PDF files. Install with: pip install pypdf")
|
|
except Exception as e:
|
|
print(f" Warning: Could not read PDF {path}: {e}")
|
|
return ""
|
|
elif suffix not in SUPPORTED_EXTENSIONS:
|
|
print(f" Unsupported file type: {suffix}")
|
|
return ""
|
|
|
|
# Read text-based files
|
|
try:
|
|
content = path.read_text(encoding='utf-8')
|
|
return content if content.strip() else ""
|
|
except Exception as e:
|
|
print(f" Warning: Could not read {path}: {e}")
|
|
return ""
|
|
|
|
|
|
async def ingest_library(library_id: str, name: str, description: Optional[str] = None, source_path: Optional[str] = None) -> Dict[str, Any]:
|
|
"""
|
|
Ingest all documents for a library.
|
|
|
|
Args:
|
|
library_id: Unique identifier for the library
|
|
name: Library name
|
|
description: Optional description
|
|
source_path: Path to library folder (relative to DOCS_PATH)
|
|
|
|
Returns:
|
|
Summary dict with operation results
|
|
"""
|
|
print(f"\n[Library] Processing: {library_id}")
|
|
if source_path:
|
|
print(f" Source: {source_path}")
|
|
|
|
# Ensure library record exists
|
|
result = upsert_library(library_id, name, description, source_path)
|
|
print(f" [{result.get('success', False)}] Library record: {'created' if not result.get('exists') else 'updated'}")
|
|
|
|
# Get the library folder path
|
|
library_dir = DOCS_PATH / source_path
|
|
|
|
if not library_dir.exists():
|
|
print(f" Error: Directory does not exist: {library_dir}")
|
|
return {"success": False, "error": f"Directory not found: {library_dir}"}
|
|
|
|
# Find all supported files (recursive)
|
|
print(f" [Library] Scanning for files in: {library_dir}")
|
|
doc_files = []
|
|
|
|
for file_path in library_dir.rglob('*'):
|
|
if file_path.is_file():
|
|
suffix = file_path.suffix.lower()
|
|
if suffix == '.pdf':
|
|
doc_files.append(file_path)
|
|
elif suffix in SUPPORTED_EXTENSIONS:
|
|
doc_files.append(file_path)
|
|
|
|
print(f" [Library] Found {len(doc_files)} document(s)")
|
|
|
|
# Prepare the complete replacement before touching the existing index.
|
|
all_chunks = []
|
|
processed_files = 0
|
|
|
|
for file_path in doc_files:
|
|
# Read file content
|
|
print(f" [File] Reading: {file_path.relative_to(library_dir)}")
|
|
content = await read_document_file(file_path)
|
|
|
|
if not content:
|
|
continue
|
|
|
|
# Estimate tokens and chunk
|
|
num_tokens = estimate_tokens(content)
|
|
chunks = chunk_text(content, max_tokens=500, overlap_tokens=80)
|
|
|
|
if not chunks:
|
|
print(f" [File] No valid chunks from {file_path.name}")
|
|
continue
|
|
|
|
# Embed chunks and prepare for storage
|
|
print(f" Chunked into {len(chunks)} pieces (approx. {num_tokens} tokens)")
|
|
|
|
# FastEmbed is synchronous and CPU-heavy; keep it off the API event loop.
|
|
embeddings = await asyncio.to_thread(embed_texts, chunks)
|
|
|
|
# Build chunk dicts
|
|
base_path = file_path.relative_to(library_dir).as_posix()
|
|
|
|
for i, chunk in enumerate(chunks):
|
|
chunk_dict = {
|
|
"id": f"{base_path}:{i}",
|
|
"library_id": library_id,
|
|
"path": base_path,
|
|
"title": Path(base_path).stem,
|
|
"content": chunk,
|
|
"chunk_index": i,
|
|
"embedding": embeddings[i],
|
|
"token_estimate": estimate_tokens(chunk),
|
|
}
|
|
all_chunks.append(chunk_dict)
|
|
|
|
processed_files += 1
|
|
|
|
print(f" [Library] Processed {processed_files} file(s), {len(all_chunks)} total chunks")
|
|
|
|
if doc_files and not all_chunks:
|
|
error = "No document chunks were produced; keeping the existing index"
|
|
print(f" [Library] {error}")
|
|
return {
|
|
"success": False,
|
|
"library_id": library_id,
|
|
"files_processed": processed_files,
|
|
"chunks_created": 0,
|
|
"vectors_added": 0,
|
|
"error": error,
|
|
}
|
|
|
|
# Update vectors first. If this fails, the previous SQLite index remains usable.
|
|
if all_chunks:
|
|
upsert_result = await upsert_chunks(all_chunks)
|
|
print(f" [Library] Vector store: {upsert_result.get('success', False)} ({upsert_result.get('points_added', 0)} added)")
|
|
if not upsert_result.get("success"):
|
|
return {
|
|
"success": False,
|
|
"library_id": library_id,
|
|
"files_processed": processed_files,
|
|
"chunks_created": len(all_chunks),
|
|
"vectors_added": 0,
|
|
"error": upsert_result.get("error", "Vector store update failed"),
|
|
}
|
|
else:
|
|
print(f" [Library] No vectors to add to Qdrant")
|
|
upsert_result = {"success": True, "points_added": 0}
|
|
|
|
# Replace SQLite rows in one transaction only after preparation succeeds.
|
|
replace_result = replace_library_documents(library_id, all_chunks)
|
|
if not replace_result.get("success"):
|
|
print(f" [Library] SQLite replacement failed: {replace_result.get('error')}")
|
|
return {
|
|
"success": False,
|
|
"library_id": library_id,
|
|
"files_processed": processed_files,
|
|
"chunks_created": len(all_chunks),
|
|
"vectors_added": upsert_result.get("points_added", 0),
|
|
"error": replace_result.get("error", "SQLite replacement failed"),
|
|
}
|
|
|
|
print(
|
|
f" [Library] Replaced {replace_result.get('deleted', 0)} old chunks "
|
|
f"with {replace_result.get('inserted', 0)} new chunks"
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"library_id": library_id,
|
|
"files_processed": processed_files,
|
|
"chunks_created": len(all_chunks),
|
|
"vectors_added": upsert_result.get("points_added", 0),
|
|
}
|
|
|
|
|
|
async def ingest_git_source_from_config(
|
|
repo_url: str,
|
|
branch: str = "main",
|
|
include_paths: Optional[List[str]] = None,
|
|
exclude_paths: Optional[List[str]] = None,
|
|
repos_base: Optional[Path] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Ingest a git repository defined in sources configuration.
|
|
|
|
Args:
|
|
repo_url: Git repository URL to clone from
|
|
branch: Branch to checkout (default: main)
|
|
include_paths: Paths relative to repo root to include (if None, all dirs considered)
|
|
exclude_paths: Paths relative to repo root to exclude
|
|
repos_base: Base directory for cloned repos (defaults to ./data/repos)
|
|
|
|
Returns:
|
|
Dict with operation result
|
|
|
|
Raises:
|
|
GitCloneError: If git operations fail
|
|
"""
|
|
# Auto-generate library_id from URL if not provided
|
|
import urllib.parse
|
|
parsed = urllib.parse.urlparse(repo_url)
|
|
path_part = parsed.path.rstrip('.git')
|
|
library_id = Path(path_part).name or "unknown"
|
|
|
|
name = Path(parsed.hostname or path_part).stem
|
|
description = f"Documentation from {path_part}"
|
|
|
|
result = await ingest_git_source(
|
|
library_id=library_id,
|
|
name=name,
|
|
description=description,
|
|
repo_url=repo_url,
|
|
branch=branch,
|
|
include_paths=include_paths,
|
|
exclude_paths=exclude_paths,
|
|
repos_base=repos_base
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
async def detect_libraries() -> List[Dict[str, Any]]:
|
|
"""
|
|
Detect all top-level folders under DOCS_PATH as libraries.
|
|
|
|
Returns:
|
|
List of dicts with library metadata
|
|
"""
|
|
print(f"\n[Detection] Scanning for libraries in: {DOCS_PATH}")
|
|
|
|
if not DOCS_PATH.exists():
|
|
print(f" [Detection] Directory does not exist: {DOCS_PATH}")
|
|
return []
|
|
|
|
# Get top-level directories
|
|
directories = list(DOCS_PATH.iterdir())
|
|
dirs_only = [d for d in directories if d.is_dir()]
|
|
|
|
libraries = []
|
|
for i, lib_dir in enumerate(dirs_only, 1):
|
|
name = lib_dir.name
|
|
|
|
# Create library record with defaults
|
|
result = upsert_library(
|
|
library_id=lib_dir.name.lower(),
|
|
name=name,
|
|
description=None,
|
|
source_path=lib_dir.name
|
|
)
|
|
|
|
libraries.append({
|
|
"id": lib_dir.name.lower(),
|
|
"name": name,
|
|
"source_path": lib_dir.name
|
|
})
|
|
|
|
print(f" [{i}/{len(dirs_only)}] Library detected: {name} (id: {lib_dir.name.lower()})")
|
|
|
|
print(f"\n[Detection] Found {len(libraries)} library(ies)")
|
|
return libraries
|
|
|
|
|
|
async def ingest_all(verbose: bool = True) -> Dict[str, Any]:
|
|
"""
|
|
Ingest all discovered libraries.
|
|
|
|
Args:
|
|
verbose: Whether to print progress messages
|
|
|
|
Returns:
|
|
Summary dict with overall results
|
|
"""
|
|
if verbose:
|
|
print("\n" + "=" * 60)
|
|
print("DOCUMENT INGESTION STARTED")
|
|
print("=" * 60)
|
|
|
|
# Detect libraries
|
|
libraries = await detect_libraries()
|
|
|
|
if not libraries:
|
|
result = {"total_libraries": 0, "total_chunks": 0, "successful": []}
|
|
if verbose:
|
|
print("\n[Summary] No libraries to ingest")
|
|
return result
|
|
|
|
# Ingest each library
|
|
results = []
|
|
for lib in libraries:
|
|
lib_id = lib["id"]
|
|
|
|
result = await ingest_library(
|
|
library_id=lib_id,
|
|
name=lib["name"],
|
|
description=None,
|
|
source_path=lib.get("source_path")
|
|
)
|
|
|
|
if verbose and result.get('success'):
|
|
print(f" [Library] Done: {result.get('library_id')} - {result.get('chunks_created', 0)} chunks")
|
|
|
|
results.append(result)
|
|
|
|
# Calculate totals
|
|
total_chunks = sum(r.get('chunks_created', 0) for r in results)
|
|
successful = len([r for r in results if r.get('success')])
|
|
|
|
result = {
|
|
"total_libraries": len(libraries),
|
|
"successful": successful,
|
|
"failed": len(results) - successful,
|
|
"total_chunks": total_chunks
|
|
}
|
|
|
|
if verbose:
|
|
print("\n" + "=" * 60)
|
|
print("INGESTION COMPLETE")
|
|
print("=" * 60)
|
|
print(f" Libraries processed: {result['total_libraries']}")
|
|
print(f" Successful: {result['successful']}")
|
|
print(f" Failed: {result['failed']}")
|
|
print(f" Total chunks created: {result['total_chunks']}")
|
|
|
|
return result
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Run ingestion tests
|
|
import asyncio
|
|
|
|
async def test_run():
|
|
print("Testing ingestion module...\n")
|
|
|
|
# Test detect_libraries
|
|
libs = await detect_libraries()
|
|
print(f"\nDetected libraries: {len(libs)}")
|
|
|
|
if libs:
|
|
# Try to ingest the first library (may fail if no docs exist, which is ok for test)
|
|
print("\nAttempting sample ingestion...")
|
|
result = await ingest_library(
|
|
library_id=libs[0]["id"],
|
|
name=libs[0]["name"],
|
|
source_path=libs[0].get("source_path")
|
|
)
|
|
print(f"Result: {result}")
|
|
|
|
print("\n✅ Tests completed!")
|
|
|
|
asyncio.run(test_run())
|