REDCap SDK Design Document¶
Production-Grade Python API Interface for REDCap Integration
Version: 1.0 | Date: 2026-01-22
1. Executive Summary¶
- Ecosystem maturity: R has the most mature REDCap tooling (REDCapR, redcapAPI, REDCapTidieR), while Python relies primarily on PyCap with limited alternatives
- Core abstraction pattern: All major libraries use a connection/project object that encapsulates credentials and provides namespaced method access
- Tidy data model (REDCapTidieR's "supertibble") represents the most analysis-friendly output format for complex longitudinal/repeating projects
- Type casting is critical: redcapAPI's
exportRecordsTypeddemonstrates metadata-driven validation and casting as essential for production use - Diff-based writes (redcap-toolbox pattern) significantly reduce API load for incremental sync workflows
- Batching strategies vary: REDCapR uses explicit batch glossaries; d3b-redcap-api uses intelligent chunking; redcapAPI uses
batch.sizeparameter - Validation before write is a core principle in redcapAPI, preventing common import errors through pre-flight checks
- Token security patterns are well-documented in REDCapR and rccola: never log tokens, use environment variables, support least privilege
- Longitudinal/repeating handling requires consistent merge keys:
(record_id, redcap_event_name, redcap_repeat_instrument, redcap_repeat_instance) - Error taxonomy should distinguish transport, auth, validation, and API-specific errors with structured payloads
- Hierarchical data organization (d3b pattern) provides an intermediate representation useful for analysis workflows
- File operations require streaming support for large files, checksum validation, and safe filename handling
- Operational concerns: retry/backoff, rate limiting, correlation IDs, and structured logging are essential for production
- Documentation quality directly correlates with adoption: REDCapR's troubleshooting guide is exemplary
- Our design synthesizes best patterns: PyCap's minimalism, redcapAPI's validation, REDCapTidieR's tidy outputs, redcap-toolbox's efficiency
2. Ecosystem Review¶
2.1 R Packages¶
REDCapR¶
Repository: OuhscBbmc/REDCapR
Core Abstraction: Functional API with connection objects. Primary functions like redcap_read() accept a redcap_uri and token directly or via credential helpers.
Key Functions:
| Function | Purpose |
|----------|---------|
| redcap_read() | Batched record export with automatic stacking |
| redcap_read_oneshot() | Single-call record export |
| redcap_read_eav_oneshot() | Export in entity-attribute-value format |
| redcap_write() | Batched record import |
| redcap_metadata_read/write() | Data dictionary operations |
| redcap_file_download/upload_oneshot() | File attachment operations |
| sanitize_token() | Token validation and cleaning |
| retrieve_credential_local() | Secure local credential storage |
Batching Strategy: Uses create_batch_glossary() to create a dataset guiding batch operations. Batches process subsets of records/fields to avoid timeouts.
Error Handling: Returns structured results with success boolean, raw data, and outcome messages. The troubleshooting guide provides a systematic debugging approach: server → network → library → application.
Security Posture:
- Default SSL certificate verification
- sanitize_token() validates token format before use
- Credential helpers support local encrypted storage
- Documentation explicitly addresses token hygiene
Strengths: Excellent documentation, robust batching, comprehensive troubleshooting guide, mature codebase (MIT license, active maintenance).
redcapAPI¶
Repository: vubiostat/redcapAPI
Core Abstraction: Object-oriented via redcapConnection objects. Methods operate on the connection.
Key Functions:
| Function | Purpose |
|----------|---------|
| exportRecordsTyped() | Type-cast record export with validation |
| importRecords() | Validated record import with pre-flight checks |
| exportMetaData() | Data dictionary export |
| fieldValidationAndCasting | Customizable validation/casting framework |
Type Casting System (via exportRecordsTyped):
- Metadata-driven field typing
- Inversion of control: users can override any casting decision
- Validation functions: valRx (regex), valChoice (choices), valSkip (bypass)
- Cast functions: castRaw, castCode, castLabel, default_cast_no_factor
- Missing data detection precedes validation
- reviewInvalidRecords() generates validation reports with hotlinks
Import Validation (importRecords):
1. Verify all variables exist in data dictionary
2. Confirm record ID presence and position
3. Remove calculated fields automatically
4. Validate date field types (character, POSIXct, Date)
5. Check values against validation limits
6. Write validation failures to logfile
Batching: batch.size parameter controls records per API call.
Strengths: Most sophisticated validation/type-casting system, extensive vignettes, analysis-ready outputs.
REDCapTidieR¶
Repository: CRAN package
Core Abstraction: Returns a "supertibble" — a tibble where each row represents one REDCap instrument.
Key Design:
supertibble
├── redcap_form_name # Instrument identifier
├── redcap_form_label # Human-readable name
├── redcap_data # Nested tibble of observations
├── redcap_metadata # Nested tibble of field definitions
├── row_count # Quick data quality metrics
├── col_count
├── pct_missing
└── form_complete_pct
Longitudinal/Repeating Handling:
- Each instrument's data tibble contains appropriate merge keys
- Non-repeating instruments: (record_id, redcap_event_name)
- Repeating instruments: (record_id, redcap_event_name, redcap_repeat_instrument, redcap_repeat_instance)
- Preserves relational integrity through nesting rather than wide joins
Example: 734 heroes × 5,966 power records cleanly separated rather than cartesian-joined.
Strengths: Most analysis-friendly output format, excellent for complex projects, avoids wide-table explosion.
REDCapDM¶
Publication: PMC10905808
Core Abstraction: Clinical data management workflow tool.
Key Components:
| Function | Purpose |
|----------|---------|
| redcap_data() | Import via API or files |
| rd_transform() | Multi-step preprocessing pipeline |
| rd_query() | Generate missing data/discrepancy reports |
| rd_event() | Detect missing events (REDCap gap) |
| check_queries() | Track query resolution over time |
Transformation Pipeline (rd_transform):
1. Recalculate computed fields for verification
2. Convert checkbox variables to interpretable names
3. Replace variables with factor-format versions
4. Convert branching logic to R syntax
5. Remove completion status/timestamp variables
Query Management: Structured tracking of data quality issues with comparison between successive reports.
Strengths: Purpose-built for clinical trial workflows, addresses real-world data management needs.
rccola¶
Repository: cran/rccola
Purpose: Secure credential management for REDCap tokens.
Key Patterns: - Environment variable storage - OS keychain integration - Separation of read vs. write tokens (least privilege) - Token validation before use
2.2 Python Packages¶
PyCap¶
Repository: redcap-tools/PyCap
Design Philosophy: "Minimal interface exposing all required and optional API parameters... doesn't do anything fancy behind the scenes."
Core Abstraction: Single Project class instantiated with (api_url, api_key).
API Surface:
project = Project(api_url, api_key)
# Records
project.export_records(format_type, records, fields, forms, events, ...)
project.import_records(to_import, import_format, returnFormat)
project.delete_records(records, arm, instrument, event, repeat_instance)
# Metadata
project.export_metadata(format_type, fields, forms)
project.import_metadata(to_import, import_format)
# Files
project.export_file(record, field, event, repeat_instance) → (bytes, dict)
project.import_file(record, field, file_name, file_object, event)
project.delete_file(record, field, event)
# File Repository
project.export_file_repository(folder_id, format_type)
project.export_file_from_repository(doc_id)
project.create_folder_in_repository(name, folder_id, dag_id, role_id)
project.delete_file_from_repository(doc_id)
# Events/Arms (longitudinal)
project.export_events(format_type, arms)
project.import_events(to_import)
project.delete_events(events)
project.export_arms(format_type, arms)
project.import_arms(to_import)
project.delete_arms(arms)
# Users/DAGs
project.export_users(format_type)
project.import_users(to_import)
project.delete_users(users)
project.export_dags(format_type)
project.export_user_dag_assignment(format_type)
# Project Info
project.export_project_info(format_type)
project.export_logging(format_type, log_type, user, record, ...)
project.export_version()
# Properties
project.is_longitudinal # bool
project.def_field # primary key field name
project.field_names # list
project.forms # list
project.metadata # dict
Format Support: JSON, CSV, XML, pandas DataFrame (via df_kwargs).
Repeating Instruments: repeat_instance parameter supported throughout.
Error Handling: RedcapError for API failures, ValueError for invalid inputs.
Strengths: Comprehensive endpoint coverage, clear 1:1 mapping to REDCap API, well-maintained (healthy release cadence, 190 stars, 1,729 weekly downloads).
Weaknesses: No built-in type casting, no validation before write, no batching, no tidy output option.
redcaplite¶
Repository: PyPI
Design Philosophy: "Lightweight, user-friendly Python client... minimal dependencies to keep your environment lean."
Core Abstraction: RedcapClient class with 40+ methods organized as Export/Import/Delete.
Key Features:
- Full type hints
- Pandas integration via pd_read_csv_kwargs
- Minimal dependencies
- Comprehensive test coverage
Trade-offs: Focuses on common endpoints rather than complete API coverage.
redcap-toolbox¶
Repository: PyPI
Core Innovation: Diff-based imports to minimize API load.
Key Methods:
| Command | Purpose |
|---------|---------|
| download_redcap | Export with optional survey timestamps |
| download_redcap_report | Report-based export |
| split_redcap_data | Partition by event/instrument |
| update_redcap_diff | Apply differential updates |
Diff Strategy:
1. Export current state to "original" file
2. Create "modified" cache with changes
3. update_redcap_diff computes minimal delta
4. Only changed fields transmitted to API
Authentication: Environment variables (REDCAP_API_URL, REDCAP_API_TOKEN).
Strengths: Significant API load reduction for incremental sync workflows.
d3b-redcap-api-python¶
Repository: d3b-center/d3b-redcap-api-python
Architecture: Three-tier design: 1. Low-level transport: Generic HTTP handlers 2. API operations: 1:1 REDCap endpoint methods 3. High-level structuring: Hierarchical data organization
Key Innovation: get_records_tree() transforms flat API responses into nested structures organized by event → instrument → record_id → instance.
Batching: get_records employs intelligent chunking to prevent timeout failures on large datasets.
Pandas Integration: to_df(), all_dfs() convert tree structures to DataFrames.
Motivation: Created when PyCap had maintenance gaps; focuses on hierarchical data organization.
3. Capability Matrix¶
| Feature | REDCapR | redcapAPI | REDCapTidieR | REDCapDM | PyCap | redcaplite | redcap-toolbox | d3b-python |
|---|---|---|---|---|---|---|---|---|
| Auth/Connection | Native | Native | Via REDCapR | Via redcapAPI | Native | Native | Env vars | Native |
| Read Records | Native | Native | Native | Native | Native | Native | Native | Native |
| Write Records | Native | Native | No | No | Native | Native | Diff-based | Native |
| Delete Records | Native | Partial | No | No | Native | Native | No | Native |
| Metadata Export | Native | Native | Native | Native | Native | Native | No | Native |
| Metadata Import | Native | Native | No | No | Native | Native | No | Native |
| Instruments List | Native | Native | Native | Native | Native | Native | Partial | Native |
| Events (longitudinal) | Native | Native | Native | Native | Native | Native | Native | Native |
| Arms | Native | Native | Native | Native | Native | Native | No | Native |
| DAGs | Partial | Native | No | No | Native | Native | No | Native |
| File Upload | Native | Native | No | No | Native | Native | No | Native |
| File Download | Native | Native | No | No | Native | Native | No | Native |
| File Repository | Native | Partial | No | No | Native | Partial | No | Partial |
| Users | Partial | Native | No | No | Native | Native | No | Native |
| Logs | No | Native | No | No | Native | Partial | No | Native |
| Reports | Native | Native | No | No | Native | Native | Native | Native |
| Type Casting | Partial | Native | Via parent | Via parent | No | Partial | No | No |
| Validation | Basic | Native | Via parent | Native | No | Basic | No | No |
| Tidy Outputs | No | No | Native | Partial | No | No | Partial | Partial |
| Longitudinal Handling | Basic | Basic | Native | Native | Basic | Basic | Native | Native |
| Repeating Instruments | Basic | Basic | Native | Native | Basic | Basic | Basic | Native |
| Diff-based Writes | No | No | No | No | No | No | Native | No |
| Batching | Native | Native | Via parent | Via parent | No | No | No | Native |
| Retry/Backoff | Partial | Partial | No | No | No | No | No | No |
| Rate Limiting | No | No | No | No | No | No | No | No |
| Structured Logging | No | No | No | No | No | No | No | No |
| Correlation IDs | No | No | No | No | No | No | No | No |
| Query Management | No | No | No | Native | No | No | No | No |
Legend: Native = Built-in support | Partial = Limited support | No = Not supported | Via parent = Depends on underlying package
4. Design Principles for the New Interface¶
4.1 Core Principles¶
- Safe by Default
- Tokens never logged or exposed in error messages
- SSL verification enabled by default
- Validation before write operations
-
Least privilege guidance (separate read/write tokens)
-
Consistent Mental Model
- Single
Clientclass with namespaced endpoint groups - Uniform method signatures across endpoint families
-
Predictable return types with explicit format options
-
Efficient by Design
- Automatic batching for large operations
- Optional diff-based writes for incremental sync
- Connection pooling and request reuse
-
Configurable rate limiting
-
Explicit Schema Handling
- Metadata-driven type casting
- Validation framework with customizable rules
-
Clear missing value semantics
-
Analysis-Ready Outputs
- Tidy (per-instrument) output as first-class option
- Consistent merge keys for longitudinal/repeating data
-
DataFrame integration without forcing pandas dependency
-
Observable Operations
- Structured logging with correlation IDs
- Request/response metrics
- Deterministic error taxonomy
4.2 Design Decisions¶
| Decision | Choice | Rationale |
|---|---|---|
| HTTP library | httpx | Async support, connection pooling, modern API |
| Validation | Pydantic v2 | Industry standard, excellent performance |
| DataFrames | Optional pandas | Don't force dependency; provide integration |
| Batching | Automatic with override | Sensible defaults, expert escape hatch |
| Type casting | Opt-in strict mode | Flexibility for edge cases |
| Tidy output | Explicit method | Clear intent, avoid surprise transformations |
5. Proposed Public API (Python)¶
5.1 Client Initialization¶
from redcap_sdk import Client, ClientConfig
# Minimal initialization
client = Client(
url="https://redcap.institution.edu/api/",
token="your-api-token"
)
# Full configuration
config = ClientConfig(
timeout=30.0,
max_retries=3,
backoff_factor=0.5,
rate_limit_per_minute=60,
verify_ssl=True,
ca_bundle="/path/to/certs.pem", # optional
batch_size=500,
enable_diff_writes=True,
log_level="INFO",
)
client = Client(
url="https://redcap.institution.edu/api/",
token=os.environ["REDCAP_TOKEN"], # recommended pattern
config=config
)
# Context manager for cleanup
async with Client(url, token) as client:
records = await client.records.export()
5.2 Records Namespace¶
class RecordsAPI:
def export(
self,
*,
format: Literal["json", "csv", "xml", "df"] = "df",
records: list[str] | None = None,
fields: list[str] | None = None,
forms: list[str] | None = None,
events: list[str] | None = None,
raw_or_label: Literal["raw", "label", "both"] = "raw",
export_checkbox_labels: bool = False,
export_survey_fields: bool = False,
export_data_access_groups: bool = False,
filter_logic: str | None = None,
date_range_begin: datetime | None = None,
date_range_end: datetime | None = None,
typed: bool = True, # Apply metadata-driven type casting
cast_overrides: dict[str, Callable] | None = None,
validation_mode: Literal["strict", "permissive", "skip"] = "permissive",
) -> pd.DataFrame | list[dict] | str:
"""
Export records from the project.
Args:
format: Output format. "df" returns pandas DataFrame.
records: Specific record IDs to export. None = all records.
fields: Specific fields to export. None = all fields.
forms: Specific forms/instruments to export. None = all forms.
events: Specific events (longitudinal). None = all events.
raw_or_label: Return raw codes, labels, or both.
export_checkbox_labels: Include checkbox option labels.
export_survey_fields: Include survey timestamp/identifier fields.
export_data_access_groups: Include DAG assignment.
filter_logic: REDCap filter logic expression.
date_range_begin: Filter by record creation date.
date_range_end: Filter by record creation date.
typed: Apply type casting based on metadata.
cast_overrides: Custom casting functions by field name or type.
validation_mode: How to handle validation failures.
Returns:
Records in requested format.
Raises:
AuthError: Invalid or expired token.
ApiError: REDCap API error response.
ValidationError: Data fails validation (strict mode).
"""
...
def export_tidy(
self,
*,
forms: list[str] | None = None,
events: list[str] | None = None,
typed: bool = True,
include_metadata: bool = True,
) -> TidyBundle:
"""
Export records as tidy per-instrument tables (REDCapTidieR-style).
Returns:
TidyBundle with one DataFrame per instrument, plus metadata.
"""
...
def import_(
self,
data: pd.DataFrame | list[dict],
*,
overwrite_behavior: Literal["normal", "overwrite"] = "normal",
return_content: Literal["count", "ids", "auto_ids"] = "count",
force_auto_number: bool = False,
validate: bool = True,
date_format: str = "YMD",
) -> ImportResult:
"""
Import records to the project.
Args:
data: Records to import.
overwrite_behavior: "normal" preserves existing data; "overwrite" replaces.
return_content: What to return (count, IDs, or auto-generated IDs).
force_auto_number: Let REDCap assign record IDs.
validate: Run pre-flight validation against metadata.
date_format: Expected date format in data.
Returns:
ImportResult with count/IDs and any validation warnings.
Raises:
ValidationError: Data fails pre-flight validation.
ApiError: REDCap rejects the import.
"""
...
def import_diff(
self,
current: pd.DataFrame,
modified: pd.DataFrame,
*,
key_fields: list[str] | None = None,
) -> ImportResult:
"""
Import only changed records (diff-based, redcap-toolbox pattern).
Args:
current: Current state of records (from previous export).
modified: Desired state of records.
key_fields: Fields that define record identity.
Default: [record_id, redcap_event_name,
redcap_repeat_instrument, redcap_repeat_instance]
Returns:
ImportResult with count of actually modified records.
"""
...
def delete(
self,
records: list[str],
*,
arm: str | None = None,
instrument: str | None = None,
event: str | None = None,
repeat_instance: int | None = None,
delete_logging: bool = False,
) -> int:
"""
Delete records (development projects only).
Returns:
Count of deleted records.
"""
...
5.3 Metadata Namespace¶
class MetadataAPI:
def export(
self,
*,
format: Literal["json", "df"] = "df",
fields: list[str] | None = None,
forms: list[str] | None = None,
) -> pd.DataFrame | list[dict]:
"""Export project data dictionary."""
...
def import_(
self,
metadata: pd.DataFrame | list[dict],
) -> int:
"""Import/update data dictionary. Returns count of fields."""
...
def get_field_schema(
self,
field_name: str,
) -> FieldSchema:
"""
Get parsed schema for a specific field.
Returns:
FieldSchema with type, validation, choices, branching logic.
"""
...
def get_type_caster(self) -> TypeCaster:
"""
Get a TypeCaster configured from project metadata.
Returns:
TypeCaster that can transform raw API data to typed DataFrames.
"""
...
5.4 Files Namespace¶
class FilesAPI:
def download(
self,
record: str,
field: str,
*,
event: str | None = None,
repeat_instance: int | None = None,
validate_checksum: bool = True,
) -> FileDownload:
"""
Download file attachment from a record.
Returns:
FileDownload with content (bytes), filename, mime_type, size.
"""
...
def download_streaming(
self,
record: str,
field: str,
destination: Path | BinaryIO,
*,
event: str | None = None,
repeat_instance: int | None = None,
chunk_size: int = 8192,
) -> FileMetadata:
"""
Stream large file to disk or file-like object.
Returns:
FileMetadata with filename, mime_type, size, checksum.
"""
...
def upload(
self,
record: str,
field: str,
file_path: Path | None = None,
file_object: BinaryIO | None = None,
filename: str | None = None,
*,
event: str | None = None,
repeat_instance: int | None = None,
) -> None:
"""
Upload file attachment to a record.
Args:
file_path: Path to file on disk.
file_object: File-like object (provide filename if using this).
filename: Override filename (required if using file_object).
"""
...
def delete(
self,
record: str,
field: str,
*,
event: str | None = None,
repeat_instance: int | None = None,
) -> None:
"""Delete file attachment from a record."""
...
5.5 File Repository Namespace¶
class FileRepositoryAPI:
def list(
self,
*,
folder_id: int | None = None,
format: Literal["json", "df"] = "df",
) -> pd.DataFrame | list[dict]:
"""List files and folders in repository."""
...
def download(
self,
doc_id: int,
destination: Path | None = None,
) -> FileDownload | Path:
"""Download file from repository."""
...
def upload(
self,
file_path: Path,
*,
folder_id: int | None = None,
) -> int:
"""Upload file to repository. Returns doc_id."""
...
def create_folder(
self,
name: str,
*,
parent_folder_id: int | None = None,
dag_id: int | None = None,
role_id: int | None = None,
) -> int:
"""Create folder in repository. Returns folder_id."""
...
def delete(self, doc_id: int) -> None:
"""Delete file from repository."""
...
5.6 Events/Arms Namespace (Longitudinal)¶
class EventsAPI:
def export(
self,
*,
arms: list[str] | None = None,
format: Literal["json", "df"] = "df",
) -> pd.DataFrame | list[dict]:
"""Export events for longitudinal project."""
...
def import_(self, events: pd.DataFrame | list[dict]) -> int:
"""Import events. Returns count."""
...
def delete(self, events: list[str]) -> int:
"""Delete events. Returns count."""
...
class ArmsAPI:
def export(
self,
*,
arms: list[str] | None = None,
format: Literal["json", "df"] = "df",
) -> pd.DataFrame | list[dict]:
"""Export study arms."""
...
def import_(self, arms: pd.DataFrame | list[dict]) -> int:
"""Import arms. Returns count."""
...
def delete(self, arms: list[str]) -> int:
"""Delete arms. Returns count."""
...
5.7 Instruments Namespace¶
class InstrumentsAPI:
def list(
self,
*,
format: Literal["json", "df"] = "df",
) -> pd.DataFrame | list[dict]:
"""List all instruments/forms in project."""
...
def export_mapping(
self,
*,
arms: list[str] | None = None,
format: Literal["json", "df"] = "df",
) -> pd.DataFrame | list[dict]:
"""Export instrument-event mapping (longitudinal)."""
...
def import_mapping(
self,
mapping: pd.DataFrame | list[dict],
) -> int:
"""Import instrument-event mapping. Returns count."""
...
def export_pdf(
self,
instrument: str | None = None,
*,
record: str | None = None,
event: str | None = None,
all_records: bool = False,
compact_display: bool = False,
) -> bytes:
"""Export instrument as PDF."""
...
5.8 Users/DAGs Namespace¶
class UsersAPI:
def export(
self,
*,
format: Literal["json", "df"] = "df",
) -> pd.DataFrame | list[dict]:
"""Export users with permissions."""
...
def import_(self, users: pd.DataFrame | list[dict]) -> int:
"""Import/update users. Returns count."""
...
def delete(self, users: list[str]) -> int:
"""Delete users. Returns count."""
...
class DagsAPI:
def export(
self,
*,
format: Literal["json", "df"] = "df",
) -> pd.DataFrame | list[dict]:
"""Export Data Access Groups."""
...
def import_(self, dags: pd.DataFrame | list[dict]) -> int:
"""Import DAGs. Returns count."""
...
def delete(self, dags: list[str]) -> int:
"""Delete DAGs. Returns count."""
...
def export_user_assignment(
self,
*,
format: Literal["json", "df"] = "df",
) -> pd.DataFrame | list[dict]:
"""Export user-DAG assignments."""
...
def import_user_assignment(
self,
assignments: pd.DataFrame | list[dict],
) -> int:
"""Import user-DAG assignments. Returns count."""
...
def switch(self, dag: str | None) -> None:
"""Switch current user's DAG context. None = no DAG filter."""
...
5.9 Project Namespace¶
class ProjectAPI:
def info(self) -> ProjectInfo:
"""
Get project information.
Returns:
ProjectInfo with title, is_longitudinal, has_repeating_instruments,
record_autonumbering_enabled, etc.
"""
...
def export_xml(self, *, include_records: bool = False) -> bytes:
"""Export entire project as XML (REDCap XML format)."""
...
def generate_next_record_name(self) -> str:
"""Generate next available record ID."""
...
class LoggingAPI:
def export(
self,
*,
format: Literal["json", "df"] = "df",
log_type: Literal["export", "manage", "user", "record", "record_add",
"record_edit", "record_delete", "lock_record",
"page_view"] | None = None,
user: str | None = None,
record: str | None = None,
dag: str | None = None,
begin_time: datetime | None = None,
end_time: datetime | None = None,
) -> pd.DataFrame | list[dict]:
"""Export audit logs with filtering."""
...
5.10 Reports Namespace¶
class ReportsAPI:
def export(
self,
report_id: int,
*,
format: Literal["json", "csv", "xml", "df"] = "df",
raw_or_label: Literal["raw", "label", "both"] = "raw",
export_checkbox_labels: bool = False,
typed: bool = True,
) -> pd.DataFrame | list[dict] | str:
"""Export a saved report by ID."""
...
5.11 Surveys Namespace¶
class SurveysAPI:
def export_participant_list(
self,
instrument: str,
*,
event: str | None = None,
format: Literal["json", "df"] = "df",
) -> pd.DataFrame | list[dict]:
"""Export survey participant list."""
...
def export_link(
self,
record: str,
instrument: str,
*,
event: str | None = None,
repeat_instance: int | None = None,
) -> str:
"""Get survey link for specific record."""
...
def export_return_code(
self,
record: str,
instrument: str,
*,
event: str | None = None,
repeat_instance: int | None = None,
) -> str:
"""Get survey return code for record."""
...
def export_queue_link(self, record: str) -> str:
"""Get survey queue link for record."""
...
5.12 Repeating Instruments Namespace¶
class RepeatingAPI:
def export_settings(
self,
*,
format: Literal["json", "df"] = "df",
) -> pd.DataFrame | list[dict]:
"""Export repeating instruments/events settings."""
...
def import_settings(
self,
settings: pd.DataFrame | list[dict],
) -> int:
"""Import repeating instruments/events settings."""
...
6. Data Model & Typing Strategy¶
6.1 Core Data Types¶
from dataclasses import dataclass
from typing import Literal, Any
from datetime import datetime, date, time
import pandas as pd
@dataclass
class TidyBundle:
"""REDCapTidieR-style output: one DataFrame per instrument."""
instruments: dict[str, pd.DataFrame] # form_name -> data
metadata: dict[str, pd.DataFrame] # form_name -> field definitions
# Convenience accessors
def __getitem__(self, form_name: str) -> pd.DataFrame:
return self.instruments[form_name]
def summary(self) -> pd.DataFrame:
"""Return summary table (like REDCapTidieR supertibble)."""
rows = []
for form_name, df in self.instruments.items():
rows.append({
"form_name": form_name,
"row_count": len(df),
"col_count": len(df.columns),
"pct_missing": df.isna().mean().mean() * 100,
})
return pd.DataFrame(rows)
@dataclass
class FieldSchema:
"""Parsed schema for a single field from metadata."""
field_name: str
field_label: str
field_type: Literal["text", "notes", "calc", "dropdown", "radio",
"checkbox", "yesno", "truefalse", "file",
"slider", "descriptive", "sql"]
validation_type: str | None # "date_ymd", "integer", "number", "email", etc.
validation_min: Any | None
validation_max: Any | None
choices: dict[str, str] | None # code -> label for dropdowns/radios
branching_logic: str | None
required: bool
identifier: bool # PHI identifier
form_name: str
def get_python_type(self) -> type:
"""Return appropriate Python type for this field."""
type_map = {
("text", None): str,
("text", "integer"): int,
("text", "number"): float,
("text", "date_ymd"): date,
("text", "datetime_ymd"): datetime,
("text", "time"): time,
("text", "email"): str,
("notes", None): str,
("calc", None): float,
("dropdown", None): str, # or Categorical
("radio", None): str,
("checkbox", None): bool, # individual checkbox field
("yesno", None): bool,
("truefalse", None): bool,
("slider", None): int,
("file", None): str, # filename
}
return type_map.get((self.field_type, self.validation_type), str)
@dataclass
class ImportResult:
"""Result of an import operation."""
count: int
ids: list[str] | None = None
warnings: list[str] | None = None
validation_report: pd.DataFrame | None = None
@dataclass
class FileDownload:
"""Result of a file download."""
content: bytes
filename: str
mime_type: str | None
size: int
checksum: str | None = None
@dataclass
class FileMetadata:
"""Metadata for a file (without content)."""
filename: str
mime_type: str | None
size: int
checksum: str | None = None
@dataclass
class ProjectInfo:
"""Project-level information."""
project_id: int
project_title: str
is_longitudinal: bool
has_repeating_instruments: bool
has_repeating_events: bool
record_autonumbering_enabled: bool
surveys_enabled: bool
scheduling_enabled: bool
purpose: int
purpose_other: str | None
creation_time: datetime
production_time: datetime | None
in_production: bool
project_language: str
missing_data_codes: str | None
6.2 Type Casting System¶
from typing import Callable, Protocol
from abc import ABC, abstractmethod
class CastFunction(Protocol):
"""Protocol for field casting functions."""
def __call__(
self,
value: str,
field_name: str,
schema: FieldSchema,
) -> Any:
...
class TypeCaster:
"""
Metadata-driven type caster for REDCap data.
Inspired by redcapAPI's exportRecordsTyped validation/casting framework.
"""
def __init__(
self,
metadata: pd.DataFrame,
*,
na_values: set[str] = {"", "NA", "NaN", "-999"},
strict: bool = False,
cast_overrides: dict[str, CastFunction] | None = None,
):
self._metadata = metadata
self._schemas = self._parse_schemas(metadata)
self._na_values = na_values
self._strict = strict
self._overrides = cast_overrides or {}
self._validation_errors: list[dict] = []
def cast_dataframe(
self,
df: pd.DataFrame,
*,
report_errors: bool = True,
) -> pd.DataFrame:
"""
Cast all columns in DataFrame according to metadata.
Args:
df: Raw DataFrame from API.
report_errors: Collect validation errors for review.
Returns:
DataFrame with typed columns.
"""
result = df.copy()
for col in result.columns:
if col in self._schemas:
schema = self._schemas[col]
cast_fn = self._overrides.get(col) or self._get_caster(schema)
result[col] = result[col].apply(
lambda v: self._cast_value(v, col, schema, cast_fn)
)
return result
def get_validation_report(self) -> pd.DataFrame:
"""Return report of all validation errors encountered."""
return pd.DataFrame(self._validation_errors)
def _cast_value(
self,
value: Any,
field_name: str,
schema: FieldSchema,
cast_fn: CastFunction,
) -> Any:
# Handle NA values first
if pd.isna(value) or str(value).strip() in self._na_values:
return None
try:
return cast_fn(str(value), field_name, schema)
except (ValueError, TypeError) as e:
self._validation_errors.append({
"field": field_name,
"value": value,
"expected_type": schema.get_python_type().__name__,
"error": str(e),
})
if self._strict:
raise ValidationError(f"Field '{field_name}' failed validation: {e}")
return value # Return raw value in permissive mode
def _get_caster(self, schema: FieldSchema) -> CastFunction:
"""Get appropriate casting function for field type."""
# Implementation follows redcapAPI patterns
...
# Built-in cast functions (like redcapAPI's castRaw, castCode, castLabel)
def cast_raw(value: str, field_name: str, schema: FieldSchema) -> str:
"""Return value unchanged."""
return value
def cast_code(value: str, field_name: str, schema: FieldSchema) -> str:
"""Return coded value (for dropdowns/radios)."""
return value
def cast_label(value: str, field_name: str, schema: FieldSchema) -> str:
"""Return label for coded value."""
if schema.choices and value in schema.choices:
return schema.choices[value]
return value
def cast_integer(value: str, field_name: str, schema: FieldSchema) -> int:
"""Cast to integer with validation."""
result = int(float(value)) # Handle "1.0" -> 1
if schema.validation_min is not None and result < schema.validation_min:
raise ValueError(f"Value {result} below minimum {schema.validation_min}")
if schema.validation_max is not None and result > schema.validation_max:
raise ValueError(f"Value {result} above maximum {schema.validation_max}")
return result
def cast_number(value: str, field_name: str, schema: FieldSchema) -> float:
"""Cast to float with validation."""
result = float(value)
if schema.validation_min is not None and result < schema.validation_min:
raise ValueError(f"Value {result} below minimum {schema.validation_min}")
if schema.validation_max is not None and result > schema.validation_max:
raise ValueError(f"Value {result} above maximum {schema.validation_max}")
return result
def cast_date(value: str, field_name: str, schema: FieldSchema) -> date:
"""Cast to date with format detection."""
# Handle multiple formats: YYYY-MM-DD, MM/DD/YYYY, etc.
formats = ["%Y-%m-%d", "%m/%d/%Y", "%d/%m/%Y"]
for fmt in formats:
try:
return datetime.strptime(value, fmt).date()
except ValueError:
continue
raise ValueError(f"Cannot parse date: {value}")
def cast_checkbox(value: str, field_name: str, schema: FieldSchema) -> bool:
"""Cast checkbox to boolean."""
return value in ("1", "Checked", "Yes", "TRUE", "True", "true")
6.3 Checkbox Handling¶
REDCap checkboxes are complex: a single checkbox field expands to multiple columns (field___1, field___2, etc.).
def reshape_checkboxes(
df: pd.DataFrame,
metadata: pd.DataFrame,
*,
mode: Literal["wide", "long", "combined"] = "wide",
) -> pd.DataFrame:
"""
Handle checkbox field transformation.
Args:
mode:
- "wide": Keep as separate columns (default REDCap export)
- "long": Pivot to (record_id, field, choice, checked)
- "combined": Combine into single column with list of checked values
"""
...
7. Longitudinal/Repetition Handling¶
7.1 Canonical Internal Representation¶
All records have a composite key:
@dataclass
class RecordKey:
"""Canonical record identifier for any REDCap project type."""
record_id: str
redcap_event_name: str | None = None # Longitudinal projects
redcap_repeat_instrument: str | None = None # Repeating instruments
redcap_repeat_instance: int | None = None # Instance number (1-indexed)
def as_tuple(self) -> tuple:
return (
self.record_id,
self.redcap_event_name,
self.redcap_repeat_instrument,
self.redcap_repeat_instance,
)
@classmethod
def merge_columns(cls) -> list[str]:
"""Return column names for merge operations."""
return [
"record_id",
"redcap_event_name",
"redcap_repeat_instrument",
"redcap_repeat_instance",
]
7.2 Project Type Detection¶
class ProjectType(Enum):
CLASSIC = "classic" # Simple, no events, no repeating
LONGITUDINAL = "longitudinal" # Events, no repeating
REPEATING = "repeating" # No events, repeating instruments
LONGITUDINAL_REPEATING = "longitudinal_repeating" # Both
def detect_project_type(project_info: ProjectInfo) -> ProjectType:
"""Detect project type from info."""
if project_info.is_longitudinal and project_info.has_repeating_instruments:
return ProjectType.LONGITUDINAL_REPEATING
elif project_info.is_longitudinal:
return ProjectType.LONGITUDINAL
elif project_info.has_repeating_instruments:
return ProjectType.REPEATING
else:
return ProjectType.CLASSIC
7.3 Tidy Output Generation¶
def create_tidy_bundle(
records: pd.DataFrame,
metadata: pd.DataFrame,
project_type: ProjectType,
) -> TidyBundle:
"""
Transform flat records into tidy per-instrument tables.
This follows REDCapTidieR's approach:
- Each instrument becomes its own DataFrame
- Merge keys are consistent across instruments
- No cartesian explosion from wide joins
"""
instruments: dict[str, pd.DataFrame] = {}
instrument_metadata: dict[str, pd.DataFrame] = {}
# Group metadata by form
form_groups = metadata.groupby("form_name")
for form_name, form_meta in form_groups:
# Get fields for this form (plus key columns)
form_fields = form_meta["field_name"].tolist()
# Determine appropriate key columns based on project type
key_cols = _get_key_columns(form_name, project_type, metadata)
# Select relevant columns
select_cols = key_cols + [f for f in form_fields if f in records.columns]
form_df = records[select_cols].copy()
# Drop rows where all form fields are NA (no data for this form)
data_cols = [c for c in form_fields if c in form_df.columns]
form_df = form_df.dropna(subset=data_cols, how="all")
# Handle repeating: keep only rows with matching repeat_instrument
if "redcap_repeat_instrument" in form_df.columns:
is_repeating = form_meta["repeating"].any()
if is_repeating:
form_df = form_df[
form_df["redcap_repeat_instrument"] == form_name
]
else:
form_df = form_df[
form_df["redcap_repeat_instrument"].isna()
]
instruments[form_name] = form_df.reset_index(drop=True)
instrument_metadata[form_name] = form_meta
return TidyBundle(
instruments=instruments,
metadata=instrument_metadata,
)
def _get_key_columns(
form_name: str,
project_type: ProjectType,
metadata: pd.DataFrame,
) -> list[str]:
"""Determine appropriate key columns for an instrument."""
keys = ["record_id"]
if project_type in (ProjectType.LONGITUDINAL, ProjectType.LONGITUDINAL_REPEATING):
keys.append("redcap_event_name")
if project_type in (ProjectType.REPEATING, ProjectType.LONGITUDINAL_REPEATING):
# Check if this specific form is repeating
form_meta = metadata[metadata["form_name"] == form_name]
if form_meta["repeating"].any():
keys.extend(["redcap_repeat_instrument", "redcap_repeat_instance"])
return keys
7.4 Pivot/Widen Helpers¶
def pivot_to_wide(
tidy_df: pd.DataFrame,
*,
id_cols: list[str] = ["record_id"],
pivot_col: str = "redcap_event_name",
value_cols: list[str] | None = None,
) -> pd.DataFrame:
"""
Pivot longitudinal data from long to wide format.
Example: record_id, event, score -> record_id, baseline_score, followup_score
"""
if value_cols is None:
value_cols = [c for c in tidy_df.columns if c not in id_cols + [pivot_col]]
return tidy_df.pivot_table(
index=id_cols,
columns=pivot_col,
values=value_cols,
aggfunc="first",
).reset_index()
def nest_repeating(
tidy_df: pd.DataFrame,
*,
group_cols: list[str] = ["record_id", "redcap_event_name"],
) -> pd.DataFrame:
"""
Nest repeating instances into list columns.
Useful for creating nested JSON or working with hierarchical data.
"""
return tidy_df.groupby(group_cols).apply(
lambda g: g.drop(columns=group_cols).to_dict("records")
).reset_index(name="instances")
8. Write Strategies¶
8.1 Full Replace vs Normal¶
class WriteMode(Enum):
NORMAL = "normal" # Blanks don't overwrite existing data
OVERWRITE = "overwrite" # Blanks replace existing data
def import_records(
client: Client,
data: pd.DataFrame,
*,
mode: WriteMode = WriteMode.NORMAL,
validate: bool = True,
) -> ImportResult:
"""
Import records with specified overwrite behavior.
NORMAL mode: Only non-blank values update the database.
Existing values preserved if new value is blank.
OVERWRITE mode: All values (including blanks) update the database.
Use with caution - can delete data.
"""
...
8.2 Diff-Based Import¶
Inspired by redcap-toolbox's approach:
def compute_diff(
current: pd.DataFrame,
modified: pd.DataFrame,
*,
key_columns: list[str] | None = None,
) -> DiffResult:
"""
Compute minimal changes needed to transform current -> modified.
Args:
current: Current state (from previous export).
modified: Desired state.
key_columns: Columns that identify records.
Default: RecordKey.merge_columns()
Returns:
DiffResult with:
- records_to_add: New records not in current
- records_to_update: Changed records (only changed fields)
- records_to_delete: Records in current but not modified
- unchanged_count: Records with no changes
"""
if key_columns is None:
key_columns = [c for c in RecordKey.merge_columns() if c in current.columns]
# Create composite keys
current_keys = set(current[key_columns].apply(tuple, axis=1))
modified_keys = set(modified[key_columns].apply(tuple, axis=1))
# Identify adds, deletes, potential updates
to_add_keys = modified_keys - current_keys
to_delete_keys = current_keys - modified_keys
to_check_keys = current_keys & modified_keys
# For potential updates, compare field by field
updates = []
unchanged = 0
for key in to_check_keys:
key_filter = _make_key_filter(key_columns, key)
current_row = current.loc[key_filter].iloc[0]
modified_row = modified.loc[key_filter].iloc[0]
changed_fields = {}
for col in modified.columns:
if col in key_columns:
continue
if not _values_equal(current_row[col], modified_row[col]):
changed_fields[col] = modified_row[col]
if changed_fields:
update_row = {c: key[i] for i, c in enumerate(key_columns)}
update_row.update(changed_fields)
updates.append(update_row)
else:
unchanged += 1
return DiffResult(
records_to_add=modified[modified[key_columns].apply(tuple, axis=1).isin(to_add_keys)],
records_to_update=pd.DataFrame(updates) if updates else pd.DataFrame(),
records_to_delete=list(to_delete_keys),
unchanged_count=unchanged,
)
@dataclass
class DiffResult:
"""Result of computing differences between datasets."""
records_to_add: pd.DataFrame
records_to_update: pd.DataFrame
records_to_delete: list[tuple]
unchanged_count: int
@property
def has_changes(self) -> bool:
return (
len(self.records_to_add) > 0 or
len(self.records_to_update) > 0 or
len(self.records_to_delete) > 0
)
def summary(self) -> str:
return (
f"Add: {len(self.records_to_add)}, "
f"Update: {len(self.records_to_update)}, "
f"Delete: {len(self.records_to_delete)}, "
f"Unchanged: {self.unchanged_count}"
)
async def import_diff(
client: Client,
current: pd.DataFrame,
modified: pd.DataFrame,
*,
apply_deletes: bool = False, # Safety: require explicit opt-in
) -> ImportResult:
"""
Apply only changed records to minimize API load.
This is the recommended approach for incremental sync workflows.
"""
diff = compute_diff(current, modified)
results = []
if len(diff.records_to_add) > 0:
result = await client.records.import_(diff.records_to_add)
results.append(("add", result))
if len(diff.records_to_update) > 0:
result = await client.records.import_(
diff.records_to_update,
overwrite_behavior="overwrite", # Only updating specific fields
)
results.append(("update", result))
if apply_deletes and diff.records_to_delete:
# Warning: deletes are destructive and often not supported
for key in diff.records_to_delete:
await client.records.delete([key[0]]) # record_id
results.append(("delete", len(diff.records_to_delete)))
return ImportResult(
count=sum(r[1].count if hasattr(r[1], 'count') else r[1] for r in results),
warnings=[f"Diff summary: {diff.summary()}"],
)
8.3 Validation Before Write¶
Following redcapAPI's importRecords patterns:
class ImportValidator:
"""Pre-flight validation for record imports."""
def __init__(self, metadata: pd.DataFrame, project_info: ProjectInfo):
self._metadata = metadata
self._project_info = project_info
self._schemas = {
row["field_name"]: FieldSchema.from_row(row)
for _, row in metadata.iterrows()
}
def validate(self, data: pd.DataFrame) -> ValidationReport:
"""
Run all validation checks.
Checks (following redcapAPI patterns):
1. All columns exist in data dictionary
2. Record ID column present and first
3. No calculated fields included
4. Date fields have correct types
5. Values within validation limits
6. Required fields populated
7. Choices valid for dropdowns/radios
"""
errors = []
warnings = []
# Check 1: Column existence
valid_fields = set(self._metadata["field_name"])
for col in data.columns:
if col not in valid_fields and not self._is_system_field(col):
errors.append(ValidationError(
field=col,
error_type="unknown_field",
message=f"Field '{col}' not in data dictionary",
))
# Check 2: Record ID presence
record_id_field = self._project_info.def_field or "record_id"
if record_id_field not in data.columns:
errors.append(ValidationError(
field=record_id_field,
error_type="missing_record_id",
message="Record ID field not found in data",
))
# Check 3: Calculated fields
calc_fields = self._metadata[self._metadata["field_type"] == "calc"]["field_name"]
for col in data.columns:
if col in calc_fields.values:
warnings.append(ValidationWarning(
field=col,
warning_type="calculated_field",
message=f"Calculated field '{col}' will be ignored",
))
# Check 4-7: Per-field validation
for col in data.columns:
if col in self._schemas:
field_errors = self._validate_column(data[col], self._schemas[col])
errors.extend(field_errors)
return ValidationReport(
valid=len(errors) == 0,
errors=errors,
warnings=warnings,
)
def _validate_column(
self,
series: pd.Series,
schema: FieldSchema,
) -> list[ValidationError]:
"""Validate a single column against its schema."""
errors = []
for idx, value in series.items():
if pd.isna(value):
if schema.required:
errors.append(ValidationError(
field=schema.field_name,
row=idx,
error_type="required_missing",
message=f"Required field missing at row {idx}",
))
continue
# Type validation
try:
schema.get_python_type()(value)
except (ValueError, TypeError):
errors.append(ValidationError(
field=schema.field_name,
row=idx,
value=value,
error_type="type_mismatch",
message=f"Cannot convert '{value}' to {schema.get_python_type().__name__}",
))
# Range validation
if schema.validation_min is not None or schema.validation_max is not None:
try:
num_val = float(value)
if schema.validation_min and num_val < schema.validation_min:
errors.append(ValidationError(
field=schema.field_name,
row=idx,
value=value,
error_type="below_minimum",
message=f"Value {value} below minimum {schema.validation_min}",
))
if schema.validation_max and num_val > schema.validation_max:
errors.append(ValidationError(
field=schema.field_name,
row=idx,
value=value,
error_type="above_maximum",
message=f"Value {value} above maximum {schema.validation_max}",
))
except ValueError:
pass
# Choice validation
if schema.choices and str(value) not in schema.choices:
errors.append(ValidationError(
field=schema.field_name,
row=idx,
value=value,
error_type="invalid_choice",
message=f"Value '{value}' not in valid choices: {list(schema.choices.keys())}",
))
return errors
9. Files & Large Payloads¶
9.1 File Download with Streaming¶
async def download_file_streaming(
client: Client,
record: str,
field: str,
destination: Path,
*,
event: str | None = None,
repeat_instance: int | None = None,
chunk_size: int = 8192,
progress_callback: Callable[[int, int], None] | None = None,
) -> FileMetadata:
"""
Stream large file to disk without loading into memory.
Args:
destination: Path to save file.
chunk_size: Bytes per chunk.
progress_callback: Called with (bytes_downloaded, total_bytes).
Returns:
FileMetadata with filename, size, checksum.
"""
import hashlib
async with client._http.stream(
"POST",
client._url,
data={
"token": client._token,
"content": "file",
"action": "export",
"record": record,
"field": field,
"event": event,
"repeat_instance": repeat_instance,
},
) as response:
response.raise_for_status()
# Extract filename from Content-Disposition header
content_disp = response.headers.get("Content-Disposition", "")
filename = _parse_filename(content_disp) or f"{record}_{field}"
# Sanitize filename for safety
filename = _sanitize_filename(filename)
total_size = int(response.headers.get("Content-Length", 0))
downloaded = 0
hasher = hashlib.sha256()
with open(destination, "wb") as f:
async for chunk in response.aiter_bytes(chunk_size):
f.write(chunk)
hasher.update(chunk)
downloaded += len(chunk)
if progress_callback:
progress_callback(downloaded, total_size)
return FileMetadata(
filename=filename,
mime_type=response.headers.get("Content-Type"),
size=downloaded,
checksum=hasher.hexdigest(),
)
def _sanitize_filename(filename: str) -> str:
"""
Sanitize filename to prevent path traversal and other issues.
"""
import re
# Remove path components
filename = os.path.basename(filename)
# Remove dangerous characters
filename = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '_', filename)
# Limit length
if len(filename) > 255:
name, ext = os.path.splitext(filename)
filename = name[:255-len(ext)] + ext
return filename
def _parse_filename(content_disposition: str) -> str | None:
"""Parse filename from Content-Disposition header."""
import re
match = re.search(r'filename[*]?=["\']?([^"\';]+)["\']?', content_disposition)
return match.group(1) if match else None
9.2 File Upload with Validation¶
async def upload_file(
client: Client,
record: str,
field: str,
file_path: Path,
*,
event: str | None = None,
repeat_instance: int | None = None,
validate_field_type: bool = True,
) -> None:
"""
Upload file to record.
Args:
validate_field_type: Verify field is a file field before upload.
Raises:
ValidationError: Field is not a file field.
ApiError: Upload rejected by REDCap.
"""
if validate_field_type:
schema = await client.metadata.get_field_schema(field)
if schema.field_type != "file":
raise ValidationError(
f"Field '{field}' is type '{schema.field_type}', not 'file'"
)
with open(file_path, "rb") as f:
files = {
"file": (file_path.name, f, _guess_mime_type(file_path)),
}
data = {
"token": client._token,
"content": "file",
"action": "import",
"record": record,
"field": field,
"event": event,
"repeat_instance": repeat_instance,
}
response = await client._http.post(
client._url,
data=data,
files=files,
)
_check_response(response)
9.3 Batched Record Operations¶
async def export_records_batched(
client: Client,
*,
batch_size: int = 500,
records: list[str] | None = None,
**kwargs,
) -> pd.DataFrame:
"""
Export records in batches to avoid timeouts on large datasets.
Follows REDCapR's batch glossary pattern.
"""
if records is None:
# First, get list of all record IDs
all_records = await client.records.export(
fields=[client.project.def_field],
format="df",
)
records = all_records[client.project.def_field].unique().tolist()
# Create batch glossary
batches = [
records[i:i + batch_size]
for i in range(0, len(records), batch_size)
]
results = []
for batch_records in batches:
batch_df = await client.records.export(
records=batch_records,
**kwargs,
)
results.append(batch_df)
return pd.concat(results, ignore_index=True)
async def import_records_batched(
client: Client,
data: pd.DataFrame,
*,
batch_size: int = 500,
**kwargs,
) -> ImportResult:
"""
Import records in batches.
"""
total_count = 0
all_ids = []
all_warnings = []
for i in range(0, len(data), batch_size):
batch = data.iloc[i:i + batch_size]
result = await client.records.import_(batch, **kwargs)
total_count += result.count
if result.ids:
all_ids.extend(result.ids)
if result.warnings:
all_warnings.extend(result.warnings)
return ImportResult(
count=total_count,
ids=all_ids if all_ids else None,
warnings=all_warnings if all_warnings else None,
)
10. Errors, Retries, Rate Limits, Observability¶
10.1 Error Taxonomy¶
class RedcapSdkError(Exception):
"""Base exception for all SDK errors."""
pass
class TransportError(RedcapSdkError):
"""Network-level error (connection, timeout, DNS)."""
def __init__(self, message: str, cause: Exception | None = None):
super().__init__(message)
self.cause = cause
class AuthError(RedcapSdkError):
"""Authentication failure (invalid token, expired, insufficient permissions)."""
def __init__(self, message: str, status_code: int | None = None):
super().__init__(message)
self.status_code = status_code
class ApiError(RedcapSdkError):
"""REDCap API returned an error response."""
def __init__(
self,
message: str,
status_code: int,
error_code: str | None = None,
raw_response: str | None = None,
):
super().__init__(message)
self.status_code = status_code
self.error_code = error_code
self.raw_response = raw_response
class ValidationError(RedcapSdkError):
"""Data validation failure."""
def __init__(
self,
message: str,
field: str | None = None,
value: Any = None,
report: "ValidationReport | None" = None,
):
super().__init__(message)
self.field = field
self.value = value
self.report = report
class RateLimitError(RedcapSdkError):
"""Rate limit exceeded."""
def __init__(self, message: str, retry_after: float | None = None):
super().__init__(message)
self.retry_after = retry_after
10.2 Retry Policy¶
from dataclasses import dataclass
from typing import Callable
import asyncio
import random
@dataclass
class RetryPolicy:
"""Configuration for retry behavior."""
max_retries: int = 3
backoff_factor: float = 0.5 # Wait = factor * (2 ** attempt)
backoff_max: float = 60.0 # Maximum wait time
backoff_jitter: float = 0.1 # Randomization factor
retry_statuses: set[int] = frozenset({429, 500, 502, 503, 504})
retry_exceptions: tuple[type, ...] = (TransportError,)
def get_wait_time(self, attempt: int) -> float:
"""Calculate wait time for given attempt number."""
wait = min(
self.backoff_factor * (2 ** attempt),
self.backoff_max,
)
# Add jitter
jitter = wait * self.backoff_jitter * random.random()
return wait + jitter
async def with_retry(
func: Callable,
policy: RetryPolicy,
*,
correlation_id: str | None = None,
) -> Any:
"""
Execute function with retry policy.
"""
last_exception = None
for attempt in range(policy.max_retries + 1):
try:
return await func()
except policy.retry_exceptions as e:
last_exception = e
if attempt < policy.max_retries:
wait_time = policy.get_wait_time(attempt)
logger.warning(
"Request failed, retrying",
attempt=attempt + 1,
max_retries=policy.max_retries,
wait_time=wait_time,
correlation_id=correlation_id,
error=str(e),
)
await asyncio.sleep(wait_time)
else:
raise
except ApiError as e:
if e.status_code in policy.retry_statuses:
last_exception = e
if attempt < policy.max_retries:
wait_time = policy.get_wait_time(attempt)
if e.status_code == 429:
# Rate limit - use Retry-After if available
wait_time = getattr(e, "retry_after", wait_time) or wait_time
await asyncio.sleep(wait_time)
else:
raise
else:
raise
raise last_exception
10.3 Rate Limiting¶
import time
from collections import deque
from asyncio import Lock
class RateLimiter:
"""
Token bucket rate limiter.
"""
def __init__(
self,
rate_per_minute: int = 60,
burst_size: int | None = None,
):
self.rate_per_minute = rate_per_minute
self.burst_size = burst_size or rate_per_minute
self._tokens = self.burst_size
self._last_update = time.monotonic()
self._lock = Lock()
async def acquire(self) -> None:
"""Wait until a token is available."""
async with self._lock:
now = time.monotonic()
elapsed = now - self._last_update
self._last_update = now
# Refill tokens
self._tokens = min(
self.burst_size,
self._tokens + elapsed * (self.rate_per_minute / 60),
)
if self._tokens < 1:
# Wait for refill
wait_time = (1 - self._tokens) / (self.rate_per_minute / 60)
await asyncio.sleep(wait_time)
self._tokens = 0
else:
self._tokens -= 1
10.4 Structured Logging¶
import logging
import uuid
from contextvars import ContextVar
from typing import Any
# Correlation ID for request tracing
correlation_id_var: ContextVar[str] = ContextVar("correlation_id", default="")
class StructuredLogger:
"""
Structured logger with correlation ID support.
"""
def __init__(self, name: str):
self._logger = logging.getLogger(name)
def _log(self, level: int, message: str, **kwargs: Any) -> None:
correlation_id = correlation_id_var.get()
extra = {
"correlation_id": correlation_id,
**kwargs,
}
# Never log tokens
if "token" in extra:
extra["token"] = "[REDACTED]"
self._logger.log(level, message, extra=extra)
def info(self, message: str, **kwargs: Any) -> None:
self._log(logging.INFO, message, **kwargs)
def warning(self, message: str, **kwargs: Any) -> None:
self._log(logging.WARNING, message, **kwargs)
def error(self, message: str, **kwargs: Any) -> None:
self._log(logging.ERROR, message, **kwargs)
def debug(self, message: str, **kwargs: Any) -> None:
self._log(logging.DEBUG, message, **kwargs)
def generate_correlation_id() -> str:
"""Generate a new correlation ID."""
return str(uuid.uuid4())[:8]
# Usage in client
class Client:
async def _request(self, **kwargs) -> Any:
correlation_id = generate_correlation_id()
correlation_id_var.set(correlation_id)
self._logger.info(
"API request",
method=kwargs.get("method", "POST"),
content=kwargs.get("data", {}).get("content"),
correlation_id=correlation_id,
)
start = time.monotonic()
try:
result = await self._do_request(**kwargs)
elapsed = time.monotonic() - start
self._logger.info(
"API response",
elapsed_ms=elapsed * 1000,
correlation_id=correlation_id,
)
return result
except Exception as e:
elapsed = time.monotonic() - start
self._logger.error(
"API error",
elapsed_ms=elapsed * 1000,
error=str(e),
correlation_id=correlation_id,
)
raise
11. Security & Credential Handling¶
11.1 Token Security Principles¶
Following patterns from REDCapR, rccola, and redcapAPI:
- Never log tokens: All logging must redact token values
- Environment variables preferred:
REDCAP_TOKEN,REDCAP_API_URL - Support secure storage: OS keychain, HashiCorp Vault, AWS Secrets Manager
- Validate before use: Check token format, test connectivity
- Least privilege: Recommend separate tokens for read vs. write operations
11.2 Token Provider Interface¶
from abc import ABC, abstractmethod
from typing import Protocol
class TokenProvider(Protocol):
"""Protocol for token retrieval."""
def get_token(self, project_name: str | None = None) -> str:
"""Retrieve token for project."""
...
class EnvironmentTokenProvider:
"""
Get token from environment variable.
Recommended pattern - tokens never in code.
"""
def __init__(self, env_var: str = "REDCAP_TOKEN"):
self._env_var = env_var
def get_token(self, project_name: str | None = None) -> str:
env_var = self._env_var
if project_name:
# Allow project-specific tokens: REDCAP_TOKEN_PROJECT1
env_var = f"{self._env_var}_{project_name.upper()}"
token = os.environ.get(env_var)
if not token:
raise AuthError(f"Token not found in environment variable: {env_var}")
return sanitize_token(token)
class KeychainTokenProvider:
"""
Get token from OS keychain (macOS Keychain, Windows Credential Manager).
Inspired by rccola patterns.
"""
def __init__(self, service_name: str = "redcap-sdk"):
self._service_name = service_name
try:
import keyring
self._keyring = keyring
except ImportError:
raise ImportError("Install keyring package: pip install keyring")
def get_token(self, project_name: str | None = None) -> str:
key = project_name or "default"
token = self._keyring.get_password(self._service_name, key)
if not token:
raise AuthError(f"Token not found in keychain for: {key}")
return sanitize_token(token)
def set_token(self, token: str, project_name: str | None = None) -> None:
"""Store token in keychain."""
key = project_name or "default"
self._keyring.set_password(self._service_name, key, token)
class VaultTokenProvider:
"""
Get token from HashiCorp Vault.
For enterprise deployments.
"""
def __init__(
self,
vault_url: str,
vault_token: str | None = None,
secret_path: str = "secret/data/redcap",
):
self._vault_url = vault_url
self._vault_token = vault_token or os.environ.get("VAULT_TOKEN")
self._secret_path = secret_path
def get_token(self, project_name: str | None = None) -> str:
import hvac # HashiCorp Vault client
client = hvac.Client(url=self._vault_url, token=self._vault_token)
path = self._secret_path
if project_name:
path = f"{path}/{project_name}"
secret = client.secrets.kv.v2.read_secret_version(path=path)
token = secret["data"]["data"].get("token")
if not token:
raise AuthError(f"Token not found in Vault at: {path}")
return sanitize_token(token)
11.3 Token Validation¶
import re
def sanitize_token(token: str) -> str:
"""
Validate and sanitize API token.
Following REDCapR's sanitize_token() pattern.
"""
# Remove whitespace
token = token.strip()
# REDCap tokens are 32-character hex strings
if not re.match(r"^[A-Fa-f0-9]{32}$", token):
raise AuthError(
"Invalid token format. REDCap tokens should be 32 hexadecimal characters."
)
return token.upper() # Normalize to uppercase
async def verify_token(client: Client) -> bool:
"""
Verify token is valid by making a lightweight API call.
"""
try:
await client.project.info()
return True
except AuthError:
return False
11.4 Least Privilege Guidance¶
@dataclass
class TokenPermissions:
"""
Document expected permissions for a token.
REDCap allows different permission levels per token.
"""
# Export permissions
export_records: bool = False
export_logging: bool = False
export_file_repository: bool = False
# Import permissions
import_records: bool = False
import_files: bool = False
# Delete permissions (dangerous)
delete_records: bool = False
# Management permissions
manage_users: bool = False
manage_dags: bool = False
@classmethod
def read_only(cls) -> "TokenPermissions":
"""Minimal read-only permissions."""
return cls(export_records=True)
@classmethod
def read_write(cls) -> "TokenPermissions":
"""Standard read-write permissions."""
return cls(
export_records=True,
import_records=True,
import_files=True,
)
# Example usage documentation:
"""
## Token Security Best Practices
1. **Use separate tokens for different purposes**:
- Read-only token for analytics/reporting
- Write token for data entry applications
- Admin token for user management (rarely needed)
2. **Store tokens securely**:
```python
# Preferred: Environment variable
client = Client(url, token=os.environ["REDCAP_TOKEN"])
# Alternative: OS keychain
provider = KeychainTokenProvider()
client = Client(url, token=provider.get_token("my_project"))
# Enterprise: HashiCorp Vault
provider = VaultTokenProvider(vault_url="https://vault.company.com")
client = Client(url, token=provider.get_token("my_project"))
```
3. **Never commit tokens to version control**:
- Add `.env` to `.gitignore`
- Use `.env.example` for documentation
4. **Rotate tokens periodically**:
- REDCap allows regenerating tokens
- Update stored tokens after rotation
5. **Audit token usage**:
- REDCap logs API calls with token identifier
- Review logs for unexpected access patterns
"""
12. Implementation Blueprint¶
12.1 Package Structure¶
redcap_sdk/
├── __init__.py # Public exports
├── client.py # Main Client class
├── config.py # ClientConfig, settings
├── errors.py # Exception hierarchy
├── types.py # Pydantic models, dataclasses
│
├── api/ # Namespace implementations
│ ├── __init__.py
│ ├── base.py # BaseAPI with common logic
│ ├── records.py # RecordsAPI
│ ├── metadata.py # MetadataAPI
│ ├── files.py # FilesAPI, FileRepositoryAPI
│ ├── events.py # EventsAPI, ArmsAPI
│ ├── instruments.py # InstrumentsAPI
│ ├── users.py # UsersAPI, DagsAPI, UserRolesAPI
│ ├── project.py # ProjectAPI, LoggingAPI
│ ├── reports.py # ReportsAPI
│ ├── surveys.py # SurveysAPI
│ └── repeating.py # RepeatingAPI
│
├── typing/ # Type casting system
│ ├── __init__.py
│ ├── caster.py # TypeCaster class
│ ├── functions.py # Built-in cast functions
│ ├── validation.py # Validation functions
│ └── schemas.py # FieldSchema, parsing
│
├── transform/ # Data transformation
│ ├── __init__.py
│ ├── tidy.py # TidyBundle, create_tidy_bundle
│ ├── checkbox.py # Checkbox handling
│ ├── diff.py # Diff computation
│ └── longitudinal.py # Pivot/widen helpers
│
├── security/ # Token handling
│ ├── __init__.py
│ ├── providers.py # Token provider classes
│ ├── sanitize.py # Token validation
│ └── audit.py # Security logging
│
├── http/ # Transport layer
│ ├── __init__.py
│ ├── transport.py # HTTP client wrapper
│ ├── retry.py # Retry policy
│ └── rate_limit.py # Rate limiter
│
├── logging/ # Observability
│ ├── __init__.py
│ ├── structured.py # Structured logger
│ └── correlation.py # Correlation ID management
│
└── utils/ # Utilities
├── __init__.py
├── batching.py # Batch helpers
└── files.py # File utilities
12.2 Minimal Working Pseudocode¶
Creating a Client¶
# redcap_sdk/client.py
import httpx
from typing import TYPE_CHECKING
from .config import ClientConfig
from .http.transport import HttpTransport
from .logging.structured import StructuredLogger
if TYPE_CHECKING:
from .api.records import RecordsAPI
from .api.metadata import MetadataAPI
from .api.files import FilesAPI
class Client:
"""
Main entry point for REDCap SDK.
Example:
async with Client(url, token) as client:
records = await client.records.export()
"""
def __init__(
self,
url: str,
token: str,
config: ClientConfig | None = None,
):
self._url = url.rstrip("/")
self._token = sanitize_token(token)
self._config = config or ClientConfig()
self._http = HttpTransport(
timeout=self._config.timeout,
verify_ssl=self._config.verify_ssl,
ca_bundle=self._config.ca_bundle,
retry_policy=self._config.retry_policy,
rate_limiter=self._config.rate_limiter,
)
self._logger = StructuredLogger("redcap_sdk")
# Lazy-initialized namespaces
self._records: RecordsAPI | None = None
self._metadata: MetadataAPI | None = None
self._files: FilesAPI | None = None
# ... other namespaces
@property
def records(self) -> "RecordsAPI":
if self._records is None:
from .api.records import RecordsAPI
self._records = RecordsAPI(self)
return self._records
@property
def metadata(self) -> "MetadataAPI":
if self._metadata is None:
from .api.metadata import MetadataAPI
self._metadata = MetadataAPI(self)
return self._metadata
@property
def files(self) -> "FilesAPI":
if self._files is None:
from .api.files import FilesAPI
self._files = FilesAPI(self)
return self._files
async def _request(
self,
content: str,
action: str | None = None,
**params,
) -> dict | str | bytes:
"""Make API request with standard parameters."""
data = {
"token": self._token,
"content": content,
"format": params.pop("format", "json"),
}
if action:
data["action"] = action
data.update(params)
return await self._http.post(self._url, data=data)
async def __aenter__(self) -> "Client":
return self
async def __aexit__(self, *args) -> None:
await self._http.close()
Exporting Records with Type Casting¶
# redcap_sdk/api/records.py
import pandas as pd
from typing import Literal
from ..types import TidyBundle
from ..typing.caster import TypeCaster
class RecordsAPI:
def __init__(self, client: "Client"):
self._client = client
self._caster: TypeCaster | None = None
async def export(
self,
*,
format: Literal["json", "csv", "xml", "df"] = "df",
records: list[str] | None = None,
fields: list[str] | None = None,
forms: list[str] | None = None,
events: list[str] | None = None,
typed: bool = True,
cast_overrides: dict | None = None,
validation_mode: Literal["strict", "permissive", "skip"] = "permissive",
**kwargs,
) -> pd.DataFrame | list[dict] | str:
"""Export records with optional type casting."""
# Build request parameters
params = {}
if records:
params["records"] = records
if fields:
params["fields"] = fields
if forms:
params["forms"] = forms
if events:
params["events"] = events
# Make API call
api_format = "json" if format == "df" else format
raw_data = await self._client._request(
content="record",
format=api_format,
**params,
**kwargs,
)
# Return raw if not DataFrame
if format != "df":
return raw_data
# Convert to DataFrame
df = pd.DataFrame(raw_data)
# Apply type casting if requested
if typed and len(df) > 0:
caster = await self._get_caster(cast_overrides, validation_mode)
df = caster.cast_dataframe(df)
return df
async def _get_caster(
self,
overrides: dict | None,
mode: str,
) -> TypeCaster:
"""Get or create type caster from metadata."""
if self._caster is None or overrides:
metadata = await self._client.metadata.export(format="df")
self._caster = TypeCaster(
metadata,
cast_overrides=overrides,
strict=(mode == "strict"),
)
return self._caster
Returning Tidy Per-Instrument Tables¶
# redcap_sdk/api/records.py (continued)
async def export_tidy(
self,
*,
forms: list[str] | None = None,
events: list[str] | None = None,
typed: bool = True,
include_metadata: bool = True,
) -> TidyBundle:
"""
Export records as tidy per-instrument tables.
Returns a TidyBundle where each instrument has its own DataFrame
with appropriate merge keys.
"""
# Get all records
records_df = await self.export(
format="df",
forms=forms,
events=events,
typed=typed,
)
# Get metadata
metadata_df = await self._client.metadata.export(format="df")
if forms:
metadata_df = metadata_df[metadata_df["form_name"].isin(forms)]
# Get project info for type detection
project_info = await self._client.project.info()
project_type = detect_project_type(project_info)
# Create tidy bundle
from ..transform.tidy import create_tidy_bundle
return create_tidy_bundle(
records_df,
metadata_df,
project_type,
include_metadata=include_metadata,
)
# redcap_sdk/transform/tidy.py
def create_tidy_bundle(
records: pd.DataFrame,
metadata: pd.DataFrame,
project_type: ProjectType,
*,
include_metadata: bool = True,
) -> TidyBundle:
"""Transform flat records into tidy per-instrument tables."""
instruments: dict[str, pd.DataFrame] = {}
instrument_metadata: dict[str, pd.DataFrame] = {}
# Group metadata by form
for form_name in metadata["form_name"].unique():
form_meta = metadata[metadata["form_name"] == form_name]
form_fields = form_meta["field_name"].tolist()
# Determine key columns
key_cols = ["record_id"]
if project_type in (ProjectType.LONGITUDINAL, ProjectType.LONGITUDINAL_REPEATING):
if "redcap_event_name" in records.columns:
key_cols.append("redcap_event_name")
is_repeating = _is_repeating_form(form_name, metadata)
if is_repeating:
if "redcap_repeat_instrument" in records.columns:
key_cols.extend(["redcap_repeat_instrument", "redcap_repeat_instance"])
# Select columns for this form
available_fields = [f for f in form_fields if f in records.columns]
select_cols = key_cols + available_fields
select_cols = list(dict.fromkeys(select_cols)) # Dedupe preserving order
form_df = records[select_cols].copy()
# Filter to relevant rows
if "redcap_repeat_instrument" in form_df.columns:
if is_repeating:
form_df = form_df[form_df["redcap_repeat_instrument"] == form_name]
else:
form_df = form_df[form_df["redcap_repeat_instrument"].isna()]
# Drop empty rows
data_cols = [c for c in available_fields if c in form_df.columns]
if data_cols:
form_df = form_df.dropna(subset=data_cols, how="all")
instruments[form_name] = form_df.reset_index(drop=True)
if include_metadata:
instrument_metadata[form_name] = form_meta
return TidyBundle(
instruments=instruments,
metadata=instrument_metadata,
)
Performing a Diff-Based Update¶
# redcap_sdk/transform/diff.py
from dataclasses import dataclass
import pandas as pd
@dataclass
class DiffResult:
records_to_add: pd.DataFrame
records_to_update: pd.DataFrame
records_to_delete: list[tuple]
unchanged_count: int
@property
def has_changes(self) -> bool:
return (
len(self.records_to_add) > 0 or
len(self.records_to_update) > 0 or
len(self.records_to_delete) > 0
)
def compute_diff(
current: pd.DataFrame,
modified: pd.DataFrame,
*,
key_columns: list[str] | None = None,
) -> DiffResult:
"""Compute minimal changes between current and modified states."""
# Default key columns
if key_columns is None:
potential_keys = [
"record_id",
"redcap_event_name",
"redcap_repeat_instrument",
"redcap_repeat_instance",
]
key_columns = [k for k in potential_keys if k in current.columns and k in modified.columns]
# Create tuple keys for comparison
current_keyed = current.set_index(key_columns)
modified_keyed = modified.set_index(key_columns)
current_keys = set(current_keyed.index)
modified_keys = set(modified_keyed.index)
# Identify changes
to_add_keys = modified_keys - current_keys
to_delete_keys = current_keys - modified_keys
to_check_keys = current_keys & modified_keys
# Find actual updates (changed values)
updates = []
unchanged = 0
for key in to_check_keys:
current_row = current_keyed.loc[key]
modified_row = modified_keyed.loc[key]
# Compare all columns
changed_cols = {}
for col in modified_keyed.columns:
if col in current_keyed.columns:
if not _values_equal(current_row[col], modified_row[col]):
changed_cols[col] = modified_row[col]
if changed_cols:
# Build update row with key columns + changed values only
update_row = dict(zip(key_columns, key if isinstance(key, tuple) else (key,)))
update_row.update(changed_cols)
updates.append(update_row)
else:
unchanged += 1
return DiffResult(
records_to_add=modified_keyed.loc[list(to_add_keys)].reset_index() if to_add_keys else pd.DataFrame(),
records_to_update=pd.DataFrame(updates) if updates else pd.DataFrame(),
records_to_delete=list(to_delete_keys),
unchanged_count=unchanged,
)
def _values_equal(a, b) -> bool:
"""Compare values accounting for NA."""
if pd.isna(a) and pd.isna(b):
return True
if pd.isna(a) or pd.isna(b):
return False
return a == b
# Usage in RecordsAPI
async def import_diff(
self,
current: pd.DataFrame,
modified: pd.DataFrame,
*,
key_fields: list[str] | None = None,
apply_deletes: bool = False,
) -> ImportResult:
"""Import only changed records."""
diff = compute_diff(current, modified, key_columns=key_fields)
self._client._logger.info(
"Diff computed",
adds=len(diff.records_to_add),
updates=len(diff.records_to_update),
deletes=len(diff.records_to_delete),
unchanged=diff.unchanged_count,
)
total_count = 0
if len(diff.records_to_add) > 0:
result = await self.import_(diff.records_to_add)
total_count += result.count
if len(diff.records_to_update) > 0:
result = await self.import_(
diff.records_to_update,
overwrite_behavior="overwrite",
)
total_count += result.count
if apply_deletes and diff.records_to_delete:
for key in diff.records_to_delete:
record_id = key[0] if isinstance(key, tuple) else key
await self.delete([str(record_id)])
total_count += 1
return ImportResult(
count=total_count,
warnings=[f"Diff: {len(diff.records_to_add)} adds, {len(diff.records_to_update)} updates, "
f"{len(diff.records_to_delete)} deletes, {diff.unchanged_count} unchanged"],
)
Uploading/Downloading a File¶
# redcap_sdk/api/files.py
from pathlib import Path
from typing import BinaryIO
import hashlib
from ..types import FileDownload, FileMetadata
class FilesAPI:
def __init__(self, client: "Client"):
self._client = client
async def download(
self,
record: str,
field: str,
*,
event: str | None = None,
repeat_instance: int | None = None,
validate_checksum: bool = True,
) -> FileDownload:
"""Download file attachment from record."""
params = {
"record": record,
"field": field,
}
if event:
params["event"] = event
if repeat_instance:
params["repeat_instance"] = repeat_instance
response = await self._client._http.post_raw(
self._client._url,
data={
"token": self._client._token,
"content": "file",
"action": "export",
**params,
},
)
# Parse headers for metadata
content_disp = response.headers.get("Content-Disposition", "")
filename = self._parse_filename(content_disp) or f"{record}_{field}"
filename = self._sanitize_filename(filename)
content = await response.aread()
checksum = hashlib.sha256(content).hexdigest() if validate_checksum else None
return FileDownload(
content=content,
filename=filename,
mime_type=response.headers.get("Content-Type"),
size=len(content),
checksum=checksum,
)
async def download_streaming(
self,
record: str,
field: str,
destination: Path,
*,
event: str | None = None,
repeat_instance: int | None = None,
chunk_size: int = 8192,
progress_callback=None,
) -> FileMetadata:
"""Stream large file to disk."""
params = {
"record": record,
"field": field,
}
if event:
params["event"] = event
if repeat_instance:
params["repeat_instance"] = repeat_instance
async with self._client._http.stream_post(
self._client._url,
data={
"token": self._client._token,
"content": "file",
"action": "export",
**params,
},
) as response:
content_disp = response.headers.get("Content-Disposition", "")
filename = self._parse_filename(content_disp) or f"{record}_{field}"
filename = self._sanitize_filename(filename)
total_size = int(response.headers.get("Content-Length", 0))
downloaded = 0
hasher = hashlib.sha256()
with open(destination, "wb") as f:
async for chunk in response.aiter_bytes(chunk_size):
f.write(chunk)
hasher.update(chunk)
downloaded += len(chunk)
if progress_callback:
progress_callback(downloaded, total_size)
return FileMetadata(
filename=filename,
mime_type=response.headers.get("Content-Type"),
size=downloaded,
checksum=hasher.hexdigest(),
)
async def upload(
self,
record: str,
field: str,
file_path: Path | None = None,
file_object: BinaryIO | None = None,
filename: str | None = None,
*,
event: str | None = None,
repeat_instance: int | None = None,
) -> None:
"""Upload file to record."""
if file_path is None and file_object is None:
raise ValueError("Must provide either file_path or file_object")
if file_object is not None and filename is None:
raise ValueError("filename required when using file_object")
if file_path:
filename = filename or file_path.name
file_object = open(file_path, "rb")
should_close = True
else:
should_close = False
try:
files = {
"file": (filename, file_object, self._guess_mime_type(filename)),
}
data = {
"token": self._client._token,
"content": "file",
"action": "import",
"record": record,
"field": field,
}
if event:
data["event"] = event
if repeat_instance:
data["repeat_instance"] = repeat_instance
await self._client._http.post_multipart(
self._client._url,
data=data,
files=files,
)
finally:
if should_close and file_object:
file_object.close()
async def delete(
self,
record: str,
field: str,
*,
event: str | None = None,
repeat_instance: int | None = None,
) -> None:
"""Delete file from record."""
params = {
"record": record,
"field": field,
}
if event:
params["event"] = event
if repeat_instance:
params["repeat_instance"] = repeat_instance
await self._client._request(
content="file",
action="delete",
**params,
)
@staticmethod
def _parse_filename(content_disposition: str) -> str | None:
import re
match = re.search(r'filename[*]?=["\']?([^"\';]+)["\']?', content_disposition)
return match.group(1) if match else None
@staticmethod
def _sanitize_filename(filename: str) -> str:
import re
import os
filename = os.path.basename(filename)
filename = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '_', filename)
return filename[:255]
@staticmethod
def _guess_mime_type(filename: str) -> str:
import mimetypes
mime_type, _ = mimetypes.guess_type(filename)
return mime_type or "application/octet-stream"
13. Test & Documentation Plan¶
13.1 Test Plan¶
Unit Tests¶
# tests/unit/test_type_casting.py
import pytest
import pandas as pd
from redcap_sdk.typing.caster import TypeCaster
from redcap_sdk.typing.functions import cast_integer, cast_date
class TestTypeCasting:
"""Unit tests for type casting edge cases."""
def test_cast_integer_from_float_string(self):
"""Handle '1.0' -> 1 conversion."""
result = cast_integer("1.0", "age", mock_schema)
assert result == 1
assert isinstance(result, int)
def test_cast_integer_validation_min(self):
"""Reject values below minimum."""
schema = FieldSchema(validation_min=0, ...)
with pytest.raises(ValueError, match="below minimum"):
cast_integer("-1", "age", schema)
def test_cast_date_multiple_formats(self):
"""Parse common date formats."""
assert cast_date("2024-01-15", "dob", mock_schema) == date(2024, 1, 15)
assert cast_date("01/15/2024", "dob", mock_schema) == date(2024, 1, 15)
def test_na_values_handling(self):
"""NA values should become None."""
caster = TypeCaster(metadata, na_values={"", "-999", "NA"})
df = pd.DataFrame({"age": ["25", "-999", "NA", ""]})
result = caster.cast_dataframe(df)
assert result["age"].tolist() == [25, None, None, None]
def test_checkbox_expansion(self):
"""Checkbox fields expand to multiple columns."""
# field___1, field___2, field___3
...
def test_permissive_mode_preserves_invalid(self):
"""Invalid values preserved in permissive mode."""
caster = TypeCaster(metadata, strict=False)
df = pd.DataFrame({"age": ["25", "unknown"]})
result = caster.cast_dataframe(df)
assert result["age"].tolist() == [25, "unknown"]
assert len(caster.get_validation_report()) == 1
# tests/unit/test_diff.py
class TestDiffComputation:
"""Unit tests for diff-based updates."""
def test_detect_new_records(self):
current = pd.DataFrame({"record_id": [1, 2], "value": [10, 20]})
modified = pd.DataFrame({"record_id": [1, 2, 3], "value": [10, 20, 30]})
diff = compute_diff(current, modified)
assert len(diff.records_to_add) == 1
assert diff.records_to_add["record_id"].iloc[0] == 3
def test_detect_changed_values(self):
current = pd.DataFrame({"record_id": [1], "value": [10]})
modified = pd.DataFrame({"record_id": [1], "value": [15]})
diff = compute_diff(current, modified)
assert len(diff.records_to_update) == 1
assert diff.records_to_update["value"].iloc[0] == 15
def test_unchanged_records_not_included(self):
current = pd.DataFrame({"record_id": [1], "value": [10]})
modified = pd.DataFrame({"record_id": [1], "value": [10]})
diff = compute_diff(current, modified)
assert len(diff.records_to_update) == 0
assert diff.unchanged_count == 1
def test_handles_na_values(self):
current = pd.DataFrame({"record_id": [1], "value": [None]})
modified = pd.DataFrame({"record_id": [1], "value": [None]})
diff = compute_diff(current, modified)
assert diff.unchanged_count == 1
# tests/unit/test_tidy.py
class TestTidyBundle:
"""Unit tests for tidy output generation."""
def test_separate_instruments(self):
"""Each instrument becomes its own DataFrame."""
...
def test_repeating_instruments_filtered(self):
"""Repeating instrument data only includes relevant rows."""
...
def test_key_columns_appropriate_for_project_type(self):
"""Key columns match project type."""
...
Contract Tests (Mocked API)¶
# tests/contract/test_api_contract.py
import pytest
from unittest.mock import AsyncMock
from redcap_sdk import Client
@pytest.fixture
def mock_http():
"""Mock HTTP transport returning realistic API responses."""
mock = AsyncMock()
return mock
@pytest.fixture
def client(mock_http):
"""Client with mocked transport."""
client = Client("https://redcap.example.edu/api/", "ABCD" * 8)
client._http = mock_http
return client
class TestRecordsContract:
"""Contract tests for Records API."""
async def test_export_records_request_format(self, client, mock_http):
"""Verify export_records sends correct request."""
mock_http.post.return_value = [{"record_id": "1", "age": "25"}]
await client.records.export(records=["1"], fields=["age"])
mock_http.post.assert_called_once()
call_data = mock_http.post.call_args[1]["data"]
assert call_data["content"] == "record"
assert call_data["records"] == ["1"]
assert call_data["fields"] == ["age"]
async def test_import_records_request_format(self, client, mock_http):
"""Verify import_records sends correct request."""
mock_http.post.return_value = {"count": 1}
await client.records.import_(
pd.DataFrame({"record_id": ["1"], "age": [25]}),
overwrite_behavior="normal",
)
call_data = mock_http.post.call_args[1]["data"]
assert call_data["content"] == "record"
assert call_data["overwriteBehavior"] == "normal"
class TestFilesContract:
"""Contract tests for Files API."""
async def test_download_file_request_format(self, client, mock_http):
"""Verify file download sends correct request."""
mock_response = AsyncMock()
mock_response.headers = {"Content-Disposition": 'filename="test.pdf"'}
mock_response.aread.return_value = b"file content"
mock_http.post_raw.return_value = mock_response
result = await client.files.download("1", "consent_doc")
assert result.filename == "test.pdf"
assert result.content == b"file content"
Golden-File Tests¶
# tests/golden/test_tidy_outputs.py
import json
from pathlib import Path
import pandas as pd
import pytest
GOLDEN_DIR = Path(__file__).parent / "golden_files"
class TestTidyOutputGolden:
"""Golden-file tests for tidy output consistency."""
@pytest.mark.parametrize("project_type", [
"classic",
"longitudinal",
"repeating",
"longitudinal_repeating",
])
def test_tidy_output_matches_golden(self, project_type):
"""Tidy output matches expected golden file."""
# Load input data
records = pd.read_csv(GOLDEN_DIR / f"{project_type}_records.csv")
metadata = pd.read_csv(GOLDEN_DIR / f"{project_type}_metadata.csv")
# Generate tidy output
bundle = create_tidy_bundle(records, metadata, ProjectType(project_type))
# Compare to golden
golden_path = GOLDEN_DIR / f"{project_type}_tidy.json"
if golden_path.exists():
expected = json.loads(golden_path.read_text())
for form_name, expected_df in expected.items():
pd.testing.assert_frame_equal(
bundle[form_name],
pd.DataFrame(expected_df),
)
else:
# Generate golden file (first run)
golden = {
name: df.to_dict("records")
for name, df in bundle.instruments.items()
}
golden_path.write_text(json.dumps(golden, indent=2))
pytest.skip("Golden file generated")
13.2 Documentation Plan¶
1. Quickstart Guide¶
Basic Usage¶
import os
from redcap_sdk import Client
# Create client (token from environment)
client = Client(
url="https://redcap.yourinstitution.edu/api/",
token=os.environ["REDCAP_TOKEN"],
)
# Export all records
async with client:
df = await client.records.export()
print(f"Exported {len(df)} records")
# Export specific records and fields
async with client:
df = await client.records.export(
records=["101", "102"],
fields=["record_id", "age", "gender"],
)
With Type Casting¶
# Automatic type casting based on metadata
df = await client.records.export(typed=True)
# Check for validation issues
caster = await client.metadata.get_type_caster()
report = caster.get_validation_report()
if len(report) > 0:
print("Validation warnings:", report)
Tidy Output (One Table Per Instrument)¶
# Get separate DataFrames for each instrument
bundle = await client.records.export_tidy()
# Access individual instruments
demographics = bundle["demographics"]
vitals = bundle["vitals"]
# Summary statistics
print(bundle.summary())
#### 2. Longitudinal & Repeating Guide
```markdown
# Working with Longitudinal and Repeating Data
## Understanding REDCap Data Structures
### Classic Projects
- Single record ID identifies each participant
- No events, no repeating
### Longitudinal Projects
- Records have multiple events (visits)
- Key: `(record_id, redcap_event_name)`
### Repeating Instruments
- Some forms can have multiple instances
- Key: `(record_id, redcap_repeat_instrument, redcap_repeat_instance)`
### Longitudinal + Repeating
- Both events AND repeating instruments
- Key: `(record_id, redcap_event_name, redcap_repeat_instrument, redcap_repeat_instance)`
## Recommended Approach: Tidy Output
```python
# Get tidy output - automatically handles complexity
bundle = await client.records.export_tidy()
# Demographics (non-repeating): one row per record per event
demographics = bundle["demographics"]
# Columns: record_id, redcap_event_name, ...
# Medications (repeating): one row per instance
medications = bundle["medications"]
# Columns: record_id, redcap_event_name, redcap_repeat_instrument, redcap_repeat_instance, ...
Merging Tidy Tables¶
# Merge demographics with medications
merged = demographics.merge(
medications,
on=["record_id", "redcap_event_name"],
how="left",
suffixes=("", "_med"),
)
Pivoting to Wide Format¶
from redcap_sdk.transform import pivot_to_wide
# Wide format: one row per participant
wide = pivot_to_wide(
demographics,
id_cols=["record_id"],
pivot_col="redcap_event_name",
)
# Columns: record_id, baseline_age, followup_age, ...
#### 3. Security & Tokens Guide
```markdown
# Security & Token Management
## Token Best Practices
### 1. Never Hardcode Tokens
```python
# BAD - token in code
client = Client(url, token="ABC123...")
# GOOD - token from environment
client = Client(url, token=os.environ["REDCAP_TOKEN"])
2. Use Separate Tokens for Different Purposes¶
Request tokens with minimal required permissions:
- Read-only token: For analytics and reporting
- Write token: For data entry applications
- Admin token: Only when managing users (rare)
3. Token Storage Options¶
Environment Variables (Recommended)
OS Keychain
from redcap_sdk.security import KeychainTokenProvider
provider = KeychainTokenProvider()
provider.set_token("your-token", project_name="my_study")
# Later...
token = provider.get_token("my_study")
HashiCorp Vault (Enterprise)
from redcap_sdk.security import VaultTokenProvider
provider = VaultTokenProvider(
vault_url="https://vault.company.com",
secret_path="secret/data/redcap/my_study",
)
token = provider.get_token()
4. Token Validation¶
from redcap_sdk.security import sanitize_token, verify_token
# Validate format before use
token = sanitize_token(raw_token)
# Verify token works
if await verify_token(client):
print("Token is valid")
#### 4. Troubleshooting Guide
```markdown
# Troubleshooting API Calls
*Inspired by REDCapR's troubleshooting guide*
## Systematic Debugging Approach
When API calls fail, work through these layers:
### 1. Server & Authorization
- [ ] Is your REDCap account active?
- [ ] Is your email verified in REDCap?
- [ ] Does your token have the required permissions?
- [ ] Is the project in production status (if required)?
**Test in REDCap API Playground first** - if it works there, the issue is on your end.
### 2. Network Communication
- [ ] Can you reach the REDCap server? `ping redcap.institution.edu`
- [ ] Are you on the right network (VPN required)?
- [ ] Is there a firewall blocking the connection?
**Test with curl:**
```bash
curl -X POST https://redcap.institution.edu/api/ \
-d "token=YOUR_TOKEN" \
-d "content=version"
3. SDK Level¶
- [ ] Is the SDK installed correctly?
- [ ] Are you using the correct URL (include /api/)?
- [ ] Is your token 32 hexadecimal characters?
Minimal test:
async with Client(url, token) as client:
version = await client.project.export_version()
print(f"REDCap version: {version}")
4. Application Level¶
- [ ] Are field names spelled correctly?
- [ ] Do requested records exist?
- [ ] Is the date format correct?
Common Error Messages¶
| Error | Likely Cause | Solution |
|---|---|---|
AuthError: Invalid token |
Token incorrect or expired | Regenerate token in REDCap |
ApiError: 403 |
Insufficient permissions | Request additional API rights |
TransportError: Connection timeout |
Network issue | Check VPN, firewall |
ValidationError: Field 'xyz' not found |
Typo in field name | Check data dictionary |
Enable Debug Logging¶
import logging
logging.basicConfig(level=logging.DEBUG)
# Or just for the SDK
logging.getLogger("redcap_sdk").setLevel(logging.DEBUG)
14. Open Questions / Assumptions¶
Assumptions Made¶
-
REDCap API version: Design assumes REDCap 10.0+ with standard API endpoints. Older versions may lack some features (e.g., file repository).
-
Pandas optional: Design keeps pandas as optional dependency. Users who don't need DataFrames can use JSON/dict outputs.
-
Async-first: Design uses async/await for all I/O operations. A sync wrapper can be added for simpler use cases.
-
Token format: Assumed standard 32-character hexadecimal tokens. Some institutions may have different formats.
-
SSL verification: Enabled by default. Some institutional REDCap instances may have certificate issues requiring custom CA bundles.
Open Questions¶
- Sync vs Async API: Should we provide both sync and async interfaces, or async-only with optional sync wrapper?
-
Recommendation: Async-only core, with
asyncio.run()convenience for simple scripts. -
DataFrame library: Should we support alternatives to pandas (polars, pyarrow)?
-
Recommendation: Start with pandas, add polars support if requested.
-
Caching strategy: Should the SDK cache metadata/project info between calls?
-
Recommendation: Optional caching with configurable TTL, disabled by default.
-
Batch size defaults: What's the optimal default batch size for most REDCap instances?
-
Recommendation: 500 records (balance between API calls and memory), configurable.
-
Error recovery for partial imports: How to handle partial failures in batch imports?
-
Recommendation: Return partial results with list of failures, let user decide.
-
Backwards compatibility with PyCap: Should we aim for API compatibility to ease migration?
-
Recommendation: No - design for clarity over compatibility, but provide migration guide.
-
R interface: Should we provide an R wrapper (reticulate-based)?
- Recommendation: Not in v1.0; R users have mature options (redcapAPI, REDCapR).
References¶
R Packages¶
- REDCapR Reference
- REDCapR Troubleshooting
- redcapAPI CRAN Manual
- redcapAPI exportRecordsTyped
- redcapAPI importRecords
- REDCapTidieR Vignette
- REDCapDM Paper
- rccola GitHub