Skip to content

Latest commit

 

History

History
547 lines (425 loc) · 16.9 KB

File metadata and controls

547 lines (425 loc) · 16.9 KB

ADR-005: Schema Evolution Strategy

Status

Accepted - 2026-01-23

Context

Cryptocurrency exchanges frequently modify their API responses, adding new fields, deprecating old ones, or restructuring data. The K2 Reference Data Platform must handle schema changes without breaking existing queries, pipelines, or downstream consumers.

Types of Schema Changes

1. Additive Changes (most common):

  • Exchange adds marginTradingAllowed field to /exchangeInfo
  • New field riskLimits appears in instrument specs
  • Additional metadata like insuranceFundAddress

2. Modification Changes:

  • Field renamed: tickSizepriceIncrement
  • Type changed: maxLeverage from INT to DECIMAL
  • Enum expanded: status adds new value PAUSE_TRADING

3. Removal Changes (rare):

  • Field deprecated and removed
  • Entire instrument type no longer supported

Layer-Specific Requirements

Layer Change Tolerance Backward Compatibility Forward Compatibility
Bronze ✅ Full (raw JSON) N/A (append-only) ✅ Yes (JSON flexible)
Silver ⚠️ Controlled ✅ Required ⚠️ Preferred
Gold ✅ Stable ✅ Required ❌ Not critical
API ⚠️ Versioned ✅ Required ❌ Breaks clients

Iceberg Schema Evolution Capabilities

Apache Iceberg (Format Version 2) supports:

  • ✅ Add columns (nullable)
  • ✅ Delete columns (soft delete, data preserved)
  • ✅ Rename columns (metadata-only operation)
  • ✅ Promote types (INT → LONG, FLOAT → DOUBLE)
  • ✅ Reorder columns
  • ❌ Change column type incompatibly (LONG → INT)

Avro Schema Registry Compatibility Modes

Mode Add Field Remove Field Rename Field Change Type
BACKWARD ✅ Yes (default) ❌ No ❌ No ❌ No
FORWARD ❌ No ✅ Yes ❌ No ❌ No
FULL ⚠️ Yes (nullable) ⚠️ Yes (default) ❌ No ❌ No
NONE ✅ Yes ✅ Yes ✅ Yes ✅ Yes

Decision

Implement BACKWARD compatibility with vendor-specific escape hatches.

Schema Evolution Policy by Layer

Bronze Layer: Full Forward Compatibility

Strategy: Store complete API responses as JSON (no schema enforcement)

Schema:

CREATE TABLE refdata.bronze_instruments_binance (
    ingestion_id STRING NOT NULL,           -- MD5 hash (deterministic)
    ingestion_timestamp TIMESTAMP NOT NULL, -- System time
    api_endpoint STRING NOT NULL,           -- '/api/v3/exchangeInfo'
    api_response_raw STRING NOT NULL,       -- Full JSON (NEVER truncate)
    response_size_bytes INT,
    http_status_code INT,
    PRIMARY KEY (ingestion_id)
) USING iceberg
PARTITIONED BY (days(ingestion_timestamp))
LOCATION 's3a://refdata-warehouse/bronze/instruments/binance';

Rationale:

  • ✅ Future-proof: Any new fields are automatically preserved
  • ✅ No data loss: Full API response always available
  • ✅ Replay-friendly: Can re-parse Bronze with updated Silver schema
  • ✅ Debugging: Inspect exact API response for any timestamp

Retention: 7 days (sufficient for replay, not archival)

Silver Layer: Backward Compatibility with Vendor Data

Strategy: Structured columns for common fields + JSON escape hatch for exchange-specific data

Schema:

