AI Analytics Pipeline
The CTMS AI Analytics Pipeline transforms raw clinical trial data from the Frappe API into conversational insights. Users can ask natural language questions and receive answers with KPI dashboards, charts, and actionable metrics.
Architecture Overview
┌─────────────────┐ ┌──────────────┐ ┌─────────────┐
│ Frappe API │───▶│ Ingester │───▶│ Bronze │
│ (Source) │ │ (DLT Hub) │ │ Schema │
└─────────────────┘ └──────────────┘ └──────┬──────┘
│
▼
┌─────────────────┐ ┌──────────────┐ ┌─────────────┐
│ Gold Layer │◀───│ DBT │◀───│ Silver │
│ (Marts) │ │ Transforms │ │ Layer │
└────────┬────────┘ └──────────────┘ └─────────────┘
│
▼
┌─────────────────┐ ┌──────────────┐ ┌─────────────┐
│ Cube.dev │───▶│ MCP Agent │───▶│ Chat UI │
│ Semantic Layer │ │ (FastAPI) │ │ (React) │
│ :4000 │ │ :8006 │ │ :3000 │
└─────────────────┘ └──────────────┘ └─────────────┘
Technology Stack
| Layer | Technology | Purpose |
|---|---|---|
| Ingestion | DLT Hub (Python) | Extract data from Frappe API to Bronze |
| Transformation | DBT + Elementary | Bronze → Silver → Gold with observability |
| Semantic | Cube.dev | Pre-aggregations, metrics, dimensions |
| AI Agent | FastAPI + MCP | OpenAI integration with tool calling |
| Chat UI | React + Vite | Embeddable chat widget |
| Database | PostgreSQL/Neon | Serverless PostgreSQL |
Data Layers
| Layer | Schema | Models | Purpose |
|---|---|---|---|
| Bronze | bronze | Raw tables | Direct copy from Frappe API |
| Silver | silver | stg_*, int_* | Cleaned, typed, standardized |
| Gold | gold | dim_*, fct_*, cdisc_* | Analytics-ready, CDISC compliant |
DBT Model Categories
- Staging (
stg_*) — Source cleaning and typing - Intermediate (
int_*) — Business logic transformations - Dimensions (
dim_*) — Master data entities (studies, sites, subjects) - Facts (
fct_*) — Transactional/event data (visits, AEs, labs) - CDISC (
cdisc_*) — CDISC SDTM domains (DM, AE, VS, LB, etc.) - Semantic (
sem_*) — Pre-aggregated views for analytics
Repository Structure
The pipeline is consolidated in a single monorepo:
ctms-data-pipeline-ai-analytics/
├── .env.production # Shared environment configuration
├── Makefile # Master build automation
├── README.md # Project documentation
│
├── ctms-ingester/ # Bronze Layer (DLT Hub)
│ ├── bot_frappe_api_to_db.py
│ └── requirements.txt
│
├── ctms-dbt/ # Silver/Gold Layer (DBT)
│ ├── models/
│ │ ├── staging/ # Silver layer
│ │ ├── intermediate/ # Business logic
│ │ └── marts/ # Gold layer
│ └── dbt_project.yml
│
├── ctms-cube/ # Semantic Layer (Cube.dev)
│ ├── model/cubes/ # Cube definitions
│ └── cube.js
│
├── ctms-mcp-server/ # AI Agent (MCP + FastAPI)
│ ├── index.js # MCP server
│ └── agent_service_mcp_client.py
│
└── ctms-ai-chat/ # Chat UI (React)
└── src/components/
Quick Start
Prerequisites
- Node.js 20+
- Python 3.10+
- PostgreSQL/Neon database
Setup
# Clone repository
git clone https://github.com/zynomilabs/ctms-data-pipeline-ai-analytics.git
cd ctms-data-pipeline-ai-analytics
# Copy environment file
make env-copy
# Install all dependencies
make setup
Run Data Pipeline
# Ingest from Frappe API to bronze layer
make ingester-run
# Transform to silver/gold layers
make dbt-run
# Or run full pipeline
make pipeline
Start Services
# Start Cube.dev + MCP Agent together
make stack
# Or start individually in separate terminals:
make cube-dev # Terminal 1: Semantic layer :4000
make mcp-agent # Terminal 2: AI agent :8006
make chat-dev # Terminal 3: Chat UI :3000
Available Commands
Setup & Environment
| Command | Description |
|---|---|
make setup | Install all project dependencies |
make env-copy | Copy .env.production to .env |
make help | Show all available commands |
Data Pipeline
| Command | Description |
|---|---|
make ingester-run | Run bronze layer ingestion |
make dbt-run | Run DBT transformations |
make dbt-build | Run models + tests |
make pipeline | Full pipeline (ingester + dbt) |
Services
| Command | Description |
|---|---|
make stack | Start Cube + MCP Agent (background) |
make stack-stop | Stop all stack services |
make cube-dev | Start Cube.dev (:4000) |
make mcp-agent | Start MCP Agent (:8006) |
make chat-dev | Start Chat UI (:3000) |
Utilities
| Command | Description |
|---|---|
make ports-check | Show status of dev ports |
make kill-all | Kill all dev servers |
make dbt-debug | Verify database connection |
MCP Tools
The AI agent exposes these tools via the Model Context Protocol:
| Tool | Description |
|---|---|
cube_get_meta | Get metadata about cubes, measures, dimensions |
cube_query | Execute queries against semantic layer |
cube_get_sql | Get generated SQL for queries |
cube_list_cubes | List all available cubes |
cube_describe_cube | Describe a specific cube |
cube_clinical_kpis | Get pre-defined clinical KPIs |
Example Chat Queries
Ask natural language questions about clinical trials data:
"How many subjects are enrolled across all studies?"
"Show me adverse events by severity"
"What is the enrollment rate by site?"
"List all active studies with their patient counts"
"Generate a CDISC DM domain extract"
KPI Queries
"Show total subjects, active studies, and adverse event count"
"Create a dashboard with enrollment metrics by study"
"Display adverse events by system organ class"
Cube Definitions
The semantic layer includes these cubes:
| Cube | Key Measures | Key Dimensions |
|---|---|---|
Studies | count, avgDuration | phase, status, type |
Subjects | count, avgAge | sex, race, status |
Enrollments | count, screenFailRate | studyId, siteId, status |
AdverseEvents | count, saeCount | severity, aeterm, studyId |
VitalSigns | avgValue, count | vstestcd, studyId |
LabTests | avgValue, count | lbtestcd, studyId |
Visits | count, completionRate | visitType, status |
Sites | count, activeSites | country, status |
Daily Operations
Running Daily Pipeline
# Full daily run with Elementary observability
make dbt-daily
This executes:
dbt build— Run all models and testsdbt run --select elementary— Build observability tablesedr report— Generate HTML quality report
Cache Management
After DBT refresh, clear Cube.dev cache:
# Local development
make cube-cache-bust-local
# Production
make cube-cache-bust
See Also
- Environment Variables — Configuration reference
- Analytics AI Integration — Technical details
- Cube Semantic Layer — Cube.dev setup
- DBT Pipeline — Transformation details