385 lines
11 KiB
Python
385 lines
11 KiB
Python
# SQLite Database Layer for local-context7
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
from typing import List, Dict, Any, Optional
|
|
from .config import settings
|
|
|
|
try:
|
|
from qdrant_client import QdrantClient
|
|
except ImportError:
|
|
QdrantClient = None
|
|
|
|
|
|
def get_db_path() -> Path:
|
|
"""Get the database path."""
|
|
return Path(settings.db_path)
|
|
|
|
|
|
def ensure_db_dir():
|
|
"""Ensure the data directory for SQLite exists (idempotent)."""
|
|
db_path = get_db_path()
|
|
db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
# Initialize DB directory at module load time (safe to run multiple times)
|
|
ensure_db_dir()
|
|
|
|
|
|
def get_connection():
|
|
"""
|
|
Get a database connection configured to return dictionaries.
|
|
|
|
Returns:
|
|
sqlite3.Connection with row_factory set to dict
|
|
"""
|
|
conn = sqlite3.connect(str(get_db_path()))
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def init_db():
|
|
"""
|
|
Initialize the SQLite database by creating tables.
|
|
|
|
Creates:
|
|
- libraries table (id, name, description, source_path, created_at, updated_at)
|
|
- documents table (id, library_id, path, title, content, chunk_index, token_estimate, created_at)
|
|
"""
|
|
conn = get_connection()
|
|
|
|
try:
|
|
# Enable legacy mode for easier schema handling
|
|
conn.execute("PRAGMA legacy_alter_table = ON")
|
|
|
|
# Create libraries table
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS libraries (
|
|
id TEXT PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
description TEXT,
|
|
source_path TEXT NOT NULL,
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
)
|
|
""")
|
|
|
|
# Create documents table
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS documents (
|
|
id TEXT PRIMARY KEY,
|
|
library_id TEXT NOT NULL,
|
|
path TEXT NOT NULL,
|
|
title TEXT,
|
|
content TEXT,
|
|
chunk_index INTEGER,
|
|
token_estimate INTEGER,
|
|
created_at TEXT NOT NULL,
|
|
FOREIGN KEY (library_id) REFERENCES libraries(id) ON DELETE CASCADE
|
|
)
|
|
""")
|
|
|
|
# Create indexes for better query performance
|
|
conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_documents_library_id ON documents(library_id)
|
|
""")
|
|
conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_libraries_updated_at ON libraries(updated_at)
|
|
""")
|
|
|
|
conn.commit()
|
|
return {"success": True}
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
return {"success": False, "error": str(e)}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def upsert_library(
|
|
library_id: str,
|
|
name: str,
|
|
description: Optional[str] = None,
|
|
source_path: str = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Insert or update a library record.
|
|
|
|
Args:
|
|
library_id: Unique identifier for the library
|
|
name: Library name
|
|
description: Optional description
|
|
source_path: Path to library source files
|
|
|
|
Returns:
|
|
Dict with success status and operation details
|
|
"""
|
|
conn = get_connection()
|
|
|
|
try:
|
|
now = datetime.utcnow().isoformat()
|
|
|
|
source_path = source_path or library_id
|
|
|
|
# Check if library exists
|
|
cursor = conn.execute("SELECT id FROM libraries WHERE id = ?", (library_id,))
|
|
exists = cursor.fetchone() is not None
|
|
|
|
if exists:
|
|
# Update existing library
|
|
conn.execute("""
|
|
UPDATE libraries SET
|
|
name = ?, description = ?, source_path = ?, updated_at = ?
|
|
WHERE id = ?
|
|
""", (name, description, source_path, now, library_id))
|
|
else:
|
|
# Insert new library
|
|
conn.execute("""
|
|
INSERT INTO libraries (id, name, description, source_path, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""", (library_id, name, description, source_path, now, now))
|
|
|
|
conn.commit()
|
|
return {"success": True, "id": library_id, "exists": exists}
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
return {"success": False, "error": str(e)}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def insert_document_chunk(
|
|
doc_id: str,
|
|
library_id: str,
|
|
path: str,
|
|
title: Optional[str] = None,
|
|
content: str = None,
|
|
chunk_index: int = None,
|
|
token_estimate: int = 0,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Insert or update a document chunk record.
|
|
|
|
Args:
|
|
doc_id: Unique identifier for this chunk
|
|
library_id: Foreign key to libraries table
|
|
path: Relative file path within the library
|
|
title: Optional document title
|
|
content: Full text content of the chunk
|
|
chunk_index: Index within the full document (NULL if not chunked)
|
|
token_estimate: Estimated token count
|
|
|
|
Returns:
|
|
Dict with success status and operation details
|
|
"""
|
|
conn = get_connection()
|
|
|
|
try:
|
|
now = datetime.utcnow().isoformat()
|
|
|
|
# Check if document chunk exists
|
|
cursor = conn.execute(
|
|
"SELECT id FROM documents WHERE id = ?", (doc_id,)
|
|
)
|
|
exists = cursor.fetchone() is not None
|
|
|
|
if exists:
|
|
conn.execute(
|
|
"""
|
|
UPDATE documents
|
|
SET library_id = ?, path = ?, title = ?, content = ?,
|
|
chunk_index = ?, token_estimate = ?, created_at = ?
|
|
WHERE id = ?
|
|
""",
|
|
(library_id, path, title, content, chunk_index, token_estimate or 0, now, doc_id),
|
|
)
|
|
else:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO documents
|
|
(id, library_id, path, title, content, chunk_index, token_estimate, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(doc_id, library_id, path, title, content, chunk_index, token_estimate or 0, now),
|
|
)
|
|
|
|
conn.commit()
|
|
|
|
return {"success": True, "id": doc_id, "exists": exists}
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
return {"success": False, "error": str(e)}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def clear_library_documents(library_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Delete all document chunks for a library.
|
|
|
|
Args:
|
|
library_id: The library to clear
|
|
|
|
Returns:
|
|
Dict with success status and deleted count
|
|
"""
|
|
conn = get_connection()
|
|
|
|
try:
|
|
cursor = conn.execute(
|
|
"DELETE FROM documents WHERE library_id = ?", (library_id,)
|
|
)
|
|
deleted = cursor.rowcount
|
|
|
|
conn.commit()
|
|
|
|
return {"success": True, "deleted": deleted, "library_id": library_id}
|
|
|
|
except Exception as e:
|
|
conn.rollback()
|
|
return {"success": False, "error": str(e)}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def delete_library(library_id: str) -> Dict[str, Any]:
|
|
"""Delete a library row and its document chunks."""
|
|
conn = get_connection()
|
|
|
|
try:
|
|
conn.execute("DELETE FROM documents WHERE library_id = ?", (library_id,))
|
|
cursor = conn.execute("DELETE FROM libraries WHERE id = ?", (library_id,))
|
|
conn.commit()
|
|
return {"success": True, "deleted": cursor.rowcount, "library_id": library_id}
|
|
except Exception as e:
|
|
conn.rollback()
|
|
return {"success": False, "error": str(e)}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def list_libraries() -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all libraries.
|
|
|
|
Returns:
|
|
List of dictionaries containing library records
|
|
"""
|
|
conn = get_connection()
|
|
|
|
try:
|
|
cursor = conn.execute("SELECT * FROM libraries ORDER BY updated_at DESC")
|
|
|
|
# Convert to list of dicts
|
|
columns = [col[0] for col in cursor.description]
|
|
result = []
|
|
for row in cursor:
|
|
result.append(dict(zip(columns, row)))
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
return {"success": False, "error": str(e)}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def search_libraries(query: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Search libraries by name or description using full-text search.
|
|
|
|
Args:
|
|
query: Search query string
|
|
|
|
Returns:
|
|
List of matching library dictionaries (empty if none found)
|
|
"""
|
|
conn = get_connection()
|
|
|
|
try:
|
|
like_query = f"%{query}%"
|
|
cursor = conn.execute("""
|
|
SELECT * FROM libraries
|
|
WHERE lower(id) LIKE lower(?)
|
|
OR lower(name) LIKE lower(?)
|
|
OR lower(coalesce(description, '')) LIKE lower(?)
|
|
ORDER BY updated_at DESC
|
|
""", (like_query, like_query, like_query))
|
|
|
|
# Convert to list of dicts
|
|
columns = [col[0] for col in cursor.description]
|
|
result = []
|
|
for row in cursor:
|
|
result.append(dict(zip(columns, row)))
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
return {"success": False, "error": str(e)}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_document_by_id(doc_id: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get a single document by its ID.
|
|
|
|
Args:
|
|
doc_id: The document ID to fetch
|
|
|
|
Returns:
|
|
Dictionary with document data or None if not found
|
|
"""
|
|
conn = get_connection()
|
|
|
|
try:
|
|
cursor = conn.execute("SELECT * FROM documents WHERE id = ?", (doc_id,))
|
|
row = cursor.fetchone()
|
|
|
|
if row is None:
|
|
return None
|
|
|
|
# Convert to dict manually for consistency
|
|
columns = [col[0] for col in cursor.description]
|
|
return dict(zip(columns, row))
|
|
|
|
except Exception as e:
|
|
return {"success": False, "error": str(e)}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_chunks_for_library(library_id: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all document chunks for a library.
|
|
|
|
Args:
|
|
library_id: The library ID to fetch chunks for
|
|
|
|
Returns:
|
|
List of dictionaries containing chunk records
|
|
"""
|
|
conn = get_connection()
|
|
|
|
try:
|
|
cursor = conn.execute(
|
|
"SELECT * FROM documents WHERE library_id = ? ORDER BY chunk_index DESC",
|
|
(library_id,)
|
|
)
|
|
|
|
# Convert to list of dicts
|
|
columns = [col[0] for col in cursor.description]
|
|
result = []
|
|
for row in cursor:
|
|
result.append(dict(zip(columns, row)))
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
return {"success": False, "error": str(e)}
|
|
finally:
|
|
conn.close()
|