Skip to content

REDCap SDK Design Document

Production-Grade Python API Interface for REDCap Integration

Version: 1.0 | Date: 2026-01-22


1. Executive Summary

  • Ecosystem maturity: R has the most mature REDCap tooling (REDCapR, redcapAPI, REDCapTidieR), while Python relies primarily on PyCap with limited alternatives
  • Core abstraction pattern: All major libraries use a connection/project object that encapsulates credentials and provides namespaced method access
  • Tidy data model (REDCapTidieR's "supertibble") represents the most analysis-friendly output format for complex longitudinal/repeating projects
  • Type casting is critical: redcapAPI's exportRecordsTyped demonstrates metadata-driven validation and casting as essential for production use
  • Diff-based writes (redcap-toolbox pattern) significantly reduce API load for incremental sync workflows
  • Batching strategies vary: REDCapR uses explicit batch glossaries; d3b-redcap-api uses intelligent chunking; redcapAPI uses batch.size parameter
  • Validation before write is a core principle in redcapAPI, preventing common import errors through pre-flight checks
  • Token security patterns are well-documented in REDCapR and rccola: never log tokens, use environment variables, support least privilege
  • Longitudinal/repeating handling requires consistent merge keys: (record_id, redcap_event_name, redcap_repeat_instrument, redcap_repeat_instance)
  • Error taxonomy should distinguish transport, auth, validation, and API-specific errors with structured payloads
  • Hierarchical data organization (d3b pattern) provides an intermediate representation useful for analysis workflows
  • File operations require streaming support for large files, checksum validation, and safe filename handling
  • Operational concerns: retry/backoff, rate limiting, correlation IDs, and structured logging are essential for production
  • Documentation quality directly correlates with adoption: REDCapR's troubleshooting guide is exemplary
  • Our design synthesizes best patterns: PyCap's minimalism, redcapAPI's validation, REDCapTidieR's tidy outputs, redcap-toolbox's efficiency

2. Ecosystem Review

2.1 R Packages

REDCapR

Repository: OuhscBbmc/REDCapR

Core Abstraction: Functional API with connection objects. Primary functions like redcap_read() accept a redcap_uri and token directly or via credential helpers.

Key Functions: | Function | Purpose | |----------|---------| | redcap_read() | Batched record export with automatic stacking | | redcap_read_oneshot() | Single-call record export | | redcap_read_eav_oneshot() | Export in entity-attribute-value format | | redcap_write() | Batched record import | | redcap_metadata_read/write() | Data dictionary operations | | redcap_file_download/upload_oneshot() | File attachment operations | | sanitize_token() | Token validation and cleaning | | retrieve_credential_local() | Secure local credential storage |

Batching Strategy: Uses create_batch_glossary() to create a dataset guiding batch operations. Batches process subsets of records/fields to avoid timeouts.

Error Handling: Returns structured results with success boolean, raw data, and outcome messages. The troubleshooting guide provides a systematic debugging approach: server → network → library → application.

Security Posture: - Default SSL certificate verification - sanitize_token() validates token format before use - Credential helpers support local encrypted storage - Documentation explicitly addresses token hygiene

Strengths: Excellent documentation, robust batching, comprehensive troubleshooting guide, mature codebase (MIT license, active maintenance).


redcapAPI

Repository: vubiostat/redcapAPI

Core Abstraction: Object-oriented via redcapConnection objects. Methods operate on the connection.

Key Functions: | Function | Purpose | |----------|---------| | exportRecordsTyped() | Type-cast record export with validation | | importRecords() | Validated record import with pre-flight checks | | exportMetaData() | Data dictionary export | | fieldValidationAndCasting | Customizable validation/casting framework |

Type Casting System (via exportRecordsTyped): - Metadata-driven field typing - Inversion of control: users can override any casting decision - Validation functions: valRx (regex), valChoice (choices), valSkip (bypass) - Cast functions: castRaw, castCode, castLabel, default_cast_no_factor - Missing data detection precedes validation - reviewInvalidRecords() generates validation reports with hotlinks

Import Validation (importRecords): 1. Verify all variables exist in data dictionary 2. Confirm record ID presence and position 3. Remove calculated fields automatically 4. Validate date field types (character, POSIXct, Date) 5. Check values against validation limits 6. Write validation failures to logfile

Batching: batch.size parameter controls records per API call.

Strengths: Most sophisticated validation/type-casting system, extensive vignettes, analysis-ready outputs.


REDCapTidieR

Repository: CRAN package

Core Abstraction: Returns a "supertibble" — a tibble where each row represents one REDCap instrument.

Key Design:

supertibble
├── redcap_form_name      # Instrument identifier
├── redcap_form_label     # Human-readable name
├── redcap_data           # Nested tibble of observations
├── redcap_metadata       # Nested tibble of field definitions
├── row_count             # Quick data quality metrics
├── col_count
├── pct_missing
└── form_complete_pct

Longitudinal/Repeating Handling: - Each instrument's data tibble contains appropriate merge keys - Non-repeating instruments: (record_id, redcap_event_name) - Repeating instruments: (record_id, redcap_event_name, redcap_repeat_instrument, redcap_repeat_instance) - Preserves relational integrity through nesting rather than wide joins

Example: 734 heroes × 5,966 power records cleanly separated rather than cartesian-joined.

Strengths: Most analysis-friendly output format, excellent for complex projects, avoids wide-table explosion.


REDCapDM

Publication: PMC10905808

Core Abstraction: Clinical data management workflow tool.

Key Components: | Function | Purpose | |----------|---------| | redcap_data() | Import via API or files | | rd_transform() | Multi-step preprocessing pipeline | | rd_query() | Generate missing data/discrepancy reports | | rd_event() | Detect missing events (REDCap gap) | | check_queries() | Track query resolution over time |

Transformation Pipeline (rd_transform): 1. Recalculate computed fields for verification 2. Convert checkbox variables to interpretable names 3. Replace variables with factor-format versions 4. Convert branching logic to R syntax 5. Remove completion status/timestamp variables

Query Management: Structured tracking of data quality issues with comparison between successive reports.

Strengths: Purpose-built for clinical trial workflows, addresses real-world data management needs.


rccola

Repository: cran/rccola

Purpose: Secure credential management for REDCap tokens.

Key Patterns: - Environment variable storage - OS keychain integration - Separation of read vs. write tokens (least privilege) - Token validation before use


2.2 Python Packages

PyCap

Repository: redcap-tools/PyCap

Design Philosophy: "Minimal interface exposing all required and optional API parameters... doesn't do anything fancy behind the scenes."

Core Abstraction: Single Project class instantiated with (api_url, api_key).

API Surface:

project = Project(api_url, api_key)

# Records
project.export_records(format_type, records, fields, forms, events, ...)
project.import_records(to_import, import_format, returnFormat)
project.delete_records(records, arm, instrument, event, repeat_instance)

# Metadata
project.export_metadata(format_type, fields, forms)
project.import_metadata(to_import, import_format)

# Files
project.export_file(record, field, event, repeat_instance) → (bytes, dict)
project.import_file(record, field, file_name, file_object, event)
project.delete_file(record, field, event)

# File Repository
project.export_file_repository(folder_id, format_type)
project.export_file_from_repository(doc_id)
project.create_folder_in_repository(name, folder_id, dag_id, role_id)
project.delete_file_from_repository(doc_id)

# Events/Arms (longitudinal)
project.export_events(format_type, arms)
project.import_events(to_import)
project.delete_events(events)
project.export_arms(format_type, arms)
project.import_arms(to_import)
project.delete_arms(arms)

# Users/DAGs
project.export_users(format_type)
project.import_users(to_import)
project.delete_users(users)
project.export_dags(format_type)
project.export_user_dag_assignment(format_type)

# Project Info
project.export_project_info(format_type)
project.export_logging(format_type, log_type, user, record, ...)
project.export_version()

# Properties
project.is_longitudinal  # bool
project.def_field        # primary key field name
project.field_names      # list
project.forms            # list
project.metadata         # dict

Format Support: JSON, CSV, XML, pandas DataFrame (via df_kwargs).

Repeating Instruments: repeat_instance parameter supported throughout.

Error Handling: RedcapError for API failures, ValueError for invalid inputs.

Strengths: Comprehensive endpoint coverage, clear 1:1 mapping to REDCap API, well-maintained (healthy release cadence, 190 stars, 1,729 weekly downloads).

Weaknesses: No built-in type casting, no validation before write, no batching, no tidy output option.


redcaplite

Repository: PyPI

Design Philosophy: "Lightweight, user-friendly Python client... minimal dependencies to keep your environment lean."

Core Abstraction: RedcapClient class with 40+ methods organized as Export/Import/Delete.

Key Features: - Full type hints - Pandas integration via pd_read_csv_kwargs - Minimal dependencies - Comprehensive test coverage

Trade-offs: Focuses on common endpoints rather than complete API coverage.


redcap-toolbox

Repository: PyPI

Core Innovation: Diff-based imports to minimize API load.

Key Methods: | Command | Purpose | |---------|---------| | download_redcap | Export with optional survey timestamps | | download_redcap_report | Report-based export | | split_redcap_data | Partition by event/instrument | | update_redcap_diff | Apply differential updates |

Diff Strategy: 1. Export current state to "original" file 2. Create "modified" cache with changes 3. update_redcap_diff computes minimal delta 4. Only changed fields transmitted to API

Authentication: Environment variables (REDCAP_API_URL, REDCAP_API_TOKEN).

Strengths: Significant API load reduction for incremental sync workflows.


d3b-redcap-api-python

Repository: d3b-center/d3b-redcap-api-python

Architecture: Three-tier design: 1. Low-level transport: Generic HTTP handlers 2. API operations: 1:1 REDCap endpoint methods 3. High-level structuring: Hierarchical data organization

Key Innovation: get_records_tree() transforms flat API responses into nested structures organized by event → instrument → record_id → instance.

Batching: get_records employs intelligent chunking to prevent timeout failures on large datasets.

Pandas Integration: to_df(), all_dfs() convert tree structures to DataFrames.

Motivation: Created when PyCap had maintenance gaps; focuses on hierarchical data organization.


3. Capability Matrix

Feature REDCapR redcapAPI REDCapTidieR REDCapDM PyCap redcaplite redcap-toolbox d3b-python
Auth/Connection Native Native Via REDCapR Via redcapAPI Native Native Env vars Native
Read Records Native Native Native Native Native Native Native Native
Write Records Native Native No No Native Native Diff-based Native
Delete Records Native Partial No No Native Native No Native
Metadata Export Native Native Native Native Native Native No Native
Metadata Import Native Native No No Native Native No Native
Instruments List Native Native Native Native Native Native Partial Native
Events (longitudinal) Native Native Native Native Native Native Native Native
Arms Native Native Native Native Native Native No Native
DAGs Partial Native No No Native Native No Native
File Upload Native Native No No Native Native No Native
File Download Native Native No No Native Native No Native
File Repository Native Partial No No Native Partial No Partial
Users Partial Native No No Native Native No Native
Logs No Native No No Native Partial No Native
Reports Native Native No No Native Native Native Native
Type Casting Partial Native Via parent Via parent No Partial No No
Validation Basic Native Via parent Native No Basic No No
Tidy Outputs No No Native Partial No No Partial Partial
Longitudinal Handling Basic Basic Native Native Basic Basic Native Native
Repeating Instruments Basic Basic Native Native Basic Basic Basic Native
Diff-based Writes No No No No No No Native No
Batching Native Native Via parent Via parent No No No Native
Retry/Backoff Partial Partial No No No No No No
Rate Limiting No No No No No No No No
Structured Logging No No No No No No No No
Correlation IDs No No No No No No No No
Query Management No No No Native No No No No

Legend: Native = Built-in support | Partial = Limited support | No = Not supported | Via parent = Depends on underlying package


4. Design Principles for the New Interface

4.1 Core Principles

  1. Safe by Default
  2. Tokens never logged or exposed in error messages
  3. SSL verification enabled by default
  4. Validation before write operations
  5. Least privilege guidance (separate read/write tokens)

  6. Consistent Mental Model

  7. Single Client class with namespaced endpoint groups
  8. Uniform method signatures across endpoint families
  9. Predictable return types with explicit format options

  10. Efficient by Design

  11. Automatic batching for large operations
  12. Optional diff-based writes for incremental sync
  13. Connection pooling and request reuse
  14. Configurable rate limiting

  15. Explicit Schema Handling

  16. Metadata-driven type casting
  17. Validation framework with customizable rules
  18. Clear missing value semantics

  19. Analysis-Ready Outputs

  20. Tidy (per-instrument) output as first-class option
  21. Consistent merge keys for longitudinal/repeating data
  22. DataFrame integration without forcing pandas dependency

  23. Observable Operations

  24. Structured logging with correlation IDs
  25. Request/response metrics
  26. Deterministic error taxonomy

4.2 Design Decisions

Decision Choice Rationale
HTTP library httpx Async support, connection pooling, modern API
Validation Pydantic v2 Industry standard, excellent performance
DataFrames Optional pandas Don't force dependency; provide integration
Batching Automatic with override Sensible defaults, expert escape hatch
Type casting Opt-in strict mode Flexibility for edge cases
Tidy output Explicit method Clear intent, avoid surprise transformations

5. Proposed Public API (Python)

5.1 Client Initialization

from redcap_sdk import Client, ClientConfig

# Minimal initialization
client = Client(
    url="https://redcap.institution.edu/api/",
    token="your-api-token"
)

# Full configuration
config = ClientConfig(
    timeout=30.0,
    max_retries=3,
    backoff_factor=0.5,
    rate_limit_per_minute=60,
    verify_ssl=True,
    ca_bundle="/path/to/certs.pem",  # optional
    batch_size=500,
    enable_diff_writes=True,
    log_level="INFO",
)

client = Client(
    url="https://redcap.institution.edu/api/",
    token=os.environ["REDCAP_TOKEN"],  # recommended pattern
    config=config
)

# Context manager for cleanup
async with Client(url, token) as client:
    records = await client.records.export()

5.2 Records Namespace

class RecordsAPI:
    def export(
        self,
        *,
        format: Literal["json", "csv", "xml", "df"] = "df",
        records: list[str] | None = None,
        fields: list[str] | None = None,
        forms: list[str] | None = None,
        events: list[str] | None = None,
        raw_or_label: Literal["raw", "label", "both"] = "raw",
        export_checkbox_labels: bool = False,
        export_survey_fields: bool = False,
        export_data_access_groups: bool = False,
        filter_logic: str | None = None,
        date_range_begin: datetime | None = None,
        date_range_end: datetime | None = None,
        typed: bool = True,  # Apply metadata-driven type casting
        cast_overrides: dict[str, Callable] | None = None,
        validation_mode: Literal["strict", "permissive", "skip"] = "permissive",
    ) -> pd.DataFrame | list[dict] | str:
        """
        Export records from the project.

        Args:
            format: Output format. "df" returns pandas DataFrame.
            records: Specific record IDs to export. None = all records.
            fields: Specific fields to export. None = all fields.
            forms: Specific forms/instruments to export. None = all forms.
            events: Specific events (longitudinal). None = all events.
            raw_or_label: Return raw codes, labels, or both.
            export_checkbox_labels: Include checkbox option labels.
            export_survey_fields: Include survey timestamp/identifier fields.
            export_data_access_groups: Include DAG assignment.
            filter_logic: REDCap filter logic expression.
            date_range_begin: Filter by record creation date.
            date_range_end: Filter by record creation date.
            typed: Apply type casting based on metadata.
            cast_overrides: Custom casting functions by field name or type.
            validation_mode: How to handle validation failures.

        Returns:
            Records in requested format.

        Raises:
            AuthError: Invalid or expired token.
            ApiError: REDCap API error response.
            ValidationError: Data fails validation (strict mode).
        """
        ...

    def export_tidy(
        self,
        *,
        forms: list[str] | None = None,
        events: list[str] | None = None,
        typed: bool = True,
        include_metadata: bool = True,
    ) -> TidyBundle:
        """
        Export records as tidy per-instrument tables (REDCapTidieR-style).

        Returns:
            TidyBundle with one DataFrame per instrument, plus metadata.
        """
        ...

    def import_(
        self,
        data: pd.DataFrame | list[dict],
        *,
        overwrite_behavior: Literal["normal", "overwrite"] = "normal",
        return_content: Literal["count", "ids", "auto_ids"] = "count",
        force_auto_number: bool = False,
        validate: bool = True,
        date_format: str = "YMD",
    ) -> ImportResult:
        """
        Import records to the project.

        Args:
            data: Records to import.
            overwrite_behavior: "normal" preserves existing data; "overwrite" replaces.
            return_content: What to return (count, IDs, or auto-generated IDs).
            force_auto_number: Let REDCap assign record IDs.
            validate: Run pre-flight validation against metadata.
            date_format: Expected date format in data.

        Returns:
            ImportResult with count/IDs and any validation warnings.

        Raises:
            ValidationError: Data fails pre-flight validation.
            ApiError: REDCap rejects the import.
        """
        ...

    def import_diff(
        self,
        current: pd.DataFrame,
        modified: pd.DataFrame,
        *,
        key_fields: list[str] | None = None,
    ) -> ImportResult:
        """
        Import only changed records (diff-based, redcap-toolbox pattern).

        Args:
            current: Current state of records (from previous export).
            modified: Desired state of records.
            key_fields: Fields that define record identity.
                        Default: [record_id, redcap_event_name,
                                  redcap_repeat_instrument, redcap_repeat_instance]

        Returns:
            ImportResult with count of actually modified records.
        """
        ...

    def delete(
        self,
        records: list[str],
        *,
        arm: str | None = None,
        instrument: str | None = None,
        event: str | None = None,
        repeat_instance: int | None = None,
        delete_logging: bool = False,
    ) -> int:
        """
        Delete records (development projects only).

        Returns:
            Count of deleted records.
        """
        ...

5.3 Metadata Namespace

class MetadataAPI:
    def export(
        self,
        *,
        format: Literal["json", "df"] = "df",
        fields: list[str] | None = None,
        forms: list[str] | None = None,
    ) -> pd.DataFrame | list[dict]:
        """Export project data dictionary."""
        ...

    def import_(
        self,
        metadata: pd.DataFrame | list[dict],
    ) -> int:
        """Import/update data dictionary. Returns count of fields."""
        ...

    def get_field_schema(
        self,
        field_name: str,
    ) -> FieldSchema:
        """
        Get parsed schema for a specific field.

        Returns:
            FieldSchema with type, validation, choices, branching logic.
        """
        ...

    def get_type_caster(self) -> TypeCaster:
        """
        Get a TypeCaster configured from project metadata.

        Returns:
            TypeCaster that can transform raw API data to typed DataFrames.
        """
        ...

5.4 Files Namespace

class FilesAPI:
    def download(
        self,
        record: str,
        field: str,
        *,
        event: str | None = None,
        repeat_instance: int | None = None,
        validate_checksum: bool = True,
    ) -> FileDownload:
        """
        Download file attachment from a record.

        Returns:
            FileDownload with content (bytes), filename, mime_type, size.
        """
        ...

    def download_streaming(
        self,
        record: str,
        field: str,
        destination: Path | BinaryIO,
        *,
        event: str | None = None,
        repeat_instance: int | None = None,
        chunk_size: int = 8192,
    ) -> FileMetadata:
        """
        Stream large file to disk or file-like object.

        Returns:
            FileMetadata with filename, mime_type, size, checksum.
        """
        ...

    def upload(
        self,
        record: str,
        field: str,
        file_path: Path | None = None,
        file_object: BinaryIO | None = None,
        filename: str | None = None,
        *,
        event: str | None = None,
        repeat_instance: int | None = None,
    ) -> None:
        """
        Upload file attachment to a record.

        Args:
            file_path: Path to file on disk.
            file_object: File-like object (provide filename if using this).
            filename: Override filename (required if using file_object).
        """
        ...

    def delete(
        self,
        record: str,
        field: str,
        *,
        event: str | None = None,
        repeat_instance: int | None = None,
    ) -> None:
        """Delete file attachment from a record."""
        ...

5.5 File Repository Namespace

class FileRepositoryAPI:
    def list(
        self,
        *,
        folder_id: int | None = None,
        format: Literal["json", "df"] = "df",
    ) -> pd.DataFrame | list[dict]:
        """List files and folders in repository."""
        ...

    def download(
        self,
        doc_id: int,
        destination: Path | None = None,
    ) -> FileDownload | Path:
        """Download file from repository."""
        ...

    def upload(
        self,
        file_path: Path,
        *,
        folder_id: int | None = None,
    ) -> int:
        """Upload file to repository. Returns doc_id."""
        ...

    def create_folder(
        self,
        name: str,
        *,
        parent_folder_id: int | None = None,
        dag_id: int | None = None,
        role_id: int | None = None,
    ) -> int:
        """Create folder in repository. Returns folder_id."""
        ...

    def delete(self, doc_id: int) -> None:
        """Delete file from repository."""
        ...

5.6 Events/Arms Namespace (Longitudinal)

class EventsAPI:
    def export(
        self,
        *,
        arms: list[str] | None = None,
        format: Literal["json", "df"] = "df",
    ) -> pd.DataFrame | list[dict]:
        """Export events for longitudinal project."""
        ...

    def import_(self, events: pd.DataFrame | list[dict]) -> int:
        """Import events. Returns count."""
        ...

    def delete(self, events: list[str]) -> int:
        """Delete events. Returns count."""
        ...


class ArmsAPI:
    def export(
        self,
        *,
        arms: list[str] | None = None,
        format: Literal["json", "df"] = "df",
    ) -> pd.DataFrame | list[dict]:
        """Export study arms."""
        ...

    def import_(self, arms: pd.DataFrame | list[dict]) -> int:
        """Import arms. Returns count."""
        ...

    def delete(self, arms: list[str]) -> int:
        """Delete arms. Returns count."""
        ...

5.7 Instruments Namespace

class InstrumentsAPI:
    def list(
        self,
        *,
        format: Literal["json", "df"] = "df",
    ) -> pd.DataFrame | list[dict]:
        """List all instruments/forms in project."""
        ...

    def export_mapping(
        self,
        *,
        arms: list[str] | None = None,
        format: Literal["json", "df"] = "df",
    ) -> pd.DataFrame | list[dict]:
        """Export instrument-event mapping (longitudinal)."""
        ...

    def import_mapping(
        self,
        mapping: pd.DataFrame | list[dict],
    ) -> int:
        """Import instrument-event mapping. Returns count."""
        ...

    def export_pdf(
        self,
        instrument: str | None = None,
        *,
        record: str | None = None,
        event: str | None = None,
        all_records: bool = False,
        compact_display: bool = False,
    ) -> bytes:
        """Export instrument as PDF."""
        ...

5.8 Users/DAGs Namespace

class UsersAPI:
    def export(
        self,
        *,
        format: Literal["json", "df"] = "df",
    ) -> pd.DataFrame | list[dict]:
        """Export users with permissions."""
        ...

    def import_(self, users: pd.DataFrame | list[dict]) -> int:
        """Import/update users. Returns count."""
        ...

    def delete(self, users: list[str]) -> int:
        """Delete users. Returns count."""
        ...


class DagsAPI:
    def export(
        self,
        *,
        format: Literal["json", "df"] = "df",
    ) -> pd.DataFrame | list[dict]:
        """Export Data Access Groups."""
        ...

    def import_(self, dags: pd.DataFrame | list[dict]) -> int:
        """Import DAGs. Returns count."""
        ...

    def delete(self, dags: list[str]) -> int:
        """Delete DAGs. Returns count."""
        ...

    def export_user_assignment(
        self,
        *,
        format: Literal["json", "df"] = "df",
    ) -> pd.DataFrame | list[dict]:
        """Export user-DAG assignments."""
        ...

    def import_user_assignment(
        self,
        assignments: pd.DataFrame | list[dict],
    ) -> int:
        """Import user-DAG assignments. Returns count."""
        ...

    def switch(self, dag: str | None) -> None:
        """Switch current user's DAG context. None = no DAG filter."""
        ...

5.9 Project Namespace

class ProjectAPI:
    def info(self) -> ProjectInfo:
        """
        Get project information.

        Returns:
            ProjectInfo with title, is_longitudinal, has_repeating_instruments,
            record_autonumbering_enabled, etc.
        """
        ...

    def export_xml(self, *, include_records: bool = False) -> bytes:
        """Export entire project as XML (REDCap XML format)."""
        ...

    def generate_next_record_name(self) -> str:
        """Generate next available record ID."""
        ...


class LoggingAPI:
    def export(
        self,
        *,
        format: Literal["json", "df"] = "df",
        log_type: Literal["export", "manage", "user", "record", "record_add",
                          "record_edit", "record_delete", "lock_record",
                          "page_view"] | None = None,
        user: str | None = None,
        record: str | None = None,
        dag: str | None = None,
        begin_time: datetime | None = None,
        end_time: datetime | None = None,
    ) -> pd.DataFrame | list[dict]:
        """Export audit logs with filtering."""
        ...

5.10 Reports Namespace

class ReportsAPI:
    def export(
        self,
        report_id: int,
        *,
        format: Literal["json", "csv", "xml", "df"] = "df",
        raw_or_label: Literal["raw", "label", "both"] = "raw",
        export_checkbox_labels: bool = False,
        typed: bool = True,
    ) -> pd.DataFrame | list[dict] | str:
        """Export a saved report by ID."""
        ...

5.11 Surveys Namespace

class SurveysAPI:
    def export_participant_list(
        self,
        instrument: str,
        *,
        event: str | None = None,
        format: Literal["json", "df"] = "df",
    ) -> pd.DataFrame | list[dict]:
        """Export survey participant list."""
        ...

    def export_link(
        self,
        record: str,
        instrument: str,
        *,
        event: str | None = None,
        repeat_instance: int | None = None,
    ) -> str:
        """Get survey link for specific record."""
        ...

    def export_return_code(
        self,
        record: str,
        instrument: str,
        *,
        event: str | None = None,
        repeat_instance: int | None = None,
    ) -> str:
        """Get survey return code for record."""
        ...

    def export_queue_link(self, record: str) -> str:
        """Get survey queue link for record."""
        ...

5.12 Repeating Instruments Namespace

class RepeatingAPI:
    def export_settings(
        self,
        *,
        format: Literal["json", "df"] = "df",
    ) -> pd.DataFrame | list[dict]:
        """Export repeating instruments/events settings."""
        ...

    def import_settings(
        self,
        settings: pd.DataFrame | list[dict],
    ) -> int:
        """Import repeating instruments/events settings."""
        ...

6. Data Model & Typing Strategy

6.1 Core Data Types

from dataclasses import dataclass
from typing import Literal, Any
from datetime import datetime, date, time
import pandas as pd


@dataclass
class TidyBundle:
    """REDCapTidieR-style output: one DataFrame per instrument."""

    instruments: dict[str, pd.DataFrame]  # form_name -> data
    metadata: dict[str, pd.DataFrame]     # form_name -> field definitions

    # Convenience accessors
    def __getitem__(self, form_name: str) -> pd.DataFrame:
        return self.instruments[form_name]

    def summary(self) -> pd.DataFrame:
        """Return summary table (like REDCapTidieR supertibble)."""
        rows = []
        for form_name, df in self.instruments.items():
            rows.append({
                "form_name": form_name,
                "row_count": len(df),
                "col_count": len(df.columns),
                "pct_missing": df.isna().mean().mean() * 100,
            })
        return pd.DataFrame(rows)


@dataclass
class FieldSchema:
    """Parsed schema for a single field from metadata."""

    field_name: str
    field_label: str
    field_type: Literal["text", "notes", "calc", "dropdown", "radio",
                        "checkbox", "yesno", "truefalse", "file",
                        "slider", "descriptive", "sql"]
    validation_type: str | None  # "date_ymd", "integer", "number", "email", etc.
    validation_min: Any | None
    validation_max: Any | None
    choices: dict[str, str] | None  # code -> label for dropdowns/radios
    branching_logic: str | None
    required: bool
    identifier: bool  # PHI identifier
    form_name: str

    def get_python_type(self) -> type:
        """Return appropriate Python type for this field."""
        type_map = {
            ("text", None): str,
            ("text", "integer"): int,
            ("text", "number"): float,
            ("text", "date_ymd"): date,
            ("text", "datetime_ymd"): datetime,
            ("text", "time"): time,
            ("text", "email"): str,
            ("notes", None): str,
            ("calc", None): float,
            ("dropdown", None): str,  # or Categorical
            ("radio", None): str,
            ("checkbox", None): bool,  # individual checkbox field
            ("yesno", None): bool,
            ("truefalse", None): bool,
            ("slider", None): int,
            ("file", None): str,  # filename
        }
        return type_map.get((self.field_type, self.validation_type), str)


@dataclass
class ImportResult:
    """Result of an import operation."""

    count: int
    ids: list[str] | None = None
    warnings: list[str] | None = None
    validation_report: pd.DataFrame | None = None


@dataclass
class FileDownload:
    """Result of a file download."""

    content: bytes
    filename: str
    mime_type: str | None
    size: int
    checksum: str | None = None


@dataclass
class FileMetadata:
    """Metadata for a file (without content)."""

    filename: str
    mime_type: str | None
    size: int
    checksum: str | None = None


@dataclass
class ProjectInfo:
    """Project-level information."""

    project_id: int
    project_title: str
    is_longitudinal: bool
    has_repeating_instruments: bool
    has_repeating_events: bool
    record_autonumbering_enabled: bool
    surveys_enabled: bool
    scheduling_enabled: bool
    purpose: int
    purpose_other: str | None
    creation_time: datetime
    production_time: datetime | None
    in_production: bool
    project_language: str
    missing_data_codes: str | None

6.2 Type Casting System

from typing import Callable, Protocol
from abc import ABC, abstractmethod


class CastFunction(Protocol):
    """Protocol for field casting functions."""

    def __call__(
        self,
        value: str,
        field_name: str,
        schema: FieldSchema,
    ) -> Any:
        ...


class TypeCaster:
    """
    Metadata-driven type caster for REDCap data.

    Inspired by redcapAPI's exportRecordsTyped validation/casting framework.
    """

    def __init__(
        self,
        metadata: pd.DataFrame,
        *,
        na_values: set[str] = {"", "NA", "NaN", "-999"},
        strict: bool = False,
        cast_overrides: dict[str, CastFunction] | None = None,
    ):
        self._metadata = metadata
        self._schemas = self._parse_schemas(metadata)
        self._na_values = na_values
        self._strict = strict
        self._overrides = cast_overrides or {}
        self._validation_errors: list[dict] = []

    def cast_dataframe(
        self,
        df: pd.DataFrame,
        *,
        report_errors: bool = True,
    ) -> pd.DataFrame:
        """
        Cast all columns in DataFrame according to metadata.

        Args:
            df: Raw DataFrame from API.
            report_errors: Collect validation errors for review.

        Returns:
            DataFrame with typed columns.
        """
        result = df.copy()

        for col in result.columns:
            if col in self._schemas:
                schema = self._schemas[col]
                cast_fn = self._overrides.get(col) or self._get_caster(schema)
                result[col] = result[col].apply(
                    lambda v: self._cast_value(v, col, schema, cast_fn)
                )

        return result

    def get_validation_report(self) -> pd.DataFrame:
        """Return report of all validation errors encountered."""
        return pd.DataFrame(self._validation_errors)

    def _cast_value(
        self,
        value: Any,
        field_name: str,
        schema: FieldSchema,
        cast_fn: CastFunction,
    ) -> Any:
        # Handle NA values first
        if pd.isna(value) or str(value).strip() in self._na_values:
            return None

        try:
            return cast_fn(str(value), field_name, schema)
        except (ValueError, TypeError) as e:
            self._validation_errors.append({
                "field": field_name,
                "value": value,
                "expected_type": schema.get_python_type().__name__,
                "error": str(e),
            })
            if self._strict:
                raise ValidationError(f"Field '{field_name}' failed validation: {e}")
            return value  # Return raw value in permissive mode

    def _get_caster(self, schema: FieldSchema) -> CastFunction:
        """Get appropriate casting function for field type."""
        # Implementation follows redcapAPI patterns
        ...


# Built-in cast functions (like redcapAPI's castRaw, castCode, castLabel)
def cast_raw(value: str, field_name: str, schema: FieldSchema) -> str:
    """Return value unchanged."""
    return value


def cast_code(value: str, field_name: str, schema: FieldSchema) -> str:
    """Return coded value (for dropdowns/radios)."""
    return value


def cast_label(value: str, field_name: str, schema: FieldSchema) -> str:
    """Return label for coded value."""
    if schema.choices and value in schema.choices:
        return schema.choices[value]
    return value


def cast_integer(value: str, field_name: str, schema: FieldSchema) -> int:
    """Cast to integer with validation."""
    result = int(float(value))  # Handle "1.0" -> 1
    if schema.validation_min is not None and result < schema.validation_min:
        raise ValueError(f"Value {result} below minimum {schema.validation_min}")
    if schema.validation_max is not None and result > schema.validation_max:
        raise ValueError(f"Value {result} above maximum {schema.validation_max}")
    return result


def cast_number(value: str, field_name: str, schema: FieldSchema) -> float:
    """Cast to float with validation."""
    result = float(value)
    if schema.validation_min is not None and result < schema.validation_min:
        raise ValueError(f"Value {result} below minimum {schema.validation_min}")
    if schema.validation_max is not None and result > schema.validation_max:
        raise ValueError(f"Value {result} above maximum {schema.validation_max}")
    return result


def cast_date(value: str, field_name: str, schema: FieldSchema) -> date:
    """Cast to date with format detection."""
    # Handle multiple formats: YYYY-MM-DD, MM/DD/YYYY, etc.
    formats = ["%Y-%m-%d", "%m/%d/%Y", "%d/%m/%Y"]
    for fmt in formats:
        try:
            return datetime.strptime(value, fmt).date()
        except ValueError:
            continue
    raise ValueError(f"Cannot parse date: {value}")


def cast_checkbox(value: str, field_name: str, schema: FieldSchema) -> bool:
    """Cast checkbox to boolean."""
    return value in ("1", "Checked", "Yes", "TRUE", "True", "true")

6.3 Checkbox Handling

REDCap checkboxes are complex: a single checkbox field expands to multiple columns (field___1, field___2, etc.).

def reshape_checkboxes(
    df: pd.DataFrame,
    metadata: pd.DataFrame,
    *,
    mode: Literal["wide", "long", "combined"] = "wide",
) -> pd.DataFrame:
    """
    Handle checkbox field transformation.

    Args:
        mode:
            - "wide": Keep as separate columns (default REDCap export)
            - "long": Pivot to (record_id, field, choice, checked)
            - "combined": Combine into single column with list of checked values
    """
    ...

7. Longitudinal/Repetition Handling

7.1 Canonical Internal Representation

All records have a composite key:

@dataclass
class RecordKey:
    """Canonical record identifier for any REDCap project type."""

    record_id: str
    redcap_event_name: str | None = None        # Longitudinal projects
    redcap_repeat_instrument: str | None = None  # Repeating instruments
    redcap_repeat_instance: int | None = None    # Instance number (1-indexed)

    def as_tuple(self) -> tuple:
        return (
            self.record_id,
            self.redcap_event_name,
            self.redcap_repeat_instrument,
            self.redcap_repeat_instance,
        )

    @classmethod
    def merge_columns(cls) -> list[str]:
        """Return column names for merge operations."""
        return [
            "record_id",
            "redcap_event_name",
            "redcap_repeat_instrument",
            "redcap_repeat_instance",
        ]

7.2 Project Type Detection

class ProjectType(Enum):
    CLASSIC = "classic"                    # Simple, no events, no repeating
    LONGITUDINAL = "longitudinal"          # Events, no repeating
    REPEATING = "repeating"                # No events, repeating instruments
    LONGITUDINAL_REPEATING = "longitudinal_repeating"  # Both


def detect_project_type(project_info: ProjectInfo) -> ProjectType:
    """Detect project type from info."""
    if project_info.is_longitudinal and project_info.has_repeating_instruments:
        return ProjectType.LONGITUDINAL_REPEATING
    elif project_info.is_longitudinal:
        return ProjectType.LONGITUDINAL
    elif project_info.has_repeating_instruments:
        return ProjectType.REPEATING
    else:
        return ProjectType.CLASSIC

7.3 Tidy Output Generation

def create_tidy_bundle(
    records: pd.DataFrame,
    metadata: pd.DataFrame,
    project_type: ProjectType,
) -> TidyBundle:
    """
    Transform flat records into tidy per-instrument tables.

    This follows REDCapTidieR's approach:
    - Each instrument becomes its own DataFrame
    - Merge keys are consistent across instruments
    - No cartesian explosion from wide joins
    """
    instruments: dict[str, pd.DataFrame] = {}
    instrument_metadata: dict[str, pd.DataFrame] = {}

    # Group metadata by form
    form_groups = metadata.groupby("form_name")

    for form_name, form_meta in form_groups:
        # Get fields for this form (plus key columns)
        form_fields = form_meta["field_name"].tolist()

        # Determine appropriate key columns based on project type
        key_cols = _get_key_columns(form_name, project_type, metadata)

        # Select relevant columns
        select_cols = key_cols + [f for f in form_fields if f in records.columns]
        form_df = records[select_cols].copy()

        # Drop rows where all form fields are NA (no data for this form)
        data_cols = [c for c in form_fields if c in form_df.columns]
        form_df = form_df.dropna(subset=data_cols, how="all")

        # Handle repeating: keep only rows with matching repeat_instrument
        if "redcap_repeat_instrument" in form_df.columns:
            is_repeating = form_meta["repeating"].any()
            if is_repeating:
                form_df = form_df[
                    form_df["redcap_repeat_instrument"] == form_name
                ]
            else:
                form_df = form_df[
                    form_df["redcap_repeat_instrument"].isna()
                ]

        instruments[form_name] = form_df.reset_index(drop=True)
        instrument_metadata[form_name] = form_meta

    return TidyBundle(
        instruments=instruments,
        metadata=instrument_metadata,
    )


def _get_key_columns(
    form_name: str,
    project_type: ProjectType,
    metadata: pd.DataFrame,
) -> list[str]:
    """Determine appropriate key columns for an instrument."""
    keys = ["record_id"]

    if project_type in (ProjectType.LONGITUDINAL, ProjectType.LONGITUDINAL_REPEATING):
        keys.append("redcap_event_name")

    if project_type in (ProjectType.REPEATING, ProjectType.LONGITUDINAL_REPEATING):
        # Check if this specific form is repeating
        form_meta = metadata[metadata["form_name"] == form_name]
        if form_meta["repeating"].any():
            keys.extend(["redcap_repeat_instrument", "redcap_repeat_instance"])

    return keys

7.4 Pivot/Widen Helpers

def pivot_to_wide(
    tidy_df: pd.DataFrame,
    *,
    id_cols: list[str] = ["record_id"],
    pivot_col: str = "redcap_event_name",
    value_cols: list[str] | None = None,
) -> pd.DataFrame:
    """
    Pivot longitudinal data from long to wide format.

    Example: record_id, event, score -> record_id, baseline_score, followup_score
    """
    if value_cols is None:
        value_cols = [c for c in tidy_df.columns if c not in id_cols + [pivot_col]]

    return tidy_df.pivot_table(
        index=id_cols,
        columns=pivot_col,
        values=value_cols,
        aggfunc="first",
    ).reset_index()


def nest_repeating(
    tidy_df: pd.DataFrame,
    *,
    group_cols: list[str] = ["record_id", "redcap_event_name"],
) -> pd.DataFrame:
    """
    Nest repeating instances into list columns.

    Useful for creating nested JSON or working with hierarchical data.
    """
    return tidy_df.groupby(group_cols).apply(
        lambda g: g.drop(columns=group_cols).to_dict("records")
    ).reset_index(name="instances")

8. Write Strategies

8.1 Full Replace vs Normal

class WriteMode(Enum):
    NORMAL = "normal"      # Blanks don't overwrite existing data
    OVERWRITE = "overwrite"  # Blanks replace existing data


def import_records(
    client: Client,
    data: pd.DataFrame,
    *,
    mode: WriteMode = WriteMode.NORMAL,
    validate: bool = True,
) -> ImportResult:
    """
    Import records with specified overwrite behavior.

    NORMAL mode: Only non-blank values update the database.
                 Existing values preserved if new value is blank.

    OVERWRITE mode: All values (including blanks) update the database.
                    Use with caution - can delete data.
    """
    ...

8.2 Diff-Based Import

Inspired by redcap-toolbox's approach:

def compute_diff(
    current: pd.DataFrame,
    modified: pd.DataFrame,
    *,
    key_columns: list[str] | None = None,
) -> DiffResult:
    """
    Compute minimal changes needed to transform current -> modified.

    Args:
        current: Current state (from previous export).
        modified: Desired state.
        key_columns: Columns that identify records.
                     Default: RecordKey.merge_columns()

    Returns:
        DiffResult with:
        - records_to_add: New records not in current
        - records_to_update: Changed records (only changed fields)
        - records_to_delete: Records in current but not modified
        - unchanged_count: Records with no changes
    """
    if key_columns is None:
        key_columns = [c for c in RecordKey.merge_columns() if c in current.columns]

    # Create composite keys
    current_keys = set(current[key_columns].apply(tuple, axis=1))
    modified_keys = set(modified[key_columns].apply(tuple, axis=1))

    # Identify adds, deletes, potential updates
    to_add_keys = modified_keys - current_keys
    to_delete_keys = current_keys - modified_keys
    to_check_keys = current_keys & modified_keys

    # For potential updates, compare field by field
    updates = []
    unchanged = 0

    for key in to_check_keys:
        key_filter = _make_key_filter(key_columns, key)
        current_row = current.loc[key_filter].iloc[0]
        modified_row = modified.loc[key_filter].iloc[0]

        changed_fields = {}
        for col in modified.columns:
            if col in key_columns:
                continue
            if not _values_equal(current_row[col], modified_row[col]):
                changed_fields[col] = modified_row[col]

        if changed_fields:
            update_row = {c: key[i] for i, c in enumerate(key_columns)}
            update_row.update(changed_fields)
            updates.append(update_row)
        else:
            unchanged += 1

    return DiffResult(
        records_to_add=modified[modified[key_columns].apply(tuple, axis=1).isin(to_add_keys)],
        records_to_update=pd.DataFrame(updates) if updates else pd.DataFrame(),
        records_to_delete=list(to_delete_keys),
        unchanged_count=unchanged,
    )


@dataclass
class DiffResult:
    """Result of computing differences between datasets."""

    records_to_add: pd.DataFrame
    records_to_update: pd.DataFrame
    records_to_delete: list[tuple]
    unchanged_count: int

    @property
    def has_changes(self) -> bool:
        return (
            len(self.records_to_add) > 0 or
            len(self.records_to_update) > 0 or
            len(self.records_to_delete) > 0
        )

    def summary(self) -> str:
        return (
            f"Add: {len(self.records_to_add)}, "
            f"Update: {len(self.records_to_update)}, "
            f"Delete: {len(self.records_to_delete)}, "
            f"Unchanged: {self.unchanged_count}"
        )


async def import_diff(
    client: Client,
    current: pd.DataFrame,
    modified: pd.DataFrame,
    *,
    apply_deletes: bool = False,  # Safety: require explicit opt-in
) -> ImportResult:
    """
    Apply only changed records to minimize API load.

    This is the recommended approach for incremental sync workflows.
    """
    diff = compute_diff(current, modified)

    results = []

    if len(diff.records_to_add) > 0:
        result = await client.records.import_(diff.records_to_add)
        results.append(("add", result))

    if len(diff.records_to_update) > 0:
        result = await client.records.import_(
            diff.records_to_update,
            overwrite_behavior="overwrite",  # Only updating specific fields
        )
        results.append(("update", result))

    if apply_deletes and diff.records_to_delete:
        # Warning: deletes are destructive and often not supported
        for key in diff.records_to_delete:
            await client.records.delete([key[0]])  # record_id
        results.append(("delete", len(diff.records_to_delete)))

    return ImportResult(
        count=sum(r[1].count if hasattr(r[1], 'count') else r[1] for r in results),
        warnings=[f"Diff summary: {diff.summary()}"],
    )

8.3 Validation Before Write

Following redcapAPI's importRecords patterns:

class ImportValidator:
    """Pre-flight validation for record imports."""

    def __init__(self, metadata: pd.DataFrame, project_info: ProjectInfo):
        self._metadata = metadata
        self._project_info = project_info
        self._schemas = {
            row["field_name"]: FieldSchema.from_row(row)
            for _, row in metadata.iterrows()
        }

    def validate(self, data: pd.DataFrame) -> ValidationReport:
        """
        Run all validation checks.

        Checks (following redcapAPI patterns):
        1. All columns exist in data dictionary
        2. Record ID column present and first
        3. No calculated fields included
        4. Date fields have correct types
        5. Values within validation limits
        6. Required fields populated
        7. Choices valid for dropdowns/radios
        """
        errors = []
        warnings = []

        # Check 1: Column existence
        valid_fields = set(self._metadata["field_name"])
        for col in data.columns:
            if col not in valid_fields and not self._is_system_field(col):
                errors.append(ValidationError(
                    field=col,
                    error_type="unknown_field",
                    message=f"Field '{col}' not in data dictionary",
                ))

        # Check 2: Record ID presence
        record_id_field = self._project_info.def_field or "record_id"
        if record_id_field not in data.columns:
            errors.append(ValidationError(
                field=record_id_field,
                error_type="missing_record_id",
                message="Record ID field not found in data",
            ))

        # Check 3: Calculated fields
        calc_fields = self._metadata[self._metadata["field_type"] == "calc"]["field_name"]
        for col in data.columns:
            if col in calc_fields.values:
                warnings.append(ValidationWarning(
                    field=col,
                    warning_type="calculated_field",
                    message=f"Calculated field '{col}' will be ignored",
                ))

        # Check 4-7: Per-field validation
        for col in data.columns:
            if col in self._schemas:
                field_errors = self._validate_column(data[col], self._schemas[col])
                errors.extend(field_errors)

        return ValidationReport(
            valid=len(errors) == 0,
            errors=errors,
            warnings=warnings,
        )

    def _validate_column(
        self,
        series: pd.Series,
        schema: FieldSchema,
    ) -> list[ValidationError]:
        """Validate a single column against its schema."""
        errors = []

        for idx, value in series.items():
            if pd.isna(value):
                if schema.required:
                    errors.append(ValidationError(
                        field=schema.field_name,
                        row=idx,
                        error_type="required_missing",
                        message=f"Required field missing at row {idx}",
                    ))
                continue

            # Type validation
            try:
                schema.get_python_type()(value)
            except (ValueError, TypeError):
                errors.append(ValidationError(
                    field=schema.field_name,
                    row=idx,
                    value=value,
                    error_type="type_mismatch",
                    message=f"Cannot convert '{value}' to {schema.get_python_type().__name__}",
                ))

            # Range validation
            if schema.validation_min is not None or schema.validation_max is not None:
                try:
                    num_val = float(value)
                    if schema.validation_min and num_val < schema.validation_min:
                        errors.append(ValidationError(
                            field=schema.field_name,
                            row=idx,
                            value=value,
                            error_type="below_minimum",
                            message=f"Value {value} below minimum {schema.validation_min}",
                        ))
                    if schema.validation_max and num_val > schema.validation_max:
                        errors.append(ValidationError(
                            field=schema.field_name,
                            row=idx,
                            value=value,
                            error_type="above_maximum",
                            message=f"Value {value} above maximum {schema.validation_max}",
                        ))
                except ValueError:
                    pass

            # Choice validation
            if schema.choices and str(value) not in schema.choices:
                errors.append(ValidationError(
                    field=schema.field_name,
                    row=idx,
                    value=value,
                    error_type="invalid_choice",
                    message=f"Value '{value}' not in valid choices: {list(schema.choices.keys())}",
                ))

        return errors

9. Files & Large Payloads

9.1 File Download with Streaming

async def download_file_streaming(
    client: Client,
    record: str,
    field: str,
    destination: Path,
    *,
    event: str | None = None,
    repeat_instance: int | None = None,
    chunk_size: int = 8192,
    progress_callback: Callable[[int, int], None] | None = None,
) -> FileMetadata:
    """
    Stream large file to disk without loading into memory.

    Args:
        destination: Path to save file.
        chunk_size: Bytes per chunk.
        progress_callback: Called with (bytes_downloaded, total_bytes).

    Returns:
        FileMetadata with filename, size, checksum.
    """
    import hashlib

    async with client._http.stream(
        "POST",
        client._url,
        data={
            "token": client._token,
            "content": "file",
            "action": "export",
            "record": record,
            "field": field,
            "event": event,
            "repeat_instance": repeat_instance,
        },
    ) as response:
        response.raise_for_status()

        # Extract filename from Content-Disposition header
        content_disp = response.headers.get("Content-Disposition", "")
        filename = _parse_filename(content_disp) or f"{record}_{field}"

        # Sanitize filename for safety
        filename = _sanitize_filename(filename)

        total_size = int(response.headers.get("Content-Length", 0))
        downloaded = 0
        hasher = hashlib.sha256()

        with open(destination, "wb") as f:
            async for chunk in response.aiter_bytes(chunk_size):
                f.write(chunk)
                hasher.update(chunk)
                downloaded += len(chunk)
                if progress_callback:
                    progress_callback(downloaded, total_size)

        return FileMetadata(
            filename=filename,
            mime_type=response.headers.get("Content-Type"),
            size=downloaded,
            checksum=hasher.hexdigest(),
        )


def _sanitize_filename(filename: str) -> str:
    """
    Sanitize filename to prevent path traversal and other issues.
    """
    import re
    # Remove path components
    filename = os.path.basename(filename)
    # Remove dangerous characters
    filename = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '_', filename)
    # Limit length
    if len(filename) > 255:
        name, ext = os.path.splitext(filename)
        filename = name[:255-len(ext)] + ext
    return filename


