Skip to content

Latest commit

 

History

History
854 lines (682 loc) · 25.1 KB

File metadata and controls

854 lines (682 loc) · 25.1 KB

数据库设计文档

一、PostgreSQL元数据库设计

1.1 设计原则

  • 范式设计:遵循第三范式,减少数据冗余
  • JSONB灵活性:使用JSONB存储动态配置
  • 性能优化:合理建立索引
  • 审计追踪:记录创建时间、更新时间
  • 软删除:重要数据使用软删除
  • 多租户隔离:基于tenant_id的数据隔离

1.2 ER图

┌──────────────┐         ┌──────────────────┐         ┌──────────────────┐
│   tenants    │────────<│   sync_tasks     │>────────│  table_mappings  │
│              │  1   n  │                  │  1   n  │                  │
└──────────────┘         └─────────┬────────┘         └──────────────────┘
                                   │
                                   │ 1
                                   │
                                   │ n
                         ┌─────────▼────────┐
                         │  sync_offsets    │
                         │                  │
                         └──────────────────┘
                                   │
                                   │ 1
                                   │
                                   │ n
                         ┌─────────▼────────┐
                         │  sync_metrics    │
                         │                  │
                         └──────────────────┘

二、表结构设计

2.1 租户表 (tenants)

用途:管理多租户信息和配额

CREATE TABLE tenants (
    -- 主键
    tenant_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

    -- 基本信息
    tenant_name VARCHAR(100) NOT NULL,
    tenant_code VARCHAR(50) NOT NULL UNIQUE,
    description TEXT,

    -- 联系信息
    contact_name VARCHAR(100),
    contact_email VARCHAR(255),
    contact_phone VARCHAR(20),

    -- 配额管理
    max_connectors INTEGER DEFAULT 10,
    max_tasks_per_connector INTEGER DEFAULT 8,
    max_throughput_tps INTEGER DEFAULT 10000,

    -- 状态管理
    status VARCHAR(20) NOT NULL DEFAULT 'ACTIVE',
        -- 枚举值: ACTIVE, SUSPENDED, INACTIVE

    -- 配置信息
    config JSONB DEFAULT '{}',
        -- 示例: {"alert_email": "admin@example.com", "timezone": "UTC"}

    -- 审计字段
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    created_by VARCHAR(100),
    updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_by VARCHAR(100),
    deleted_at TIMESTAMP,

    -- 约束
    CONSTRAINT chk_tenant_status CHECK (status IN ('ACTIVE', 'SUSPENDED', 'INACTIVE'))
);

-- 索引
CREATE INDEX idx_tenants_status ON tenants(status) WHERE deleted_at IS NULL;
CREATE INDEX idx_tenants_code ON tenants(tenant_code);

-- 注释
COMMENT ON TABLE tenants IS '租户信息表';
COMMENT ON COLUMN tenants.tenant_id IS '租户唯一标识';
COMMENT ON COLUMN tenants.max_connectors IS '最大允许创建的connector数量';
COMMENT ON COLUMN tenants.config IS '租户级别的配置信息(JSONB格式)';

2.2 同步任务表 (sync_tasks)

用途:存储同步任务的核心配置