CREATE TABLE refdata.silver_instruments (
    instrument_sk STRING PRIMARY KEY,

    -- Natural key
    exchange STRING NOT NULL,
    symbol STRING NOT NULL,

    -- Bitemporal dimensions
    valid_from TIMESTAMP NOT NULL,
    valid_to TIMESTAMP,
    record_created_at TIMESTAMP NOT NULL,
    record_updated_at TIMESTAMP,

    -- ⭐ CORE FIELDS (common across all exchanges)
    instrument_type STRING,        -- 'spot', 'perpetual', 'future'
    base_asset STRING,
    quote_asset STRING,
    status STRING,                 -- 'active', 'suspended', 'delisted'

    -- Contract specifications
    tick_size DECIMAL(38, 18),
    lot_size DECIMAL(38, 18),
    min_notional DECIMAL(38, 18),
    max_leverage INT,

    -- Perpetual-specific (NULL for spot)
    funding_interval_hours INT,
    settlement_asset STRING,

    -- ⭐ VENDOR DATA ESCAPE HATCH
    -- Stores exchange-specific fields not promoted to columns
    vendor_data STRING,            -- JSON map of additional fields

    -- Audit trail
    source_system STRING NOT NULL,
    source_record_id STRING,
    change_reason STRING,
    changed_by STRING,

    -- Data quality
    is_validated BOOLEAN DEFAULT TRUE,
    validation_errors STRING       -- JSON array if validation failed

) USING iceberg
PARTITIONED BY (exchange, months(valid_from))
LOCATION 's3a://refdata-warehouse/silver/instruments'
TBLPROPERTIES (
    'format-version' = '2',
    'write.format.default' = 'parquet'
);

Example Vendor Data:

// Binance-specific fields not in core schema
{
  "marginTradingAllowed": true,
  "riskLimits": [
    {"notionalValue": 50000, "maintenanceMarginRate": 0.02},
    {"notionalValue": 100000, "maintenanceMarginRate": 0.025}
  ],
  "insuranceFund": "0x1234...",
  "ocoAllowed": true,
  "quoteOrderQtyMarketAllowed": true
}

Handling New Fields:

  1. Initial: New field appears in Bronze → Store in vendor_data JSON
  2. Monitor: Track usage; if 2+ exchanges have it, consider promotion
  3. Promote: Add column to Silver schema (nullable, backward compatible)
  4. Migrate: Backfill from vendor_data if needed

Schema Evolution Workflow:

-- Step 1: Exchange adds new field "marginTradingAllowed"
-- Initially stored in vendor_data (no schema change)

-- Step 2: After validation, promote to column
ALTER TABLE refdata.silver_instruments
ADD COLUMN IF NOT EXISTS margin_trading_allowed BOOLEAN;

-- Step 3: Update DBT model to extract from Bronze
-- dbt/models/silver/silver_instruments.sql
SELECT
    -- ... existing fields
    JSON_EXTRACT_SCALAR(api_response_raw, '$.marginTradingAllowed') AS margin_trading_allowed,
    -- ... vendor_data (excluding promoted fields)
FROM {{ ref('bronze_instruments_binance') }}

-- Step 4: Old queries continue to work (column is nullable)

Gold Layer: Stable Schema

Strategy: Minimize changes; Gold is public contract for analysts

Schema:

CREATE TABLE refdata.gold_symbology_master (
    canonical_id STRING PRIMARY KEY,
    base_asset STRING NOT NULL,
    quote_asset STRING NOT NULL,
    instrument_class STRING NOT NULL,

    -- Exchange mappings (add columns as new exchanges supported)
    binance_symbol STRING,
    kraken_symbol STRING,
    bybit_symbol STRING,      -- Add when Bybit ingestion ready
    coinbase_symbol STRING,   -- Add when Coinbase ingestion ready

    parent_canonical_id STRING,
    first_seen_at TIMESTAMP NOT NULL,
    last_seen_at TIMESTAMP,
    is_active BOOLEAN DEFAULT TRUE,

    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP,
    metadata STRING            -- JSON for extensibility

) USING iceberg
PARTITIONED BY (truncate(base_asset, 1));

Adding New Exchange:

-- Backward compatible: existing queries unaffected
ALTER TABLE refdata.gold_symbology_master
ADD COLUMN IF NOT EXISTS okx_symbol STRING;

-- Update DBT model to populate new column

API Layer: Versioned Endpoints

Strategy: API versioning with deprecation notices

Versioning Scheme: /v{major}/resource

  • v1: Initial release
  • v2: Breaking changes (rename field, remove field, change type)
  • v1 remains available for 6 months after v2 release

Example Breaking Change:

# OLD: /v1/instruments (returns tick_size as STRING)
{
  "tick_size": "0.01",
  "lot_size": "0.00001"
}

# NEW: /v2/instruments (returns tick_size as DECIMAL)
{
  "tick_size": 0.01,  # Number, not string
  "lot_size": 0.00001
}

# Deprecation header in v1 response
X-API-Deprecation: "v1 will be sunset on 2024-07-15; migrate to /v2/instruments"

Non-Breaking Changes (add field):

# v1 can add optional fields (backward compatible)
{
  "tick_size": "0.01",
  "lot_size": "0.00001",
  "margin_trading_allowed": true  # NEW field, clients can ignore
}

