Skip to content

Server Architecture

The Metricis server is a FastAPI-based backend providing REST APIs for all portal and client functionality.

Overview

  • Framework: FastAPI 0.109+
  • Language: Python 3.11+
  • ORM: SQLAlchemy 2.x (async)
  • Database: PostgreSQL 15+
  • Migrations: Alembic
  • Session Storage: Redis or in-memory
  • Background Jobs: Celery + Redis
  • Validation: Pydantic 2.x
  • Logging: structlog (structured logging)

Project Structure

server/
├── app/
│   ├── main.py              # FastAPI app entry point
│   ├── config.py            # Configuration (Pydantic settings)
│   ├── routers/             # API endpoints
│   │   ├── auth.py
│   │   ├── batteries.py
│   │   ├── consent.py
│   │   ├── dashboard.py
│   │   ├── exports.py
│   │   ├── health.py
│   │   ├── item_banks.py
│   │   ├── modules.py
│   │   ├── participants.py
│   │   ├── redcap.py
│   │   ├── reports.py
│   │   ├── schedules.py
│   │   ├── session.py
│   │   ├── studies.py
│   │   ├── submit.py
│   │   ├── sync.py
│   │   ├── templates.py
│   │   └── user_sessions.py
│   ├── models/              # SQLAlchemy ORM models
│   │   ├── __init__.py
│   │   ├── user.py
│   │   ├── study.py
│   │   ├── participant.py
│   │   ├── battery.py
│   │   ├── session.py
│   │   └── ...
│   ├── schemas/             # Pydantic schemas (request/response models)
│   ├── services/            # Business logic
│   │   ├── auth.py
│   │   ├── export.py
│   │   ├── local_storage.py
│   │   ├── notifications.py
│   │   ├── participant_import.py
│   │   ├── pdf_reports.py
│   │   ├── redcap.py
│   │   ├── redcap_sync.py
│   │   ├── reporting.py
│   │   ├── scheduler.py
│   │   ├── session_store.py
│   │   ├── templates.py
│   │   └── user_sessions.py
│   ├── workers/             # Celery background tasks
│   │   └── reminder_worker.py
│   ├── db/
│   │   ├── session.py       # Database session factory
│   │   └── base.py          # Declarative base
│   ├── celery_app.py        # Celery configuration
│   └── utils/
│       ├── security.py      # Password hashing, JWT
│       └── dependencies.py  # FastAPI dependencies
├── alembic/                 # Database migrations
│   ├── versions/
│   └── env.py
├── tests/                   # pytest tests
├── requirements.txt
└── .env

Application Entry Point

# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
import structlog

from app.config import get_settings
from app.routers import (
    auth, batteries, consent, dashboard, exports, health,
    participants, redcap, reports, schedules, session,
    studies, submit, sync, templates, user_sessions
)

settings = get_settings()
logger = structlog.get_logger(__name__)

# Rate limiter
limiter = Limiter(key_func=lambda: "global")

app = FastAPI(
    title="Metricis API",
    description="Mobile-first cognitive assessment platform",
    version="0.1.0",
)

# CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.ALLOWED_ORIGINS,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Rate limiting
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

# Include routers
app.include_router(health.router, prefix="/api", tags=["health"])
app.include_router(auth.router, prefix="/api/auth", tags=["auth"])
app.include_router(studies.router, prefix="/api/studies", tags=["studies"])
app.include_router(participants.router, prefix="/api/participants", tags=["participants"])
app.include_router(batteries.router, prefix="/api/batteries", tags=["batteries"])
app.include_router(session.router, prefix="/api/session", tags=["session"])
app.include_router(submit.router, prefix="/api/submit", tags=["submit"])
# ... more routers

@app.on_event("startup")
async def startup():
    logger.info("application_startup", environment=settings.ENVIRONMENT)

@app.on_event("shutdown")
async def shutdown():
    logger.info("application_shutdown")

Configuration

# app/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import Field, SecretStr
import secrets

class Settings(BaseSettings):
    model_config = SettingsConfigDict(env_file=".env", case_sensitive=False)

    # Environment
    ENVIRONMENT: str = "development"
    DEBUG: bool = False
    LOG_LEVEL: str = "info"

    # Database
    DATABASE_URL: str = "postgresql+asyncpg://localhost:5432/metricis"

    # Security
    JWT_SECRET_KEY: SecretStr = Field(default_factory=lambda: SecretStr(secrets.token_urlsafe(32)))
    SESSION_SECRET_KEY: SecretStr = Field(default_factory=lambda: SecretStr(secrets.token_urlsafe(32)))
    JWT_ALGORITHM: str = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24  # 24 hours

    # Redis
    SESSION_STORAGE_BACKEND: str = "memory"  # "memory" or "redis"
    REDIS_URL: str = "redis://localhost:6379/0"

    # CORS
    ALLOWED_ORIGINS: list[str] = [
        "http://localhost:5173",
        "http://localhost:3000",
        "http://localhost:3001",
    ]

    # Rate Limiting
    RATE_LIMIT_PER_MINUTE: int = 60
    AUTH_RATE_LIMIT_PER_MINUTE: int = 10

settings = get_settings()

Database Session

# app/db/session.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from app.config import get_settings

settings = get_settings()

engine = create_async_engine(
    settings.DATABASE_URL,
    echo=settings.DEBUG,
    future=True,
)

AsyncSessionLocal = sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)

async def get_db() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        yield session

API Routers

Authentication

# app/routers/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from slowapi import Limiter
from app.services.auth import authenticate_user, create_access_token
from app.schemas.auth import Token

router = APIRouter()
limiter = Limiter(key_func=lambda: "auth")