CREATE TABLE sync_tasks (
    -- 主键
    task_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

    -- 租户关联
    tenant_id UUID NOT NULL REFERENCES tenants(tenant_id) ON DELETE CASCADE,

    -- 任务基本信息
    task_name VARCHAR(200) NOT NULL,
    task_code VARCHAR(100) NOT NULL,
    description TEXT,

    -- 源数据库配置
    source_db_type VARCHAR(50) NOT NULL,
        -- 枚举值: MYSQL, ORACLE, SQLSERVER, POSTGRESQL
    source_connection_config JSONB NOT NULL,
        -- 示例: {
        --   "host": "mysql.example.com",
        --   "port": 3306,
        --   "database": "mydb",
        --   "username": "dbuser",
        --   "password": "encrypted_password",
        --   "ssl": true
        -- }

    -- 目标数据库配置
    target_db_type VARCHAR(50) NOT NULL,
    target_connection_config JSONB NOT NULL,

    -- Debezium Connector配置
    connector_name VARCHAR(200) UNIQUE,
    connector_config JSONB NOT NULL,
        -- 示例: {
        --   "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        --   "database.server.id": "184054",
        --   "snapshot.mode": "initial",
        --   "database.include.list": "inventory",
        --   ...
        -- }

    -- 同步模式
    sync_mode VARCHAR(20) NOT NULL DEFAULT 'FULL_INCREMENTAL',
        -- 枚举值: FULL_ONLY, INCREMENTAL_ONLY, FULL_INCREMENTAL

    -- 任务状态
    status VARCHAR(20) NOT NULL DEFAULT 'CREATED',
        -- 枚举值: CREATED, RUNNING, PAUSED, STOPPED, FAILED, COMPLETED

    -- 健康状态
    health_status VARCHAR(20) DEFAULT 'UNKNOWN',
        -- 枚举值: HEALTHY, DEGRADED, UNHEALTHY, UNKNOWN

    -- 错误信息
    last_error TEXT,
    error_count INTEGER DEFAULT 0,

    -- 统计信息
    total_records_synced BIGINT DEFAULT 0,
    last_sync_time TIMESTAMP,

    -- 告警配置
    alert_config JSONB DEFAULT '{}',
        -- 示例: {
        --   "lag_threshold": 100000,
        --   "error_rate_threshold": 0.01,
        --   "alert_emails": ["admin@example.com"]
        -- }

    -- 调度配置
    schedule_config JSONB DEFAULT '{}',
        -- 示例: {"cron": "0 0 * * *", "timezone": "UTC"}

    -- 审计字段
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    created_by VARCHAR(100),
    updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_by VARCHAR(100),
    deleted_at TIMESTAMP,

    -- 约束
    CONSTRAINT uk_tenant_task_code UNIQUE (tenant_id, task_code),
    CONSTRAINT chk_sync_task_source_db_type
        CHECK (source_db_type IN ('MYSQL', 'ORACLE', 'SQLSERVER', 'POSTGRESQL')),
    CONSTRAINT chk_sync_task_target_db_type
        CHECK (target_db_type IN ('MYSQL', 'ORACLE', 'SQLSERVER', 'POSTGRESQL')),
    CONSTRAINT chk_sync_task_mode
        CHECK (sync_mode IN ('FULL_ONLY', 'INCREMENTAL_ONLY', 'FULL_INCREMENTAL')),
    CONSTRAINT chk_sync_task_status
        CHECK (status IN ('CREATED', 'RUNNING', 'PAUSED', 'STOPPED', 'FAILED', 'COMPLETED'))
);

-- 索引
CREATE INDEX idx_sync_tasks_tenant ON sync_tasks(tenant_id) WHERE deleted_at IS NULL;
CREATE INDEX idx_sync_tasks_status ON sync_tasks(status) WHERE deleted_at IS NULL;
CREATE INDEX idx_sync_tasks_health ON sync_tasks(health_status);
CREATE INDEX idx_sync_tasks_connector ON sync_tasks(connector_name);
CREATE INDEX idx_sync_tasks_source_db ON sync_tasks(source_db_type);
CREATE INDEX idx_sync_tasks_target_db ON sync_tasks(target_db_type);

-- GIN索引用于JSONB查询
CREATE INDEX idx_sync_tasks_source_config ON sync_tasks USING GIN (source_connection_config);
CREATE INDEX idx_sync_tasks_target_config ON sync_tasks USING GIN (target_connection_config);

-- 注释
COMMENT ON TABLE sync_tasks IS '数据同步任务表';
COMMENT ON COLUMN sync_tasks.connector_name IS 'Kafka Connect中的connector名称';
COMMENT ON COLUMN sync_tasks.connector_config IS 'Debezium connector完整配置';

2.3 表映射配置表 (table_mappings)

用途:定义源表到目标表的映射关系