Avro Schema Registry Configuration

Compatibility Mode: BACKWARD

Rationale:

  • ✅ Can add fields with defaults (Bronze ingestion can evolve)
  • ✅ Old consumers can read new messages (ignore unknown fields)
  • ❌ Cannot remove fields (prevents accidental breaking changes)

Schema Evolution Example:

// Version 1: Initial schema
{
  "type": "record",
  "name": "BinanceInstrument",
  "namespace": "refdata.instruments",
  "fields": [
    {"name": "symbol", "type": "string"},
    {"name": "baseAsset", "type": "string"},
    {"name": "quoteAsset", "type": "string"},
    {"name": "tickSize", "type": "string"}
  ]
}

// Version 2: Add marginTradingAllowed (backward compatible)
{
  "type": "record",
  "name": "BinanceInstrument",
  "namespace": "refdata.instruments",
  "fields": [
    {"name": "symbol", "type": "string"},
    {"name": "baseAsset", "type": "string"},
    {"name": "quoteAsset", "type": "string"},
    {"name": "tickSize", "type": "string"},
    {"name": "marginTradingAllowed", "type": ["null", "boolean"], "default": null}  // NEW
  ]
}

Registration:

# scripts/register_schemas.py
from confluent_kafka.schema_registry import SchemaRegistryClient, Schema

sr_client = SchemaRegistryClient({'url': 'http://localhost:8081'})

# Set compatibility mode
sr_client.set_compatibility(
    subject_name='refdata-binance-instrument-raw-value',
    level='BACKWARD'
)

# Register new schema version
with open('config/schemas/binance_instrument_v2.avsc') as f:
    schema = Schema(f.read(), schema_type='AVRO')
    schema_id = sr_client.register_schema(
        subject_name='refdata-binance-instrument-raw-value',
        schema=schema
    )
    print(f"Registered schema version {schema_id}")

DBT Incremental Model Strategy

Handling Schema Changes in DBT:

-- dbt/models/silver/silver_instruments.sql
{{
    config(
        materialized='incremental',
        unique_key='instrument_sk',
        on_schema_change='append_new_columns'  -- ⭐ Auto-add new columns
    )
}}

SELECT
    MD5(exchange || symbol || valid_from) AS instrument_sk,
    exchange,
    symbol,
    valid_from,
    valid_to,
    record_created_at,
    record_updated_at,

    -- Core fields
    JSON_EXTRACT_SCALAR(api_response_raw, '$.baseAsset') AS base_asset,
    JSON_EXTRACT_SCALAR(api_response_raw, '$.quoteAsset') AS quote_asset,
    CAST(JSON_EXTRACT_SCALAR(api_response_raw, '$.filters[0].tickSize') AS DECIMAL(38,18)) AS tick_size,

    -- NEW: Promote marginTradingAllowed from vendor_data
    CAST(JSON_EXTRACT_SCALAR(api_response_raw, '$.marginTradingAllowed') AS BOOLEAN) AS margin_trading_allowed,

    -- Vendor data: remaining fields not promoted
    {{ build_vendor_data_json() }} AS vendor_data

FROM {{ ref('bronze_instruments_binance') }}

{% if is_incremental() %}
WHERE ingestion_timestamp > (SELECT MAX(record_created_at) FROM {{ this }})
{% endif %}

DBT Macro for Vendor Data:

-- dbt/macros/build_vendor_data_json.sql
{% macro build_vendor_data_json() %}
-- Extract all fields NOT already promoted to columns
JSON_BUILD_OBJECT(
    'riskLimits', JSON_EXTRACT(api_response_raw, '$.riskLimits'),
    'insuranceFund', JSON_EXTRACT_SCALAR(api_response_raw, '$.insuranceFund'),
    'ocoAllowed', JSON_EXTRACT_SCALAR(api_response_raw, '$.ocoAllowed')
    -- Add more as needed
)
{% endmacro %}

Consequences

Positive

No Data Loss: Bronze preserves full API responses; can always re-parse

Backward Compatibility: Existing queries continue to work when new fields added

Forward Compatibility (Bronze): Future API changes automatically captured in JSON

Controlled Evolution (Silver): Structured columns for common fields, JSON escape hatch for rest

Replay-Friendly: Can re-run DBT with updated schema to backfill new columns from Bronze

API Stability: Versioned endpoints prevent breaking client applications

Negative

Manual Promotion: Must monitor vendor_data and manually promote frequently-used fields