def _parse_filename(content_disposition: str) -> str | None:
    """Parse filename from Content-Disposition header."""
    import re
    match = re.search(r'filename[*]?=["\']?([^"\';]+)["\']?', content_disposition)
    return match.group(1) if match else None

9.2 File Upload with Validation

async def upload_file(
    client: Client,
    record: str,
    field: str,
    file_path: Path,
    *,
    event: str | None = None,
    repeat_instance: int | None = None,
    validate_field_type: bool = True,
) -> None:
    """
    Upload file to record.

    Args:
        validate_field_type: Verify field is a file field before upload.

    Raises:
        ValidationError: Field is not a file field.
        ApiError: Upload rejected by REDCap.
    """
    if validate_field_type:
        schema = await client.metadata.get_field_schema(field)
        if schema.field_type != "file":
            raise ValidationError(
                f"Field '{field}' is type '{schema.field_type}', not 'file'"
            )

    with open(file_path, "rb") as f:
        files = {
            "file": (file_path.name, f, _guess_mime_type(file_path)),
        }
        data = {
            "token": client._token,
            "content": "file",
            "action": "import",
            "record": record,
            "field": field,
            "event": event,
            "repeat_instance": repeat_instance,
        }

        response = await client._http.post(
            client._url,
            data=data,
            files=files,
        )
        _check_response(response)

