# SQLite Database Layer for local-context7 import sqlite3 from pathlib import Path from datetime import datetime, timezone from typing import List, Dict, Any, Optional from .config import settings try: from qdrant_client import QdrantClient except ImportError: QdrantClient = None def get_db_path() -> Path: """Get the database path.""" return Path(settings.db_path) def ensure_db_dir(): """Ensure the data directory for SQLite exists (idempotent).""" db_path = get_db_path() db_path.parent.mkdir(parents=True, exist_ok=True) # Initialize DB directory at module load time (safe to run multiple times) ensure_db_dir() def get_connection(): """ Get a database connection configured to return dictionaries. Returns: sqlite3.Connection with row_factory set to dict """ conn = sqlite3.connect(str(get_db_path())) conn.row_factory = sqlite3.Row return conn def init_db(): """ Initialize the SQLite database by creating tables. Creates: - libraries table (id, name, description, source_path, created_at, updated_at) - documents table (id, library_id, path, title, content, chunk_index, token_estimate, created_at) """ conn = get_connection() try: # Enable legacy mode for easier schema handling conn.execute("PRAGMA legacy_alter_table = ON") # Create libraries table conn.execute(""" CREATE TABLE IF NOT EXISTS libraries ( id TEXT PRIMARY KEY, name TEXT NOT NULL, description TEXT, source_path TEXT NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """) # Create documents table conn.execute(""" CREATE TABLE IF NOT EXISTS documents ( id TEXT PRIMARY KEY, library_id TEXT NOT NULL, path TEXT NOT NULL, title TEXT, content TEXT, chunk_index INTEGER, token_estimate INTEGER, created_at TEXT NOT NULL, FOREIGN KEY (library_id) REFERENCES libraries(id) ON DELETE CASCADE ) """) # Create indexes for better query performance conn.execute(""" CREATE INDEX IF NOT EXISTS idx_documents_library_id ON documents(library_id) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_libraries_updated_at ON libraries(updated_at) """) conn.commit() return {"success": True} except Exception as e: conn.rollback() return {"success": False, "error": str(e)} finally: conn.close() def upsert_library( library_id: str, name: str, description: Optional[str] = None, source_path: str = None ) -> Dict[str, Any]: """ Insert or update a library record. Args: library_id: Unique identifier for the library name: Library name description: Optional description source_path: Path to library source files Returns: Dict with success status and operation details """ conn = get_connection() try: now = datetime.utcnow().isoformat() source_path = source_path or library_id # Check if library exists cursor = conn.execute("SELECT id FROM libraries WHERE id = ?", (library_id,)) exists = cursor.fetchone() is not None if exists: # Update existing library conn.execute(""" UPDATE libraries SET name = ?, description = ?, source_path = ?, updated_at = ? WHERE id = ? """, (name, description, source_path, now, library_id)) else: # Insert new library conn.execute(""" INSERT INTO libraries (id, name, description, source_path, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?) """, (library_id, name, description, source_path, now, now)) conn.commit() return {"success": True, "id": library_id, "exists": exists} except Exception as e: conn.rollback() return {"success": False, "error": str(e)} finally: conn.close() def insert_document_chunk( doc_id: str, library_id: str, path: str, title: Optional[str] = None, content: str = None, chunk_index: int = None, token_estimate: int = 0, ) -> Dict[str, Any]: """ Insert or update a document chunk record. Args: doc_id: Unique identifier for this chunk library_id: Foreign key to libraries table path: Relative file path within the library title: Optional document title content: Full text content of the chunk chunk_index: Index within the full document (NULL if not chunked) token_estimate: Estimated token count Returns: Dict with success status and operation details """ conn = get_connection() try: now = datetime.utcnow().isoformat() # Check if document chunk exists cursor = conn.execute( "SELECT id FROM documents WHERE id = ?", (doc_id,) ) exists = cursor.fetchone() is not None if exists: conn.execute( """ UPDATE documents SET library_id = ?, path = ?, title = ?, content = ?, chunk_index = ?, token_estimate = ?, created_at = ? WHERE id = ? """, (library_id, path, title, content, chunk_index, token_estimate or 0, now, doc_id), ) else: conn.execute( """ INSERT INTO documents (id, library_id, path, title, content, chunk_index, token_estimate, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, (doc_id, library_id, path, title, content, chunk_index, token_estimate or 0, now), ) conn.commit() return {"success": True, "id": doc_id, "exists": exists} except Exception as e: conn.rollback() return {"success": False, "error": str(e)} finally: conn.close() def clear_library_documents(library_id: str) -> Dict[str, Any]: """ Delete all document chunks for a library. Args: library_id: The library to clear Returns: Dict with success status and deleted count """ conn = get_connection() try: cursor = conn.execute( "DELETE FROM documents WHERE library_id = ?", (library_id,) ) deleted = cursor.rowcount conn.commit() return {"success": True, "deleted": deleted, "library_id": library_id} except Exception as e: conn.rollback() return {"success": False, "error": str(e)} finally: conn.close() def delete_library(library_id: str) -> Dict[str, Any]: """Delete a library row and its document chunks.""" conn = get_connection() try: conn.execute("DELETE FROM documents WHERE library_id = ?", (library_id,)) cursor = conn.execute("DELETE FROM libraries WHERE id = ?", (library_id,)) conn.commit() return {"success": True, "deleted": cursor.rowcount, "library_id": library_id} except Exception as e: conn.rollback() return {"success": False, "error": str(e)} finally: conn.close() def list_libraries() -> List[Dict[str, Any]]: """ Get all libraries. Returns: List of dictionaries containing library records """ conn = get_connection() try: cursor = conn.execute("SELECT * FROM libraries ORDER BY updated_at DESC") # Convert to list of dicts columns = [col[0] for col in cursor.description] result = [] for row in cursor: result.append(dict(zip(columns, row))) return result except Exception as e: return {"success": False, "error": str(e)} finally: conn.close() def search_libraries(query: str) -> List[Dict[str, Any]]: """ Search libraries by name or description using full-text search. Args: query: Search query string Returns: List of matching library dictionaries (empty if none found) """ conn = get_connection() try: like_query = f"%{query}%" cursor = conn.execute(""" SELECT * FROM libraries WHERE lower(id) LIKE lower(?) OR lower(name) LIKE lower(?) OR lower(coalesce(description, '')) LIKE lower(?) ORDER BY updated_at DESC """, (like_query, like_query, like_query)) # Convert to list of dicts columns = [col[0] for col in cursor.description] result = [] for row in cursor: result.append(dict(zip(columns, row))) return result except Exception as e: return {"success": False, "error": str(e)} finally: conn.close() def get_document_by_id(doc_id: str) -> Optional[Dict[str, Any]]: """ Get a single document by its ID. Args: doc_id: The document ID to fetch Returns: Dictionary with document data or None if not found """ conn = get_connection() try: cursor = conn.execute("SELECT * FROM documents WHERE id = ?", (doc_id,)) row = cursor.fetchone() if row is None: return None # Convert to dict manually for consistency columns = [col[0] for col in cursor.description] return dict(zip(columns, row)) except Exception as e: return {"success": False, "error": str(e)} finally: conn.close() def get_chunks_for_library(library_id: str) -> List[Dict[str, Any]]: """ Get all document chunks for a library. Args: library_id: The library ID to fetch chunks for Returns: List of dictionaries containing chunk records """ conn = get_connection() try: cursor = conn.execute( "SELECT * FROM documents WHERE library_id = ? ORDER BY chunk_index DESC", (library_id,) ) # Convert to list of dicts columns = [col[0] for col in cursor.description] result = [] for row in cursor: result.append(dict(zip(columns, row))) return result except Exception as e: return {"success": False, "error": str(e)} finally: conn.close()