Schema Drift: Over time, Silver may accumulate many nullable columns (mitigated by periodic review)

Query Complexity: Querying vendor_data requires JSON extraction (slower than native columns)

Version Management: API versioning adds operational overhead (maintaining v1 + v2)

Mitigation Strategies

1. Automated Field Monitoring:

# scripts/monitor_vendor_data_fields.py
# Alert if new field appears in vendor_data across 70%+ of records
from collections import Counter

vendor_data_fields = []
for record in silver_instruments:
    if record.vendor_data:
        fields = json.loads(record.vendor_data).keys()
        vendor_data_fields.extend(fields)

field_counts = Counter(vendor_data_fields)
total_records = len(silver_instruments)

for field, count in field_counts.items():
    prevalence = count / total_records
    if prevalence > 0.7:
        alert(f"Field '{field}' appears in {prevalence:.0%} of records; consider promotion")

2. Periodic Schema Review:

  • Quarterly review of Silver schema
  • Identify nullable columns with <5% population → candidate for removal
  • Identify vendor_data fields with >70% population → candidate for promotion

3. API Deprecation Policy:

  • New version: Announce 3 months in advance
  • Deprecation period: 6 months (both versions live)
  • Sunset: Remove old version after 6 months
  • Monitor: Track v1 usage during deprecation period

4. DBT Test for Schema Drift:

-- tests/test_no_unpopulated_columns.sql
-- Alert if any column has <5% non-null values (likely unused)
WITH column_stats AS (
    SELECT
        'tick_size' AS column_name,
        COUNT(*) AS total_rows,
        COUNT(tick_size) AS non_null_count
    FROM {{ ref('silver_instruments') }}
    UNION ALL
    SELECT
        'margin_trading_allowed',
        COUNT(*),
        COUNT(margin_trading_allowed)
    FROM {{ ref('silver_instruments') }}
    -- Add more columns
)
SELECT
    column_name,
    non_null_count / total_rows AS population_rate
FROM column_stats
WHERE non_null_count / total_rows < 0.05;
-- Alert if any rows returned

Alternatives Considered

Alternative 1: FORWARD Compatibility (Rejected)

Approach: Allow field removal, prevent field addition

Pros:

  • ✅ Can deprecate old fields

Cons:

  • Breaks old consumers: Old Kafka consumers can't read new messages
  • ❌ Not suitable for additive changes (exchanges typically add fields, not remove)

Why Rejected: Incompatible with exchange API evolution patterns

Alternative 2: FULL Compatibility (Rejected)

Approach: All changes must be both backward and forward compatible

Pros:

  • ✅ Maximum compatibility

Cons:

  • Too restrictive: Can't add required fields, can't remove deprecated fields
  • ❌ Forces nullable fields with defaults (clutters schema)

Why Rejected: Too rigid for practical use

Alternative 3: No Compatibility Checks (NONE) (Rejected)

Approach: Allow any schema change

Pros:

  • ✅ Maximum flexibility

Cons:

  • Breaks consumers: No protection against breaking changes
  • ❌ Debugging nightmares: Incompatible messages in Kafka

Why Rejected: Too risky for production system

Alternative 4: Store Everything in vendor_data (Rejected)

Approach: Minimal Silver schema, most fields in JSON

Pros:

  • ✅ Maximum forward compatibility

Cons:

  • Poor query performance: JSON extraction slower than native columns
  • ❌ No type safety: All values are strings in JSON
  • ❌ Difficult for analysts: Must know JSON structure

Why Rejected: Sacrifices performance and usability

Implementation Checklist

  • Bronze Iceberg tables with JSON column (api_response_raw)
  • Avro schemas with BACKWARD compatibility (config/schemas/*.avsc)
  • Schema Registry registration script (scripts/register_schemas.py)
  • Silver schema with vendor_data column
  • DBT macro for build_vendor_data_json()
  • DBT config on_schema_change='append_new_columns'
  • Monitoring script for vendor_data field prevalence
  • API versioning middleware (deprecation headers)
  • API documentation for version policy
  • DBT test for unpopulated columns
  • Runbook: How to promote field from vendor_data to column
  • Runbook: How to handle breaking API changes

References

Related ADRs

  • ADR-001: Bitemporal Modeling (late corrections are a form of schema evolution)
  • ADR-002: Ingestion Strategy (Bronze ingestion must handle new fields)
  • ADR-003: DBT vs Spark (DBT handles schema evolution via on_schema_change)