9.3 Batched Record Operations

async def export_records_batched(
    client: Client,
    *,
    batch_size: int = 500,
    records: list[str] | None = None,
    **kwargs,
) -> pd.DataFrame:
    """
    Export records in batches to avoid timeouts on large datasets.

    Follows REDCapR's batch glossary pattern.
    """
    if records is None:
        # First, get list of all record IDs
        all_records = await client.records.export(
            fields=[client.project.def_field],
            format="df",
        )
        records = all_records[client.project.def_field].unique().tolist()

    # Create batch glossary
    batches = [
        records[i:i + batch_size]
        for i in range(0, len(records), batch_size)
    ]

    results = []
    for batch_records in batches:
        batch_df = await client.records.export(
            records=batch_records,
            **kwargs,
        )
        results.append(batch_df)

    return pd.concat(results, ignore_index=True)


async def import_records_batched(
    client: Client,
    data: pd.DataFrame,
    *,
    batch_size: int = 500,
    **kwargs,
) -> ImportResult:
    """
    Import records in batches.
    """
    total_count = 0
    all_ids = []
    all_warnings = []

    for i in range(0, len(data), batch_size):
        batch = data.iloc[i:i + batch_size]
        result = await client.records.import_(batch, **kwargs)
        total_count += result.count
        if result.ids:
            all_ids.extend(result.ids)
        if result.warnings:
            all_warnings.extend(result.warnings)

    return ImportResult(
        count=total_count,
        ids=all_ids if all_ids else None,
        warnings=all_warnings if all_warnings else None,
    )

