Skip to main content

AI Brief: helvia-rag-pipelines

Python FastAPI service implementing configurable RAG (Retrieval-Augmented Generation) pipelines. Each pipeline has a corpus of documents, performs semantic search against a vector DB, optionally translates queries and responses, and generates answers via an LLM.

What This Repo Does

Manages named pipelines, each backed by a MySQL-stored corpus and a vector DB collection. Clients push corpus updates, trigger training (indexing into the vector DB), and then process queries: translate if needed, summarize the query against history, run semantic search, generate text via LLM, and translate the response back. Also exposes admin endpoints for token management, NLP provider CRUD, and VDB collection inspection.

Tech Stack

  • Language: Python 3.9-3.11
  • Framework: FastAPI 0.95, Uvicorn 0.29
  • Key dependencies: SQLAlchemy 2.0 + PyMySQL (MySQL), pymilvus 2.4.4, qdrant-client 1.11, openai 0.27, google-cloud-translate 3.11, google-generativeai 0.8, redis 4.6, alembic 1.13, PyJWT 2.7, elastic-apm 6.22, ecs-logging, tiktoken

Entry Points

  • Main: app/main.py (FastAPI app object: fast_api_app)
  • Run command: uvicorn app.main:fast_api_app
  • Config: app/config/config.py (reads from environment variables)

Key Directories

DirectoryPurpose
app/routers/FastAPI routers: pipeline_routers.py (pipeline CRUD + process/train), admin_routers.py (tokens, NLP providers, VDB)
app/services/Business logic: pipeline_service.py (train/index/process/search), llm_service.py, semantic_search_service.py, translation_service.py, embedding_cache*.py
app/repositories/SQLAlchemy data access: pipeline_repository.py, models.py (ORM entities), vector_repository_milvus.py, vector_repository_qdrant.py
app/clients/External API clients: nlp_api_client_openai.py, nlp_api_client_azure_openai.py, nlp_api_client_google_genai.py, nlp_api_client_anyscale.py, translation_api_client_*.py, helvia_semcache_client.py
app/parsers/LLM response parsers: JSON and plaintext variants for text generation and query summarization
app/schemas/Pydantic models: pipeline_schemas.py, pipeline_configuration_schemas.py, llm_schemas.py, semantic_search_schemas.py, translation_schemas.py
app/databases/DB manager singletons: relational_db_manager.py (SQLAlchemy engine/session), vector_db_manager.py (selects Milvus or Qdrant at startup)
app/auth/JWT bearer authentication (jwt_bearer.py)
app/utils/Logger, token counting (tiktoken), logprob utilities, sparse list
alembic/Database migration scripts
tests/pytest tests covering pipeline CRUD, processing, training, corpus updates, caching, parsers

API Surface

Pipeline CRUD (/pipelines):

  • POST /pipelines — create pipeline (admin)
  • GET /pipelines — list pipelines with pagination (admin)
  • GET /pipelines/{pipeline_id} — get pipeline; ?show_doc_details=true includes VDB counts (admin/client)
  • PATCH /pipelines/{pipeline_id} — update pipeline (admin/client)
  • DELETE /pipelines/{pipeline_id} — delete pipeline (admin/client)

Corpus management:

  • PUT /pipelines/{pipeline_id}/corpus — full corpus update: inserts/updates/deletes items diffed against existing (admin/client)
  • GET /pipelines/{pipeline_id}/corpus — paginated corpus listing (admin/client)

Pipeline operations:

  • POST /pipelines/{pipeline_id}:process — process a query (translate, semantic search, LLM generate) (admin/client)
  • POST /pipelines/{pipeline_id}:search — semantic search only, no LLM generation (admin/client)
  • POST /pipelines/{pipeline_id}:train — trigger full train (index corpus into vector DB) as background task (admin/client)
  • POST /pipelines/{pipeline_id}:index — trigger indexing only, no status change to READY (admin/client)

Analytics:

  • GET /pipelines/analytics — paginated analytics log (admin/client)
  • GET /pipelines/analytics/aggregated — aggregated analytics by pipeline/org/tenant/session/date range (admin/client)

Admin (/admin):

  • POST /admin/token — create JWT token (admin)
  • POST /admin/nlp_providers — register NLP provider (admin)
  • GET /admin/nlp_providers — list NLP providers (admin)
  • PATCH /admin/nlp_providers/{id} — update NLP provider (admin)
  • DELETE /admin/nlp_providers/{id} — delete NLP provider (admin)
  • GET /admin/vdb_collections — list VDB collection allocations (admin)
  • GET /admin/config — dump service configuration (admin)

External Dependencies

  • Vector DB: Milvus (pymilvus) or Qdrant (qdrant-client), selected at startup via VECTOR_DB env var
  • LLM Providers: OpenAI, Azure OpenAI, Google Gemini, Anyscale
  • Translation: Google Cloud Translate, Azure OpenAI (for translation), OpenAI (for translation)
  • Database: MySQL (SQLAlchemy + PyMySQL)
  • Cache: Redis (embedding cache) or in-memory
  • Semantic cache: Helvia SemCache (external service, optional)
  • APM: Elastic APM

Running Locally

# Install dependencies
poetry install

# Run with Qdrant
VECTOR_DB=qdrant OPENAI_API_KEY=... QDRANT_API_URL=... MYSQL_URL=... \
uvicorn app.main:fast_api_app --reload

See README.md for full environment variable table.

Tests

poetry run pytest --envfile .env.test
# Single test file:
poetry run pytest --envfile .env.test tests/test_pipeline_processing.py