CREATE TABLE table_mappings (
    -- 主键
    mapping_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

    -- 任务关联
    task_id UUID NOT NULL REFERENCES sync_tasks(task_id) ON DELETE CASCADE,

    -- 源表信息
    source_schema VARCHAR(100),
    source_table VARCHAR(100) NOT NULL,
    source_table_fqn VARCHAR(255) GENERATED ALWAYS AS
        (COALESCE(source_schema || '.', '') || source_table) STORED,

    -- 目标表信息
    target_schema VARCHAR(100),
    target_table VARCHAR(100) NOT NULL,
    target_table_fqn VARCHAR(255) GENERATED ALWAYS AS
        (COALESCE(target_schema || '.', '') || target_table) STORED,

    -- 列映射规则
    column_mappings JSONB NOT NULL DEFAULT '[]',
        -- 示例: [
        --   {
        --     "source_column": "customer_id",
        --     "target_column": "cust_id",
        --     "source_type": "BIGINT",
        --     "target_type": "VARCHAR(20)",
        --     "nullable": false,
        --     "default_value": null
        --   },
        --   ...
        -- ]

    -- 主键映射
    primary_key_mapping JSONB,
        -- 示例: {
        --   "source_columns": ["id"],
        --   "target_columns": ["customer_id"]
        -- }

    -- 过滤条件
    row_filter_condition TEXT,
        -- 示例: "status = 'ACTIVE' AND created_at > '2024-01-01'"

    -- 列过滤(要同步的列)
    column_filter JSONB DEFAULT '[]',
        -- 示例: ["id", "name", "email", "created_at"]

    -- 转换规则
    transform_rules JSONB DEFAULT '[]',
        -- 示例: [
        --   {
        --     "column": "phone",
        --     "transform_type": "MASK",
        --     "params": {"pattern": "***-****-####"}
        --   },
        --   {
        --     "column": "price",
        --     "transform_type": "MULTIPLY",
        --     "params": {"factor": 1.1}
        --   }
        -- ]

    -- 冲突处理策略
    conflict_resolution VARCHAR(20) DEFAULT 'UPSERT',
        -- 枚举值: UPSERT, INSERT_ONLY, UPDATE_ONLY, IGNORE

    -- 批量配置
    batch_size INTEGER DEFAULT 1000,
    batch_timeout_ms INTEGER DEFAULT 5000,

    -- 状态
    is_enabled BOOLEAN DEFAULT true,

    -- 统计信息
    total_rows_synced BIGINT DEFAULT 0,
    last_sync_time TIMESTAMP,

    -- 审计字段
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    created_by VARCHAR(100),
    updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_by VARCHAR(100),
    deleted_at TIMESTAMP,

    -- 约束
    CONSTRAINT uk_task_source_table UNIQUE (task_id, source_schema, source_table),
    CONSTRAINT chk_mapping_conflict_resolution
        CHECK (conflict_resolution IN ('UPSERT', 'INSERT_ONLY', 'UPDATE_ONLY', 'IGNORE'))
);

-- 索引
CREATE INDEX idx_table_mappings_task ON table_mappings(task_id) WHERE deleted_at IS NULL;
CREATE INDEX idx_table_mappings_source ON table_mappings(source_table_fqn);
CREATE INDEX idx_table_mappings_target ON table_mappings(target_table_fqn);
CREATE INDEX idx_table_mappings_enabled ON table_mappings(is_enabled) WHERE deleted_at IS NULL;

-- GIN索引
CREATE INDEX idx_table_mappings_columns ON table_mappings USING GIN (column_mappings);

-- 注释
COMMENT ON TABLE table_mappings IS '表映射配置表';
COMMENT ON COLUMN table_mappings.column_mappings IS '列级别的字段映射规则(JSONB数组)';
COMMENT ON COLUMN table_mappings.transform_rules IS '数据转换规则';

2.4 转换脚本表 (transform_scripts)

用途:存储自定义的数据转换脚本