@router.post("/login", response_model=Token)
@limiter.limit("10/minute")
async def login(
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: AsyncSession = Depends(get_db),
):
    user = await authenticate_user(db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect email or password",
        )

    access_token = create_access_token(data={"sub": user.email})
    return {"access_token": access_token, "token_type": "bearer"}

Session Management

# app/routers/session.py
from fastapi import APIRouter, Depends
from app.services.session_store import SessionStore
from app.schemas.session import SessionStart, SessionResponse

router = APIRouter()

@router.post("/start", response_model=SessionResponse)
async def start_session(
    data: SessionStart,
    session_store: SessionStore = Depends(get_session_store),
):
    session_id = await session_store.create_session(
        participant_id=data.participant_id,
        battery_id=data.battery_id,
    )
    return SessionResponse(session_id=session_id)

@router.post("/end")
async def end_session(
    session_id: str,
    session_store: SessionStore = Depends(get_session_store),
):
    await session_store.end_session(session_id)
    return {"status": "success"}

Data Submission

# app/routers/submit.py
from fastapi import APIRouter, Depends
from app.services.local_storage import LocalStorageService
from app.services.redcap import REDCapService
from app.schemas.submit import SubmitData

router = APIRouter()

@router.post("")
async def submit_data(
    data: SubmitData,
    local_storage: LocalStorageService = Depends(get_local_storage),
    redcap: REDCapService = Depends(get_redcap_service),
):
    # Store locally as JSON
    await local_storage.save_session_data(
        session_id=data.session_id,
        task_summaries=data.task_summaries,
        trials=data.trials,
    )

    # Sync to REDCap if configured
    if redcap.is_configured():
        await redcap.sync_session_data(data)

    return {"status": "success"}

Services

REDCap Integration

# app/services/redcap.py
from redcap import Project
from app.config import get_settings

class REDCapService:
    def __init__(self, site_id: str):
        self.settings = get_settings()
        self.site_config = self.settings.SITE_CONFIGS[site_id]
        self.project = Project(
            self.site_config.redcap_url,
            self.site_config.token,
        )

    async def import_participants(self, study_id: str):
        # Pull participant records from REDCap
        records = self.project.export_records()
        # Process and import...

    async def sync_session_data(self, data: SubmitData):
        # Map task summaries to REDCap fields
        record = {
            "record_id": data.participant_id,
            "session_date": data.session_date,
            "simple_rt_mean": data.task_summaries["simple_rt"]["mean_rt"],
            "cpt_accuracy": data.task_summaries["cpt"]["accuracy"],
            # ...
        }
        self.project.import_records([record])

PDF Reports

# app/services/pdf_reports.py
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table
from reportlab.lib.styles import getSampleStyleSheet
from io import BytesIO

class PDFReportService:
    def generate_participant_report(self, participant_id: str) -> BytesIO:
        buffer = BytesIO()
        doc = SimpleDocTemplate(buffer, pagesize=letter)

        # Build content
        styles = getSampleStyleSheet()
        story = []

        story.append(Paragraph(f"Participant Report: {participant_id}", styles['Title']))
        story.append(Spacer(1, 12))

        # Add session data, charts, etc.

        doc.build(story)
        buffer.seek(0)
        return buffer

Background Workers (Celery)

# app/celery_app.py
from celery import Celery
from app.config import get_settings

settings = get_settings()

celery_app = Celery(
    "metricis",
    broker=settings.REDIS_URL,
    backend=settings.REDIS_URL,
)

celery_app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    timezone="UTC",
    enable_utc=True,
)

celery_app.autodiscover_tasks(["app.workers"])
# app/workers/reminder_worker.py
from celery import shared_task
from app.services.notifications import NotificationService

@shared_task
def send_visit_reminder(participant_id: str, visit_id: str):
    notification_service = NotificationService()
    notification_service.send_reminder(participant_id, visit_id)

Structured Logging

import structlog

logger = structlog.get_logger(__name__)

# Usage
logger.info(
    "user_login",
    user_id=user.id,
    email=user.email,
    ip_address=request.client.host,
)

logger.error(
    "redcap_sync_failed",
    participant_id=participant_id,
    error=str(e),
)

Output:

{
  "event": "user_login",
  "timestamp": "2026-01-24T12:34:56.789Z",
  "level": "info",
  "user_id": 123,
  "email": "user@example.com",
  "ip_address": "192.168.1.1"
}

Security

Password Hashing

# app/utils/security.py
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def hash_password(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

JWT Tokens

from datetime import datetime, timedelta
from jose import JWTError, jwt
from app.config import get_settings

settings = get_settings()

def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES))
    to_encode.update({"exp": expire})

    return jwt.encode(
        to_encode,
        settings.JWT_SECRET_KEY.get_secret_value(),
        algorithm=settings.JWT_ALGORITHM
    )

def verify_token(token: str) -> dict:
    payload = jwt.decode(
        token,
        settings.JWT_SECRET_KEY.get_secret_value(),
        algorithms=[settings.JWT_ALGORITHM]
    )
    return payload

Database Migrations

# Create migration
alembic revision --autogenerate -m "add consent tables"

# Apply migrations
alembic upgrade head

# Rollback
alembic downgrade -1

Migration file:

# alembic/versions/xxx_add_consent_tables.py
def upgrade():
    op.create_table(
        'consent_forms',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('study_id', sa.Integer(), nullable=False),
        sa.Column('version', sa.String(), nullable=False),
        sa.Column('content', sa.Text(), nullable=False),
        sa.PrimaryKeyConstraint('id'),
    )

def downgrade():
    op.drop_table('consent_forms')

Next Steps