10. Errors, Retries, Rate Limits, Observability

10.1 Error Taxonomy

class RedcapSdkError(Exception):
    """Base exception for all SDK errors."""
    pass


class TransportError(RedcapSdkError):
    """Network-level error (connection, timeout, DNS)."""

    def __init__(self, message: str, cause: Exception | None = None):
        super().__init__(message)
        self.cause = cause


class AuthError(RedcapSdkError):
    """Authentication failure (invalid token, expired, insufficient permissions)."""

    def __init__(self, message: str, status_code: int | None = None):
        super().__init__(message)
        self.status_code = status_code


class ApiError(RedcapSdkError):
    """REDCap API returned an error response."""

    def __init__(
        self,
        message: str,
        status_code: int,
        error_code: str | None = None,
        raw_response: str | None = None,
    ):
        super().__init__(message)
        self.status_code = status_code
        self.error_code = error_code
        self.raw_response = raw_response


class ValidationError(RedcapSdkError):
    """Data validation failure."""

    def __init__(
        self,
        message: str,
        field: str | None = None,
        value: Any = None,
        report: "ValidationReport | None" = None,
    ):
        super().__init__(message)
        self.field = field
        self.value = value
        self.report = report


class RateLimitError(RedcapSdkError):
    """Rate limit exceeded."""

    def __init__(self, message: str, retry_after: float | None = None):
        super().__init__(message)
        self.retry_after = retry_after

