Skip to main content

Data Pipeline Runner (Docker)

Run the full CTMS data pipeline (Ingester + DBT) using pre-built Docker images. No local Python, DBT, or dependency setup required — just Docker.

When to Use This

Use the Docker runner (run-ctms-data-pipeline.sh) for staging, production, and EC2 deployments. For local development with source code, see Data Pipeline - Ingester and Data Pipeline - DBT.


Prerequisites

  • Docker installed and running
  • Access to the ctms.devops repository
  • A configured environment file (.env.ctms-data-pipeline.<client>)
  • Running API gateway (Caddy + KrakenD) for local deployments

Quick Start

cd ctms.devops

# Run the full pipeline (ingester + DBT)
bash scripts/run-ctms-data-pipeline.sh --env example full-pipeline

# Or run stages individually
bash scripts/run-ctms-data-pipeline.sh --env example ingester
bash scripts/run-ctms-data-pipeline.sh --env example dbt-build

Environment Setup

1. Create Environment File

Each client/environment has its own config file in the ctms.devops root:

.env.ctms-data-pipeline.example        # Example staging
.env.ctms-data-pipeline.example.prod # Example production
.env.ctms-data-pipeline.zynomi # Zynomi

2. Required Variables

# Docker Images
INGESTER_IMAGE=zynomi/ctms-ingester:latest
DBT_IMAGE=zynomi/ctms-dbt:latest

# Target Database (data warehouse)
TARGET_DB_HOST=your-db-host.supabase.com
TARGET_DB_PORT=5432
TARGET_DB_NAME=postgres
TARGET_DB_USER=postgres.your_project_ref
TARGET_DB_PASSWORD=YourPassword
TARGET_DB_SSLMODE=require

# Frappe API Source
FRAPPE_BASE_URL=https://api.localhost/api/v1

# Pipeline Settings
DLT_DESTINATION=postgres
DLT_DATASET_NAME=bronze
DLT_PIPELINE_NAME=hbct_clinical_trial_pipeline
TABLE_PREFIX=tbl_mst_
DBT_TARGET=dev
Passwords with Special Characters

If your TARGET_DB_PASSWORD contains $ or other shell special characters, do not escape them in the env file. Docker's --env-file reads values literally without shell expansion. The runner script handles this correctly.

3. API Endpoint Groups

Configure which Frappe DocTypes to ingest using JSON arrays:

VariableDescriptionExample DocTypes
DATALAKE_APISClinical/transaction dataStudy, Patient, Subject, Vitals, Consent
DATALAKE_APIS_MASTERReference/lookup dataSites, Dosages, Countries, Study Phase
DATALAKE_APIS_CRFCase Report FormsCRF form definitions
DATALAKE_APIS_RBACAccess controlCTMS Roles, Permissions, Navigation

Set an empty array [] to skip a group:

DATALAKE_APIS_CRF=[]

Pipeline Commands

Command Reference

CommandDescription
full-pipelineRun ingester + DBT daily (default)
ingesterRun bronze layer ingestion only
ingester-dry-runFetch data without writing to database
dbt-buildRun DBT deps + build (models + tests)
dbt-dailyFull DBT pipeline (deps + build + Elementary)
dbt-depsInstall DBT packages only
elementaryRun Elementary observability + report
pullPull latest Docker images
cleanupRemove unused Docker resources

Options

OptionDescription
--env <name>Required. Environment name (e.g., example, example.prod)
--select <models>DBT model selection (e.g., staging, marts, +model_name)
--full-refreshRun DBT with --full-refresh to rebuild incremental models

Usage Examples

Run Full Pipeline

bash scripts/run-ctms-data-pipeline.sh --env example full-pipeline

This will:

  1. Pull latest Docker images
  2. Run the ingester (Frappe API → Bronze layer)
  3. Run DBT deps, build, Elementary, and generate the observability report

Ingester Only

# Full ingestion
bash scripts/run-ctms-data-pipeline.sh --env example ingester

# Dry run (fetch data, don't write to DB)
bash scripts/run-ctms-data-pipeline.sh --env example ingester-dry-run

DBT Only

# Build models + run tests
bash scripts/run-ctms-data-pipeline.sh --env example dbt-build

# Build specific layer
bash scripts/run-ctms-data-pipeline.sh --env example dbt-build --select staging
bash scripts/run-ctms-data-pipeline.sh --env example dbt-build --select marts

# Full refresh (rebuild incremental models)
bash scripts/run-ctms-data-pipeline.sh --env example dbt-build --full-refresh

# Combined: specific models + full refresh
bash scripts/run-ctms-data-pipeline.sh --env example dbt-build --select staging --full-refresh

Elementary Reports

bash scripts/run-ctms-data-pipeline.sh --env example elementary

Report is generated at: pipeline-data/dbt-reports/elementary_report.html


Running with Docker Directly

You can also run the containers directly without the runner script.

Ingester