CREATE TABLE transform_scripts (
    -- 主键
    script_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

    -- 映射关联
    mapping_id UUID NOT NULL REFERENCES table_mappings(mapping_id) ON DELETE CASCADE,

    -- 脚本信息
    script_name VARCHAR(200) NOT NULL,
    script_type VARCHAR(20) NOT NULL,
        -- 枚举值: GROOVY, JAVASCRIPT, SQL, PYTHON

    -- 触发时机
    trigger_phase VARCHAR(20) NOT NULL,
        -- 枚举值: BEFORE_TRANSFORM, AFTER_TRANSFORM, BEFORE_WRITE, AFTER_WRITE

    -- 脚本内容
    script_content TEXT NOT NULL,
        -- 示例 (Groovy):
        -- """
        -- def transform(record) {
        --     record.fullName = record.firstName + ' ' + record.lastName
        --     record.updatedAt = new Date()
        --     return record
        -- }
        -- """

    -- 执行顺序
    execution_order INTEGER DEFAULT 0,

    -- 脚本参数
    script_params JSONB DEFAULT '{}',
        -- 示例: {"timezone": "UTC", "currency_rate": 6.5}

    -- 状态
    is_enabled BOOLEAN DEFAULT true,

    -- 错误处理
    error_handling VARCHAR(20) DEFAULT 'LOG_AND_CONTINUE',
        -- 枚举值: LOG_AND_CONTINUE, THROW_EXCEPTION, SKIP_RECORD

    -- 超时设置
    timeout_ms INTEGER DEFAULT 5000,

    -- 统计信息
    execution_count BIGINT DEFAULT 0,
    last_execution_time TIMESTAMP,
    average_execution_time_ms NUMERIC(10, 2),

    -- 审计字段
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    created_by VARCHAR(100),
    updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_by VARCHAR(100),
    deleted_at TIMESTAMP,

    -- 约束
    CONSTRAINT chk_script_type
        CHECK (script_type IN ('GROOVY', 'JAVASCRIPT', 'SQL', 'PYTHON')),
    CONSTRAINT chk_trigger_phase
        CHECK (trigger_phase IN ('BEFORE_TRANSFORM', 'AFTER_TRANSFORM', 'BEFORE_WRITE', 'AFTER_WRITE')),
    CONSTRAINT chk_error_handling
        CHECK (error_handling IN ('LOG_AND_CONTINUE', 'THROW_EXCEPTION', 'SKIP_RECORD'))
);

-- 索引
CREATE INDEX idx_transform_scripts_mapping ON transform_scripts(mapping_id) WHERE deleted_at IS NULL;
CREATE INDEX idx_transform_scripts_order ON transform_scripts(execution_order);
CREATE INDEX idx_transform_scripts_enabled ON transform_scripts(is_enabled);

-- 注释
COMMENT ON TABLE transform_scripts IS '数据转换脚本表';
COMMENT ON COLUMN transform_scripts.script_content IS '脚本代码内容';
COMMENT ON COLUMN transform_scripts.execution_order IS '多个脚本的执行顺序(升序)';

2.5 同步偏移量表 (sync_offsets)

用途:存储Debezium的offset信息,用于断点续传

CREATE TABLE sync_offsets (
    -- 主键
    offset_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

    -- 任务关联
    task_id UUID NOT NULL REFERENCES sync_tasks(task_id) ON DELETE CASCADE,

    -- Partition信息
    source_partition JSONB NOT NULL,
        -- 示例: {
        --   "server": "mysql-server-1",
        --   "database": "inventory"
        -- }

    -- Offset信息
    source_offset JSONB NOT NULL,
        -- 示例 (MySQL): {
        --   "file": "mysql-bin.000003",
        --   "pos": 154,
        --   "row": 1,
        --   "server_id": 223344,
        --   "event": 2,
        --   "ts_sec": 1706234567
        -- }
        -- 示例 (Oracle): {
        --   "scn": "123456789",
        --   "commit_scn": "123456790"
        -- }

    -- 快照信息
    snapshot_offset JSONB,
        -- 示例: {
        --   "last_snapshot_record": true,
        --   "snapshot_completed": true
        -- }

    -- 时间戳
    last_updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,

    -- 约束
    CONSTRAINT uk_task_partition UNIQUE (task_id, source_partition)
);

-- 索引
CREATE INDEX idx_sync_offsets_task ON sync_offsets(task_id);
CREATE INDEX idx_sync_offsets_updated ON sync_offsets(last_updated DESC);

-- GIN索引
CREATE INDEX idx_sync_offsets_partition ON sync_offsets USING GIN (source_partition);
CREATE INDEX idx_sync_offsets_offset ON sync_offsets USING GIN (source_offset);