10.2 Retry Policy

from dataclasses import dataclass
from typing import Callable
import asyncio
import random


@dataclass
class RetryPolicy:
    """Configuration for retry behavior."""

    max_retries: int = 3
    backoff_factor: float = 0.5       # Wait = factor * (2 ** attempt)
    backoff_max: float = 60.0         # Maximum wait time
    backoff_jitter: float = 0.1       # Randomization factor
    retry_statuses: set[int] = frozenset({429, 500, 502, 503, 504})
    retry_exceptions: tuple[type, ...] = (TransportError,)

    def get_wait_time(self, attempt: int) -> float:
        """Calculate wait time for given attempt number."""
        wait = min(
            self.backoff_factor * (2 ** attempt),
            self.backoff_max,
        )
        # Add jitter
        jitter = wait * self.backoff_jitter * random.random()
        return wait + jitter


async def with_retry(
    func: Callable,
    policy: RetryPolicy,
    *,
    correlation_id: str | None = None,
) -> Any:
    """
    Execute function with retry policy.
    """
    last_exception = None

    for attempt in range(policy.max_retries + 1):
        try:
            return await func()
        except policy.retry_exceptions as e:
            last_exception = e
            if attempt < policy.max_retries:
                wait_time = policy.get_wait_time(attempt)
                logger.warning(
                    "Request failed, retrying",
                    attempt=attempt + 1,
                    max_retries=policy.max_retries,
                    wait_time=wait_time,
                    correlation_id=correlation_id,
                    error=str(e),
                )
                await asyncio.sleep(wait_time)
            else:
                raise
        except ApiError as e:
            if e.status_code in policy.retry_statuses:
                last_exception = e
                if attempt < policy.max_retries:
                    wait_time = policy.get_wait_time(attempt)
                    if e.status_code == 429:
                        # Rate limit - use Retry-After if available
                        wait_time = getattr(e, "retry_after", wait_time) or wait_time
                    await asyncio.sleep(wait_time)
                else:
                    raise
            else:
                raise

    raise last_exception

10.3 Rate Limiting

import time
from collections import deque
from asyncio import Lock


class RateLimiter:
    """
    Token bucket rate limiter.
    """

    def __init__(
        self,
        rate_per_minute: int = 60,
        burst_size: int | None = None,
    ):
        self.rate_per_minute = rate_per_minute
        self.burst_size = burst_size or rate_per_minute
        self._tokens = self.burst_size
        self._last_update = time.monotonic()
        self._lock = Lock()

    async def acquire(self) -> None:
        """Wait until a token is available."""
        async with self._lock:
            now = time.monotonic()
            elapsed = now - self._last_update
            self._last_update = now

            # Refill tokens
            self._tokens = min(
                self.burst_size,
                self._tokens + elapsed * (self.rate_per_minute / 60),
            )

            if self._tokens < 1:
                # Wait for refill
                wait_time = (1 - self._tokens) / (self.rate_per_minute / 60)
                await asyncio.sleep(wait_time)
                self._tokens = 0
            else:
                self._tokens -= 1

10.4 Structured Logging

import logging
import uuid
from contextvars import ContextVar
from typing import Any

# Correlation ID for request tracing
correlation_id_var: ContextVar[str] = ContextVar("correlation_id", default="")


class StructuredLogger:
    """
    Structured logger with correlation ID support.
    """

    def __init__(self, name: str):
        self._logger = logging.getLogger(name)

    def _log(self, level: int, message: str, **kwargs: Any) -> None:
        correlation_id = correlation_id_var.get()
        extra = {
            "correlation_id": correlation_id,
            **kwargs,
        }
        # Never log tokens
        if "token" in extra:
            extra["token"] = "[REDACTED]"

        self._logger.log(level, message, extra=extra)

    def info(self, message: str, **kwargs: Any) -> None:
        self._log(logging.INFO, message, **kwargs)

    def warning(self, message: str, **kwargs: Any) -> None:
        self._log(logging.WARNING, message, **kwargs)

    def error(self, message: str, **kwargs: Any) -> None:
        self._log(logging.ERROR, message, **kwargs)

    def debug(self, message: str, **kwargs: Any) -> None:
        self._log(logging.DEBUG, message, **kwargs)


def generate_correlation_id() -> str:
    """Generate a new correlation ID."""
    return str(uuid.uuid4())[:8]


# Usage in client
class Client:
    async def _request(self, **kwargs) -> Any:
        correlation_id = generate_correlation_id()
        correlation_id_var.set(correlation_id)

        self._logger.info(
            "API request",
            method=kwargs.get("method", "POST"),
            content=kwargs.get("data", {}).get("content"),
            correlation_id=correlation_id,
        )

        start = time.monotonic()
        try:
            result = await self._do_request(**kwargs)
            elapsed = time.monotonic() - start
            self._logger.info(
                "API response",
                elapsed_ms=elapsed * 1000,
                correlation_id=correlation_id,
            )
            return result
        except Exception as e:
            elapsed = time.monotonic() - start
            self._logger.error(
                "API error",
                elapsed_ms=elapsed * 1000,
                error=str(e),
                correlation_id=correlation_id,
            )
            raise

11. Security & Credential Handling

11.1 Token Security Principles

Following patterns from REDCapR, rccola, and redcapAPI:

  1. Never log tokens: All logging must redact token values
  2. Environment variables preferred: REDCAP_TOKEN, REDCAP_API_URL
  3. Support secure storage: OS keychain, HashiCorp Vault, AWS Secrets Manager
  4. Validate before use: Check token format, test connectivity
  5. Least privilege: Recommend separate tokens for read vs. write operations

11.2 Token Provider Interface

from abc import ABC, abstractmethod
from typing import Protocol


class TokenProvider(Protocol):
    """Protocol for token retrieval."""

    def get_token(self, project_name: str | None = None) -> str:
        """Retrieve token for project."""
        ...


class EnvironmentTokenProvider:
    """
    Get token from environment variable.

    Recommended pattern - tokens never in code.
    """

    def __init__(self, env_var: str = "REDCAP_TOKEN"):
        self._env_var = env_var

    def get_token(self, project_name: str | None = None) -> str:
        env_var = self._env_var
        if project_name:
            # Allow project-specific tokens: REDCAP_TOKEN_PROJECT1
            env_var = f"{self._env_var}_{project_name.upper()}"

        token = os.environ.get(env_var)
        if not token:
            raise AuthError(f"Token not found in environment variable: {env_var}")

        return sanitize_token(token)