docker run --rm \
--platform linux/amd64 \
--add-host api.localhost:host-gateway \
--env-file .env.ctms-data-pipeline.example \
-v ./pipeline-data/dlt:/app/.dlt \
-v ./pipeline-data/ingester-logs:/app/logs \
--entrypoint /bin/sh \
zynomi/ctms-ingester:latest -c "
export DB_HOST=\"\${TARGET_DB_HOST:-\$DB_HOST}\"
export DB_PORT=\"\${TARGET_DB_PORT:-\${DB_PORT:-5432}}\"
export DB_NAME=\"\${TARGET_DB_NAME:-\$DB_NAME}\"
export DB_USER=\"\${TARGET_DB_USER:-\$DB_USER}\"
export DB_PASSWORD=\"\${TARGET_DB_PASSWORD:-\$DB_PASSWORD}\"
export DB_SSLMODE=\"\${TARGET_DB_SSLMODE:-\${DB_SSLMODE:-require}}\"
exec python bot_frappe_api_to_db.py --batch
"
Why the entrypoint wrapper?

The env file uses TARGET_DB_* variable names (for DBT compatibility), but the ingester reads DB_*. The entrypoint wrapper maps between them inside the container, where the raw values from --env-file have no shell expansion issues.

DBT

DBT's built-in docker-entrypoint.sh handles TARGET_DB_*DB_* mapping automatically:

# DBT Build
docker run --rm \
--platform linux/amd64 \
--env-file .env.ctms-data-pipeline.example \
-v ./pipeline-data/dbt-target:/app/target \
-v ./pipeline-data/dbt-logs:/app/logs \
-v ./pipeline-data/dbt-reports:/app/reports \
zynomi/ctms-dbt:latest dbt build

# DBT Deps
docker run --rm \
--platform linux/amd64 \
--env-file .env.ctms-data-pipeline.example \
-v ./pipeline-data/dbt-target:/app/target \
-v ./pipeline-data/dbt-logs:/app/logs \
-v ./pipeline-data/dbt-reports:/app/reports \
zynomi/ctms-dbt:latest dbt deps

# DBT with model selection
docker run --rm \
--platform linux/amd64 \
--env-file .env.ctms-data-pipeline.example \
-v ./pipeline-data/dbt-target:/app/target \
-v ./pipeline-data/dbt-logs:/app/logs \
-v ./pipeline-data/dbt-reports:/app/reports \
zynomi/ctms-dbt:latest dbt build --select staging

Output Directories

All pipeline output is stored under pipeline-data/:

pipeline-data/
├── dlt/ # DLT pipeline state (incremental tracking)
├── ingester-logs/ # Ingester execution logs
├── dbt-target/ # DBT compiled SQL and run results
├── dbt-logs/ # DBT execution logs
└── dbt-reports/ # Elementary HTML reports
└── elementary_report.html
DLT State

The pipeline-data/dlt/ directory tracks incremental pipeline state. Deleting it forces a full re-ingestion on the next run.


Data Flow

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│ Frappe APIs │────▶│ Ingester │────▶│ PostgreSQL │
│ (via KrakenD) │ │ (DLT Hub) │ │ Bronze Layer │
└─────────────────┘ └─────────────────┘ └─────────────────┘


┌─────────────────┐
│ DBT │
│ Bronze → Silver │
│ Silver → Gold │
└─────────────────┘
LayerSchemaDescription
BronzebronzeRaw data from Frappe APIs (via DLT)
SilversilverCleaned, typed, deduplicated data
GoldgoldBusiness-ready dimensional models (facts + dimensions)

Cron Scheduling

For automated daily runs on EC2 or any Linux server:

# Daily at 2 AM — full pipeline
0 2 * * * /path/to/ctms.devops/scripts/run-ctms-data-pipeline.sh \
--env example.prod full-pipeline >> /var/log/ctms-pipeline.log 2>&1

# Ingester every 6 hours, DBT daily
0 */6 * * * /path/to/ctms.devops/scripts/run-ctms-data-pipeline.sh \
--env example.prod ingester >> /var/log/ctms-ingester.log 2>&1
0 3 * * * /path/to/ctms.devops/scripts/run-ctms-data-pipeline.sh \
--env example.prod dbt-daily >> /var/log/ctms-dbt.log 2>&1

Troubleshooting

IssueCauseSolution
CERTIFICATE_VERIFY_FAILEDCaddy's self-signed cert not trustedThe runner script auto-merges Caddy's CA. Ensure ctms-caddy container is running
Circuit breaker openToo many failed auth attemptsWait 5 minutes, verify TARGET_DB_PASSWORD is correct in env file
column "X" does not existDLT skipped columns with all-null valuesPopulate data in Frappe for the missing fields, then re-run ingester
relation does not existDLT skipped empty API responsesVerify the DocType has data in Frappe
Password with $ truncatedShell expanded $ in passwordUse TARGET_DB_* vars (not DB_*) — Docker --env-file reads them literally
Ingester connects to localhostDB_HOST not mappedUse the runner script (handles mapping) or the entrypoint wrapper shown above

See Also