-- 注释
COMMENT ON TABLE sync_offsets IS 'CDC偏移量存储表(用于断点续传)';
COMMENT ON COLUMN sync_offsets.source_partition IS 'Kafka Connect partition信息';
COMMENT ON COLUMN sync_offsets.source_offset IS 'CDC消费位置信息';

2.6 同步监控指标表 (sync_metrics)

用途:存储同步任务的监控指标

CREATE TABLE sync_metrics (
    -- 主键
    metric_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

    -- 任务关联
    task_id UUID NOT NULL REFERENCES sync_tasks(task_id) ON DELETE CASCADE,

    -- 指标类型
    metric_type VARCHAR(50) NOT NULL,
        -- 枚举值: THROUGHPUT, LATENCY, LAG, ERROR_RATE, CPU, MEMORY

    -- 指标值
    metric_value JSONB NOT NULL,
        -- 示例 (THROUGHPUT): {
        --   "records_per_second": 1500.5,
        --   "bytes_per_second": 1048576
        -- }
        -- 示例 (LATENCY): {
        --   "p50": 50,
        --   "p95": 200,
        --   "p99": 500,
        --   "max": 1200
        -- }
        -- 示例 (LAG): {
        --   "consumer_lag": 10000,
        --   "time_lag_ms": 5000
        -- }

    -- 时间维度
    time_window VARCHAR(20),
        -- 枚举值: REALTIME, 1MIN, 5MIN, 15MIN, 1HOUR, 1DAY

    -- 记录时间
    recorded_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,

    -- 分区键(用于时间序列分区)
    recorded_date DATE GENERATED ALWAYS AS (recorded_at::DATE) STORED
);

-- 时间序列分区
CREATE INDEX idx_sync_metrics_task_time ON sync_metrics(task_id, recorded_at DESC);
CREATE INDEX idx_sync_metrics_type ON sync_metrics(metric_type, recorded_at DESC);
CREATE INDEX idx_sync_metrics_date ON sync_metrics(recorded_date);

-- GIN索引
CREATE INDEX idx_sync_metrics_value ON sync_metrics USING GIN (metric_value);

-- 注释
COMMENT ON TABLE sync_metrics IS '同步任务监控指标表';
COMMENT ON COLUMN sync_metrics.metric_value IS '指标值(JSONB格式,支持复杂数据结构)';
COMMENT ON COLUMN sync_metrics.time_window IS '指标聚合的时间窗口';

-- 分区表(按月分区)
-- 注意:实际使用时建议使用PostgreSQL 10+的声明式分区
-- CREATE TABLE sync_metrics_2025_01 PARTITION OF sync_metrics
-- FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');

2.7 告警记录表 (alert_records)

用途:记录告警历史