class KeychainTokenProvider:
    """
    Get token from OS keychain (macOS Keychain, Windows Credential Manager).

    Inspired by rccola patterns.
    """

    def __init__(self, service_name: str = "redcap-sdk"):
        self._service_name = service_name
        try:
            import keyring
            self._keyring = keyring
        except ImportError:
            raise ImportError("Install keyring package: pip install keyring")

    def get_token(self, project_name: str | None = None) -> str:
        key = project_name or "default"
        token = self._keyring.get_password(self._service_name, key)
        if not token:
            raise AuthError(f"Token not found in keychain for: {key}")
        return sanitize_token(token)

    def set_token(self, token: str, project_name: str | None = None) -> None:
        """Store token in keychain."""
        key = project_name or "default"
        self._keyring.set_password(self._service_name, key, token)


class VaultTokenProvider:
    """
    Get token from HashiCorp Vault.

    For enterprise deployments.
    """

    def __init__(
        self,
        vault_url: str,
        vault_token: str | None = None,
        secret_path: str = "secret/data/redcap",
    ):
        self._vault_url = vault_url
        self._vault_token = vault_token or os.environ.get("VAULT_TOKEN")
        self._secret_path = secret_path

    def get_token(self, project_name: str | None = None) -> str:
        import hvac  # HashiCorp Vault client

        client = hvac.Client(url=self._vault_url, token=self._vault_token)
        path = self._secret_path
        if project_name:
            path = f"{path}/{project_name}"

        secret = client.secrets.kv.v2.read_secret_version(path=path)
        token = secret["data"]["data"].get("token")
        if not token:
            raise AuthError(f"Token not found in Vault at: {path}")

        return sanitize_token(token)

11.3 Token Validation

import re


def sanitize_token(token: str) -> str:
    """
    Validate and sanitize API token.

    Following REDCapR's sanitize_token() pattern.
    """
    # Remove whitespace
    token = token.strip()

    # REDCap tokens are 32-character hex strings
    if not re.match(r"^[A-Fa-f0-9]{32}$", token):
        raise AuthError(
            "Invalid token format. REDCap tokens should be 32 hexadecimal characters."
        )

    return token.upper()  # Normalize to uppercase


async def verify_token(client: Client) -> bool:
    """
    Verify token is valid by making a lightweight API call.
    """
    try:
        await client.project.info()
        return True
    except AuthError:
        return False

11.4 Least Privilege Guidance

@dataclass
class TokenPermissions:
    """
    Document expected permissions for a token.

    REDCap allows different permission levels per token.
    """

    # Export permissions
    export_records: bool = False
    export_logging: bool = False
    export_file_repository: bool = False

    # Import permissions
    import_records: bool = False
    import_files: bool = False

    # Delete permissions (dangerous)
    delete_records: bool = False

    # Management permissions
    manage_users: bool = False
    manage_dags: bool = False

    @classmethod
    def read_only(cls) -> "TokenPermissions":
        """Minimal read-only permissions."""
        return cls(export_records=True)

    @classmethod
    def read_write(cls) -> "TokenPermissions":
        """Standard read-write permissions."""
        return cls(
            export_records=True,
            import_records=True,
            import_files=True,
        )


# Example usage documentation:
"""
## Token Security Best Practices

1. **Use separate tokens for different purposes**:
   - Read-only token for analytics/reporting
   - Write token for data entry applications
   - Admin token for user management (rarely needed)

2. **Store tokens securely**:
   ```python
   # Preferred: Environment variable
   client = Client(url, token=os.environ["REDCAP_TOKEN"])

   # Alternative: OS keychain
   provider = KeychainTokenProvider()
   client = Client(url, token=provider.get_token("my_project"))

   # Enterprise: HashiCorp Vault
   provider = VaultTokenProvider(vault_url="https://vault.company.com")
   client = Client(url, token=provider.get_token("my_project"))
   ```

3. **Never commit tokens to version control**:
   - Add `.env` to `.gitignore`
   - Use `.env.example` for documentation

4. **Rotate tokens periodically**:
   - REDCap allows regenerating tokens
   - Update stored tokens after rotation

5. **Audit token usage**:
   - REDCap logs API calls with token identifier
   - Review logs for unexpected access patterns
"""

12. Implementation Blueprint

12.1 Package Structure

redcap_sdk/
├── __init__.py              # Public exports
├── client.py                # Main Client class
├── config.py                # ClientConfig, settings
├── errors.py                # Exception hierarchy
├── types.py                 # Pydantic models, dataclasses
│
├── api/                     # Namespace implementations
│   ├── __init__.py
│   ├── base.py              # BaseAPI with common logic
│   ├── records.py           # RecordsAPI
│   ├── metadata.py          # MetadataAPI
│   ├── files.py             # FilesAPI, FileRepositoryAPI
│   ├── events.py            # EventsAPI, ArmsAPI
│   ├── instruments.py       # InstrumentsAPI
│   ├── users.py             # UsersAPI, DagsAPI, UserRolesAPI
│   ├── project.py           # ProjectAPI, LoggingAPI
│   ├── reports.py           # ReportsAPI
│   ├── surveys.py           # SurveysAPI
│   └── repeating.py         # RepeatingAPI
│
├── typing/                  # Type casting system
│   ├── __init__.py
│   ├── caster.py            # TypeCaster class
│   ├── functions.py         # Built-in cast functions
│   ├── validation.py        # Validation functions
│   └── schemas.py           # FieldSchema, parsing
│
├── transform/               # Data transformation
│   ├── __init__.py
│   ├── tidy.py              # TidyBundle, create_tidy_bundle
│   ├── checkbox.py          # Checkbox handling
│   ├── diff.py              # Diff computation
│   └── longitudinal.py      # Pivot/widen helpers
│
├── security/                # Token handling
│   ├── __init__.py
│   ├── providers.py         # Token provider classes
│   ├── sanitize.py          # Token validation
│   └── audit.py             # Security logging
│
├── http/                    # Transport layer
│   ├── __init__.py
│   ├── transport.py         # HTTP client wrapper
│   ├── retry.py             # Retry policy
│   └── rate_limit.py        # Rate limiter
│
├── logging/                 # Observability
│   ├── __init__.py
│   ├── structured.py        # Structured logger
│   └── correlation.py       # Correlation ID management
│
└── utils/                   # Utilities
    ├── __init__.py
    ├── batching.py          # Batch helpers
    └── files.py             # File utilities

12.2 Minimal Working Pseudocode

Creating a Client

# redcap_sdk/client.py

import httpx
from typing import TYPE_CHECKING

from .config import ClientConfig
from .http.transport import HttpTransport
from .logging.structured import StructuredLogger

if TYPE_CHECKING:
    from .api.records import RecordsAPI
    from .api.metadata import MetadataAPI
    from .api.files import FilesAPI


class Client:
    """
    Main entry point for REDCap SDK.

    Example:
        async with Client(url, token) as client:
            records = await client.records.export()
    """

    def __init__(
        self,
        url: str,
        token: str,
        config: ClientConfig | None = None,
    ):
        self._url = url.rstrip("/")
        self._token = sanitize_token(token)
        self._config = config or ClientConfig()

        self._http = HttpTransport(
            timeout=self._config.timeout,
            verify_ssl=self._config.verify_ssl,
            ca_bundle=self._config.ca_bundle,
            retry_policy=self._config.retry_policy,
            rate_limiter=self._config.rate_limiter,
        )
        self._logger = StructuredLogger("redcap_sdk")

        # Lazy-initialized namespaces
        self._records: RecordsAPI | None = None
        self._metadata: MetadataAPI | None = None
        self._files: FilesAPI | None = None
        # ... other namespaces

    @property
    def records(self) -> "RecordsAPI":
        if self._records is None:
            from .api.records import RecordsAPI
            self._records = RecordsAPI(self)
        return self._records

    @property
    def metadata(self) -> "MetadataAPI":
        if self._metadata is None:
            from .api.metadata import MetadataAPI
            self._metadata = MetadataAPI(self)
        return self._metadata

    @property
    def files(self) -> "FilesAPI":
        if self._files is None:
            from .api.files import FilesAPI
            self._files = FilesAPI(self)
        return self._files

    async def _request(
        self,
        content: str,
        action: str | None = None,
        **params,
    ) -> dict | str | bytes:
        """Make API request with standard parameters."""
        data = {
            "token": self._token,
            "content": content,
            "format": params.pop("format", "json"),
        }
        if action:
            data["action"] = action
        data.update(params)

        return await self._http.post(self._url, data=data)

    async def __aenter__(self) -> "Client":
        return self

    async def __aexit__(self, *args) -> None:
        await self._http.close()

Exporting Records with Type Casting

# redcap_sdk/api/records.py

import pandas as pd
from typing import Literal

from ..types import TidyBundle
from ..typing.caster import TypeCaster


class RecordsAPI:
    def __init__(self, client: "Client"):
        self._client = client
        self._caster: TypeCaster | None = None

    async def export(
        self,
        *,
        format: Literal["json", "csv", "xml", "df"] = "df",
        records: list[str] | None = None,
        fields: list[str] | None = None,
        forms: list[str] | None = None,
        events: list[str] | None = None,
        typed: bool = True,
        cast_overrides: dict | None = None,
        validation_mode: Literal["strict", "permissive", "skip"] = "permissive",
        **kwargs,
    ) -> pd.DataFrame | list[dict] | str:
        """Export records with optional type casting."""

        # Build request parameters
        params = {}
        if records:
            params["records"] = records
        if fields:
            params["fields"] = fields
        if forms:
            params["forms"] = forms
        if events:
            params["events"] = events

        # Make API call
        api_format = "json" if format == "df" else format
        raw_data = await self._client._request(
            content="record",
            format=api_format,
            **params,
            **kwargs,
        )

        # Return raw if not DataFrame
        if format != "df":
            return raw_data

        # Convert to DataFrame
        df = pd.DataFrame(raw_data)

        # Apply type casting if requested
        if typed and len(df) > 0:
            caster = await self._get_caster(cast_overrides, validation_mode)
            df = caster.cast_dataframe(df)

        return df

    async def _get_caster(
        self,
        overrides: dict | None,
        mode: str,
    ) -> TypeCaster:
        """Get or create type caster from metadata."""
        if self._caster is None or overrides:
            metadata = await self._client.metadata.export(format="df")
            self._caster = TypeCaster(
                metadata,
                cast_overrides=overrides,
                strict=(mode == "strict"),
            )
        return self._caster

Returning Tidy Per-Instrument Tables

# redcap_sdk/api/records.py (continued)

    async def export_tidy(
        self,
        *,
        forms: list[str] | None = None,
        events: list[str] | None = None,
        typed: bool = True,
        include_metadata: bool = True,
    ) -> TidyBundle:
        """
        Export records as tidy per-instrument tables.

        Returns a TidyBundle where each instrument has its own DataFrame
        with appropriate merge keys.
        """
        # Get all records
        records_df = await self.export(
            format="df",
            forms=forms,
            events=events,
            typed=typed,
        )

        # Get metadata
        metadata_df = await self._client.metadata.export(format="df")
        if forms:
            metadata_df = metadata_df[metadata_df["form_name"].isin(forms)]

        # Get project info for type detection
        project_info = await self._client.project.info()
        project_type = detect_project_type(project_info)

        # Create tidy bundle
        from ..transform.tidy import create_tidy_bundle

        return create_tidy_bundle(
            records_df,
            metadata_df,
            project_type,
            include_metadata=include_metadata,
        )


# redcap_sdk/transform/tidy.py

