Metricis - Architecture Proposal¶
Project: Metricis (https://metricis.app) Version: 1.0.0-draft Date: 2026-01-04 Status: For Review
Executive Summary¶
This document proposes a production-grade architecture for a modular, offline-capable, mobile-first assessment platform built on the existing jsPsych foundation. The platform will support:
- Battery authoring & controlled delivery via an admin UI
- Longitudinal administration with comprehensive data capture
- Selective REDCap integration with server-side security
- Role-based researcher portal with site scoping
- Study operations including scheduling, reminders, and windows
- Multi-language support at participant level
- Computer Adaptive Testing (CAT) with offline capability
- Lab.js-inspired drag-and-drop builder for battery assembly
The architecture preserves existing jsPsych investments (25+ cognitive tasks already implemented) while adding the infrastructure needed for regulated clinical trials.
Table of Contents¶
- System Architecture (A)
- Data Model / Schema (B)
- Execution + Offline Sync (C)
- REDCap Integration (D)
- Researcher Portal & Reporting (E)
- Scheduling / Reminders (F)
- CAT Engine (G)
- Admin Builder UX (H)
- Security, Privacy & Compliance (I)
- Implementation Plan (J)
A. System Architecture¶
Technology Stack Selection¶
| Layer | Technology | Rationale |
|---|---|---|
| Client Runtime | jsPsych 7.x + TypeScript | Existing investment; 25+ tasks implemented; proven timing accuracy (~3-8ms jitter per docs/platform-design.md) |
| Client Framework | Vite + vanilla TS | Already in use; fast builds; tree-shaking |
| Admin/Portal UI | React 18 + TypeScript | Rich ecosystem for complex UIs; drag-and-drop libraries available |
| UI Component Library | Radix UI + Tailwind CSS | Accessible primitives; mobile-responsive; open-source |
| Backend API | FastAPI (Python 3.11+) | Already in use; async support; automatic OpenAPI docs |
| Database | PostgreSQL 15+ | ACID compliance; JSONB for flexible data; mature ecosystem |
| Cache/Queue | Redis 7+ | Session cache; job queue backend; pub/sub for real-time |
| Job Queue | Celery or ARQ | Scheduling, reminders, async processing |
| Auth | Auth.js (NextAuth) or Authentik | OIDC/SAML support; role-based access; self-hosted option |
| Object Storage | MinIO (S3-compatible) | Battery assets; exports; self-hosted for compliance |
| Mobile Wrapper | Capacitor 5+ | Offline-first PWA or native app; per docs/platform-design.md recommendation |
| Offline Storage | IndexedDB (Dexie.js) | Structured storage; 50MB+ capacity; async API |
Architecture Diagram¶
┌─────────────────────────────────────────────────────────────────────────────────┐
│ CLIENT LAYER │
├─────────────────────────────────────────────────────────────────────────────────┤
│ ┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────────┐ │
│ │ Assessment App │ │ Admin Portal │ │ Researcher Portal │ │
│ │ (jsPsych Runtime) │ │ (React + DnD) │ │ (React + Dashboards) │ │
│ │ │ │ │ │ │ │
│ │ • Battery Runner │ │ • Battery Builder │ │ • Subject Management │ │
│ │ • Offline Engine │ │ • Module Registry │ │ • Results Viewer │ │
│ │ • CAT Client │ │ • Version Control │ │ • Report Generator │ │
│ │ • Sync Queue │ │ • Preview Mode │ │ • Audit Log Viewer │ │
│ └─────────┬───────────┘ └─────────┬───────────┘ └───────────┬─────────────┘ │
│ │ │ │ │
│ │ Capacitor/PWA │ │ │
│ │ Service Worker │ │ │
└────────────┼────────────────────────┼──────────────────────────┼────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────────────────┐
│ API GATEWAY │
│ (FastAPI + Rate Limiting) │
├─────────────────────────────────────────────────────────────────────────────────┤
│ • JWT/Session Authentication • Request Validation │
│ • RBAC Middleware • Audit Logging │
│ • Rate Limiting (slowapi) • CORS/Security Headers │
└─────────────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────────┐
│ SERVICE LAYER │
├──────────────────┬──────────────────┬──────────────────┬────────────────────────┤
│ Session Service │ Battery Service │ Data Service │ Integration Service │
│ ───────────────│ ───────────────│ ───────────────│ ──────────────────────│
│ • Token mgmt │ • CRUD ops │ • Submission │ • REDCap sync │
│ • Validation │ • Versioning │ • Scoring │ • FHIR export │
│ • Expiry │ • Compilation │ • CAT engine │ • Webhook dispatch │
├──────────────────┼──────────────────┼──────────────────┼────────────────────────┤
│ Subject Service │ Schedule Svc │ Notify Service │ Audit Service │
│ ───────────────│ ───────────────│ ───────────────│ ──────────────────────│
│ • Enrollment │ • Windows │ • Email/SMS │ • Event logging │
│ • State mgmt │ • Reminders │ • Push │ • Query interface │
│ • Language │ • Escalation │ • Templates │ • Export │
└──────────────────┴──────────────────┴──────────────────┴────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────────┐
│ DATA LAYER │
├─────────────────────┬─────────────────────┬─────────────────────────────────────┤
│ PostgreSQL │ Redis │ MinIO (S3) │
│ ─────────────────│ ─────────────────│ ─────────────────────────────────│
│ • Canonical data │ • Session cache │ • Battery bundles │
│ • Audit logs │ • Job queue │ • Media assets │
│ • Config/metadata │ • Rate limits │ • Export files │
│ • CAT item banks │ • Pub/sub │ • Signed URL delivery │
└─────────────────────┴─────────────────────┴─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────────┐
│ EXTERNAL INTEGRATIONS │
├─────────────────────┬─────────────────────┬─────────────────────────────────────┤
│ REDCap │ Email/SMS │ Identity Provider │
│ (per-project) │ (SendGrid/Twilio) │ (OIDC/SAML) │
└─────────────────────┴─────────────────────┴─────────────────────────────────────┘
Key Architectural Decisions¶
1. Offline-First Design (per docs/platform-design.md: "Offline Functionality" requirement)¶
The assessment client operates in offline-first mode: - Complete battery bundle downloaded at session start - All data persisted to IndexedDB before any network attempt - Background sync with exponential backoff - Conflict resolution via server-authoritative timestamps
2. jsPsych Preservation Strategy (per docs/platform-design.md: "Compatibility with jsPsych Assets")¶
Rather than replacing jsPsych, we wrap and extend:
- Existing 33 tasks remain unchanged (25 Tier 1 + 8 Tier 2)
- New BatteryOrchestrator coordinates task sequencing
- TelemetryPlugin injects timing/provenance metadata
- OfflineAdapter handles persistence layer
3. Timing Accuracy Mitigation (per docs/platform-design.md: "Timing Precision" ~3-8ms jitter)¶
interface TimingMetadata {
// Collected per trial
performance_now_start: number; // High-resolution timer
performance_now_end: number;
date_now_start: number; // Wall clock (for drift detection)
frame_count: number; // requestAnimationFrame count
estimated_refresh_rate: number; // Detected display refresh
// Collected per session
device_timestamp_offset: number; // Server time - client time
battery_level?: number; // Power state affects timing
visibility_changes: number; // Tab focus changes
// QA flags
timing_warnings: string[]; // e.g., "gc_detected", "tab_hidden"
}
4. Security Boundaries¶
- REDCap tokens NEVER exposed to clients - all integration server-side
- Signed URLs for battery delivery with expiration
- JWT with refresh rotation for authenticated sessions
- Audit log immutability via append-only table with checksums
B. Data Model / Schema¶
Entity Relationship Overview¶
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Study │───────│ Site │───────│ User │
└─────────────┘ 1:N └─────────────┘ N:M └─────────────┘
│ │ │
│ 1:N │ 1:N │
▼ ▼ │
┌─────────────┐ ┌─────────────┐ │
│ StudyArm │ │ Subject │◄───────────┘ (site-scoped access)
└─────────────┘ └─────────────┘
│ │
│ 1:N │ 1:N
▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Battery │───────│ Session │───────│ Event │
│ Version │ 1:N └─────────────┘ 1:N └─────────────┘
└─────────────┘ │ │
│ │ 1:N │ 1:N
│ 1:N ▼ ▼
▼ ┌─────────────┐ ┌─────────────┐
┌─────────────┐ │ Response │ │ Telemetry │
│ Module │ └─────────────┘ └─────────────┘
│ Version │ │
└─────────────┘ │ 1:1
│ ▼
│ ┌─────────────┐
│ │ Score │
│ └─────────────┘
│
│ (for CAT modules)
▼
┌─────────────┐ ┌─────────────┐
│ ItemBank │───────│ ItemBank │
│ │ 1:N │ Item │
└─────────────┘ └─────────────┘
Core Tables¶
Studies & Organization¶
-- Studies/Projects
CREATE TABLE studies (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
code VARCHAR(50) UNIQUE NOT NULL, -- e.g., "PEDS-MS-2024"
name VARCHAR(255) NOT NULL,
description TEXT,
status VARCHAR(20) DEFAULT 'draft', -- draft, active, paused, completed, archived
-- Configuration
config JSONB NOT NULL DEFAULT '{}', -- study-wide settings
supported_languages VARCHAR(10)[] DEFAULT ARRAY['en'],
-- REDCap integration (optional)
redcap_enabled BOOLEAN DEFAULT FALSE,
redcap_project_id VARCHAR(50),
-- Audit
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
created_by UUID REFERENCES users(id),
-- Versioning for reproducibility
version INTEGER DEFAULT 1,
version_hash VARCHAR(64) -- SHA-256 of config
);
-- Sites within studies
CREATE TABLE sites (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
study_id UUID NOT NULL REFERENCES studies(id),
code VARCHAR(50) NOT NULL, -- e.g., "TORONTO-SICK-KIDS"
name VARCHAR(255) NOT NULL,
timezone VARCHAR(50) DEFAULT 'UTC',
-- Site-specific config
config JSONB DEFAULT '{}',
-- REDCap site-level config
redcap_url VARCHAR(500),
redcap_api_token_encrypted BYTEA, -- Encrypted at rest
redcap_event_name VARCHAR(100),
status VARCHAR(20) DEFAULT 'active',
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(study_id, code)
);
CREATE INDEX idx_sites_study ON sites(study_id);
Users & RBAC¶
-- Users (can be linked to external IdP)
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
external_id VARCHAR(255), -- IdP subject ID
email VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(255),
-- Status
status VARCHAR(20) DEFAULT 'active', -- active, suspended, deactivated
last_login_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Roles (system-defined)
CREATE TABLE roles (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
code VARCHAR(50) UNIQUE NOT NULL, -- platform_admin, study_admin, site_coordinator, etc.
name VARCHAR(100) NOT NULL,
description TEXT,
permissions JSONB NOT NULL DEFAULT '[]', -- Array of permission codes
is_system BOOLEAN DEFAULT FALSE -- Cannot be deleted if true
);
-- Pre-populate roles
INSERT INTO roles (code, name, permissions, is_system) VALUES
('platform_admin', 'Platform Administrator', '["*"]', TRUE),
('study_admin', 'Study Administrator', '["study:*", "site:read", "subject:*", "battery:*", "report:*"]', TRUE),
('site_coordinator', 'Site Coordinator', '["site:read", "subject:*", "session:*", "report:read"]', TRUE),
('investigator', 'Investigator', '["site:read", "subject:read", "report:*", "export:*"]', TRUE),
('clinician_reviewer', 'Clinician Reviewer', '["subject:read", "report:read", "score:read"]', TRUE),
('data_manager', 'Data Manager', '["subject:read", "export:*", "audit:read"]', TRUE),
('monitor', 'Read-Only Monitor', '["site:read", "subject:read:limited", "report:read:summary"]', TRUE);
-- User-Role-Site assignments (site-scoped RBAC)
CREATE TABLE user_roles (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id),
role_id UUID NOT NULL REFERENCES roles(id),
study_id UUID REFERENCES studies(id), -- NULL = all studies (for platform_admin)
site_id UUID REFERENCES sites(id), -- NULL = all sites in study
granted_at TIMESTAMPTZ DEFAULT NOW(),
granted_by UUID REFERENCES users(id),
expires_at TIMESTAMPTZ, -- Optional expiry
UNIQUE(user_id, role_id, study_id, site_id)
);
CREATE INDEX idx_user_roles_user ON user_roles(user_id);
CREATE INDEX idx_user_roles_study ON user_roles(study_id);
CREATE INDEX idx_user_roles_site ON user_roles(site_id);
Subjects & Enrollment¶
-- Subjects/Participants
CREATE TABLE subjects (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
study_id UUID NOT NULL REFERENCES studies(id),
site_id UUID NOT NULL REFERENCES sites(id),
-- Identifiers
subject_code VARCHAR(100) NOT NULL, -- Study-assigned ID
external_id VARCHAR(255), -- REDCap record_id or other system
-- Demographics (minimal, de-identified)
date_of_birth DATE, -- For age calculation
sex VARCHAR(20),
-- Preferences
preferred_language VARCHAR(10) DEFAULT 'en',
timezone VARCHAR(50), -- Defaults to site timezone
-- Study state
study_arm_id UUID REFERENCES study_arms(id),
enrollment_date DATE,
status VARCHAR(20) DEFAULT 'enrolled', -- enrolled, active, withdrawn, completed
-- Contact for reminders (encrypted)
contact_email_encrypted BYTEA,
contact_phone_encrypted BYTEA,
notification_preferences JSONB DEFAULT '{"email": true, "sms": false, "push": false}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(study_id, subject_code)
);
CREATE INDEX idx_subjects_study ON subjects(study_id);
CREATE INDEX idx_subjects_site ON subjects(site_id);
CREATE INDEX idx_subjects_status ON subjects(status);
-- Study Arms (for randomized trials)
CREATE TABLE study_arms (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
study_id UUID NOT NULL REFERENCES studies(id),
code VARCHAR(50) NOT NULL,
name VARCHAR(255) NOT NULL,
description TEXT,
-- Battery assignment
battery_version_id UUID REFERENCES battery_versions(id),
-- Schedule template
schedule_template_id UUID REFERENCES schedule_templates(id),
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(study_id, code)
);
Batteries & Modules¶
-- Module Registry (reusable task/survey definitions)
CREATE TABLE modules (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
code VARCHAR(100) UNIQUE NOT NULL, -- e.g., "simple-rt", "promis-anxiety"
name VARCHAR(255) NOT NULL,
description TEXT,
-- Classification
type VARCHAR(50) NOT NULL, -- cognitive_task, survey, cat_survey, instructions
domain VARCHAR(100), -- processing_speed, working_memory, anxiety, etc.
-- Technical
jspsych_plugin VARCHAR(100), -- e.g., "html-keyboard-response"
is_custom_plugin BOOLEAN DEFAULT FALSE,
-- CAT configuration (if type = cat_survey)
item_bank_id UUID REFERENCES item_banks(id),
-- Metadata
estimated_duration_seconds INTEGER,
age_range_min INTEGER,
age_range_max INTEGER,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Module Versions (immutable snapshots)
CREATE TABLE module_versions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
module_id UUID NOT NULL REFERENCES modules(id),
version VARCHAR(20) NOT NULL, -- Semantic versioning
-- Content (immutable once published)
config JSONB NOT NULL, -- Task parameters
timeline_definition JSONB NOT NULL, -- jsPsych timeline structure
-- Localization
translations JSONB DEFAULT '{}', -- { "en": {...}, "fr": {...} }
-- Assets
asset_manifest JSONB DEFAULT '[]', -- List of required assets
asset_bundle_url VARCHAR(500), -- S3/MinIO URL
-- Scoring
scoring_algorithm_version VARCHAR(20),
scoring_config JSONB,
-- State
status VARCHAR(20) DEFAULT 'draft', -- draft, published, deprecated
published_at TIMESTAMPTZ,
-- Provenance
content_hash VARCHAR(64) NOT NULL, -- SHA-256 for integrity
created_at TIMESTAMPTZ DEFAULT NOW(),
created_by UUID REFERENCES users(id),
UNIQUE(module_id, version)
);
CREATE INDEX idx_module_versions_module ON module_versions(module_id);
CREATE INDEX idx_module_versions_status ON module_versions(status);
-- Batteries (ordered collections of modules)
CREATE TABLE batteries (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
study_id UUID NOT NULL REFERENCES studies(id),
code VARCHAR(100) NOT NULL,
name VARCHAR(255) NOT NULL,
description TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(study_id, code)
);
-- Battery Versions (immutable, deployable snapshots)
CREATE TABLE battery_versions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
battery_id UUID NOT NULL REFERENCES batteries(id),
version VARCHAR(20) NOT NULL,
-- Module sequence
module_sequence JSONB NOT NULL, -- Ordered list with branching rules
/*
Example:
[
{"module_version_id": "uuid", "order": 1, "required": true},
{"module_version_id": "uuid", "order": 2, "required": true,
"branch_condition": {"type": "score_threshold", "module": "anxiety", "threshold": 10}}
]
*/
-- Delivery constraints
delivery_config JSONB NOT NULL DEFAULT '{}',
/*
{
"allowed_platforms": ["web", "ios", "android"],
"allowed_browsers": ["chrome", "safari", "edge"],
"min_screen_width": 320,
"require_fullscreen": true,
"allow_resume": true,
"max_resume_count": 3,
"session_timeout_minutes": 120,
"require_consent": true,
"consent_module_version_id": "uuid"
}
*/
-- Timing
estimated_duration_minutes INTEGER,
-- State
status VARCHAR(20) DEFAULT 'draft', -- draft, published, deprecated
published_at TIMESTAMPTZ,
-- Provenance
content_hash VARCHAR(64) NOT NULL,
compiled_bundle_url VARCHAR(500), -- Pre-compiled deployable bundle
created_at TIMESTAMPTZ DEFAULT NOW(),
created_by UUID REFERENCES users(id),
UNIQUE(battery_id, version)
);
CREATE INDEX idx_battery_versions_battery ON battery_versions(battery_id);
CREATE INDEX idx_battery_versions_status ON battery_versions(status);
Sessions & Data Capture¶
-- Assessment Sessions
CREATE TABLE sessions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- Context
subject_id UUID NOT NULL REFERENCES subjects(id),
battery_version_id UUID NOT NULL REFERENCES battery_versions(id),
schedule_event_id UUID REFERENCES schedule_events(id), -- If part of scheduled study
-- Access control
access_token_hash VARCHAR(64) NOT NULL, -- SHA-256 of token
token_expires_at TIMESTAMPTZ NOT NULL,
-- State
status VARCHAR(20) DEFAULT 'pending', -- pending, in_progress, completed, expired, abandoned
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
last_activity_at TIMESTAMPTZ,
-- Progress tracking
current_module_index INTEGER DEFAULT 0,
modules_completed INTEGER DEFAULT 0,
total_modules INTEGER NOT NULL,
-- Resume support
resume_count INTEGER DEFAULT 0,
resume_data JSONB, -- Checkpoint for resumption
-- Client context
client_info JSONB, -- Device, browser, screen size
language VARCHAR(10) NOT NULL,
-- Provenance
battery_content_hash VARCHAR(64) NOT NULL, -- Verify immutability
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_sessions_subject ON sessions(subject_id);
CREATE INDEX idx_sessions_status ON sessions(status);
CREATE INDEX idx_sessions_schedule_event ON sessions(schedule_event_id);
-- Events (trial-level data)
CREATE TABLE events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL REFERENCES sessions(id),
module_version_id UUID NOT NULL REFERENCES module_versions(id),
-- Ordering
event_index INTEGER NOT NULL, -- Global index within session
trial_index INTEGER NOT NULL, -- Index within module
-- jsPsych data (flexible schema)
trial_type VARCHAR(100) NOT NULL,
trial_data JSONB NOT NULL, -- Full jsPsych trial data
-- Standardized timing (extracted for querying)
rt_ms INTEGER, -- Response time
stimulus_onset_ms BIGINT, -- performance.now() at stimulus
response_time_ms BIGINT, -- performance.now() at response
-- Timing QA
timing_metadata JSONB, -- Frame counts, drift, warnings
-- Provenance
client_timestamp TIMESTAMPTZ NOT NULL,
server_received_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(session_id, event_index)
);
CREATE INDEX idx_events_session ON events(session_id);
CREATE INDEX idx_events_module ON events(module_version_id);
CREATE INDEX idx_events_trial_type ON events(trial_type);
-- Telemetry (session-level metadata)
CREATE TABLE telemetry (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL REFERENCES sessions(id),
-- Device info
user_agent TEXT,
platform VARCHAR(50),
screen_width INTEGER,
screen_height INTEGER,
device_pixel_ratio REAL,
-- Performance
estimated_refresh_rate REAL,
memory_info JSONB,
-- Timing calibration
server_time_offset_ms INTEGER, -- Client clock vs server
-- Session quality indicators
visibility_hidden_count INTEGER DEFAULT 0,
focus_lost_count INTEGER DEFAULT 0,
network_offline_count INTEGER DEFAULT 0,
-- Battery (for mobile)
initial_battery_level REAL,
final_battery_level REAL,
collected_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_telemetry_session ON telemetry(session_id);
-- Responses (item-level for surveys)
CREATE TABLE responses (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_id UUID NOT NULL REFERENCES events(id),
-- Item identification
item_id VARCHAR(100) NOT NULL, -- e.g., "PROMIS_ANX_1"
item_bank_version VARCHAR(20), -- For CAT provenance
-- Response
response_value JSONB NOT NULL, -- Flexible: number, string, array
response_option_index INTEGER, -- For multiple choice
-- Timing
item_onset_ms BIGINT,
response_ms BIGINT,
rt_ms INTEGER,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_responses_event ON responses(event_id);
CREATE INDEX idx_responses_item ON responses(item_id);
Scoring & CAT¶
-- Scores (computed outputs)
CREATE TABLE scores (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL REFERENCES sessions(id),
module_version_id UUID NOT NULL REFERENCES module_versions(id),
-- Score identification
score_type VARCHAR(100) NOT NULL, -- e.g., "total", "subscale_cognitive", "theta"
score_name VARCHAR(255),
-- Values
raw_score REAL,
scaled_score REAL, -- T-score, z-score, etc.
percentile REAL,
-- CAT-specific
theta REAL, -- IRT ability estimate
standard_error REAL, -- SEM
-- Normative comparison
norm_group VARCHAR(100), -- e.g., "pediatric_8-12"
norm_mean REAL,
norm_sd REAL,
-- Provenance
scoring_algorithm_version VARCHAR(20) NOT NULL,
scoring_config_hash VARCHAR(64),
-- Interpretation aids
interpretation_band VARCHAR(50), -- e.g., "normal", "borderline", "clinical"
flags JSONB DEFAULT '[]', -- ["below_cutoff", "high_variability"]
computed_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_scores_session ON scores(session_id);
CREATE INDEX idx_scores_module ON scores(module_version_id);
CREATE INDEX idx_scores_type ON scores(score_type);
-- CAT Traces (decision log for CAT administrations)
CREATE TABLE cat_traces (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL REFERENCES sessions(id),
module_version_id UUID NOT NULL REFERENCES module_versions(id),
-- Sequence
step_index INTEGER NOT NULL,
-- Item selection
item_id VARCHAR(100) NOT NULL,
item_parameters JSONB NOT NULL, -- a, b, c parameters
-- State before item
theta_estimate REAL NOT NULL,
standard_error REAL NOT NULL,
-- Response
response_value INTEGER,
response_correct BOOLEAN,
-- State after item
theta_updated REAL,
se_updated REAL,
-- Decision
stopping_rule_checked VARCHAR(50), -- "max_items", "min_se", "content_balance"
stopping_rule_met BOOLEAN DEFAULT FALSE,
-- Item bank state
items_administered INTEGER,
items_remaining INTEGER,
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(session_id, module_version_id, step_index)
);
CREATE INDEX idx_cat_traces_session ON cat_traces(session_id);
-- Item Banks (for CAT)
CREATE TABLE item_banks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
code VARCHAR(100) UNIQUE NOT NULL, -- e.g., "PROMIS_PEDIATRIC_ANXIETY_V2"
name VARCHAR(255) NOT NULL,
description TEXT,
-- IRT model
irt_model VARCHAR(50) NOT NULL, -- "2PL", "GRM", "GPCM"
-- Bank metadata
domain VARCHAR(100),
population VARCHAR(100), -- "pediatric", "adult"
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Item Bank Versions (immutable)
CREATE TABLE item_bank_versions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
item_bank_id UUID NOT NULL REFERENCES item_banks(id),
version VARCHAR(20) NOT NULL,
-- Items
items JSONB NOT NULL, -- Array of item definitions
/*
[
{
"item_id": "PROMIS_ANX_1",
"text": {"en": "I felt afraid", "fr": "J'ai eu peur"},
"options": [{"value": 1, "text": {"en": "Never", "fr": "Jamais"}}, ...],
"parameters": {"a": 2.1, "b": [-1.5, -0.5, 0.5, 1.5]},
"content_area": "worry"
}
]
*/
-- Scoring
theta_to_tscore_lookup JSONB, -- Conversion table
-- State
status VARCHAR(20) DEFAULT 'draft',
published_at TIMESTAMPTZ,
-- Provenance
content_hash VARCHAR(64) NOT NULL,
source_citation TEXT, -- e.g., "HealthMeasures PROMIS v2.0"
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(item_bank_id, version)
);
CREATE INDEX idx_item_bank_versions_bank ON item_bank_versions(item_bank_id);
Scheduling & Notifications¶
-- Schedule Templates
CREATE TABLE schedule_templates (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
study_id UUID NOT NULL REFERENCES studies(id),
name VARCHAR(255) NOT NULL,
description TEXT,
-- Schedule definition
events JSONB NOT NULL,
/*
[
{
"event_code": "baseline",
"name": "Baseline Assessment",
"battery_version_id": "uuid",
"day_offset": 0,
"window_before_days": 0,
"window_after_days": 7,
"required": true
},
{
"event_code": "week_4",
"name": "Week 4 Follow-up",
"battery_version_id": "uuid",
"day_offset": 28,
"window_before_days": 3,
"window_after_days": 7,
"required": true,
"depends_on": "baseline"
}
]
*/
-- Reminder configuration
reminder_config JSONB DEFAULT '{}',
/*
{
"reminders": [
{"offset_hours": -24, "channel": "email", "template": "reminder_24h"},
{"offset_hours": -2, "channel": "sms", "template": "reminder_2h"},
{"offset_hours": 24, "channel": "email", "template": "overdue_24h"}
],
"escalation": {
"after_hours": 72,
"notify_coordinator": true
}
}
*/
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Schedule Events (instantiated per subject)
CREATE TABLE schedule_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
subject_id UUID NOT NULL REFERENCES subjects(id),
schedule_template_id UUID REFERENCES schedule_templates(id),
-- Event details
event_code VARCHAR(100) NOT NULL,
event_name VARCHAR(255),
battery_version_id UUID NOT NULL REFERENCES battery_versions(id),
-- Timing
anchor_date DATE NOT NULL, -- e.g., enrollment date
scheduled_date DATE NOT NULL,
window_start DATE NOT NULL,
window_end DATE NOT NULL,
-- State
status VARCHAR(20) DEFAULT 'scheduled', -- scheduled, open, completed, missed, cancelled
-- Completion tracking
session_id UUID REFERENCES sessions(id),
completed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_schedule_events_subject ON schedule_events(subject_id);
CREATE INDEX idx_schedule_events_status ON schedule_events(status);
CREATE INDEX idx_schedule_events_window ON schedule_events(window_start, window_end);
-- Notifications
CREATE TABLE notifications (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
subject_id UUID NOT NULL REFERENCES subjects(id),
schedule_event_id UUID REFERENCES schedule_events(id),
-- Notification details
channel VARCHAR(20) NOT NULL, -- email, sms, push
template_code VARCHAR(100) NOT NULL,
-- Content (rendered)
recipient VARCHAR(255) NOT NULL, -- Email or phone
subject_line VARCHAR(500),
body TEXT,
-- State
status VARCHAR(20) DEFAULT 'pending', -- pending, sent, delivered, failed, bounced
scheduled_for TIMESTAMPTZ NOT NULL,
sent_at TIMESTAMPTZ,
delivered_at TIMESTAMPTZ,
-- Error tracking
attempts INTEGER DEFAULT 0,
last_error TEXT,
-- External IDs
external_message_id VARCHAR(255), -- SendGrid/Twilio ID
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_notifications_subject ON notifications(subject_id);
CREATE INDEX idx_notifications_status ON notifications(status);
CREATE INDEX idx_notifications_scheduled ON notifications(scheduled_for);
REDCap Integration¶
-- REDCap Field Mappings (per study)
CREATE TABLE redcap_mappings (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
study_id UUID NOT NULL REFERENCES studies(id),
-- Mapping definition
mapping_name VARCHAR(255) NOT NULL,
description TEXT,
-- Field mappings
field_mappings JSONB NOT NULL,
/*
[
{
"source_type": "score",
"source_path": "simple_rt.mean_rt",
"redcap_field": "srt_mean",
"transform": null
},
{
"source_type": "score",
"source_path": "promis.anxiety_theta",
"redcap_field": "promis_anx_tscore",
"transform": "theta_to_tscore"
},
{
"source_type": "metadata",
"source_path": "session.completed_at",
"redcap_field": "cog_assessment_date",
"transform": "date_only"
}
]
*/
-- REDCap target
redcap_form_name VARCHAR(100),
redcap_event_name VARCHAR(100),
-- State
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(study_id, mapping_name)
);
-- REDCap Sync Log
CREATE TABLE redcap_sync_log (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL REFERENCES sessions(id),
redcap_mapping_id UUID NOT NULL REFERENCES redcap_mappings(id),
-- Sync details
sync_type VARCHAR(20) NOT NULL, -- push, pull
-- Payload
payload_sent JSONB,
response_received JSONB,
-- Result
status VARCHAR(20) NOT NULL, -- success, failed, partial
records_affected INTEGER,
error_message TEXT,
-- Retry tracking
attempt_number INTEGER DEFAULT 1,
synced_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_redcap_sync_session ON redcap_sync_log(session_id);
CREATE INDEX idx_redcap_sync_status ON redcap_sync_log(status);
Audit Log¶
-- Audit Log (append-only, immutable)
CREATE TABLE audit_log (
id BIGSERIAL PRIMARY KEY, -- Sequential for ordering
-- Actor
user_id UUID REFERENCES users(id),
session_token_hash VARCHAR(64), -- For participant actions
ip_address INET,
user_agent TEXT,
-- Action
action VARCHAR(100) NOT NULL, -- e.g., "session.start", "score.compute"
resource_type VARCHAR(100), -- e.g., "session", "subject"
resource_id UUID,
-- Context
study_id UUID REFERENCES studies(id),
site_id UUID REFERENCES sites(id),
subject_id UUID REFERENCES subjects(id),
-- Details
details JSONB DEFAULT '{}',
-- Integrity
previous_hash VARCHAR(64), -- Chain for tamper detection
entry_hash VARCHAR(64) NOT NULL, -- SHA-256 of this entry
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- No updates or deletes allowed
CREATE RULE audit_log_no_update AS ON UPDATE TO audit_log DO INSTEAD NOTHING;
CREATE RULE audit_log_no_delete AS ON DELETE TO audit_log DO INSTEAD NOTHING;
CREATE INDEX idx_audit_log_user ON audit_log(user_id);
CREATE INDEX idx_audit_log_action ON audit_log(action);
CREATE INDEX idx_audit_log_resource ON audit_log(resource_type, resource_id);
CREATE INDEX idx_audit_log_study ON audit_log(study_id);
CREATE INDEX idx_audit_log_created ON audit_log(created_at);
C. Execution + Offline Sync Design¶
Module Registry & Packaging¶
// Module Registry Interface
interface ModuleRegistry {
// Registration
registerModule(definition: ModuleDefinition): Promise<void>;
registerVersion(moduleId: string, version: ModuleVersionDefinition): Promise<void>;
// Retrieval
getModule(moduleId: string): Promise<Module>;
getModuleVersion(moduleId: string, version: string): Promise<ModuleVersion>;
getLatestPublishedVersion(moduleId: string): Promise<ModuleVersion>;
// Search
listModules(filters: ModuleFilters): Promise<Module[]>;
listVersions(moduleId: string): Promise<ModuleVersion[]>;
}
interface ModuleDefinition {
code: string;
name: string;
type: 'cognitive_task' | 'survey' | 'cat_survey' | 'instructions';
domain?: string;
jsPsychPlugin?: string;
isCustomPlugin: boolean;
estimatedDurationSeconds?: number;
ageRange?: { min: number; max: number };
}
interface ModuleVersionDefinition {
version: string;
config: Record<string, unknown>;
timelineDefinition: JsPsychTimelineNode[];
translations: Record<string, TranslationBundle>;
assetManifest: AssetReference[];
scoringConfig?: ScoringConfig;
}
// Existing task adapter (wraps current implementations)
function wrapExistingTask(
createTimeline: (jsPsych: JsPsych, ...args: unknown[]) => object[],
calculateSummary: (data: DataCollection) => TaskSummary
): ModuleVersionDefinition {
return {
version: '1.0.0',
config: {},
timelineDefinition: [], // Dynamically generated
translations: {},
assetManifest: [],
scoringConfig: {
algorithmVersion: '1.0.0',
summaryFunction: calculateSummary,
},
// Special flag to use legacy function-based approach
_legacyCreateTimeline: createTimeline,
};
}
Battery Runner / Orchestrator¶
/**
* BatteryOrchestrator
*
* Coordinates execution of a battery, handling:
* - Module sequencing and branching
* - Progress tracking
* - Data capture and persistence
* - Resume/restart logic
* - Offline resilience
*/
class BatteryOrchestrator {
private jsPsych: JsPsych;
private battery: CompiledBattery;
private session: SessionState;
private offlineStore: OfflineStore;
private telemetryCollector: TelemetryCollector;
constructor(config: OrchestratorConfig) {
this.jsPsych = initJsPsych({
display_element: config.targetElement,
show_progress_bar: true,
auto_update_progress_bar: false,
on_trial_finish: this.handleTrialFinish.bind(this),
on_finish: this.handleBatteryComplete.bind(this),
});
this.offlineStore = new OfflineStore();
this.telemetryCollector = new TelemetryCollector();
}
/**
* Initialize and start battery execution
*/
async start(accessToken: string): Promise<void> {
// 1. Validate token and fetch battery
this.session = await this.initializeSession(accessToken);
this.battery = await this.fetchAndCacheBattery(this.session.batteryVersionId);
// 2. Verify content integrity
const computedHash = await this.computeBatteryHash(this.battery);
if (computedHash !== this.session.batteryContentHash) {
throw new IntegrityError('Battery content hash mismatch');
}
// 3. Check for resume state
const resumeState = await this.offlineStore.getResumeState(this.session.id);
if (resumeState && this.canResume(resumeState)) {
await this.resumeFromCheckpoint(resumeState);
return;
}
// 4. Build timeline
const timeline = await this.buildTimeline();
// 5. Collect initial telemetry
await this.telemetryCollector.collectSessionStart();
// 6. Run
await this.jsPsych.run(timeline);
}
/**
* Build jsPsych timeline from battery definition
*/
private async buildTimeline(): Promise<TimelineNode[]> {
const timeline: TimelineNode[] = [];
// Consent (if required)
if (this.battery.deliveryConfig.requireConsent) {
timeline.push(await this.buildConsentModule());
}
// Main modules
for (const moduleEntry of this.battery.moduleSequence) {
const moduleVersion = await this.loadModuleVersion(moduleEntry.moduleVersionId);
// Check branch condition
if (moduleEntry.branchCondition) {
timeline.push({
conditional_function: () => this.evaluateBranchCondition(moduleEntry.branchCondition),
timeline: await this.buildModuleTimeline(moduleVersion, moduleEntry),
});
} else {
timeline.push(...await this.buildModuleTimeline(moduleVersion, moduleEntry));
}
// Add break between modules (configurable)
if (moduleEntry.order < this.battery.moduleSequence.length) {
timeline.push(this.buildBreakTrial());
}
}
// Completion
timeline.push(await this.buildCompletionModule());
return timeline;
}
/**
* Handle trial completion - persist immediately
*/
private async handleTrialFinish(data: TrialData): Promise<void> {
// 1. Enrich with timing metadata
const enrichedData: EnrichedTrialData = {
...data,
_meta: {
sessionId: this.session.id,
moduleVersionId: this.getCurrentModuleVersionId(),
eventIndex: this.session.eventCount++,
clientTimestamp: new Date().toISOString(),
timingMetadata: this.telemetryCollector.getTrialTiming(),
},
};
// 2. Persist to IndexedDB FIRST (offline-first)
await this.offlineStore.persistEvent(enrichedData);
// 3. Update progress
this.updateProgress();
// 4. Attempt background sync (non-blocking)
this.syncQueue.enqueue(enrichedData);
// 5. Create checkpoint for resume
await this.createCheckpoint();
}
/**
* Evaluate branching condition
*/
private evaluateBranchCondition(condition: BranchCondition): boolean {
switch (condition.type) {
case 'score_threshold':
const score = this.getModuleScore(condition.moduleCode, condition.scoreType);
return score !== null && score >= condition.threshold;
case 'response_value':
const response = this.getResponse(condition.moduleCode, condition.itemId);
return response === condition.expectedValue;
case 'age_range':
const age = this.session.subjectAge;
return age >= condition.minAge && age <= condition.maxAge;
case 'random':
return Math.random() < condition.probability;
default:
console.warn(`Unknown branch condition type: ${condition.type}`);
return true; // Default to include
}
}
}
Offline Storage Strategy¶
/**
* OfflineStore
*
* IndexedDB-based persistence layer using Dexie.js
* Ensures data is never lost, even with no connectivity
*/
import Dexie, { Table } from 'dexie';
interface StoredEvent {
id: string; // UUID
sessionId: string;
eventIndex: number;
data: EnrichedTrialData;
syncStatus: 'pending' | 'syncing' | 'synced' | 'failed';
syncAttempts: number;
lastSyncAttempt?: Date;
createdAt: Date;
}
interface StoredSession {
id: string;
accessToken: string;
batteryVersionId: string;
batteryBundle: CompiledBattery; // Cached for offline
status: 'active' | 'completed' | 'abandoned';
resumeState?: ResumeState;
createdAt: Date;
updatedAt: Date;
}
interface SyncQueueItem {
id: string;
type: 'event' | 'session_complete' | 'telemetry';
payload: unknown;
priority: number;
attempts: number;
createdAt: Date;
}
class OfflineStore extends Dexie {
events!: Table<StoredEvent>;
sessions!: Table<StoredSession>;
syncQueue!: Table<SyncQueueItem>;
batteryCache!: Table<{ id: string; data: CompiledBattery; cachedAt: Date }>;
constructor() {
super('CognitiveAssessmentDB');
this.version(1).stores({
events: 'id, sessionId, eventIndex, syncStatus, createdAt',
sessions: 'id, status, createdAt',
syncQueue: 'id, type, priority, createdAt',
batteryCache: 'id, cachedAt',
});
}
/**
* Persist event immediately (synchronous-feeling async)
*/
async persistEvent(data: EnrichedTrialData): Promise<void> {
const event: StoredEvent = {
id: crypto.randomUUID(),
sessionId: data._meta.sessionId,
eventIndex: data._meta.eventIndex,
data,
syncStatus: 'pending',
syncAttempts: 0,
createdAt: new Date(),
};
await this.events.add(event);
}
/**
* Get all pending events for sync
*/
async getPendingEvents(limit: number = 100): Promise<StoredEvent[]> {
return this.events
.where('syncStatus')
.anyOf(['pending', 'failed'])
.limit(limit)
.toArray();
}
/**
* Mark events as synced
*/
async markSynced(eventIds: string[]): Promise<void> {
await this.events
.where('id')
.anyOf(eventIds)
.modify({ syncStatus: 'synced' });
}
/**
* Store resume checkpoint
*/
async saveResumeState(sessionId: string, state: ResumeState): Promise<void> {
await this.sessions
.where('id')
.equals(sessionId)
.modify({ resumeState: state, updatedAt: new Date() });
}
/**
* Cleanup old synced data (retention policy)
*/
async cleanup(retentionDays: number = 30): Promise<void> {
const cutoff = new Date();
cutoff.setDate(cutoff.getDate() - retentionDays);
await this.events
.where('syncStatus')
.equals('synced')
.and(e => e.createdAt < cutoff)
.delete();
}
}
Sync Queue with Retry Logic¶
/**
* SyncManager
*
* Handles background synchronization with exponential backoff
* Ensures eventual consistency even with intermittent connectivity
*/
class SyncManager {
private offlineStore: OfflineStore;
private isOnline: boolean = navigator.onLine;
private syncInProgress: boolean = false;
private retryTimeouts: Map<string, NodeJS.Timeout> = new Map();
// Backoff configuration
private readonly BASE_DELAY_MS = 1000;
private readonly MAX_DELAY_MS = 300000; // 5 minutes
private readonly MAX_ATTEMPTS = 10;
constructor(offlineStore: OfflineStore) {
this.offlineStore = offlineStore;
this.setupNetworkListeners();
this.startPeriodicSync();
}
private setupNetworkListeners(): void {
window.addEventListener('online', () => {
this.isOnline = true;
this.triggerSync();
});
window.addEventListener('offline', () => {
this.isOnline = false;
});
}
private startPeriodicSync(): void {
// Sync every 30 seconds when online
setInterval(() => {
if (this.isOnline) {
this.triggerSync();
}
}, 30000);
}
/**
* Trigger sync process
*/
async triggerSync(): Promise<void> {
if (this.syncInProgress || !this.isOnline) return;
this.syncInProgress = true;
try {
const pendingEvents = await this.offlineStore.getPendingEvents(50);
if (pendingEvents.length === 0) {
this.syncInProgress = false;
return;
}
// Group by session for batch submission
const bySession = this.groupBySession(pendingEvents);
for (const [sessionId, events] of bySession) {
await this.syncSessionEvents(sessionId, events);
}
} catch (error) {
console.error('Sync failed:', error);
} finally {
this.syncInProgress = false;
}
}
/**
* Sync events for a single session
*/
private async syncSessionEvents(
sessionId: string,
events: StoredEvent[]
): Promise<void> {
try {
// Mark as syncing
await this.offlineStore.events
.where('id')
.anyOf(events.map(e => e.id))
.modify({ syncStatus: 'syncing' });
// Submit to server (idempotent endpoint)
const response = await fetch('/api/sessions/events/batch', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Idempotency-Key': `${sessionId}-${events[0].eventIndex}-${events[events.length - 1].eventIndex}`,
},
body: JSON.stringify({
sessionId,
events: events.map(e => e.data),
}),
});
if (response.ok) {
await this.offlineStore.markSynced(events.map(e => e.id));
} else if (response.status === 409) {
// Conflict - events already synced
await this.offlineStore.markSynced(events.map(e => e.id));
} else {
throw new Error(`Sync failed: ${response.status}`);
}
} catch (error) {
// Mark as failed, schedule retry
for (const event of events) {
const newAttempts = event.syncAttempts + 1;
if (newAttempts >= this.MAX_ATTEMPTS) {
console.error(`Event ${event.id} exceeded max retry attempts`);
continue;
}
await this.offlineStore.events
.where('id')
.equals(event.id)
.modify({
syncStatus: 'failed',
syncAttempts: newAttempts,
lastSyncAttempt: new Date(),
});
// Schedule retry with exponential backoff
const delay = Math.min(
this.BASE_DELAY_MS * Math.pow(2, newAttempts),
this.MAX_DELAY_MS
);
setTimeout(() => this.triggerSync(), delay);
}
}
}
private groupBySession(events: StoredEvent[]): Map<string, StoredEvent[]> {
const map = new Map<string, StoredEvent[]>();
for (const event of events) {
const list = map.get(event.sessionId) || [];
list.push(event);
map.set(event.sessionId, list);
}
return map;
}
}
Resume/Restart Policies¶
interface ResumeState {
checkpointAt: Date;
moduleIndex: number;
trialIndex: number;
jsPsychState: unknown; // Internal jsPsych state
partialResponses: Record<string, unknown>;
elapsedTimeMs: number;
}
interface ResumePolicy {
maxResumeCount: number; // e.g., 3
maxTimeSinceCheckpointMs: number; // e.g., 24 hours
allowCrossDevice: boolean; // e.g., false for clinical trials
requireReauthentication: boolean;
}
class ResumeManager {
private policy: ResumePolicy;
/**
* Determine if session can be resumed
*/
canResume(session: StoredSession, resumeState: ResumeState): ResumeDecision {
// Check resume count
if (session.resumeCount >= this.policy.maxResumeCount) {
return {
allowed: false,
reason: 'max_resume_count_exceeded',
action: 'restart_required',
};
}
// Check time since checkpoint
const timeSinceCheckpoint = Date.now() - resumeState.checkpointAt.getTime();
if (timeSinceCheckpoint > this.policy.maxTimeSinceCheckpointMs) {
return {
allowed: false,
reason: 'checkpoint_expired',
action: 'restart_required',
};
}
// Check device (if restricted)
if (!this.policy.allowCrossDevice) {
const currentDeviceId = this.getDeviceFingerprint();
if (currentDeviceId !== session.deviceId) {
return {
allowed: false,
reason: 'device_mismatch',
action: 'contact_coordinator',
};
}
}
return {
allowed: true,
resumeFromModule: resumeState.moduleIndex,
resumeFromTrial: resumeState.trialIndex,
};
}
/**
* Create checkpoint for resume
*/
async createCheckpoint(
session: SessionState,
jsPsych: JsPsych,
currentModule: number,
currentTrial: number
): Promise<void> {
const resumeState: ResumeState = {
checkpointAt: new Date(),
moduleIndex: currentModule,
trialIndex: currentTrial,
jsPsychState: this.serializeJsPsychState(jsPsych),
partialResponses: this.collectPartialResponses(jsPsych),
elapsedTimeMs: jsPsych.getTotalTime(),
};
await this.offlineStore.saveResumeState(session.id, resumeState);
}
}
Server-Side Idempotent Endpoints¶
# server/app/routers/events.py
from fastapi import APIRouter, Header, HTTPException
from pydantic import BaseModel
from typing import List
import hashlib
router = APIRouter(prefix="/sessions", tags=["events"])
class EventData(BaseModel):
event_index: int
trial_index: int
trial_type: str
trial_data: dict
client_timestamp: str
timing_metadata: dict
class BatchEventSubmission(BaseModel):
session_id: str
events: List[EventData]
# Idempotency key storage (Redis in production)
processed_keys: dict = {}
@router.post("/events/batch")
async def submit_events_batch(
submission: BatchEventSubmission,
x_idempotency_key: str = Header(...),
):
"""
Idempotent batch event submission.
Clients should retry with the same idempotency key on failure.
Server guarantees at-most-once processing per key.
"""
# Check if already processed
if x_idempotency_key in processed_keys:
return {"status": "already_processed", "result": processed_keys[x_idempotency_key]}
# Validate session
session = await get_session(submission.session_id)
if not session or session.status not in ['pending', 'in_progress']:
raise HTTPException(status_code=404, detail="Session not found or not active")
# Process events
try:
result = await process_events(session, submission.events)
# Store idempotency result (with TTL in production)
processed_keys[x_idempotency_key] = result
return {"status": "processed", "result": result}
except DuplicateEventError:
# Events already exist - treat as success
return {"status": "duplicate", "message": "Events already recorded"}
except Exception as e:
# Don't store failed attempts - allow retry
raise HTTPException(status_code=500, detail=str(e))
async def process_events(session: Session, events: List[EventData]) -> dict:
"""
Process and store events with conflict detection.
"""
processed = 0
skipped = 0
for event in events:
# Check for duplicate (by session + event_index)
existing = await db.events.find_one({
"session_id": session.id,
"event_index": event.event_index
})
if existing:
# Verify content matches (detect conflicts)
if hash_event(existing) != hash_event(event):
raise ConflictError(f"Event {event.event_index} exists with different content")
skipped += 1
continue
# Insert new event
await db.events.insert_one({
"id": generate_uuid(),
"session_id": session.id,
"event_index": event.event_index,
**event.dict(),
"server_received_at": datetime.utcnow(),
})
processed += 1
# Update session state
await update_session_progress(session.id, events[-1].event_index)
return {"processed": processed, "skipped": skipped}
D. REDCap Integration Design¶
Integration Architecture¶
┌─────────────────────────────────────────────────────────────────┐
│ Assessment Platform │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ Client │───▶│ Server │───▶│ REDCap Service │ │
│ │ (no token) │ │ (secure) │ │ │ │
│ └─────────────┘ └─────────────┘ │ • Token vault │ │
│ │ • Mapping engine │ │
│ │ • Retry queue │ │
│ │ • Reconciliation │ │
│ └──────────┬──────────┘ │
└────────────────────────────────────────────────────┼────────────┘
│
▼
┌─────────────────────────────────────┐
│ REDCap Instance │
│ (External - per study/site) │
│ │
│ • Project database │
│ • API endpoint │
│ • Forms/instruments │
└─────────────────────────────────────┘
Secure Token Management¶
# server/app/services/redcap_vault.py
from cryptography.fernet import Fernet
from functools import lru_cache
import os
class REDCapVault:
"""
Secure storage and retrieval of REDCap API tokens.
Tokens are:
- Encrypted at rest using Fernet (AES-128-CBC)
- Never exposed to clients
- Cached in memory with TTL
- Rotatable without downtime
"""
def __init__(self):
# Key from environment (injected via secrets manager in production)
key = os.environ.get('REDCAP_ENCRYPTION_KEY')
if not key:
raise ValueError("REDCAP_ENCRYPTION_KEY not configured")
self.cipher = Fernet(key.encode())
def encrypt_token(self, plaintext_token: str) -> bytes:
"""Encrypt token for database storage."""
return self.cipher.encrypt(plaintext_token.encode())
def decrypt_token(self, encrypted_token: bytes) -> str:
"""Decrypt token for API use."""
return self.cipher.decrypt(encrypted_token).decode()
@lru_cache(maxsize=100)
def get_site_token(self, site_id: str) -> str:
"""
Get decrypted token for a site.
Cached to avoid repeated decryption.
"""
site = db.sites.find_one({"id": site_id})
if not site or not site.redcap_api_token_encrypted:
raise ValueError(f"No REDCap token configured for site {site_id}")
return self.decrypt_token(site.redcap_api_token_encrypted)
def rotate_token(self, site_id: str, new_token: str) -> None:
"""
Rotate token for a site.
Clears cache entry.
"""
encrypted = self.encrypt_token(new_token)
db.sites.update_one(
{"id": site_id},
{"$set": {"redcap_api_token_encrypted": encrypted}}
)
# Clear cache
self.get_site_token.cache_clear()
Field Mapping Engine¶
# server/app/services/redcap_mapper.py
from typing import Any, Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
class SourceType(Enum):
SCORE = "score"
RESPONSE = "response"
METADATA = "metadata"
COMPUTED = "computed"
class TransformType(Enum):
NONE = "none"
DATE_ONLY = "date_only"
THETA_TO_TSCORE = "theta_to_tscore"
ROUND = "round"
BOOLEAN_TO_INT = "boolean_to_int"
LOOKUP = "lookup"
@dataclass
class FieldMapping:
source_type: SourceType
source_path: str # Dot-notation path, e.g., "simple_rt.mean_rt"
redcap_field: str
transform: TransformType = TransformType.NONE
transform_params: Optional[Dict] = None
required: bool = False
default_value: Any = None
class REDCapMapper:
"""
Maps platform data to REDCap fields based on study configuration.
"""
def __init__(self, mapping_config: List[FieldMapping]):
self.mappings = mapping_config
self.transforms = {
TransformType.NONE: lambda x, _: x,
TransformType.DATE_ONLY: self._transform_date_only,
TransformType.THETA_TO_TSCORE: self._transform_theta_to_tscore,
TransformType.ROUND: self._transform_round,
TransformType.BOOLEAN_TO_INT: self._transform_boolean_to_int,
TransformType.LOOKUP: self._transform_lookup,
}
def map_session_to_redcap(
self,
session: Session,
scores: Dict[str, Any],
responses: Dict[str, Any],
metadata: Dict[str, Any]
) -> Dict[str, Any]:
"""
Transform platform data to REDCap record format.
"""
record = {}
errors = []
for mapping in self.mappings:
try:
# Get source value
source_data = self._get_source_data(
mapping.source_type,
scores, responses, metadata
)
value = self._get_nested_value(source_data, mapping.source_path)
# Handle missing values
if value is None:
if mapping.required:
errors.append(f"Required field missing: {mapping.source_path}")
continue
value = mapping.default_value
# Apply transform
if value is not None:
value = self.transforms[mapping.transform](value, mapping.transform_params)
# Set in record
if value is not None:
record[mapping.redcap_field] = value
except Exception as e:
errors.append(f"Error mapping {mapping.source_path}: {str(e)}")
if errors:
raise MappingError(errors)
return record
def _get_nested_value(self, data: Dict, path: str) -> Any:
"""Get value from nested dict using dot notation."""
keys = path.split('.')
value = data
for key in keys:
if isinstance(value, dict):
value = value.get(key)
else:
return None
return value
def _transform_theta_to_tscore(self, theta: float, params: Dict) -> float:
"""Convert IRT theta to T-score (mean=50, SD=10)."""
return round(50 + (theta * 10), 1)
def _transform_date_only(self, datetime_str: str, _) -> str:
"""Extract date portion from ISO datetime."""
return datetime_str[:10]
def _transform_round(self, value: float, params: Dict) -> float:
"""Round to specified decimal places."""
decimals = params.get('decimals', 2)
return round(value, decimals)
def _transform_boolean_to_int(self, value: bool, _) -> int:
"""Convert boolean to 0/1."""
return 1 if value else 0
def _transform_lookup(self, value: Any, params: Dict) -> Any:
"""Lookup value in mapping table."""
lookup_table = params.get('table', {})
return lookup_table.get(str(value), value)
Example Mapping Configuration¶
{
"mapping_name": "cognitive_assessment_v1",
"redcap_form_name": "cognitive_assessment",
"redcap_event_name": "baseline_arm_1",
"field_mappings": [
{
"source_type": "metadata",
"source_path": "session.completed_at",
"redcap_field": "cog_date",
"transform": "date_only",
"required": true
},
{
"source_type": "metadata",
"source_path": "session.language",
"redcap_field": "cog_language",
"transform": "lookup",
"transform_params": {
"table": {"en": "1", "fr": "2", "es": "3"}
}
},
{
"source_type": "score",
"source_path": "simple_rt.mean_rt",
"redcap_field": "srt_mean",
"transform": "round",
"transform_params": {"decimals": 1}
},
{
"source_type": "score",
"source_path": "simple_rt.rt_variability",
"redcap_field": "srt_cv",
"transform": "round",
"transform_params": {"decimals": 3}
},
{
"source_type": "score",
"source_path": "cpt.d_prime",
"redcap_field": "cpt_dprime",
"transform": "round",
"transform_params": {"decimals": 2}
},
{
"source_type": "score",
"source_path": "promis.anxiety_theta",
"redcap_field": "promis_anx_tscore",
"transform": "theta_to_tscore"
},
{
"source_type": "score",
"source_path": "tmt.part_b_completed",
"redcap_field": "tmt_b_complete",
"transform": "boolean_to_int"
}
]
}
REDCap Sync Service¶
# server/app/services/redcap_sync.py
import redcap
from typing import Optional
from datetime import datetime
class REDCapSyncService:
"""
Handles synchronization of data to/from REDCap.
Features:
- Batched submissions with retry
- Reconciliation for conflicts
- Pull participant metadata (optional)
- Comprehensive logging
"""
def __init__(self, vault: REDCapVault, mapper: REDCapMapper):
self.vault = vault
self.mapper = mapper
async def push_session_scores(
self,
session_id: str,
site_id: str,
mapping_id: str
) -> SyncResult:
"""
Push session scores to REDCap.
"""
# Load data
session = await db.sessions.find_one({"id": session_id})
subject = await db.subjects.find_one({"id": session.subject_id})
scores = await self._load_scores(session_id)
# Get mapping config
mapping_config = await db.redcap_mappings.find_one({"id": mapping_id})
# Transform to REDCap format
try:
record = self.mapper.map_session_to_redcap(
session=session,
scores=scores,
responses={}, # Add if needed
metadata=self._build_metadata(session, subject)
)
except MappingError as e:
return SyncResult(
status="failed",
error=f"Mapping error: {e.errors}"
)
# Add identifiers
record["record_id"] = subject.external_id or subject.subject_code
if mapping_config.get("redcap_event_name"):
record["redcap_event_name"] = mapping_config["redcap_event_name"]
# Submit to REDCap
try:
site = await db.sites.find_one({"id": site_id})
token = self.vault.get_site_token(site_id)
project = redcap.Project(site.redcap_url, token)
response = project.import_records([record])
# Log success
await self._log_sync(
session_id=session_id,
mapping_id=mapping_id,
sync_type="push",
status="success",
payload_sent=record,
response_received=response
)
return SyncResult(
status="success",
records_affected=response.get("count", 1)
)
except Exception as e:
# Log failure
await self._log_sync(
session_id=session_id,
mapping_id=mapping_id,
sync_type="push",
status="failed",
payload_sent=record,
error_message=str(e)
)
# Queue for retry
await self._queue_retry(session_id, mapping_id)
return SyncResult(
status="failed",
error=str(e)
)
async def pull_subject_metadata(
self,
site_id: str,
external_id: str
) -> Optional[Dict]:
"""
Pull participant metadata from REDCap (if configured).
Useful for:
- Getting enrollment date
- Getting randomization arm
- Syncing demographics
"""
site = await db.sites.find_one({"id": site_id})
if not site.get("redcap_pull_enabled"):
return None
try:
token = self.vault.get_site_token(site_id)
project = redcap.Project(site.redcap_url, token)
records = project.export_records(
records=[external_id],
fields=site.get("redcap_pull_fields", [])
)
if records:
return records[0]
return None
except Exception as e:
logger.error(f"REDCap pull failed for {external_id}: {e}")
return None
async def reconcile_identifiers(
self,
study_id: str
) -> ReconciliationReport:
"""
Reconcile subject identifiers between platform and REDCap.
Returns report of:
- Matched records
- Platform-only records
- REDCap-only records
- Mismatches
"""
# Implementation for data quality checks
pass
async def _log_sync(self, **kwargs):
"""Log sync attempt to audit trail."""
await db.redcap_sync_log.insert_one({
"id": generate_uuid(),
**kwargs,
"synced_at": datetime.utcnow()
})
async def _queue_retry(self, session_id: str, mapping_id: str):
"""Queue failed sync for retry."""
await db.sync_queue.insert_one({
"id": generate_uuid(),
"type": "redcap_push",
"payload": {
"session_id": session_id,
"mapping_id": mapping_id
},
"attempts": 0,
"scheduled_for": datetime.utcnow(),
"created_at": datetime.utcnow()
})
REDCap API Payload Examples¶
# Example: Pushing cognitive assessment scores
# Input: Platform scores
platform_scores = {
"simple_rt": {
"total_trials": 20,
"valid_trials": 18,
"mean_rt": 342.5,
"median_rt": 335.0,
"rt_variability": 0.156
},
"cpt": {
"total_trials": 100,
"correct_go": 45,
"correct_nogo": 48,
"omission_errors": 5,
"commission_errors": 2,
"d_prime": 2.34
},
"promis": {
"anxiety_theta": 0.85,
"anxiety_se": 0.32,
"fatigue_theta": 0.12,
"fatigue_se": 0.28
}
}
# Output: REDCap record format
redcap_record = {
"record_id": "SUBJ-001",
"redcap_event_name": "baseline_arm_1",
# Assessment metadata
"cog_date": "2026-01-04",
"cog_language": "1", # 1=English
"cog_device": "tablet",
"cog_duration_min": 45,
# Simple RT
"srt_total": 20,
"srt_valid": 18,
"srt_mean": 342.5,
"srt_median": 335.0,
"srt_cv": 0.156,
# CPT
"cpt_total": 100,
"cpt_hits": 45,
"cpt_cr": 48,
"cpt_omissions": 5,
"cpt_commissions": 2,
"cpt_dprime": 2.34,
# PROMIS (T-scores)
"promis_anx_tscore": 58.5, # 50 + (0.85 * 10)
"promis_anx_se": 3.2,
"promis_fat_tscore": 51.2,
"promis_fat_se": 2.8,
# Form completion
"cognitive_assessment_complete": "2" # 2 = Complete
}
# REDCap API call (via PyCap)
import redcap
project = redcap.Project(url, token)
response = project.import_records([redcap_record])
# Response
# {"count": 1}
E. Researcher Portal & Reporting¶
RBAC Implementation¶
# server/app/auth/rbac.py
from enum import Enum
from typing import List, Optional, Set
from functools import wraps
from fastapi import Depends, HTTPException
class Permission(Enum):
# Study management
STUDY_CREATE = "study:create"
STUDY_READ = "study:read"
STUDY_UPDATE = "study:update"
STUDY_DELETE = "study:delete"
# Site management
SITE_READ = "site:read"
SITE_UPDATE = "site:update"
# Subject management
SUBJECT_CREATE = "subject:create"
SUBJECT_READ = "subject:read"
SUBJECT_READ_LIMITED = "subject:read:limited" # Summary only
SUBJECT_UPDATE = "subject:update"
# Session management
SESSION_CREATE = "session:create"
SESSION_READ = "session:read"
# Battery management
BATTERY_CREATE = "battery:create"
BATTERY_READ = "battery:read"
BATTERY_UPDATE = "battery:update"
BATTERY_PUBLISH = "battery:publish"
# Reporting
REPORT_READ = "report:read"
REPORT_READ_SUMMARY = "report:read:summary"
REPORT_GENERATE = "report:generate"
# Export
EXPORT_RAW = "export:raw"
EXPORT_SCORES = "export:scores"
EXPORT_AUDIT = "export:audit"
# Audit
AUDIT_READ = "audit:read"
# Admin
ADMIN_USERS = "admin:users"
ADMIN_SYSTEM = "admin:system"
class RBACService:
"""
Role-Based Access Control with site scoping.
"""
def __init__(self):
self.permission_cache = {}
async def get_user_permissions(
self,
user_id: str,
study_id: Optional[str] = None,
site_id: Optional[str] = None
) -> Set[Permission]:
"""
Get effective permissions for a user in a context.
Permissions are aggregated from all applicable role assignments.
"""
cache_key = f"{user_id}:{study_id}:{site_id}"
if cache_key in self.permission_cache:
return self.permission_cache[cache_key]
# Get all role assignments for user
query = {"user_id": user_id}
if study_id:
query["$or"] = [
{"study_id": study_id},
{"study_id": None} # Platform-wide roles
]
if site_id:
query["$or"] = [
{"site_id": site_id},
{"site_id": None} # Study-wide roles
]
assignments = await db.user_roles.find(query).to_list()
# Aggregate permissions
permissions = set()
for assignment in assignments:
role = await db.roles.find_one({"id": assignment["role_id"]})
if role:
for perm in role["permissions"]:
if perm == "*":
# Superuser - all permissions
return set(Permission)
permissions.add(Permission(perm))
self.permission_cache[cache_key] = permissions
return permissions
async def check_permission(
self,
user_id: str,
permission: Permission,
study_id: Optional[str] = None,
site_id: Optional[str] = None
) -> bool:
"""Check if user has specific permission."""
permissions = await self.get_user_permissions(user_id, study_id, site_id)
return permission in permissions
async def get_accessible_sites(
self,
user_id: str,
study_id: str
) -> List[str]:
"""Get list of site IDs user can access in a study."""
assignments = await db.user_roles.find({
"user_id": user_id,
"$or": [
{"study_id": study_id},
{"study_id": None}
]
}).to_list()
site_ids = set()
for assignment in assignments:
if assignment.get("site_id"):
site_ids.add(assignment["site_id"])
else:
# Study-wide or platform-wide - access all sites
sites = await db.sites.find({"study_id": study_id}).to_list()
site_ids.update(s["id"] for s in sites)
return list(site_ids)
# Dependency for route protection
def require_permission(permission: Permission):
"""FastAPI dependency for permission checking."""
async def checker(
current_user: User = Depends(get_current_user),
study_id: Optional[str] = None,
site_id: Optional[str] = None
):
rbac = RBACService()
has_perm = await rbac.check_permission(
current_user.id, permission, study_id, site_id
)
if not has_perm:
raise HTTPException(
status_code=403,
detail=f"Permission denied: {permission.value}"
)
return current_user
return checker
# Usage in routes
@router.get("/subjects")
async def list_subjects(
study_id: str,
site_id: Optional[str] = None,
user: User = Depends(require_permission(Permission.SUBJECT_READ))
):
"""List subjects with site-scoping applied."""
rbac = RBACService()
accessible_sites = await rbac.get_accessible_sites(user.id, study_id)
query = {"study_id": study_id}
if site_id:
if site_id not in accessible_sites:
raise HTTPException(403, "Access denied to this site")
query["site_id"] = site_id
else:
query["site_id"] = {"$in": accessible_sites}
subjects = await db.subjects.find(query).to_list()
return subjects
Portal Pages & Workflows¶
// Portal page structure
interface PortalRoutes {
// Dashboard
'/dashboard': DashboardPage; // Overview metrics, alerts
// Studies (Platform Admin, Study Admin)
'/studies': StudyListPage;
'/studies/:studyId': StudyDetailPage;
'/studies/:studyId/settings': StudySettingsPage;
'/studies/:studyId/batteries': BatteryListPage;
'/studies/:studyId/schedule': ScheduleTemplatePage;
// Sites (Study Admin, Site Coordinator)
'/studies/:studyId/sites': SiteListPage;
'/studies/:studyId/sites/:siteId': SiteDetailPage;
// Subjects (Site Coordinator, Investigator)
'/studies/:studyId/subjects': SubjectListPage;
'/studies/:studyId/subjects/:subjectId': SubjectDetailPage;
'/studies/:studyId/subjects/:subjectId/sessions': SessionListPage;
'/studies/:studyId/subjects/:subjectId/sessions/:sessionId': SessionDetailPage;
// Reports (Investigator, Clinician Reviewer)
'/studies/:studyId/reports': ReportListPage;
'/studies/:studyId/reports/generate': ReportGeneratorPage;
'/studies/:studyId/subjects/:subjectId/report': SubjectReportPage;
// Exports (Data Manager)
'/studies/:studyId/exports': ExportPage;
// Audit (Data Manager, Monitor)
'/studies/:studyId/audit': AuditLogPage;
// Admin (Platform Admin)
'/admin/users': UserManagementPage;
'/admin/roles': RoleManagementPage;
'/admin/system': SystemSettingsPage;
}
// Dashboard components
interface DashboardMetrics {
// Completion status
sessionsToday: number;
sessionsThisWeek: number;
completionRate: number;
// Data quality
pendingSyncCount: number;
failedSyncCount: number;
dataQualityFlags: QualityFlag[];
// Overdue tasks
overdueAssessments: OverdueAssessment[];
upcomingWindows: UpcomingWindow[];
// Site comparison (for multi-site)
siteMetrics: SiteMetric[];
}
// Subject detail page
interface SubjectDetailView {
// Demographics
subjectCode: string;
enrollmentDate: Date;
studyArm: string;
preferredLanguage: string;
// Schedule
scheduleTimeline: ScheduleEvent[];
nextAssessment: ScheduleEvent | null;
// Session history
sessions: SessionSummary[];
// Latest scores (with sparklines)
latestScores: ScoreSummary[];
// Notes & flags
coordinatorNotes: Note[];
qualityFlags: QualityFlag[];
}
Report Generation¶
# server/app/services/report_generator.py
from reportlab.lib import colors
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer, Image
from reportlab.lib.styles import getSampleStyleSheet
from io import BytesIO
from datetime import datetime
from typing import Dict, List, Optional
import json
class ReportGenerator:
"""
Generate clinical reports in multiple formats.
Outputs:
- PDF: Clinical summary for EMR import
- JSON: Structured data for system integration
- FHIR: DiagnosticReport resource (optional)
"""
def __init__(self):
self.styles = getSampleStyleSheet()
async def generate_subject_report(
self,
subject_id: str,
session_id: Optional[str] = None,
format: str = "pdf"
) -> bytes:
"""Generate report for a subject."""
# Load data
subject = await db.subjects.find_one({"id": subject_id})
sessions = await self._load_sessions(subject_id, session_id)
scores = await self._load_scores(sessions)
norms = await self._load_normative_data(subject)
if format == "pdf":
return self._generate_pdf(subject, sessions, scores, norms)
elif format == "json":
return self._generate_json(subject, sessions, scores, norms)
elif format == "fhir":
return self._generate_fhir(subject, sessions, scores, norms)
else:
raise ValueError(f"Unknown format: {format}")
def _generate_pdf(
self,
subject: Dict,
sessions: List[Dict],
scores: Dict,
norms: Dict
) -> bytes:
"""Generate PDF clinical report."""
buffer = BytesIO()
doc = SimpleDocTemplate(buffer, pagesize=letter)
elements = []
# Header
elements.append(Paragraph(
"Cognitive Assessment Report",
self.styles['Title']
))
elements.append(Spacer(1, 12))
# Subject info
elements.append(Paragraph(
f"Subject ID: {subject['subject_code']}",
self.styles['Normal']
))
elements.append(Paragraph(
f"Assessment Date: {sessions[-1]['completed_at'].strftime('%Y-%m-%d')}",
self.styles['Normal']
))
elements.append(Paragraph(
f"Report Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}",
self.styles['Normal']
))
elements.append(Spacer(1, 24))
# Score summary table
elements.append(Paragraph("Score Summary", self.styles['Heading2']))
score_data = [["Domain", "Score", "Percentile", "Interpretation"]]
for domain, domain_scores in scores.items():
score_data.append([
domain,
f"{domain_scores.get('scaled_score', 'N/A')}",
f"{domain_scores.get('percentile', 'N/A')}%",
domain_scores.get('interpretation_band', 'N/A')
])
score_table = Table(score_data, colWidths=[150, 80, 80, 120])
score_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 12),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
elements.append(score_table)
elements.append(Spacer(1, 24))
# Detailed results by domain
elements.append(Paragraph("Detailed Results", self.styles['Heading2']))
for domain, domain_scores in scores.items():
elements.append(Paragraph(domain, self.styles['Heading3']))
if domain_scores.get('subscales'):
for subscale, subscale_score in domain_scores['subscales'].items():
elements.append(Paragraph(
f" {subscale}: {subscale_score}",
self.styles['Normal']
))
elements.append(Spacer(1, 12))
# Footer with provenance
elements.append(Spacer(1, 24))
elements.append(Paragraph(
f"Battery Version: {sessions[-1]['battery_version_id'][:8]}",
self.styles['Italic']
))
elements.append(Paragraph(
f"Scoring Algorithm: v{scores.get('_meta', {}).get('algorithm_version', 'unknown')}",
self.styles['Italic']
))
doc.build(elements)
return buffer.getvalue()
def _generate_json(
self,
subject: Dict,
sessions: List[Dict],
scores: Dict,
norms: Dict
) -> bytes:
"""Generate structured JSON report."""
report = {
"reportType": "CognitiveAssessmentReport",
"version": "1.0.0",
"generatedAt": datetime.utcnow().isoformat(),
"subject": {
"id": subject["subject_code"],
"studyArm": subject.get("study_arm_code"),
},
"assessment": {
"sessionId": sessions[-1]["id"],
"completedAt": sessions[-1]["completed_at"].isoformat(),
"batteryVersionId": sessions[-1]["battery_version_id"],
"language": sessions[-1]["language"],
},
"scores": {
domain: {
"rawScore": s.get("raw_score"),
"scaledScore": s.get("scaled_score"),
"percentile": s.get("percentile"),
"standardError": s.get("standard_error"),
"interpretationBand": s.get("interpretation_band"),
"normGroup": s.get("norm_group"),
}
for domain, s in scores.items()
if not domain.startswith("_")
},
"qualityIndicators": {
"completionRate": self._calculate_completion_rate(sessions[-1]),
"timingFlags": sessions[-1].get("timing_flags", []),
"validityFlags": scores.get("_meta", {}).get("validity_flags", []),
},
"provenance": {
"scoringAlgorithmVersion": scores.get("_meta", {}).get("algorithm_version"),
"normativeDataVersion": norms.get("version"),
"reportGeneratorVersion": "1.0.0",
}
}
return json.dumps(report, indent=2).encode()
def _generate_fhir(
self,
subject: Dict,
sessions: List[Dict],
scores: Dict,
norms: Dict
) -> bytes:
"""Generate FHIR DiagnosticReport resource."""
# FHIR R4 DiagnosticReport
fhir_report = {
"resourceType": "DiagnosticReport",
"id": sessions[-1]["id"],
"status": "final",
"category": [{
"coding": [{
"system": "http://terminology.hl7.org/CodeSystem/v2-0074",
"code": "NRS",
"display": "Neuropsychological Report"
}]
}],
"code": {
"coding": [{
"system": "http://loinc.org",
"code": "96115-2",
"display": "Cognitive function assessment"
}]
},
"subject": {
"identifier": {
"value": subject["subject_code"]
}
},
"effectiveDateTime": sessions[-1]["completed_at"].isoformat(),
"issued": datetime.utcnow().isoformat(),
"result": [
self._score_to_fhir_observation(domain, score)
for domain, score in scores.items()
if not domain.startswith("_")
]
}
return json.dumps(fhir_report, indent=2).encode()
def _score_to_fhir_observation(self, domain: str, score: Dict) -> Dict:
"""Convert a score to FHIR Observation reference."""
return {
"reference": f"Observation/{domain}",
"display": f"{domain}: {score.get('scaled_score', 'N/A')}"
}
F. Scheduling/Reminders¶
Scheduling Model¶
# server/app/models/schedule.py
from datetime import date, datetime, timedelta
from typing import List, Optional
from pydantic import BaseModel
from enum import Enum
class EventStatus(Enum):
SCHEDULED = "scheduled" # Future, not yet in window
OPEN = "open" # In window, awaiting completion
COMPLETED = "completed" # Session completed
MISSED = "missed" # Window closed without completion
CANCELLED = "cancelled" # Manually cancelled
class ScheduleEventDefinition(BaseModel):
"""Definition of an event in a schedule template."""
event_code: str
name: str
battery_version_id: str
day_offset: int # Days from anchor
window_before_days: int = 0 # Days before scheduled date
window_after_days: int = 7 # Days after scheduled date
required: bool = True
depends_on: Optional[str] = None # Previous event that must complete
class ReminderDefinition(BaseModel):
"""Definition of a reminder in the schedule."""
offset_hours: int # Hours from window start (negative = before)
channel: str # email, sms, push
template: str # Template code
class ScheduleTemplate(BaseModel):
"""Complete schedule template for a study arm."""
events: List[ScheduleEventDefinition]
reminders: List[ReminderDefinition]
escalation_after_hours: int = 72
escalation_notify_coordinator: bool = True
class ScheduleService:
"""
Manages assessment schedules for subjects.
"""
async def create_subject_schedule(
self,
subject_id: str,
template_id: str,
anchor_date: date
) -> List[str]:
"""
Instantiate a schedule for a subject from a template.
Returns list of created event IDs.
"""
template = await db.schedule_templates.find_one({"id": template_id})
subject = await db.subjects.find_one({"id": subject_id})
event_ids = []
completed_events = {} # Track for dependencies
for event_def in template["events"]:
# Calculate dates
scheduled_date = anchor_date + timedelta(days=event_def["day_offset"])
window_start = scheduled_date - timedelta(days=event_def["window_before_days"])
window_end = scheduled_date + timedelta(days=event_def["window_after_days"])
# Check dependency
depends_on_event_id = None
if event_def.get("depends_on"):
depends_on_event_id = completed_events.get(event_def["depends_on"])
# Create event
event = {
"id": generate_uuid(),
"subject_id": subject_id,
"schedule_template_id": template_id,
"event_code": event_def["event_code"],
"event_name": event_def["name"],
"battery_version_id": event_def["battery_version_id"],
"anchor_date": anchor_date,
"scheduled_date": scheduled_date,
"window_start": window_start,
"window_end": window_end,
"depends_on_event_id": depends_on_event_id,
"status": EventStatus.SCHEDULED.value,
"created_at": datetime.utcnow()
}
await db.schedule_events.insert_one(event)
event_ids.append(event["id"])
completed_events[event_def["event_code"]] = event["id"]
# Schedule reminders
await self._schedule_reminders(event, template["reminders"])
return event_ids
async def _schedule_reminders(
self,
event: Dict,
reminder_defs: List[Dict]
) -> None:
"""Schedule all reminders for an event."""
for reminder_def in reminder_defs:
scheduled_for = datetime.combine(
event["window_start"],
datetime.min.time()
) + timedelta(hours=reminder_def["offset_hours"])
# Don't schedule reminders in the past
if scheduled_for <= datetime.utcnow():
continue
reminder = {
"id": generate_uuid(),
"subject_id": event["subject_id"],
"schedule_event_id": event["id"],
"channel": reminder_def["channel"],
"template_code": reminder_def["template"],
"scheduled_for": scheduled_for,
"status": "pending",
"created_at": datetime.utcnow()
}
await db.notifications.insert_one(reminder)
async def check_windows(self) -> None:
"""
Periodic task to update event statuses based on windows.
Called by scheduler (e.g., every hour).
"""
now = datetime.utcnow()
today = now.date()
# Open events where window has started
await db.schedule_events.update_many(
{
"status": EventStatus.SCHEDULED.value,
"window_start": {"$lte": today}
},
{"$set": {"status": EventStatus.OPEN.value}}
)
# Miss events where window has closed without completion
await db.schedule_events.update_many(
{
"status": EventStatus.OPEN.value,
"window_end": {"$lt": today}
},
{"$set": {"status": EventStatus.MISSED.value}}
)
async def mark_completed(
self,
event_id: str,
session_id: str
) -> None:
"""Mark event as completed when session finishes."""
await db.schedule_events.update_one(
{"id": event_id},
{
"$set": {
"status": EventStatus.COMPLETED.value,
"session_id": session_id,
"completed_at": datetime.utcnow()
}
}
)
# Cancel pending reminders
await db.notifications.update_many(
{
"schedule_event_id": event_id,
"status": "pending"
},
{"$set": {"status": "cancelled"}}
)
Job Queue Implementation¶
# server/app/jobs/scheduler.py
from celery import Celery
from celery.schedules import crontab
from datetime import datetime, timedelta
from typing import Optional
# Celery configuration
app = Celery('cognitive_platform')
app.config_from_object('app.config.celery_config')
# Periodic tasks
app.conf.beat_schedule = {
'check-schedule-windows': {
'task': 'app.jobs.scheduler.check_windows',
'schedule': crontab(minute=0), # Every hour
},
'process-notifications': {
'task': 'app.jobs.scheduler.process_pending_notifications',
'schedule': crontab(minute='*/5'), # Every 5 minutes
},
'retry-failed-syncs': {
'task': 'app.jobs.scheduler.retry_failed_syncs',
'schedule': crontab(minute='*/15'), # Every 15 minutes
},
'cleanup-expired-sessions': {
'task': 'app.jobs.scheduler.cleanup_expired_sessions',
'schedule': crontab(hour=2, minute=0), # Daily at 2 AM
},
}
@app.task(bind=True, max_retries=3)
def check_windows(self):
"""Check and update schedule event windows."""
try:
from app.services.schedule import ScheduleService
import asyncio
service = ScheduleService()
asyncio.run(service.check_windows())
except Exception as e:
self.retry(exc=e, countdown=60)
@app.task(bind=True, max_retries=3)
def process_pending_notifications(self):
"""Process and send pending notifications."""
try:
import asyncio
asyncio.run(_process_notifications())
except Exception as e:
self.retry(exc=e, countdown=60)
async def _process_notifications():
"""Internal notification processing."""
now = datetime.utcnow()
# Get pending notifications due for sending
pending = await db.notifications.find({
"status": "pending",
"scheduled_for": {"$lte": now}
}).limit(100).to_list()
for notification in pending:
try:
await send_notification.delay(notification["id"])
except Exception as e:
logger.error(f"Failed to queue notification {notification['id']}: {e}")
@app.task(bind=True, max_retries=5)
def send_notification(self, notification_id: str):
"""Send a single notification."""
try:
import asyncio
asyncio.run(_send_notification(notification_id))
except Exception as e:
# Exponential backoff
countdown = 60 * (2 ** self.request.retries)
self.retry(exc=e, countdown=countdown)
async def _send_notification(notification_id: str):
"""Internal notification sending."""
notification = await db.notifications.find_one({"id": notification_id})
if not notification or notification["status"] != "pending":
return
# Mark as sending
await db.notifications.update_one(
{"id": notification_id},
{"$set": {"status": "sending", "attempts": notification.get("attempts", 0) + 1}}
)
# Get subject and render template
subject = await db.subjects.find_one({"id": notification["subject_id"]})
template = await load_template(notification["template_code"], subject["preferred_language"])
# Get contact info
contact = await get_decrypted_contact(subject, notification["channel"])
# Send via appropriate channel
if notification["channel"] == "email":
result = await send_email(contact, template)
elif notification["channel"] == "sms":
result = await send_sms(contact, template)
elif notification["channel"] == "push":
result = await send_push(subject["id"], template)
else:
raise ValueError(f"Unknown channel: {notification['channel']}")
# Update status
await db.notifications.update_one(
{"id": notification_id},
{
"$set": {
"status": "sent" if result.success else "failed",
"sent_at": datetime.utcnow() if result.success else None,
"external_message_id": result.message_id,
"last_error": result.error
}
}
)
Notification Templates¶
# server/app/services/notification_templates.py
from jinja2 import Environment, FileSystemLoader
from typing import Dict
class NotificationTemplateService:
"""
Manages localized notification templates.
"""
def __init__(self):
self.env = Environment(
loader=FileSystemLoader('templates/notifications'),
autoescape=True
)
async def render(
self,
template_code: str,
language: str,
context: Dict
) -> RenderedTemplate:
"""Render a notification template with context."""
# Load template for language (with fallback)
try:
template = self.env.get_template(f"{template_code}/{language}.html")
except:
template = self.env.get_template(f"{template_code}/en.html")
# Get subject and body templates
subject_template = self.env.get_template(f"{template_code}/{language}_subject.txt")
return RenderedTemplate(
subject=subject_template.render(**context),
body_html=template.render(**context),
body_text=self._html_to_text(template.render(**context))
)
# Example template: templates/notifications/reminder_24h/en.html
"""
<!DOCTYPE html>
<html>
<head>
<style>
.button { background-color: #4CAF50; color: white; padding: 14px 20px; text-decoration: none; border-radius: 4px; }
</style>
</head>
<body>
<h2>Assessment Reminder</h2>
<p>Hello,</p>
<p>This is a reminder that your {{ assessment_name }} assessment is due tomorrow.</p>
<p>Window: {{ window_start }} - {{ window_end }}</p>
<p>
<a href="{{ assessment_url }}" class="button">Start Assessment</a>
</p>
<p>If you have any questions, please contact your study coordinator.</p>
<p>Thank you for your participation!</p>
</body>
</html>
"""
# French version: templates/notifications/reminder_24h/fr.html
"""
<!DOCTYPE html>
<html>
<head>
<style>
.button { background-color: #4CAF50; color: white; padding: 14px 20px; text-decoration: none; border-radius: 4px; }
</style>
</head>
<body>
<h2>Rappel d'évaluation</h2>
<p>Bonjour,</p>
<p>Ceci est un rappel que votre évaluation {{ assessment_name }} est prévue pour demain.</p>
<p>Période: {{ window_start }} - {{ window_end }}</p>
<p>
<a href="{{ assessment_url }}" class="button">Commencer l'évaluation</a>
</p>
<p>Si vous avez des questions, veuillez contacter votre coordonnateur d'étude.</p>
<p>Merci de votre participation!</p>
</body>
</html>
"""
G. CAT Engine¶
IRT Implementation Approach¶
Given the complexity of IRT and CAT, I recommend a hybrid approach:
- Use existing open-source IRT library for core estimation (catsim in Python)
- Build custom orchestration for the platform-specific requirements
- Implement offline-capable client with cached item bank
# server/app/services/cat_engine.py
import numpy as np
from catsim.initialization import RandomInitializer
from catsim.selection import MaxInfoSelector
from catsim.estimation import NumericalSearchEstimator
from catsim.stopping import MaxItemStopper, MinErrorStopper
from catsim.cat import generate_item_bank
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
@dataclass
class CATItem:
item_id: str
text: Dict[str, str] # Localized text
options: List[Dict] # Response options
parameters: Dict # IRT parameters (a, b, c or thresholds)
content_area: Optional[str] # For content balancing
@dataclass
class CATState:
theta: float # Current ability estimate
se: float # Standard error
items_administered: List[str] # Item IDs
responses: List[int] # Response values
step: int # Current step
@dataclass
class CATConfig:
min_items: int = 4
max_items: int = 12
min_se: float = 0.3 # Stop when SE below this
initial_theta: float = 0.0
content_balancing: bool = False
content_constraints: Optional[Dict] = None
class CATEngine:
"""
Computer Adaptive Testing engine for PROMIS-style item banks.
Supports:
- Graded Response Model (GRM) for polytomous items
- Maximum information item selection
- Multiple stopping rules
- Content balancing (optional)
- Full decision audit trail
"""
def __init__(self, config: CATConfig):
self.config = config
self.initializer = RandomInitializer()
self.selector = MaxInfoSelector()
self.estimator = NumericalSearchEstimator()
def initialize(self, item_bank: List[CATItem]) -> CATState:
"""Initialize a new CAT session."""
return CATState(
theta=self.config.initial_theta,
se=float('inf'),
items_administered=[],
responses=[],
step=0
)
def select_next_item(
self,
state: CATState,
item_bank: List[CATItem],
administered_items: List[str]
) -> Tuple[CATItem, Dict]:
"""
Select the next item to administer.
Returns:
Tuple of (selected item, selection metadata)
"""
# Filter available items
available = [
item for item in item_bank
if item.item_id not in administered_items
]
if not available:
return None, {"reason": "no_items_available"}
# Convert to catsim format
item_params = self._to_catsim_params(available)
# Apply content balancing if configured
if self.config.content_balancing:
available = self._apply_content_constraints(available, state)
# Select item with maximum information at current theta
best_item = None
best_info = -float('inf')
for item in available:
info = self._calculate_information(item, state.theta)
if info > best_info:
best_info = info
best_item = item
return best_item, {
"theta_at_selection": state.theta,
"information": best_info,
"items_remaining": len(available)
}
def process_response(
self,
state: CATState,
item: CATItem,
response: int
) -> Tuple[CATState, Dict]:
"""
Process a response and update ability estimate.
Returns:
Tuple of (updated state, estimation metadata)
"""
# Add to administered
new_items = state.items_administered + [item.item_id]
new_responses = state.responses + [response]
# Re-estimate theta using all responses
item_params = [
self._get_item_params(item_id, new_items)
for item_id in new_items
]
# EAP or MLE estimation
new_theta, new_se = self._estimate_ability(
item_params,
new_responses
)
new_state = CATState(
theta=new_theta,
se=new_se,
items_administered=new_items,
responses=new_responses,
step=state.step + 1
)
return new_state, {
"theta_before": state.theta,
"theta_after": new_theta,
"se_before": state.se,
"se_after": new_se,
"response": response,
"item_parameters": item.parameters
}
def check_stopping(self, state: CATState) -> Tuple[bool, str]:
"""
Check if stopping criteria are met.
Returns:
Tuple of (should_stop, reason)
"""
# Maximum items
if state.step >= self.config.max_items:
return True, "max_items"
# Minimum SE achieved (after minimum items)
if state.step >= self.config.min_items and state.se <= self.config.min_se:
return True, "min_se"
return False, ""
def _calculate_information(self, item: CATItem, theta: float) -> float:
"""Calculate Fisher information for an item at theta."""
params = item.parameters
if "b" in params and not isinstance(params["b"], list):
# 2PL model
a, b = params["a"], params["b"]
p = 1 / (1 + np.exp(-a * (theta - b)))
return a**2 * p * (1 - p)
else:
# GRM model
a = params["a"]
bs = params["b"] # Threshold parameters
# Calculate category probabilities
probs = self._grm_probabilities(a, bs, theta)
# Information for GRM
info = 0
for k in range(len(probs)):
if probs[k] > 0:
# Derivative of log probability
info += (self._grm_derivative(a, bs, theta, k) ** 2) / probs[k]
return info
def _grm_probabilities(
self,
a: float,
bs: List[float],
theta: float
) -> List[float]:
"""Calculate GRM category probabilities."""
# Cumulative probabilities
cum_probs = [1.0] # P(X >= 0) = 1
for b in bs:
p = 1 / (1 + np.exp(-a * (theta - b)))
cum_probs.append(p)
cum_probs.append(0.0) # P(X >= K+1) = 0
# Category probabilities
probs = []
for k in range(len(cum_probs) - 1):
probs.append(cum_probs[k] - cum_probs[k + 1])
return probs
def _estimate_ability(
self,
item_params: List[Dict],
responses: List[int]
) -> Tuple[float, float]:
"""
Estimate ability using Expected A Posteriori (EAP).
"""
# Quadrature points
quad_points = np.linspace(-4, 4, 61)
# Prior (standard normal)
prior = np.exp(-quad_points**2 / 2) / np.sqrt(2 * np.pi)
# Likelihood
likelihood = np.ones_like(quad_points)
for params, response in zip(item_params, responses):
if isinstance(params["b"], list):
# GRM
for i, theta in enumerate(quad_points):
probs = self._grm_probabilities(params["a"], params["b"], theta)
likelihood[i] *= probs[response]
else:
# 2PL
for i, theta in enumerate(quad_points):
p = 1 / (1 + np.exp(-params["a"] * (theta - params["b"])))
likelihood[i] *= p if response == 1 else (1 - p)
# Posterior
posterior = likelihood * prior
posterior = posterior / np.sum(posterior)
# EAP estimate
theta = np.sum(quad_points * posterior)
# Posterior standard deviation (SE)
se = np.sqrt(np.sum((quad_points - theta)**2 * posterior))
return float(theta), float(se)
def theta_to_tscore(self, theta: float) -> float:
"""Convert IRT theta to T-score (mean=50, SD=10)."""
return 50 + (theta * 10)
# Pseudocode for CAT loop
"""
def run_cat_session(item_bank, config):
engine = CATEngine(config)
state = engine.initialize(item_bank)
traces = []
while True:
# Select next item
item, selection_meta = engine.select_next_item(
state, item_bank, state.items_administered
)
if item is None:
break
# Present item to user and get response
response = present_item_and_wait_for_response(item)
# Process response
state, estimation_meta = engine.process_response(state, item, response)
# Log trace
traces.append({
"step": state.step,
"item_id": item.item_id,
"response": response,
"theta": state.theta,
"se": state.se,
**selection_meta,
**estimation_meta
})
# Check stopping
should_stop, reason = engine.check_stopping(state)
traces[-1]["stopping_check"] = {"should_stop": should_stop, "reason": reason}
if should_stop:
break
return {
"final_theta": state.theta,
"final_se": state.se,
"tscore": engine.theta_to_tscore(state.theta),
"items_administered": state.items_administered,
"traces": traces
}
"""
Offline CAT Client¶
// client/src/cat/OfflineCATEngine.ts
/**
* Client-side CAT engine for offline operation.
*
* The item bank is cached locally, and all estimation
* happens on-device. Results are synced when online.
*/
interface CATItemBank {
bankId: string;
version: string;
irtModel: 'GRM' | '2PL';
items: CATItem[];
thetaToTScoreLookup: Map<number, number>;
}
interface CATItem {
itemId: string;
text: Record<string, string>;
options: Array<{
value: number;
text: Record<string, string>;
}>;
parameters: {
a: number;
b: number | number[]; // Single value for 2PL, array for GRM
c?: number; // Guessing parameter (3PL only)
};
contentArea?: string;
}
interface CATSession {
theta: number;
se: number;
itemsAdministered: string[];
responses: number[];
traces: CATTrace[];
}
interface CATTrace {
step: number;
itemId: string;
thetaBefore: number;
seBefore: number;
thetaAfter: number;
seAfter: number;
response: number;
information: number;
stoppingRuleChecked: string;
stoppingRuleMet: boolean;
}
class OfflineCATEngine {
private itemBank: CATItemBank;
private config: CATConfig;
private session: CATSession;
constructor(itemBank: CATItemBank, config: CATConfig) {
this.itemBank = itemBank;
this.config = config;
this.session = this.initializeSession();
}
private initializeSession(): CATSession {
return {
theta: this.config.initialTheta,
se: Infinity,
itemsAdministered: [],
responses: [],
traces: [],
};
}
/**
* Get next item to administer
*/
getNextItem(): CATItem | null {
// Check if we should stop
const [shouldStop] = this.checkStopping();
if (shouldStop) {
return null;
}
// Get available items
const available = this.itemBank.items.filter(
item => !this.session.itemsAdministered.includes(item.itemId)
);
if (available.length === 0) {
return null;
}
// Select item with maximum information
let bestItem: CATItem | null = null;
let bestInfo = -Infinity;
for (const item of available) {
const info = this.calculateInformation(item, this.session.theta);
if (info > bestInfo) {
bestInfo = info;
bestItem = item;
}
}
return bestItem;
}
/**
* Process a response and update estimates
*/
processResponse(itemId: string, response: number): void {
const item = this.itemBank.items.find(i => i.itemId === itemId);
if (!item) {
throw new Error(`Item not found: ${itemId}`);
}
const thetaBefore = this.session.theta;
const seBefore = this.session.se;
// Add to administered
this.session.itemsAdministered.push(itemId);
this.session.responses.push(response);
// Re-estimate theta
const [newTheta, newSe] = this.estimateAbility();
this.session.theta = newTheta;
this.session.se = newSe;
// Check stopping
const [shouldStop, reason] = this.checkStopping();
// Log trace
this.session.traces.push({
step: this.session.traces.length + 1,
itemId,
thetaBefore,
seBefore,
thetaAfter: newTheta,
seAfter: newSe,
response,
information: this.calculateInformation(item, thetaBefore),
stoppingRuleChecked: reason || 'none',
stoppingRuleMet: shouldStop,
});
}
/**
* Check if we should stop
*/
checkStopping(): [boolean, string] {
const n = this.session.itemsAdministered.length;
if (n >= this.config.maxItems) {
return [true, 'max_items'];
}
if (n >= this.config.minItems && this.session.se <= this.config.minSe) {
return [true, 'min_se'];
}
return [false, ''];
}
/**
* Get final results
*/
getResults(): CATResults {
return {
theta: this.session.theta,
se: this.session.se,
tScore: this.thetaToTScore(this.session.theta),
itemsAdministered: this.session.itemsAdministered,
responses: this.session.responses,
traces: this.session.traces,
bankId: this.itemBank.bankId,
bankVersion: this.itemBank.version,
};
}
/**
* Calculate Fisher information
*/
private calculateInformation(item: CATItem, theta: number): number {
const { a, b } = item.parameters;
if (typeof b === 'number') {
// 2PL model
const p = 1 / (1 + Math.exp(-a * (theta - b)));
return a * a * p * (1 - p);
} else {
// GRM model
const probs = this.grmProbabilities(a, b, theta);
let info = 0;
for (let k = 0; k < probs.length; k++) {
if (probs[k] > 0.001) {
const deriv = this.grmDerivative(a, b, theta, k);
info += (deriv * deriv) / probs[k];
}
}
return info;
}
}
/**
* GRM category probabilities
*/
private grmProbabilities(a: number, bs: number[], theta: number): number[] {
const cumProbs = [1];
for (const b of bs) {
cumProbs.push(1 / (1 + Math.exp(-a * (theta - b))));
}
cumProbs.push(0);
const probs: number[] = [];
for (let k = 0; k < cumProbs.length - 1; k++) {
probs.push(cumProbs[k] - cumProbs[k + 1]);
}
return probs;
}
/**
* EAP ability estimation
*/
private estimateAbility(): [number, number] {
// Quadrature points
const quadPoints: number[] = [];
for (let i = -40; i <= 40; i++) {
quadPoints.push(i / 10);
}
// Prior (standard normal)
const prior = quadPoints.map(t =>
Math.exp(-t * t / 2) / Math.sqrt(2 * Math.PI)
);
// Likelihood
const likelihood = quadPoints.map(() => 1);
for (let i = 0; i < this.session.itemsAdministered.length; i++) {
const item = this.itemBank.items.find(
it => it.itemId === this.session.itemsAdministered[i]
)!;
const response = this.session.responses[i];
const { a, b } = item.parameters;
for (let j = 0; j < quadPoints.length; j++) {
const theta = quadPoints[j];
if (typeof b === 'number') {
// 2PL
const p = 1 / (1 + Math.exp(-a * (theta - b)));
likelihood[j] *= response === 1 ? p : (1 - p);
} else {
// GRM
const probs = this.grmProbabilities(a, b, theta);
likelihood[j] *= probs[response] || 0.001;
}
}
}
// Posterior
const posterior = likelihood.map((l, i) => l * prior[i]);
const sum = posterior.reduce((a, b) => a + b, 0);
const normalized = posterior.map(p => p / sum);
// EAP
let theta = 0;
for (let i = 0; i < quadPoints.length; i++) {
theta += quadPoints[i] * normalized[i];
}
// SE
let variance = 0;
for (let i = 0; i < quadPoints.length; i++) {
variance += Math.pow(quadPoints[i] - theta, 2) * normalized[i];
}
const se = Math.sqrt(variance);
return [theta, se];
}
/**
* Convert theta to T-score
*/
private thetaToTScore(theta: number): number {
// Use lookup table if available
if (this.itemBank.thetaToTScoreLookup) {
const rounded = Math.round(theta * 100) / 100;
const lookup = this.itemBank.thetaToTScoreLookup.get(rounded);
if (lookup !== undefined) {
return lookup;
}
}
// Default linear transformation
return 50 + (theta * 10);
}
}
H. Admin Builder UX¶
Lab.js-Inspired Builder Components¶
// admin/src/components/BatteryBuilder/BatteryBuilder.tsx
import React, { useState, useCallback } from 'react';
import {
DndContext,
closestCenter,
KeyboardSensor,
PointerSensor,
useSensor,
useSensors,
DragEndEvent,
} from '@dnd-kit/core';
import {
arrayMove,
SortableContext,
sortableKeyboardCoordinates,
verticalListSortingStrategy,
} from '@dnd-kit/sortable';
interface BatteryBuilderProps {
batteryId: string;
initialModules: ModuleEntry[];
availableModules: Module[];
onSave: (battery: BatteryDraft) => Promise<void>;
}
export function BatteryBuilder({
batteryId,
initialModules,
availableModules,
onSave,
}: BatteryBuilderProps) {
const [modules, setModules] = useState<ModuleEntry[]>(initialModules);
const [selectedModule, setSelectedModule] = useState<string | null>(null);
const [previewMode, setPreviewMode] = useState<'phone' | 'tablet' | 'desktop'>('tablet');
const [isDirty, setIsDirty] = useState(false);
const sensors = useSensors(
useSensor(PointerSensor),
useSensor(KeyboardSensor, {
coordinateGetter: sortableKeyboardCoordinates,
})
);
const handleDragEnd = useCallback((event: DragEndEvent) => {
const { active, over } = event;
if (over && active.id !== over.id) {
setModules((items) => {
const oldIndex = items.findIndex((i) => i.id === active.id);
const newIndex = items.findIndex((i) => i.id === over.id);
return arrayMove(items, oldIndex, newIndex);
});
setIsDirty(true);
}
}, []);
const handleAddModule = useCallback((moduleId: string) => {
const module = availableModules.find((m) => m.id === moduleId);
if (!module) return;
const newEntry: ModuleEntry = {
id: crypto.randomUUID(),
moduleVersionId: module.latestVersionId,
order: modules.length + 1,
required: true,
branchCondition: null,
};
setModules([...modules, newEntry]);
setIsDirty(true);
}, [modules, availableModules]);
const handleRemoveModule = useCallback((entryId: string) => {
setModules(modules.filter((m) => m.id !== entryId));
setIsDirty(true);
}, [modules]);
const handleUpdateBranching = useCallback((
entryId: string,
condition: BranchCondition | null
) => {
setModules(modules.map((m) =>
m.id === entryId ? { ...m, branchCondition: condition } : m
));
setIsDirty(true);
}, [modules]);
return (
<div className="battery-builder">
{/* Header */}
<div className="builder-header">
<h1>Battery Builder</h1>
<div className="header-actions">
<PreviewModeSelector
mode={previewMode}
onChange={setPreviewMode}
/>
<button
onClick={() => setPreviewOpen(true)}
className="btn-secondary"
>
Preview
</button>
<button
onClick={handleSave}
disabled={!isDirty}
className="btn-primary"
>
Save Draft
</button>
</div>
</div>
<div className="builder-content">
{/* Module Library (Left Panel) */}
<div className="module-library">
<h2>Available Modules</h2>
<input
type="search"
placeholder="Search modules..."
onChange={(e) => setSearchQuery(e.target.value)}
/>
<div className="module-categories">
{Object.entries(groupedModules).map(([category, mods]) => (
<ModuleCategory
key={category}
name={category}
modules={mods}
onAdd={handleAddModule}
/>
))}
</div>
</div>
{/* Timeline (Center) */}
<div className="battery-timeline">
<h2>Battery Timeline</h2>
<p className="timeline-info">
Drag to reorder. Click to configure branching.
</p>
<DndContext
sensors={sensors}
collisionDetection={closestCenter}
onDragEnd={handleDragEnd}
>
<SortableContext
items={modules.map((m) => m.id)}
strategy={verticalListSortingStrategy}
>
{modules.map((entry, index) => (
<SortableModuleCard
key={entry.id}
entry={entry}
index={index}
module={getModuleDetails(entry.moduleVersionId)}
isSelected={selectedModule === entry.id}
onSelect={() => setSelectedModule(entry.id)}
onRemove={() => handleRemoveModule(entry.id)}
/>
))}
</SortableContext>
</DndContext>
{modules.length === 0 && (
<div className="empty-timeline">
<p>Drag modules here to build your battery</p>
</div>
)}
{/* Estimated duration */}
<div className="timeline-summary">
<span>
{modules.length} modules • ~{estimatedDuration} minutes
</span>
</div>
</div>
{/* Configuration Panel (Right) */}
<div className="config-panel">
{selectedModule ? (
<ModuleConfigPanel
entry={modules.find((m) => m.id === selectedModule)!}
onUpdateBranching={handleUpdateBranching}
allModules={modules}
/>
) : (
<BatterySettingsPanel
batteryId={batteryId}
deliveryConfig={deliveryConfig}
onUpdateConfig={setDeliveryConfig}
/>
)}
</div>
</div>
{/* Preview Modal */}
<PreviewModal
isOpen={previewOpen}
onClose={() => setPreviewOpen(false)}
battery={{ modules, deliveryConfig }}
mode={previewMode}
/>
</div>
);
}
Branching Rules Editor¶
// admin/src/components/BatteryBuilder/BranchingEditor.tsx
interface BranchCondition {
type: 'score_threshold' | 'response_value' | 'age_range' | 'random' | 'compound';
// Type-specific fields
moduleCode?: string;
scoreType?: string;
threshold?: number;
comparison?: 'gt' | 'gte' | 'lt' | 'lte' | 'eq';
itemId?: string;
expectedValue?: string | number;
minAge?: number;
maxAge?: number;
probability?: number;
// For compound conditions
operator?: 'and' | 'or';
conditions?: BranchCondition[];
}
function BranchingEditor({
condition,
onChange,
availableModules,
}: BranchingEditorProps) {
const [conditionType, setConditionType] = useState(condition?.type || 'score_threshold');
return (
<div className="branching-editor">
<h3>Conditional Display</h3>
<p className="help-text">
This module will only be shown if the condition is met.
</p>
<div className="condition-type-selector">
<label>Condition Type</label>
<select
value={conditionType}
onChange={(e) => setConditionType(e.target.value as any)}
>
<option value="">Always show</option>
<option value="score_threshold">Score threshold</option>
<option value="response_value">Response value</option>
<option value="age_range">Age range</option>
<option value="random">Random sample</option>
<option value="compound">Compound (AND/OR)</option>
</select>
</div>
{conditionType === 'score_threshold' && (
<ScoreThresholdEditor
condition={condition}
onChange={onChange}
modules={availableModules}
/>
)}
{conditionType === 'response_value' && (
<ResponseValueEditor
condition={condition}
onChange={onChange}
modules={availableModules}
/>
)}
{conditionType === 'age_range' && (
<AgeRangeEditor
condition={condition}
onChange={onChange}
/>
)}
{conditionType === 'random' && (
<RandomSampleEditor
condition={condition}
onChange={onChange}
/>
)}
{conditionType === 'compound' && (
<CompoundConditionEditor
condition={condition}
onChange={onChange}
modules={availableModules}
/>
)}
{/* Preview logic */}
{condition && (
<div className="condition-preview">
<h4>Logic Preview</h4>
<code>{conditionToReadableString(condition)}</code>
</div>
)}
</div>
);
}
function ScoreThresholdEditor({ condition, onChange, modules }: any) {
return (
<div className="score-threshold-editor">
<div className="form-group">
<label>Source Module</label>
<select
value={condition?.moduleCode || ''}
onChange={(e) => onChange({ ...condition, moduleCode: e.target.value })}
>
<option value="">Select module...</option>
{modules.map((m: any) => (
<option key={m.code} value={m.code}>{m.name}</option>
))}
</select>
</div>
<div className="form-group">
<label>Score Type</label>
<select
value={condition?.scoreType || ''}
onChange={(e) => onChange({ ...condition, scoreType: e.target.value })}
>
<option value="total">Total Score</option>
<option value="theta">Theta (CAT)</option>
<option value="tscore">T-Score</option>
<option value="percentile">Percentile</option>
</select>
</div>
<div className="form-row">
<div className="form-group">
<label>Comparison</label>
<select
value={condition?.comparison || 'gte'}
onChange={(e) => onChange({ ...condition, comparison: e.target.value })}
>
<option value="gt">Greater than</option>
<option value="gte">Greater than or equal</option>
<option value="lt">Less than</option>
<option value="lte">Less than or equal</option>
<option value="eq">Equal to</option>
</select>
</div>
<div className="form-group">
<label>Threshold</label>
<input
type="number"
value={condition?.threshold || 0}
onChange={(e) => onChange({ ...condition, threshold: Number(e.target.value) })}
/>
</div>
</div>
</div>
);
}
Version/Release Workflow¶
// admin/src/components/BatteryBuilder/PublishWorkflow.tsx
interface PublishWorkflowProps {
battery: BatteryDraft;
existingVersions: BatteryVersion[];
onPublish: (version: string, notes: string) => Promise<void>;
}
function PublishWorkflow({
battery,
existingVersions,
onPublish,
}: PublishWorkflowProps) {
const [version, setVersion] = useState(suggestNextVersion(existingVersions));
const [releaseNotes, setReleaseNotes] = useState('');
const [validationResults, setValidationResults] = useState<ValidationResult | null>(null);
const [isValidating, setIsValidating] = useState(false);
const [isPublishing, setIsPublishing] = useState(false);
const handleValidate = async () => {
setIsValidating(true);
try {
const results = await validateBattery(battery);
setValidationResults(results);
} finally {
setIsValidating(false);
}
};
const handlePublish = async () => {
if (!validationResults?.isValid) return;
setIsPublishing(true);
try {
await onPublish(version, releaseNotes);
} finally {
setIsPublishing(false);
}
};
return (
<div className="publish-workflow">
<h2>Publish Battery Version</h2>
{/* Version selection */}
<div className="form-group">
<label>Version Number</label>
<input
type="text"
value={version}
onChange={(e) => setVersion(e.target.value)}
pattern="\d+\.\d+\.\d+"
placeholder="1.0.0"
/>
<p className="help-text">
Latest: {existingVersions[0]?.version || 'None'}
</p>
</div>
{/* Release notes */}
<div className="form-group">
<label>Release Notes</label>
<textarea
value={releaseNotes}
onChange={(e) => setReleaseNotes(e.target.value)}
placeholder="Describe changes in this version..."
rows={4}
/>
</div>
{/* Validation */}
<div className="validation-section">
<button
onClick={handleValidate}
disabled={isValidating}
className="btn-secondary"
>
{isValidating ? 'Validating...' : 'Validate Battery'}
</button>
{validationResults && (
<ValidationResultsPanel results={validationResults} />
)}
</div>
{/* Publish button */}
<div className="publish-actions">
<button
onClick={handlePublish}
disabled={!validationResults?.isValid || isPublishing}
className="btn-primary"
>
{isPublishing ? 'Publishing...' : 'Publish Version'}
</button>
</div>
{/* Warning about immutability */}
<div className="warning-box">
<strong>Important:</strong> Published versions are immutable.
Once published, this battery configuration cannot be changed.
Create a new version for any modifications.
</div>
</div>
);
}
async function validateBattery(battery: BatteryDraft): Promise<ValidationResult> {
const errors: ValidationError[] = [];
const warnings: ValidationWarning[] = [];
// Check for empty battery
if (battery.modules.length === 0) {
errors.push({
type: 'error',
code: 'EMPTY_BATTERY',
message: 'Battery must contain at least one module',
});
}
// Check module versions exist and are published
for (const entry of battery.modules) {
const version = await getModuleVersion(entry.moduleVersionId);
if (!version) {
errors.push({
type: 'error',
code: 'INVALID_MODULE_VERSION',
message: `Module version ${entry.moduleVersionId} not found`,
});
} else if (version.status !== 'published') {
errors.push({
type: 'error',
code: 'UNPUBLISHED_MODULE',
message: `Module "${version.name}" is not published`,
});
}
}
// Check branching references
for (const entry of battery.modules) {
if (entry.branchCondition?.moduleCode) {
const referencedModule = battery.modules.find(
(m) => getModuleDetails(m.moduleVersionId).code === entry.branchCondition!.moduleCode
);
if (!referencedModule) {
errors.push({
type: 'error',
code: 'INVALID_BRANCH_REFERENCE',
message: `Branch condition references module "${entry.branchCondition.moduleCode}" which is not in the battery`,
});
}
}
}
// Check for circular dependencies
const circularDeps = detectCircularDependencies(battery.modules);
if (circularDeps.length > 0) {
errors.push({
type: 'error',
code: 'CIRCULAR_DEPENDENCY',
message: `Circular dependency detected: ${circularDeps.join(' -> ')}`,
});
}
// Warnings
if (battery.modules.length > 10) {
warnings.push({
type: 'warning',
code: 'LONG_BATTERY',
message: `Battery has ${battery.modules.length} modules. Consider if this is too long for participants.`,
});
}
const estimatedDuration = calculateEstimatedDuration(battery.modules);
if (estimatedDuration > 60) {
warnings.push({
type: 'warning',
code: 'LONG_DURATION',
message: `Estimated duration is ${estimatedDuration} minutes. This may cause fatigue.`,
});
}
return {
isValid: errors.length === 0,
errors,
warnings,
};
}
Minimal Viable Features & Phased Roadmap¶
Phase 1: MVP Builder (4-6 weeks) - [ ] Module registry browsing and search - [ ] Drag-and-drop timeline ordering - [ ] Basic module configuration (required/optional) - [ ] Save draft functionality - [ ] Simple publish workflow - [ ] Tablet preview mode
Phase 2: Enhanced Builder (4-6 weeks) - [ ] Branching rules editor (score threshold, response value) - [ ] Version comparison view - [ ] Multi-language preview - [ ] Device-specific preview modes - [ ] Delivery configuration UI - [ ] Validation with error/warning display
Phase 3: Advanced Features (4-6 weeks) - [ ] Compound branching conditions (AND/OR) - [ ] Module duplication and templates - [ ] Battery import/export (JSON) - [ ] Visual branching flow diagram - [ ] A/B testing configuration - [ ] Collaborative editing (real-time)
I. Security, Privacy, and Compliance¶
Threat Model¶
| Threat | Impact | Likelihood | Mitigation |
|---|---|---|---|
| Token leakage (access tokens exposed) | High - unauthorized session access | Medium | Short-lived tokens (2h), single-use, hash storage |
| REDCap token exposure | Critical - full REDCap access | Low | Server-side only, encrypted at rest, audit logging |
| Replay attacks | Medium - duplicate submissions | Medium | Idempotency keys, event sequence validation |
| Data tampering (client-side) | High - invalid research data | Medium | Server-side validation, content hashing, audit trails |
| Offline device loss | Medium - local data exposure | Medium | Encryption at rest (WebCrypto), PIN/biometric unlock |
| Session hijacking | High - impersonation | Low | HTTPS only, secure cookies, device binding |
| SQL injection | Critical - data breach | Low | Parameterized queries, ORM usage |
| XSS | Medium - session theft | Low | CSP headers, input sanitization, React auto-escaping |
| Unauthorized access | High - data breach | Medium | RBAC, site scoping, audit logging |
Security Controls¶
# server/app/security/controls.py
from fastapi import Request, HTTPException
from fastapi.security import HTTPBearer
import hashlib
import hmac
import time
from typing import Optional
# 1. Access Token Security
class AccessTokenManager:
"""
Secure access token management for assessment sessions.
Tokens are:
- Cryptographically random (256 bits)
- Single-use (cannot be reused after session completion)
- Time-limited (configurable, default 2 hours)
- Hashed in storage (SHA-256)
"""
def generate_token(self) -> tuple[str, str]:
"""Generate a new access token and its hash."""
import secrets
token = secrets.token_urlsafe(32) # 256 bits
token_hash = hashlib.sha256(token.encode()).hexdigest()
return token, token_hash
def verify_token(self, token: str, stored_hash: str) -> bool:
"""Verify a token against its stored hash."""
computed_hash = hashlib.sha256(token.encode()).hexdigest()
return hmac.compare_digest(computed_hash, stored_hash)
# 2. Signed URL Generation
class SignedURLGenerator:
"""
Generate signed URLs for battery delivery.
URLs include:
- Session ID
- Expiration timestamp
- Content hash (battery version)
- HMAC signature
"""
def __init__(self, secret_key: str):
self.secret_key = secret_key.encode()
def generate_url(
self,
session_id: str,
battery_hash: str,
expires_in_seconds: int = 7200
) -> str:
expires_at = int(time.time()) + expires_in_seconds
# Build payload
payload = f"{session_id}:{battery_hash}:{expires_at}"
# Generate signature
signature = hmac.new(
self.secret_key,
payload.encode(),
hashlib.sha256
).hexdigest()[:16] # Truncate for URL friendliness
return f"/assess/{session_id}?h={battery_hash[:8]}&e={expires_at}&s={signature}"
def verify_url(
self,
session_id: str,
battery_hash: str,
expires_at: int,
signature: str
) -> bool:
# Check expiration
if time.time() > expires_at:
return False
# Verify signature
payload = f"{session_id}:{battery_hash}:{expires_at}"
expected_signature = hmac.new(
self.secret_key,
payload.encode(),
hashlib.sha256
).hexdigest()[:16]
return hmac.compare_digest(signature, expected_signature)
# 3. Device Fingerprinting (Optional)
class DeviceFingerprint:
"""
Optional device binding for controlled delivery.
Collects non-PII device characteristics to detect
if a session is being accessed from a different device.
"""
@staticmethod
def compute(request: Request, client_info: dict) -> str:
"""Compute a device fingerprint from request and client info."""
components = [
request.headers.get("user-agent", ""),
client_info.get("screen_width", ""),
client_info.get("screen_height", ""),
client_info.get("timezone", ""),
client_info.get("language", ""),
# Note: NOT including IP address (too volatile)
]
combined = "|".join(str(c) for c in components)
return hashlib.sha256(combined.encode()).hexdigest()[:16]
# 4. Audit Logger
class AuditLogger:
"""
Immutable audit logging with integrity verification.
"""
async def log(
self,
action: str,
user_id: Optional[str] = None,
session_token_hash: Optional[str] = None,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
details: Optional[dict] = None,
request: Optional[Request] = None
) -> None:
# Get previous entry hash for chain
previous = await db.audit_log.find_one(
sort=[("id", -1)]
)
previous_hash = previous["entry_hash"] if previous else "genesis"
# Build entry
entry = {
"user_id": user_id,
"session_token_hash": session_token_hash,
"ip_address": request.client.host if request else None,
"user_agent": request.headers.get("user-agent") if request else None,
"action": action,
"resource_type": resource_type,
"resource_id": resource_id,
"details": details or {},
"previous_hash": previous_hash,
"created_at": datetime.utcnow()
}
# Compute entry hash
entry_content = json.dumps(entry, sort_keys=True, default=str)
entry["entry_hash"] = hashlib.sha256(entry_content.encode()).hexdigest()
await db.audit_log.insert_one(entry)
# 5. Security Headers Middleware
from starlette.middleware.base import BaseHTTPMiddleware
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
"""Add security headers to all responses."""
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
# Content Security Policy
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline'; " # jsPsych needs inline
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: blob:; "
"connect-src 'self' https://api.sendgrid.com; "
"frame-ancestors 'none';"
)
# Other security headers
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
response.headers["Permissions-Policy"] = "geolocation=(), microphone=(), camera=()"
# HSTS (in production)
if not settings.debug:
response.headers["Strict-Transport-Security"] = (
"max-age=31536000; includeSubDomains"
)
return response
Client-Side Security¶
// client/src/security/OfflineEncryption.ts
/**
* Client-side encryption for offline data.
*
* Uses WebCrypto API for:
* - AES-GCM encryption of stored data
* - Key derivation from user PIN (optional)
*/
class OfflineEncryption {
private key: CryptoKey | null = null;
/**
* Initialize encryption with a derived key.
*
* For maximum security, derive from a user-provided PIN.
* For convenience mode, use a device-bound key.
*/
async initialize(mode: 'pin' | 'device', pin?: string): Promise<void> {
if (mode === 'pin' && pin) {
this.key = await this.deriveKeyFromPin(pin);
} else {
this.key = await this.getOrCreateDeviceKey();
}
}
private async deriveKeyFromPin(pin: string): Promise<CryptoKey> {
const encoder = new TextEncoder();
const pinData = encoder.encode(pin);
// Import PIN as key material
const keyMaterial = await crypto.subtle.importKey(
'raw',
pinData,
'PBKDF2',
false,
['deriveBits', 'deriveKey']
);
// Get or create salt
let salt = localStorage.getItem('encryption_salt');
if (!salt) {
const saltArray = crypto.getRandomValues(new Uint8Array(16));
salt = btoa(String.fromCharCode(...saltArray));
localStorage.setItem('encryption_salt', salt);
}
const saltBytes = Uint8Array.from(atob(salt), c => c.charCodeAt(0));
// Derive key using PBKDF2
return crypto.subtle.deriveKey(
{
name: 'PBKDF2',
salt: saltBytes,
iterations: 100000,
hash: 'SHA-256',
},
keyMaterial,
{ name: 'AES-GCM', length: 256 },
false,
['encrypt', 'decrypt']
);
}
private async getOrCreateDeviceKey(): Promise<CryptoKey> {
// Try to load existing key
const storedKey = localStorage.getItem('device_key');
if (storedKey) {
const keyData = Uint8Array.from(atob(storedKey), c => c.charCodeAt(0));
return crypto.subtle.importKey(
'raw',
keyData,
{ name: 'AES-GCM', length: 256 },
false,
['encrypt', 'decrypt']
);
}
// Generate new key
const key = await crypto.subtle.generateKey(
{ name: 'AES-GCM', length: 256 },
true,
['encrypt', 'decrypt']
);
// Export and store
const exported = await crypto.subtle.exportKey('raw', key);
const exportedStr = btoa(String.fromCharCode(...new Uint8Array(exported)));
localStorage.setItem('device_key', exportedStr);
return key;
}
async encrypt(data: string): Promise<string> {
if (!this.key) throw new Error('Encryption not initialized');
const encoder = new TextEncoder();
const iv = crypto.getRandomValues(new Uint8Array(12));
const encrypted = await crypto.subtle.encrypt(
{ name: 'AES-GCM', iv },
this.key,
encoder.encode(data)
);
// Combine IV and ciphertext
const combined = new Uint8Array(iv.length + encrypted.byteLength);
combined.set(iv);
combined.set(new Uint8Array(encrypted), iv.length);
return btoa(String.fromCharCode(...combined));
}
async decrypt(encryptedData: string): Promise<string> {
if (!this.key) throw new Error('Encryption not initialized');
const combined = Uint8Array.from(atob(encryptedData), c => c.charCodeAt(0));
const iv = combined.slice(0, 12);
const ciphertext = combined.slice(12);
const decrypted = await crypto.subtle.decrypt(
{ name: 'AES-GCM', iv },
this.key,
ciphertext
);
const decoder = new TextDecoder();
return decoder.decode(decrypted);
}
}
Compliance Checklist¶
PHIPA/HIPAA Alignment: - [ ] Data minimization: Only collect necessary data - [ ] Encryption in transit: HTTPS/TLS 1.3 required - [ ] Encryption at rest: Database encryption, client-side encryption - [ ] Access controls: RBAC with site scoping - [ ] Audit trails: Immutable, tamper-evident logs - [ ] Data retention: Configurable retention policies - [ ] Breach notification: Alerting for suspicious activity
21 CFR Part 11 Alignment (if required): - [ ] Electronic signatures: User authentication for data submission - [ ] Audit trails: Timestamped, immutable records - [ ] System validation: Documented testing procedures - [ ] Data integrity: Checksums, version control - [ ] Access controls: Role-based, time-limited
J. Concrete Implementation Plan¶
Phase 1: MVP Foundation (8-10 weeks)¶
Goal: Basic battery authoring, execution, data capture, and minimal portal.
Sprint 1-2: Core Infrastructure¶
- [ ] Set up monorepo structure (client, admin, server)
- [ ] Configure PostgreSQL schema (core tables)
- [ ] Set up Redis for caching/queue
- [ ] Implement authentication (JWT + refresh)
- [ ] Create base API structure with FastAPI
- [ ] Set up CI/CD pipeline
Sprint 3-4: Module Registry & Battery Authoring¶
- [ ] Module registration API
- [ ] Module version management
- [ ] Battery CRUD operations
- [ ] Battery version compilation
- [ ] Basic admin UI for module browsing
- [ ] Simple timeline builder (no drag-drop yet)
Sprint 5-6: Battery Runner & Data Capture¶
- [ ] BatteryOrchestrator implementation
- [ ] Wrap existing jsPsych tasks as modules
- [ ] IndexedDB persistence layer
- [ ] Event capture with timing metadata
- [ ] Basic sync queue
- [ ] Session management APIs
Sprint 7-8: Minimal Portal¶
- [ ] User/role management
- [ ] Site management
- [ ] Subject listing (site-scoped)
- [ ] Session history view
- [ ] Basic score display
MVP Deliverables: - Admin can register modules and create batteries - Participant can complete a battery via URL - Data is captured with offline resilience - Portal shows completion status and scores
Phase 2: Integration & Operations (8-10 weeks)¶
Goal: REDCap integration, scheduling, reminders, reporting.
Sprint 9-10: REDCap Integration¶
- [ ] Secure token vault
- [ ] Field mapping configuration UI
- [ ] Push scores to REDCap
- [ ] Pull subject metadata (optional)
- [ ] Sync logging and retry queue
- [ ] Reconciliation reports
Sprint 11-12: Scheduling & Reminders¶
- [ ] Schedule template configuration
- [ ] Subject schedule instantiation
- [ ] Celery/ARQ job queue setup
- [ ] Notification service (email via SendGrid)
- [ ] SMS integration (Twilio)
- [ ] Reminder templates with localization
Sprint 13-14: Enhanced Portal & Reporting¶
- [ ] Dashboard with metrics
- [ ] Overdue assessment alerts
- [ ] Subject detail view with timeline
- [ ] PDF report generation
- [ ] JSON/FHIR export
- [ ] Audit log viewer
Sprint 15-16: Enhanced Builder¶
- [ ] Drag-and-drop timeline
- [ ] Branching rules editor
- [ ] Device preview modes
- [ ] Version comparison
- [ ] Validation workflow
Phase 2 Deliverables: - Automated data flow to REDCap - Longitudinal study scheduling - Participant reminders - Clinical reports for EMR
Phase 3: Advanced Features (8-10 weeks)¶
Goal: CAT support, enhanced builder, compliance features.
Sprint 17-18: CAT Engine¶
- [ ] Item bank management
- [ ] IRT estimation engine (catsim integration)
- [ ] Offline CAT client
- [ ] CAT trace logging
- [ ] Theta-to-T-score conversion
- [ ] CAT module type in builder
Sprint 19-20: Enhanced Builder & Compliance¶
- [ ] Compound branching conditions
- [ ] Visual flow diagram
- [ ] Battery templates
- [ ] Enhanced audit logging
- [ ] Consent management
- [ ] Data retention policies
Sprint 21-22: Mobile App Wrapper¶
- [ ] Capacitor project setup
- [ ] iOS/Android builds
- [ ] Push notification integration
- [ ] Biometric unlock
- [ ] App store preparation
Sprint 23-24: Polish & Documentation¶
- [ ] Performance optimization
- [ ] Accessibility audit (WCAG 2.1 AA)
- [ ] Security penetration testing
- [ ] User documentation
- [ ] API documentation
- [ ] Deployment guides
Phase 3 Deliverables: - PROMIS CAT assessments - Native mobile apps - Compliance-ready platform - Complete documentation
Key Risks & Mitigations¶
| Risk | Impact | Mitigation |
|---|---|---|
| jsPsych wrapper complexity | Timeline delay | Start with 3-5 tasks, iterate |
| CAT engine accuracy | Data quality | Validate against published PROMIS CAT |
| Offline sync edge cases | Data loss | Extensive testing, idempotent endpoints |
| REDCap API variability | Integration failures | Per-site configuration, graceful degradation |
| Mobile app store approval | Deployment delay | Early submission, compliance documentation |
| Timing accuracy concerns | Validity questions | Telemetry collection, QA flagging |
Open Questions for Discussion¶
-
CAT Implementation: Should we use catsim (Python) server-side only, or implement a TypeScript port for full offline capability?
-
Mobile Strategy: PWA only, or native apps (Capacitor) from the start? Native adds complexity but enables push notifications and better offline.
-
Multi-tenancy: Single deployment serving multiple studies, or study-specific deployments? Affects isolation and compliance.
-
Identity Provider: Integrate with existing institutional IdP (SAML/OIDC), or standalone authentication?
-
REDCap as Primary: For studies using REDCap, should REDCap be the source of truth for subject enrollment, or our platform?
-
Consent Flow: Built-in consent module, or assume consent is handled externally (e.g., in REDCap)?
Appendix: API Endpoint Summary¶
# Core Assessment APIs
POST /api/sessions # Start a new assessment session
GET /api/sessions/{id} # Get session status
POST /api/sessions/{id}/events # Submit trial events (batch)
POST /api/sessions/{id}/complete # Mark session complete
GET /api/batteries/{id}/bundle # Download battery bundle
# Admin APIs
GET /api/modules # List modules
POST /api/modules # Register module
GET /api/modules/{id}/versions # List versions
POST /api/modules/{id}/versions # Create version
POST /api/modules/{id}/versions/{v}/publish # Publish version
GET /api/batteries # List batteries
POST /api/batteries # Create battery
GET /api/batteries/{id}/versions # List versions
POST /api/batteries/{id}/versions # Create version
POST /api/batteries/{id}/versions/{v}/publish # Publish version
# Portal APIs
GET /api/studies # List studies
GET /api/studies/{id}/sites # List sites
GET /api/studies/{id}/subjects # List subjects (site-scoped)
GET /api/subjects/{id} # Get subject detail
GET /api/subjects/{id}/sessions # Get sessions
GET /api/subjects/{id}/scores # Get scores
POST /api/subjects/{id}/schedule # Create schedule
GET /api/reports/subject/{id} # Generate subject report
GET /api/exports/study/{id} # Export study data
# Integration APIs
POST /api/redcap/sync/{sessionId} # Push to REDCap
GET /api/redcap/mappings/{studyId} # Get field mappings
PUT /api/redcap/mappings/{studyId} # Update field mappings
# Notification APIs
GET /api/notifications/pending # List pending notifications
POST /api/notifications/send # Manually trigger notification
Document End
This architecture proposal is ready for review. Please provide feedback on specific sections or areas requiring further detail.