CREATE TABLE alert_records (
    -- 主键
    alert_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

    -- 任务关联
    task_id UUID REFERENCES sync_tasks(task_id) ON DELETE SET NULL,

    -- 告警信息
    alert_type VARCHAR(50) NOT NULL,
        -- 枚举值: HIGH_LAG, HIGH_ERROR_RATE, CONNECTOR_FAILED,
        --        HIGH_LATENCY, RESOURCE_EXHAUSTED

    alert_level VARCHAR(20) NOT NULL,
        -- 枚举值: P0, P1, P2, P3

    alert_title VARCHAR(255) NOT NULL,
    alert_message TEXT NOT NULL,

    -- 告警详情
    alert_details JSONB,
        -- 示例: {
        --   "current_lag": 150000,
        --   "threshold": 100000,
        --   "duration_seconds": 300
        -- }

    -- 状态
    status VARCHAR(20) DEFAULT 'OPEN',
        -- 枚举值: OPEN, ACKNOWLEDGED, RESOLVED, IGNORED

    -- 处理信息
    acknowledged_at TIMESTAMP,
    acknowledged_by VARCHAR(100),
    resolved_at TIMESTAMP,
    resolved_by VARCHAR(100),
    resolution_note TEXT,

    -- 时间
    triggered_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

-- 索引
CREATE INDEX idx_alert_records_task ON alert_records(task_id);
CREATE INDEX idx_alert_records_status ON alert_records(status) WHERE status = 'OPEN';
CREATE INDEX idx_alert_records_type ON alert_records(alert_type);
CREATE INDEX idx_alert_records_level ON alert_records(alert_level);
CREATE INDEX idx_alert_records_triggered ON alert_records(triggered_at DESC);

-- 注释
COMMENT ON TABLE alert_records IS '告警记录表';

2.8 操作审计表 (audit_logs)

用途:记录所有关键操作

CREATE TABLE audit_logs (
    -- 主键
    log_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

    -- 租户
    tenant_id UUID REFERENCES tenants(tenant_id) ON DELETE SET NULL,

    -- 操作信息
    operation_type VARCHAR(50) NOT NULL,
        -- 枚举值: CREATE_TASK, UPDATE_TASK, DELETE_TASK, START_TASK,
        --        STOP_TASK, UPDATE_MAPPING, etc.

    resource_type VARCHAR(50) NOT NULL,
        -- 枚举值: TASK, MAPPING, SCRIPT, TENANT

    resource_id UUID,

    -- 操作详情
    operation_details JSONB,
        -- 示例: {
        --   "before": {...},
        --   "after": {...},
        --   "changes": [...]
        -- }

    -- 操作结果
    status VARCHAR(20) NOT NULL,
        -- 枚举值: SUCCESS, FAILED, PARTIAL

    error_message TEXT,

    -- 操作者信息
    operator VARCHAR(100),
    operator_ip VARCHAR(45),
    user_agent TEXT,

    -- 时间
    operated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

-- 索引
CREATE INDEX idx_audit_logs_tenant ON audit_logs(tenant_id);
CREATE INDEX idx_audit_logs_operation ON audit_logs(operation_type);
CREATE INDEX idx_audit_logs_resource ON audit_logs(resource_type, resource_id);
CREATE INDEX idx_audit_logs_time ON audit_logs(operated_at DESC);

-- 注释
COMMENT ON TABLE audit_logs IS '操作审计日志表';

三、数据字典

3.1 枚举值定义

数据库类型 (DatabaseType)

MYSQL       - MySQL 5.7+
ORACLE      - Oracle 11g+
SQLSERVER   - SQL Server 2017+
POSTGRESQL  - PostgreSQL 12+

任务状态 (TaskStatus)

CREATED     - 已创建,未启动
RUNNING     - 运行中
PAUSED      - 已暂停
STOPPED     - 已停止
FAILED      - 失败
COMPLETED   - 已完成(全量同步)

健康状态 (HealthStatus)

HEALTHY     - 健康(延迟正常、无错误)
DEGRADED    - 降级(有轻微延迟或少量错误)
UNHEALTHY   - 不健康(严重延迟或大量错误)
UNKNOWN     - 未知(刚创建或连接失败)

同步模式 (SyncMode)

FULL_ONLY           - 仅全量同步
INCREMENTAL_ONLY    - 仅增量同步
FULL_INCREMENTAL    - 全量+增量

告警级别 (AlertLevel)

P0 - 紧急(服务中断)
P1 - 严重(影响数据同步)
P2 - 重要(性能问题)
P3 - 一般(提示信息)

四、初始化脚本

4.1 创建数据库

-- 创建数据库
CREATE DATABASE dbsync_metadata
    WITH
    OWNER = dbsync_user
    ENCODING = 'UTF8'
    LC_COLLATE = 'en_US.UTF-8'
    LC_CTYPE = 'en_US.UTF-8'
    TEMPLATE = template0;

-- 连接到数据库
\c dbsync_metadata;

-- 启用扩展
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS "pg_stat_statements";
CREATE EXTENSION IF NOT EXISTS "pg_trgm";  -- 用于文本模糊搜索

4.2 创建所有表

-- 按依赖顺序执行建表语句
\i 01_create_tenants.sql
\i 02_create_sync_tasks.sql
\i 03_create_table_mappings.sql
\i 04_create_transform_scripts.sql
\i 05_create_sync_offsets.sql
\i 06_create_sync_metrics.sql
\i 07_create_alert_records.sql
\i 08_create_audit_logs.sql

4.3 初始化数据

-- 插入默认租户
INSERT INTO tenants (tenant_id, tenant_name, tenant_code, status)
VALUES
    (gen_random_uuid(), 'Default Tenant', 'default', 'ACTIVE');

-- 创建默认管理员用户(如果有用户表)
-- INSERT INTO users ...

五、性能优化

5.1 分区策略

sync_metrics表按月分区

-- 创建父表
CREATE TABLE sync_metrics (
    ...
) PARTITION BY RANGE (recorded_date);

-- 创建分区
CREATE TABLE sync_metrics_2025_01 PARTITION OF sync_metrics
    FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');

CREATE TABLE sync_metrics_2025_02 PARTITION OF sync_metrics
    FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');

-- 自动创建分区的函数
CREATE OR REPLACE FUNCTION create_monthly_partitions(
    table_name TEXT,
    start_date DATE,
    end_date DATE
) RETURNS VOID AS $$
DECLARE
    partition_date DATE;
    partition_name TEXT;
    start_range TEXT;
    end_range TEXT;
BEGIN
    partition_date := DATE_TRUNC('month', start_date);

    WHILE partition_date < end_date LOOP
        partition_name := table_name || '_' || TO_CHAR(partition_date, 'YYYY_MM');
        start_range := TO_CHAR(partition_date, 'YYYY-MM-DD');
        end_range := TO_CHAR(partition_date + INTERVAL '1 month', 'YYYY-MM-DD');

        EXECUTE format(
            'CREATE TABLE IF NOT EXISTS %I PARTITION OF %I FOR VALUES FROM (%L) TO (%L)',
            partition_name, table_name, start_range, end_range
        );

        partition_date := partition_date + INTERVAL '1 month';
    END LOOP;
END;
$$ LANGUAGE plpgsql;

-- 使用示例
SELECT create_monthly_partitions('sync_metrics', '2025-01-01'::DATE, '2026-01-01'::DATE);

5.2 索引优化

-- 复合索引(常用查询组合)
CREATE INDEX idx_sync_tasks_tenant_status
    ON sync_tasks(tenant_id, status)
    WHERE deleted_at IS NULL;

-- 部分索引(只索引活跃数据)
CREATE INDEX idx_active_mappings
    ON table_mappings(task_id, is_enabled)
    WHERE deleted_at IS NULL AND is_enabled = true;

-- 表达式索引
CREATE INDEX idx_sync_tasks_source_host
    ON sync_tasks((source_connection_config->>'host'));

5.3 数据清理策略

-- 清理旧的监控数据(保留3个月)
CREATE OR REPLACE FUNCTION cleanup_old_metrics() RETURNS VOID AS $$
BEGIN
    DELETE FROM sync_metrics
    WHERE recorded_at < NOW() - INTERVAL '3 months';

    -- 清理旧的审计日志(保留1年)
    DELETE FROM audit_logs
    WHERE operated_at < NOW() - INTERVAL '1 year';
END;
$$ LANGUAGE plpgsql;

-- 创建定时任务
CREATE EXTENSION IF NOT EXISTS pg_cron;
SELECT cron.schedule('cleanup-metrics', '0 2 * * *', 'SELECT cleanup_old_metrics()');

六、备份和恢复

6.1 备份策略

# 全量备份
pg_dump -h localhost -U dbsync_user -d dbsync_metadata \
    -F c -f dbsync_metadata_$(date +%Y%m%d).dump

# 仅备份schema
pg_dump -h localhost -U dbsync_user -d dbsync_metadata \
    -s -f dbsync_metadata_schema.sql

# 仅备份数据
pg_dump -h localhost -U dbsync_user -d dbsync_metadata \
    -a -f dbsync_metadata_data.sql

6.2 恢复策略

# 恢复完整数据库
pg_restore -h localhost -U dbsync_user -d dbsync_metadata \
    -c dbsync_metadata_20250130.dump

# 恢复单个表
pg_restore -h localhost -U dbsync_user -d dbsync_metadata \
    -t sync_tasks dbsync_metadata_20250130.dump

文档版本:v1.0 最后更新:2025-01-30