def create_tidy_bundle(
    records: pd.DataFrame,
    metadata: pd.DataFrame,
    project_type: ProjectType,
    *,
    include_metadata: bool = True,
) -> TidyBundle:
    """Transform flat records into tidy per-instrument tables."""

    instruments: dict[str, pd.DataFrame] = {}
    instrument_metadata: dict[str, pd.DataFrame] = {}

    # Group metadata by form
    for form_name in metadata["form_name"].unique():
        form_meta = metadata[metadata["form_name"] == form_name]
        form_fields = form_meta["field_name"].tolist()

        # Determine key columns
        key_cols = ["record_id"]
        if project_type in (ProjectType.LONGITUDINAL, ProjectType.LONGITUDINAL_REPEATING):
            if "redcap_event_name" in records.columns:
                key_cols.append("redcap_event_name")

        is_repeating = _is_repeating_form(form_name, metadata)
        if is_repeating:
            if "redcap_repeat_instrument" in records.columns:
                key_cols.extend(["redcap_repeat_instrument", "redcap_repeat_instance"])

        # Select columns for this form
        available_fields = [f for f in form_fields if f in records.columns]
        select_cols = key_cols + available_fields
        select_cols = list(dict.fromkeys(select_cols))  # Dedupe preserving order

        form_df = records[select_cols].copy()

        # Filter to relevant rows
        if "redcap_repeat_instrument" in form_df.columns:
            if is_repeating:
                form_df = form_df[form_df["redcap_repeat_instrument"] == form_name]
            else:
                form_df = form_df[form_df["redcap_repeat_instrument"].isna()]

        # Drop empty rows
        data_cols = [c for c in available_fields if c in form_df.columns]
        if data_cols:
            form_df = form_df.dropna(subset=data_cols, how="all")

        instruments[form_name] = form_df.reset_index(drop=True)
        if include_metadata:
            instrument_metadata[form_name] = form_meta

    return TidyBundle(
        instruments=instruments,
        metadata=instrument_metadata,
    )

Performing a Diff-Based Update

# redcap_sdk/transform/diff.py

from dataclasses import dataclass
import pandas as pd


@dataclass
class DiffResult:
    records_to_add: pd.DataFrame
    records_to_update: pd.DataFrame
    records_to_delete: list[tuple]
    unchanged_count: int

    @property
    def has_changes(self) -> bool:
        return (
            len(self.records_to_add) > 0 or
            len(self.records_to_update) > 0 or
            len(self.records_to_delete) > 0
        )


def compute_diff(
    current: pd.DataFrame,
    modified: pd.DataFrame,
    *,
    key_columns: list[str] | None = None,
) -> DiffResult:
    """Compute minimal changes between current and modified states."""

    # Default key columns
    if key_columns is None:
        potential_keys = [
            "record_id",
            "redcap_event_name",
            "redcap_repeat_instrument",
            "redcap_repeat_instance",
        ]
        key_columns = [k for k in potential_keys if k in current.columns and k in modified.columns]

    # Create tuple keys for comparison
    current_keyed = current.set_index(key_columns)
    modified_keyed = modified.set_index(key_columns)

    current_keys = set(current_keyed.index)
    modified_keys = set(modified_keyed.index)

    # Identify changes
    to_add_keys = modified_keys - current_keys
    to_delete_keys = current_keys - modified_keys
    to_check_keys = current_keys & modified_keys

    # Find actual updates (changed values)
    updates = []
    unchanged = 0

    for key in to_check_keys:
        current_row = current_keyed.loc[key]
        modified_row = modified_keyed.loc[key]

        # Compare all columns
        changed_cols = {}
        for col in modified_keyed.columns:
            if col in current_keyed.columns:
                if not _values_equal(current_row[col], modified_row[col]):
                    changed_cols[col] = modified_row[col]

        if changed_cols:
            # Build update row with key columns + changed values only
            update_row = dict(zip(key_columns, key if isinstance(key, tuple) else (key,)))
            update_row.update(changed_cols)
            updates.append(update_row)
        else:
            unchanged += 1

    return DiffResult(
        records_to_add=modified_keyed.loc[list(to_add_keys)].reset_index() if to_add_keys else pd.DataFrame(),
        records_to_update=pd.DataFrame(updates) if updates else pd.DataFrame(),
        records_to_delete=list(to_delete_keys),
        unchanged_count=unchanged,
    )


def _values_equal(a, b) -> bool:
    """Compare values accounting for NA."""
    if pd.isna(a) and pd.isna(b):
        return True
    if pd.isna(a) or pd.isna(b):
        return False
    return a == b


# Usage in RecordsAPI
async def import_diff(
    self,
    current: pd.DataFrame,
    modified: pd.DataFrame,
    *,
    key_fields: list[str] | None = None,
    apply_deletes: bool = False,
) -> ImportResult:
    """Import only changed records."""

    diff = compute_diff(current, modified, key_columns=key_fields)

    self._client._logger.info(
        "Diff computed",
        adds=len(diff.records_to_add),
        updates=len(diff.records_to_update),
        deletes=len(diff.records_to_delete),
        unchanged=diff.unchanged_count,
    )

    total_count = 0

    if len(diff.records_to_add) > 0:
        result = await self.import_(diff.records_to_add)
        total_count += result.count

    if len(diff.records_to_update) > 0:
        result = await self.import_(
            diff.records_to_update,
            overwrite_behavior="overwrite",
        )
        total_count += result.count

    if apply_deletes and diff.records_to_delete:
        for key in diff.records_to_delete:
            record_id = key[0] if isinstance(key, tuple) else key
            await self.delete([str(record_id)])
            total_count += 1

    return ImportResult(
        count=total_count,
        warnings=[f"Diff: {len(diff.records_to_add)} adds, {len(diff.records_to_update)} updates, "
                  f"{len(diff.records_to_delete)} deletes, {diff.unchanged_count} unchanged"],
    )

Uploading/Downloading a File

# redcap_sdk/api/files.py

from pathlib import Path
from typing import BinaryIO
import hashlib

from ..types import FileDownload, FileMetadata


class FilesAPI:
    def __init__(self, client: "Client"):
        self._client = client

    async def download(
        self,
        record: str,
        field: str,
        *,
        event: str | None = None,
        repeat_instance: int | None = None,
        validate_checksum: bool = True,
    ) -> FileDownload:
        """Download file attachment from record."""

        params = {
            "record": record,
            "field": field,
        }
        if event:
            params["event"] = event
        if repeat_instance:
            params["repeat_instance"] = repeat_instance

        response = await self._client._http.post_raw(
            self._client._url,
            data={
                "token": self._client._token,
                "content": "file",
                "action": "export",
                **params,
            },
        )

        # Parse headers for metadata
        content_disp = response.headers.get("Content-Disposition", "")
        filename = self._parse_filename(content_disp) or f"{record}_{field}"
        filename = self._sanitize_filename(filename)

        content = await response.aread()

        checksum = hashlib.sha256(content).hexdigest() if validate_checksum else None

        return FileDownload(
            content=content,
            filename=filename,
            mime_type=response.headers.get("Content-Type"),
            size=len(content),
            checksum=checksum,
        )

    async def download_streaming(
        self,
        record: str,
        field: str,
        destination: Path,
        *,
        event: str | None = None,
        repeat_instance: int | None = None,
        chunk_size: int = 8192,
        progress_callback=None,
    ) -> FileMetadata:
        """Stream large file to disk."""

        params = {
            "record": record,
            "field": field,
        }
        if event:
            params["event"] = event
        if repeat_instance:
            params["repeat_instance"] = repeat_instance

        async with self._client._http.stream_post(
            self._client._url,
            data={
                "token": self._client._token,
                "content": "file",
                "action": "export",
                **params,
            },
        ) as response:
            content_disp = response.headers.get("Content-Disposition", "")
            filename = self._parse_filename(content_disp) or f"{record}_{field}"
            filename = self._sanitize_filename(filename)

            total_size = int(response.headers.get("Content-Length", 0))
            downloaded = 0
            hasher = hashlib.sha256()

            with open(destination, "wb") as f:
                async for chunk in response.aiter_bytes(chunk_size):
                    f.write(chunk)
                    hasher.update(chunk)
                    downloaded += len(chunk)
                    if progress_callback:
                        progress_callback(downloaded, total_size)

            return FileMetadata(
                filename=filename,
                mime_type=response.headers.get("Content-Type"),
                size=downloaded,
                checksum=hasher.hexdigest(),
            )

    async def upload(
        self,
        record: str,
        field: str,
        file_path: Path | None = None,
        file_object: BinaryIO | None = None,
        filename: str | None = None,
        *,
        event: str | None = None,
        repeat_instance: int | None = None,
    ) -> None:
        """Upload file to record."""

        if file_path is None and file_object is None:
            raise ValueError("Must provide either file_path or file_object")

        if file_object is not None and filename is None:
            raise ValueError("filename required when using file_object")

        if file_path:
            filename = filename or file_path.name
            file_object = open(file_path, "rb")
            should_close = True
        else:
            should_close = False

        try:
            files = {
                "file": (filename, file_object, self._guess_mime_type(filename)),
            }
            data = {
                "token": self._client._token,
                "content": "file",
                "action": "import",
                "record": record,
                "field": field,
            }
            if event:
                data["event"] = event
            if repeat_instance:
                data["repeat_instance"] = repeat_instance

            await self._client._http.post_multipart(
                self._client._url,
                data=data,
                files=files,
            )
        finally:
            if should_close and file_object:
                file_object.close()

    async def delete(
        self,
        record: str,
        field: str,
        *,
        event: str | None = None,
        repeat_instance: int | None = None,
    ) -> None:
        """Delete file from record."""

        params = {
            "record": record,
            "field": field,
        }
        if event:
            params["event"] = event
        if repeat_instance:
            params["repeat_instance"] = repeat_instance

        await self._client._request(
            content="file",
            action="delete",
            **params,
        )

    @staticmethod
    def _parse_filename(content_disposition: str) -> str | None:
        import re
        match = re.search(r'filename[*]?=["\']?([^"\';]+)["\']?', content_disposition)
        return match.group(1) if match else None

    @staticmethod
    def _sanitize_filename(filename: str) -> str:
        import re
        import os
        filename = os.path.basename(filename)
        filename = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '_', filename)
        return filename[:255]

    @staticmethod
    def _guess_mime_type(filename: str) -> str:
        import mimetypes
        mime_type, _ = mimetypes.guess_type(filename)
        return mime_type or "application/octet-stream"

13. Test & Documentation Plan

13.1 Test Plan

Unit Tests

# tests/unit/test_type_casting.py

import pytest
import pandas as pd
from redcap_sdk.typing.caster import TypeCaster
from redcap_sdk.typing.functions import cast_integer, cast_date


class TestTypeCasting:
    """Unit tests for type casting edge cases."""

    def test_cast_integer_from_float_string(self):
        """Handle '1.0' -> 1 conversion."""
        result = cast_integer("1.0", "age", mock_schema)
        assert result == 1
        assert isinstance(result, int)

    def test_cast_integer_validation_min(self):
        """Reject values below minimum."""
        schema = FieldSchema(validation_min=0, ...)
        with pytest.raises(ValueError, match="below minimum"):
            cast_integer("-1", "age", schema)

    def test_cast_date_multiple_formats(self):
        """Parse common date formats."""
        assert cast_date("2024-01-15", "dob", mock_schema) == date(2024, 1, 15)
        assert cast_date("01/15/2024", "dob", mock_schema) == date(2024, 1, 15)

    def test_na_values_handling(self):
        """NA values should become None."""
        caster = TypeCaster(metadata, na_values={"", "-999", "NA"})
        df = pd.DataFrame({"age": ["25", "-999", "NA", ""]})
        result = caster.cast_dataframe(df)
        assert result["age"].tolist() == [25, None, None, None]

    def test_checkbox_expansion(self):
        """Checkbox fields expand to multiple columns."""
        # field___1, field___2, field___3
        ...

    def test_permissive_mode_preserves_invalid(self):
        """Invalid values preserved in permissive mode."""
        caster = TypeCaster(metadata, strict=False)
        df = pd.DataFrame({"age": ["25", "unknown"]})
        result = caster.cast_dataframe(df)
        assert result["age"].tolist() == [25, "unknown"]
        assert len(caster.get_validation_report()) == 1


# tests/unit/test_diff.py

