# Git Source Operations for Repository Cloning and File Discovery import os import shutil from subprocess import run from pathlib import Path from typing import List, Optional, Dict, Any def get_repos_dir() -> Path: """Get the base directory for storing cloned repositories.""" # Default to ./data/repos in project root return Path(__file__).parent.parent.parent / "data" / "repos" def ensure_repos_dir(): """Ensure the repos directory exists (idempotent).""" repos_dir = get_repos_dir() repos_dir.mkdir(parents=True, exist_ok=True) return repos_dir # Initialize repos directory at module load time (safe to run multiple times) ensure_repos_dir() class GitCloneError(Exception): """Exception for git clone/checkout failures.""" pass def run_git(command: List[str]) -> None: result = run(command, capture_output=True, text=True) if result.returncode != 0: error = (result.stderr or result.stdout or "unknown git error").strip() raise GitCloneError(error) def clone_or_update_repo( repo_id: str, repo_url: str, branch: str, repos_base: Optional[Path] = None ) -> Dict[str, Any]: """ Clone a git repository or update an existing clone. Args: repo_id: Unique identifier for this repository (used in paths) repo_url: Git URL to clone from branch: Branch name to checkout repos_base: Base directory for repos (defaults to get_repos_dir()) Returns: Dict with operation result including repo path and files found Raises: GitCloneError: If clone or checkout fails """ repos_base = repos_base or get_repos_dir() repo_path = repos_base / repo_id try: if repo_path.exists(): # Update existing clone print(f" [Git] Updating existing clone at {repo_path}") # Fetch latest changes run_git(["git", "-C", str(repo_path), "fetch", "origin"]) # Reset to branch run_git(["git", "-C", str(repo_path), "reset", "--hard", "origin/" + branch]) else: # Clone new repository print(f" [Git] Cloning {repo_url} to {repo_path}") run_git( [ "git", "clone", "--branch", branch, "--single-branch", repo_url, str(repo_path), ] ) print(f" [Git] Checked out branch: {branch}") return { "success": True, "repo_path": str(repo_path), "url": repo_url, "branch": branch } except GitCloneError: raise except Exception as e: raise GitCloneError(f"Failed to clone/update repo: {e}") from e def discover_files( repo_path: Path, include_paths: Optional[List[str]] = None, exclude_paths: Optional[List[str]] = None ) -> List[Dict[str, Any]]: """ Discover files in a git repository respecting include/exclude paths. Args: repo_path: Path to the cloned repository include_paths: List of paths relative to repo root to include (if None, all dirs considered) exclude_paths: List of paths relative to repo root to exclude Returns: List of dicts with format: { "path": "docs/hooks.md", # Relative to repo root "full_path": "/full/path/to/repo/docs/hooks.md" } """ include_patterns = None if include_paths is None else [ Path(p) for p in include_paths ] exclude_patterns = set() if exclude_paths is None else { Path(p) for p in exclude_paths } discovered = [] def should_exclude(rel_path: Path) -> bool: rel_str = str(rel_path).replace("\\", "/") for exc_pattern in exclude_patterns: exc_str = str(exc_pattern).replace("\\", "/") if rel_str == exc_str or rel_str.startswith(exc_str + "/"): return True return False def file_under_include(rel_path: Path) -> bool: """True if the file is inside an include path.""" if not include_patterns: return True rel_str = str(rel_path).replace("\\", "/") for inc_pattern in include_patterns: inc_str = str(inc_pattern).replace("\\", "/") if rel_str == inc_str or rel_str.startswith(inc_str + "/"): return True return False def dir_may_contain_includes(rel_path: Path) -> bool: """True if this directory is inside, equal to, or a parent of any include path.""" if not include_patterns: return True rel_str = str(rel_path).replace("\\", "/") for inc_pattern in include_patterns: inc_str = str(inc_pattern).replace("\\", "/") if rel_str == inc_str or rel_str.startswith(inc_str + "/") or inc_str.startswith(rel_str + "/"): return True return False def walk_and_collect(current: Path, rel_prefix: Path): try: for entry in sorted(os.scandir(current), key=lambda e: e.name): entry_path = current / entry.name rel_path = rel_prefix / entry.name if should_exclude(rel_path): continue if entry.is_file(): if file_under_include(rel_path): discovered.append({ "path": str(rel_path).replace("\\", "/"), "full_path": str(entry_path), "is_binary": is_probably_binary(str(entry_path)) }) elif entry.is_dir(): if dir_may_contain_includes(rel_path): walk_and_collect(entry_path, rel_path) except PermissionError: pass def is_probably_binary(filepath: str) -> bool: """Simple binary detection based on file extension and first bytes.""" ext = Path(filepath).suffix.lower() text_extensions = {'.md', '.txt', '.py', '.js', '.ts', '.json', '.yaml', '.yml', '.html', '.css', '.sh', '.sql'} if ext not in text_extensions: # Check for null bytes in first 8KB try: with open(filepath, 'rb') as f: chunk = f.read(8192) return b'\x00' in chunk except: return False return False walk_and_collect(repo_path, Path()) return discovered def browse_repo_tree(repo_path: Path, max_depth: int = 4) -> List[Dict[str, Any]]: """ Return a nested directory tree for browsing a cloned repository. Returns a list of nodes: {"path": "docs", "type": "dir", "children": [...]} {"path": "README.md", "type": "file"} Paths are relative to repo_path. Hidden entries (starting with '.') are skipped. """ SKIP_DIRS = {".git", "node_modules", "__pycache__", ".venv", "venv", ".tox"} def build(current: Path, rel_prefix: Path, depth: int) -> List[Dict[str, Any]]: if depth <= 0: return [] nodes: List[Dict[str, Any]] = [] try: entries = sorted(os.scandir(current), key=lambda e: (not e.is_dir(), e.name.lower())) except PermissionError: return [] for entry in entries: if entry.name.startswith(".") or entry.name in SKIP_DIRS: continue rel_path = rel_prefix / entry.name rel_str = str(rel_path).replace("\\", "/") if entry.is_dir(): children = build(Path(entry.path), rel_path, depth - 1) nodes.append({"path": rel_str, "type": "dir", "children": children}) else: nodes.append({"path": rel_str, "type": "file"}) return nodes return build(repo_path, Path(), max_depth) async def ingest_git_source( library_id: str, name: str, description: Optional[str] = None, repo_url: str = None, branch: str = "main", include_paths: Optional[List[str]] = None, exclude_paths: Optional[List[str]] = None, repos_base: Optional[Path] = None ) -> Dict[str, Any]: """ Ingest a git repository as a new library. Clones the repo (or updates if exists), discovers files in include paths, and ingests them into the vector store via existing pipeline. Args: library_id: Unique identifier for this library name: Library display name description: Optional description repo_url: Git repository URL to clone from branch: Branch to checkout (default: main) include_paths: Paths relative to repo root to include (if None, all dirs considered) exclude_paths: Paths relative to repo root to exclude Returns: Dict with operation result Raises: GitCloneError: If git operations fail """ from .ingest import ingest_library print(f"\n[Git Ingestion] Processing library: {library_id}") print(f" Source: {repo_url or '(local)'}") # Ensure repos directory exists repos_base = repos_base or get_repos_dir() repos_base.mkdir(parents=True, exist_ok=True) repo_id = f"{library_id}-git" # Clone or update the repo clone_result = clone_or_update_repo( repo_id=repo_id, repo_url=repo_url, branch=branch, repos_base=repos_base ) repo_path = Path(clone_result["repo_path"]) print(f" [Git] Found files in {repo_path}") # Discover files respecting include/exclude paths files = discover_files( repo_path=repo_path, include_paths=include_paths, exclude_paths=exclude_paths ) print(f" [Git] Discovered {len(files)} file(s)") if not files: return { "success": True, "library_id": library_id, "message": "No files found matching include/exclude criteria", "files_discovered": 0 } # Stage only the filtered files into DOCS_PATH/library_id so that # ingest_library reads exactly what discover_files selected. from .config import settings docs_dir = Path(settings.docs_path) / library_id if docs_dir.exists(): shutil.rmtree(docs_dir) docs_dir.mkdir(parents=True, exist_ok=True) staged = 0 for file_info in files: if file_info.get("is_binary"): continue src = Path(file_info["full_path"]) dst = docs_dir / file_info["path"] dst.parent.mkdir(parents=True, exist_ok=True) try: shutil.copy2(src, dst) staged += 1 except OSError as exc: print(f" [Git] Warning: could not copy {src}: {exc}") print(f" [Git] Staged {staged} file(s) to {docs_dir}") result = await ingest_library( library_id=library_id, name=name, description=description, source_path=library_id, ) return { "success": result.get("success", False), "library_id": library_id, "name": name, "files_discovered": len(files), "chunks_created": result.get("chunks_created", 0), "vectors_added": result.get("vectors_added", 0) } async def sync_sources( sources_config: Dict[str, Any] = None, repos_base: Optional[Path] = None ) -> List[Dict[str, Any]]: """ Sync all git sources defined in config. Args: sources_config: List of source configs (same format as docs_sources.yaml) repos_base: Base directory for repos Returns: List of results for each source """ if sources_config is None: # Load from default config file import yaml config_path = Path(__file__).parent.parent.parent / "docs_sources.yaml" if not config_path.exists(): return [{"success": False, "error": f"Config not found: {config_path}"}] with open(config_path) as f: data = yaml.safe_load(f) sources_config = data.get("sources", []) results = [] for source in sources_config: try: result = await ingest_git_source( library_id=source.get("library_id"), name=source.get("name"), description=source.get("description"), repo_url=source.get("repo_url"), branch=source.get("branch", "main"), include_paths=source.get("include_paths"), exclude_paths=source.get("exclude_paths"), repos_base=repos_base ) except GitCloneError as e: result = { "success": False, "library_id": source.get("library_id", "unknown"), "error": str(e) } except Exception as e: result = { "success": False, "library_id": source.get("library_id", "unknown"), "error": f"Unexpected error: {e}" } results.append(result) return results