"""WebUI Views for Context7 Docs using Jinja2 templates."""
import os
import json
from pathlib import Path
from typing import Any, Optional
from fastapi import Request
from fastapi.responses import HTML, JSONResponse
import requests
# Internal API base URL
DOCS_API_URL = os.environ.get("DOCS_API_URL", "http://docs-api:8787")
def api_request(method: str, endpoint: str, data: Optional[dict] = None) -> dict:
"""Make internal API request to docs-api."""
url = f"{DOCS_API_URL}{endpoint}"
headers = {}
if os.environ.get("WEBUI_API_KEY"):
headers["X-API-Key"] = os.environ.get("WEBUI_API_KEY")
resp = requests.request(method, url, headers=headers, json=data)
return resp.json()
def navbar_html(current: str) -> str:
"""Generate navigation bar HTML."""
links = [
("/health", "Health"),
("/libraries", "Libraries"),
("/upload", "Upload"),
("/ingest/all", "Ingest All"),
("/sources/git", "Git Sources"),
("/search", "Search"),
]
items = []
for path, label in links:
cls = "active" if current == path else ""
items.append(f'{label} ')
return f"""
{' '.join(items)}
""".strip()
def footer_html() -> str:
"""Generate footer HTML."""
return ""
def health(request: Request) -> HTML:
"""System health dashboard."""
try:
data = api_request("GET", "/health")
status = data.get("status", "unknown")
service = data.get("service", "Service")
except Exception as e:
status = "error"
service = str(e)
return HTML(f"""
Context7 Docs - Health
Context7 Docs UI {navbar_html("/health")}
System Health
{service}
Status: {status}
{footer_html()}
""", media_type="text/html")
def libraries(request: Request) -> HTML:
"""List all libraries."""
try:
data = api_request("GET", "/libraries")
libs = data.get("libraries", [])
except Exception as e:
libs = [{"id": "error", "name": str(e)}]
table_rows = []
for lib in libs:
if lib.get("id") != "error":
table_rows.append(
f"""{lib.get('id')}
{lib.get('name', '')}
{lib.get('description', '') or '(no description)'}
View Docs """
)
return HTML(f"""
Context7 Docs - Libraries
Context7 Docs UI {navbar_html("/libraries")}
Libraries ({len(libs)})
ID Name Description Actions
{"".join(table_rows)}
{footer_html()}
""", media_type="text/html")
def upload(request: Request) -> HTML:
"""File upload form."""
if "file" in request.files:
uploaded_file = request.files["file"]
try:
content = uploaded_file.read().decode("utf-8")[:5000]
# Escape HTML
safe_content = content.replace("&", "&").replace("<", "<").replace(">", ">")
truncated = safe_content[:1000] + "..." if len(safe_content) > 1000 else safe_content
return HTML(f"""
Context7 Docs - Upload
Context7 Docs UI {navbar_html("/upload")}
Upload Complete!
{truncated}
{footer_html()}
""", media_type="text/html")
except Exception:
return HTML(f"""
Context7 Docs - Upload
Context7 Docs UI {navbar_html("/upload")}
File too large!
Please upload smaller text files (limit: ~5MB).
{footer_html()}
""", media_type="text/html")
else:
return HTML(f"""
Context7 Docs - Upload
Context7 Docs UI {navbar_html("/upload")}
Upload Documentation Files
Supported formats: .txt, .md, .json, .py, .js, .html, .css, .yaml
{footer_html()}
""", media_type="text/html")
def ingest_all(request: Request) -> JSONResponse:
"""Trigger ingestion for all libraries."""
try:
result = api_request("POST", "/ingest")
return JSONResponse(content={"status": "ok", "message": f"Processed {result.get('chunks', 0)} chunks"})
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
def ingest_library(request: Request, library_id: str) -> HTML:
"""Ingest for specific library."""
if "content" in request.form:
content = request.form.get("content")[:10000]
safe_content = content.replace("&", "&").replace("<", "<").replace(">", ">")
return HTML(f"""
Context7 Docs - Ingest
Context7 Docs UI {navbar_html("/ingest/{library_id}")}
Ingest for Library: {library_id}
{footer_html()}
""", media_type="text/html")
else:
try:
result = api_request("POST", f"/ingest/{library_id}")
safe_msg = result.get('message', '') or ''
safe_json = json.dumps(result, indent=2).replace("&", "&").replace("<", "<").replace(">", ">")
return HTML(f"""
Context7 Docs - Ingest Result
Context7 Docs UI {navbar_html("/ingest/{library_id}")}
Ingestion Complete!
{safe_msg}
{safe_json}
← Back to Libraries
{footer_html()}
""", media_type="text/html")
except Exception as e:
safe_error = str(e).replace("&", "&").replace("<", "<").replace(">", ">")
return HTML(f"""
Context7 Docs - Error
Context7 Docs UI {navbar_html("/ingest/{library_id}")}
Error
{safe_error}
{footer_html()}
""", media_type="text/html")
async def folders_create(request: Request) -> JSONResponse:
"""Create a new library folder."""
name = request.form.get("name", "").strip()
try:
from backend.app.db import upsert_library
await upsert_library(library_id=name, name=name, description=None, source_path=f"/docs/{name}")
return JSONResponse(content={"status": "ok", "message": f"Created folder '{name}'"})
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
async def folders_delete(request: Request) -> JSONResponse:
"""Delete a library."""
library_id = request.query_params.get("id", "").strip()
try:
from backend.app.db import delete_library
await delete_library(library_id)
return JSONResponse(content={"status": "ok", "message": f"Deleted library '{library_id}'"})
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
async def ingest_uploaded(request: Request) -> HTML:
"""Ingest uploaded file content."""
content = request.form.get("content", "")[:10000]
library_id = request.form.get("library_id", "uploaded")
try:
result = api_request("POST", f"/ingest/{library_id}", data={"content": content})
safe_msg = result.get('message', '') or ''
safe_json = json.dumps(result, indent=2).replace("&", "&").replace("<", "<").replace(">", ">")
return HTML(f"""
Context7 Docs - Upload Result
Context7 Docs UI {navbar_html("/upload")}
Ingestion Complete!
{safe_msg}
{safe_json}
← Upload Another
{footer_html()}
""", media_type="text/html")
except Exception as e:
safe_error = str(e).replace("&", "&").replace("<", "<").replace(">", ">")
return HTML(f"""
Error
Upload Ingest Error {safe_error} ← Try Again
""", media_type="text/html")
def docs(request: Request, library_id: str, topic: Optional[str] = None, tokens: int = 8000) -> HTML:
"""View docs from a library."""
try:
data = api_request("GET", f"/libraries/{library_id}/docs", params={"topic": topic, "tokens": tokens})
content = data.get("content", "")
except Exception as e:
content = str(e)
safe_content = content.replace("&", "&").replace("<", "<").replace(">", ">")[:10000]
return HTML(f"""
Context7 Docs - Library: {library_id}
Context7 Docs UI {navbar_html("/docs/{}".format(library_id))}
Library: {library_id}
Topic: {topic or '(all)'} | Tokens: {tokens}
{safe_content}
{footer_html()}
""", media_type="text/html")
def search_redirect(request: Request) -> JSONResponse:
"""Redirect to search form."""
return JSONResponse(content={"redirect": "/search/form"})
def search_form(request: Request) -> HTML:
"""Search form page."""
return HTML(f"""
Context7 Docs - Search
Context7 Docs UI {navbar_html("/search")}
Search Docs
Query:
Library (optional):
Limit results:
5
10
20
50
Search
{footer_html()}
""", media_type="text/html")
def search_results(request: Request) -> HTML:
"""Display search results."""
try:
query = request.query_params.get("q", "")
limit = int(request.query_params.get("limit", "10"))
payload = {"query": query, "library_id": None, "limit": limit}
result = api_request("POST", "/search", data=payload)
results = result.get("results", [])
except Exception as e:
return HTML(f"""
Error
Error {str(e)} ← Try Again
""", media_type="text/html")
cards = []
for r in results:
title = r.get("title", "Untitled") or (r.get("content", "")[:100] + "...")[:200]
content = (r.get("content", "") or r.get("chunk", ""))[:500]
cards.append(f"""""")
return HTML(f"""
Context7 Docs - Search Results
Context7 Docs UI {navbar_html("/search")}
Search Results for "{query}"
{len(results)} results found
{''.join(cards)}
← New Search
{footer_html()}
""", media_type="text/html")
def sync_sources(request: Request) -> HTML:
"""Sync git sources."""
if request.method == "POST":
try:
data = api_request("POST", "/sources/sync")
safe_json = json.dumps(data, indent=2).replace("&", "&").replace("<", "<").replace(">", ">")
return HTML(f"""
Sync Result
Context7 Docs UI {navbar_html("/sync/sources")}
Git Sync Complete! {safe_json}
Sync Again
{footer_html()}
""", media_type="text/html")
except Exception as e:
safe_error = str(e).replace("&", "&").replace("<", "<").replace(">", ">")
return HTML(f"""
Error
Sync Error {safe_error} ← Try Again
""", media_type="text/html")
else:
try:
data = api_request("GET", "/libraries")
libs = [l.get("id") for l in data.get("libraries", []) if l.get("id") != "error"]
except Exception:
libs = []
lib_list = ", ".join(libs) if libs else "(none)"
return HTML(f"""
Context7 Docs - Git Sync
""", media_type="text/html")
def git_sources(request: Request) -> HTML:
"""List configured git sources."""
import yaml
config_path = Path(__file__).parent.parent.parent / "docs_sources.yaml"
try:
with open(config_path) as f:
data = yaml.safe_load(f)
sources = data.get("sources", [])
source_blocks = []
for src in sources:
url = src.get("repo_url", "")[:50] + "..." if len(src.get("repo_url", "")) > 50 else src.get("repo_url", "")
branch = src.get("branch", "main")
include = src.get("include_paths", ["*"])
exclude = src.get("exclude_paths", [])
source_blocks.append(f"""
{src.get('library_id', 'unknown')}
URL: {url}
Branch: {branch}
Include: {', '.join(include)}{' | Exclude: ' + ', '.join(exclude) if exclude else ''}
""")
return HTML(f"""
Context7 Docs - Git Sources
Context7 Docs UI {navbar_html("/sources/git")}
Configured Git Sources ({len(sources)})
{''.join(source_blocks)}
{footer_html()}
""", media_type="text/html")
except Exception as e:
safe_error = str(e).replace("&", "&").replace("<", "<").replace(">", ">")
return HTML(f"""
Error
Git Sources Error {safe_error}
""", media_type="text/html")
def logs(request: Request) -> HTML:
"""Logs/status page."""
return HTML(f"""
Context7 Docs - Logs
Context7 Docs UI {navbar_html("/logs")}
Status Messages
Docs API: {DOCS_API_URL}
Qdrant Health: healthy | MCP OK: yes
Logs are printed to container stdout/stderr. For full logs, inspect Docker containers directly.
{footer_html()}
""", media_type="text/html")
# Register all routes
__all__ = [
"health", "libraries", "upload", "ingest_all", "ingest_library",
"folders_create", "folders_delete", "docs", "search_redirect",
"search_form", "search_results", "sync_sources", "git_sources", "logs"
]