class TestDiffComputation:
    """Unit tests for diff-based updates."""

    def test_detect_new_records(self):
        current = pd.DataFrame({"record_id": [1, 2], "value": [10, 20]})
        modified = pd.DataFrame({"record_id": [1, 2, 3], "value": [10, 20, 30]})

        diff = compute_diff(current, modified)

        assert len(diff.records_to_add) == 1
        assert diff.records_to_add["record_id"].iloc[0] == 3

    def test_detect_changed_values(self):
        current = pd.DataFrame({"record_id": [1], "value": [10]})
        modified = pd.DataFrame({"record_id": [1], "value": [15]})

        diff = compute_diff(current, modified)

        assert len(diff.records_to_update) == 1
        assert diff.records_to_update["value"].iloc[0] == 15

    def test_unchanged_records_not_included(self):
        current = pd.DataFrame({"record_id": [1], "value": [10]})
        modified = pd.DataFrame({"record_id": [1], "value": [10]})

        diff = compute_diff(current, modified)

        assert len(diff.records_to_update) == 0
        assert diff.unchanged_count == 1

    def test_handles_na_values(self):
        current = pd.DataFrame({"record_id": [1], "value": [None]})
        modified = pd.DataFrame({"record_id": [1], "value": [None]})

        diff = compute_diff(current, modified)

        assert diff.unchanged_count == 1


# tests/unit/test_tidy.py

class TestTidyBundle:
    """Unit tests for tidy output generation."""

    def test_separate_instruments(self):
        """Each instrument becomes its own DataFrame."""
        ...

    def test_repeating_instruments_filtered(self):
        """Repeating instrument data only includes relevant rows."""
        ...

    def test_key_columns_appropriate_for_project_type(self):
        """Key columns match project type."""
        ...

Contract Tests (Mocked API)

# tests/contract/test_api_contract.py

import pytest
from unittest.mock import AsyncMock
from redcap_sdk import Client


@pytest.fixture
def mock_http():
    """Mock HTTP transport returning realistic API responses."""
    mock = AsyncMock()
    return mock


@pytest.fixture
def client(mock_http):
    """Client with mocked transport."""
    client = Client("https://redcap.example.edu/api/", "ABCD" * 8)
    client._http = mock_http
    return client


class TestRecordsContract:
    """Contract tests for Records API."""

    async def test_export_records_request_format(self, client, mock_http):
        """Verify export_records sends correct request."""
        mock_http.post.return_value = [{"record_id": "1", "age": "25"}]

        await client.records.export(records=["1"], fields=["age"])

        mock_http.post.assert_called_once()
        call_data = mock_http.post.call_args[1]["data"]
        assert call_data["content"] == "record"
        assert call_data["records"] == ["1"]
        assert call_data["fields"] == ["age"]

    async def test_import_records_request_format(self, client, mock_http):
        """Verify import_records sends correct request."""
        mock_http.post.return_value = {"count": 1}

        await client.records.import_(
            pd.DataFrame({"record_id": ["1"], "age": [25]}),
            overwrite_behavior="normal",
        )

        call_data = mock_http.post.call_args[1]["data"]
        assert call_data["content"] == "record"
        assert call_data["overwriteBehavior"] == "normal"


class TestFilesContract:
    """Contract tests for Files API."""

    async def test_download_file_request_format(self, client, mock_http):
        """Verify file download sends correct request."""
        mock_response = AsyncMock()
        mock_response.headers = {"Content-Disposition": 'filename="test.pdf"'}
        mock_response.aread.return_value = b"file content"
        mock_http.post_raw.return_value = mock_response

        result = await client.files.download("1", "consent_doc")

        assert result.filename == "test.pdf"
        assert result.content == b"file content"

Golden-File Tests

# tests/golden/test_tidy_outputs.py

import json
from pathlib import Path
import pandas as pd
import pytest

GOLDEN_DIR = Path(__file__).parent / "golden_files"


class TestTidyOutputGolden:
    """Golden-file tests for tidy output consistency."""

    @pytest.mark.parametrize("project_type", [
        "classic",
        "longitudinal",
        "repeating",
        "longitudinal_repeating",
    ])
    def test_tidy_output_matches_golden(self, project_type):
        """Tidy output matches expected golden file."""

        # Load input data
        records = pd.read_csv(GOLDEN_DIR / f"{project_type}_records.csv")
        metadata = pd.read_csv(GOLDEN_DIR / f"{project_type}_metadata.csv")

        # Generate tidy output
        bundle = create_tidy_bundle(records, metadata, ProjectType(project_type))

        # Compare to golden
        golden_path = GOLDEN_DIR / f"{project_type}_tidy.json"
        if golden_path.exists():
            expected = json.loads(golden_path.read_text())
            for form_name, expected_df in expected.items():
                pd.testing.assert_frame_equal(
                    bundle[form_name],
                    pd.DataFrame(expected_df),
                )
        else:
            # Generate golden file (first run)
            golden = {
                name: df.to_dict("records")
                for name, df in bundle.instruments.items()
            }
            golden_path.write_text(json.dumps(golden, indent=2))
            pytest.skip("Golden file generated")

13.2 Documentation Plan

1. Quickstart Guide

# Quickstart

## Installation

```bash
pip install redcap-sdk

Basic Usage

import os
from redcap_sdk import Client

# Create client (token from environment)
client = Client(
    url="https://redcap.yourinstitution.edu/api/",
    token=os.environ["REDCAP_TOKEN"],
)

# Export all records
async with client:
    df = await client.records.export()
    print(f"Exported {len(df)} records")

# Export specific records and fields
async with client:
    df = await client.records.export(
        records=["101", "102"],
        fields=["record_id", "age", "gender"],
    )

With Type Casting

# Automatic type casting based on metadata
df = await client.records.export(typed=True)

# Check for validation issues
caster = await client.metadata.get_type_caster()
report = caster.get_validation_report()
if len(report) > 0:
    print("Validation warnings:", report)

Tidy Output (One Table Per Instrument)

# Get separate DataFrames for each instrument
bundle = await client.records.export_tidy()

# Access individual instruments
demographics = bundle["demographics"]
vitals = bundle["vitals"]

# Summary statistics
print(bundle.summary())
#### 2. Longitudinal & Repeating Guide

```markdown
# Working with Longitudinal and Repeating Data

## Understanding REDCap Data Structures

### Classic Projects
- Single record ID identifies each participant
- No events, no repeating

### Longitudinal Projects
- Records have multiple events (visits)
- Key: `(record_id, redcap_event_name)`

### Repeating Instruments
- Some forms can have multiple instances
- Key: `(record_id, redcap_repeat_instrument, redcap_repeat_instance)`

### Longitudinal + Repeating
- Both events AND repeating instruments
- Key: `(record_id, redcap_event_name, redcap_repeat_instrument, redcap_repeat_instance)`

## Recommended Approach: Tidy Output

```python
# Get tidy output - automatically handles complexity
bundle = await client.records.export_tidy()

# Demographics (non-repeating): one row per record per event
demographics = bundle["demographics"]
# Columns: record_id, redcap_event_name, ...

# Medications (repeating): one row per instance
medications = bundle["medications"]
# Columns: record_id, redcap_event_name, redcap_repeat_instrument, redcap_repeat_instance, ...

Merging Tidy Tables

# Merge demographics with medications
merged = demographics.merge(
    medications,
    on=["record_id", "redcap_event_name"],
    how="left",
    suffixes=("", "_med"),
)

Pivoting to Wide Format

from redcap_sdk.transform import pivot_to_wide

# Wide format: one row per participant
wide = pivot_to_wide(
    demographics,
    id_cols=["record_id"],
    pivot_col="redcap_event_name",
)
# Columns: record_id, baseline_age, followup_age, ...
#### 3. Security & Tokens Guide

```markdown
# Security & Token Management

## Token Best Practices

### 1. Never Hardcode Tokens

```python
# BAD - token in code
client = Client(url, token="ABC123...")

# GOOD - token from environment
client = Client(url, token=os.environ["REDCAP_TOKEN"])

2. Use Separate Tokens for Different Purposes

Request tokens with minimal required permissions:

  • Read-only token: For analytics and reporting
  • Write token: For data entry applications
  • Admin token: Only when managing users (rare)

3. Token Storage Options

Environment Variables (Recommended)

export REDCAP_TOKEN=your-token-here

OS Keychain

from redcap_sdk.security import KeychainTokenProvider

provider = KeychainTokenProvider()
provider.set_token("your-token", project_name="my_study")

# Later...
token = provider.get_token("my_study")

HashiCorp Vault (Enterprise)

from redcap_sdk.security import VaultTokenProvider

provider = VaultTokenProvider(
    vault_url="https://vault.company.com",
    secret_path="secret/data/redcap/my_study",
)
token = provider.get_token()

4. Token Validation

from redcap_sdk.security import sanitize_token, verify_token

# Validate format before use
token = sanitize_token(raw_token)

# Verify token works
if await verify_token(client):
    print("Token is valid")
#### 4. Troubleshooting Guide

```markdown
# Troubleshooting API Calls

*Inspired by REDCapR's troubleshooting guide*

## Systematic Debugging Approach

When API calls fail, work through these layers:

### 1. Server & Authorization

- [ ] Is your REDCap account active?
- [ ] Is your email verified in REDCap?
- [ ] Does your token have the required permissions?
- [ ] Is the project in production status (if required)?

**Test in REDCap API Playground first** - if it works there, the issue is on your end.

### 2. Network Communication

- [ ] Can you reach the REDCap server? `ping redcap.institution.edu`
- [ ] Are you on the right network (VPN required)?
- [ ] Is there a firewall blocking the connection?

**Test with curl:**
```bash
curl -X POST https://redcap.institution.edu/api/ \
  -d "token=YOUR_TOKEN" \
  -d "content=version"

3. SDK Level

  • [ ] Is the SDK installed correctly?
  • [ ] Are you using the correct URL (include /api/)?
  • [ ] Is your token 32 hexadecimal characters?

Minimal test:

async with Client(url, token) as client:
    version = await client.project.export_version()
    print(f"REDCap version: {version}")

4. Application Level

  • [ ] Are field names spelled correctly?
  • [ ] Do requested records exist?
  • [ ] Is the date format correct?

Common Error Messages

Error Likely Cause Solution
AuthError: Invalid token Token incorrect or expired Regenerate token in REDCap
ApiError: 403 Insufficient permissions Request additional API rights
TransportError: Connection timeout Network issue Check VPN, firewall
ValidationError: Field 'xyz' not found Typo in field name Check data dictionary

Enable Debug Logging

import logging
logging.basicConfig(level=logging.DEBUG)

# Or just for the SDK
logging.getLogger("redcap_sdk").setLevel(logging.DEBUG)
```


14. Open Questions / Assumptions

Assumptions Made

  1. REDCap API version: Design assumes REDCap 10.0+ with standard API endpoints. Older versions may lack some features (e.g., file repository).

  2. Pandas optional: Design keeps pandas as optional dependency. Users who don't need DataFrames can use JSON/dict outputs.

  3. Async-first: Design uses async/await for all I/O operations. A sync wrapper can be added for simpler use cases.

  4. Token format: Assumed standard 32-character hexadecimal tokens. Some institutions may have different formats.

  5. SSL verification: Enabled by default. Some institutional REDCap instances may have certificate issues requiring custom CA bundles.

Open Questions

  1. Sync vs Async API: Should we provide both sync and async interfaces, or async-only with optional sync wrapper?
  2. Recommendation: Async-only core, with asyncio.run() convenience for simple scripts.

  3. DataFrame library: Should we support alternatives to pandas (polars, pyarrow)?

  4. Recommendation: Start with pandas, add polars support if requested.

  5. Caching strategy: Should the SDK cache metadata/project info between calls?

  6. Recommendation: Optional caching with configurable TTL, disabled by default.

  7. Batch size defaults: What's the optimal default batch size for most REDCap instances?

  8. Recommendation: 500 records (balance between API calls and memory), configurable.

  9. Error recovery for partial imports: How to handle partial failures in batch imports?

  10. Recommendation: Return partial results with list of failures, let user decide.

  11. Backwards compatibility with PyCap: Should we aim for API compatibility to ease migration?

  12. Recommendation: No - design for clarity over compatibility, but provide migration guide.

  13. R interface: Should we provide an R wrapper (reticulate-based)?

  14. Recommendation: Not in v1.0; R users have mature options (redcapAPI, REDCapR).

References

R Packages

Python Packages

REDCap Resources