Skip to content

mysql5.7同步数据到postgresql14会现现精度丢失 #10198

@chenweihua

Description

@chenweihua

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

数据源值为:126.752251 44.916103
同步到pg后为: 126.752250 44.916103

源值:"Longitude":126.752251
"Latitude":44.916103,生成的插入语句为:INSERT INTO "map"."public"."t_station" ("id", "station_code", "station_name", "province_id", "city_id", "county_id", "address", "longitude", "latitude", "create_time", "status", "update_time", "area_id", "area_name", "source_table") VALUES (('1'::int4), ('....'), ('....'), ('43'::int4), ('4309'::int4), ('430902'::int4),
('....'), ('126.75225000'::numeric), ('44.91610300'::numeric) 数据是有差异

SeaTunnel Version

v2.3.12

SeaTunnel Config

env {
  execution.parallelism = 1
  job.name = "map_station"
  job.mode = "STREAMING"
  # checkpoint配置优化
  checkpoint.interval = 120000
  checkpoint.timeout = 1800000
  checkpoint.min.pause.between = 60000
  checkpoint.max.concurrent = 1
  checkpoint.tolerable.failed.num = 3
  # 引擎配置优化
  seatunnel.engine.checkpoint.interval = 120000
  seatunnel.engine.checkpoint.timeout = 1800000
}

source {
  MySQL-CDC {
    plugin_output = "mysql_cdc_source"
    url = "jdbc:mysql://127.0.0.1:3306/logistics?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
    port = 3306
    username = "root"
    password = "password"
    table-names = ["logistics.t_station"]
    # 心跳检测配置
    heartbeat-interval = 30000
    # 连接超时配置
    connect-timeout = 120000
    socket-timeout = 240000 
    # CDC配置
    server-id = 12720048
    startup.mode = "initial"
    # 连接配置
    connect.timeout = "30s"
    connection.pool.size = 10
    # 增量同步配置
    debezium.snapshot.mode = "initial"
    debezium.snapshot.locking.mode = "minimal"
    # 重试配置
    max-retries = 5
    retry-backoff = 2000
     debezium.decimal.handling.mode = "deciaml"
     debezium.float.handling.mode = "float"
     debezium.double.handling.mode = "double"
     debezium.time.precision.mode = "adaptive"
     debezium.database.history.skip.unparseable.ddl = true
     debezium.value.converter.encoding = "UTF-8"
     debezium.key.converter.encoding = "UTF-8"
  }
}

transform {
  Sql {
    source_table_name = "mysql_cdc_source"
    result_table_name = "t_station_transformed"
    query = """
      SELECT 
        `Id` as id,
        `Code` as code,
        `Name` as name,
        `ProvinceId` as province_id,
        `CityId` as city_id,
        `RegionId` as county_id,
        `Address` as address,
        CASE 
          WHEN Longitude IS NOT NULL THEN
            ROUND(CAST(Longitude  AS DECIMAL(12,6)), 6)
            
          ELSE 
            NULL 
        END as longitude,
        CASE 
          WHEN `Latitude` IS NOT NULL THEN 
            CAST(CAST(`Latitude` AS STRING) AS DECIMAL(12,6))
          ELSE NULL 
        END as latitude,
        `CreateTime` as create_time,
        `Status` as status,
        `ModifyTime` as update_time,
        `area` as area,
        `areaName` as area_name
      FROM tb_station_source
    """
  }
  
}

sink {
  jdbc {
    plugin_input ="t_station_transformed"
    url = "jdbc:postgresql://127.0.0.1:5432/map?connectTimeout=60&socketTimeout=600"
    # HikariCP连接池核心配置    
    connection_check_timeout_sec = 30
    transaction_timeout_sec=600
    auto_commit=true
    column.type.mapping = "longitude=NUMERIC(10,6),latitude=NUMERIC(10,6)"
    # 禁用自动类��转换,确保精度不丢失
    disable.type.convert = false
    string_column_to_numeric = true
    properties {
      connectionTimeout=3000
      idleTimeout=600000
      keepaliveTime=true
      maxLifetime=1800000
      connectionTestQuery="select 1"
      minimumIdle=1
      maximumPoolSize=5
      initializationFailTimeout=1
      validationTimeout=5000
      leakDetectionThreshold=2000
      idle_timeout_ms = 600000
      max_lifetime_ms = 1800000
      validation_timeout_ms = 5000
      leak_detection_threshold_ms = 60000
      connection_pool_size        = 5
      connection_validation_query = "SELECT 1"
      reWriteBatchedInserts = true
      socket_timeout = 300
      login_timeout = 30
      tcp_keep_alive = true
    }
    driver = "org.postgresql.Driver"
    username = "test"
    password = "password"
    database="map"
    table = "public.t_station"
    generate_sink_sql = true    
    # 主键配置,用于upsert操作
    primary_keys = ["id"]
    support_upsert_by_insert_only = false
    write_mode = "upsert"    
    # 批量写入优化
    batch_size = 1000
    batch_interval_ms = 1000
    # 会话重试机制
    max_retries = 5
    retry_backoff_multiplier_ms = 2
    retry_backoff_initial_ms = 1000
    retry_backoff_max_ms = 30000
    # 写入超时配置
    write_timeout = 120000
    # 事务配置
    transaction.timeout = 60000
 }
}

Running Command

bin/seatunnel.sh --config

Error Exception

没有异常

Zeta or Flink or Spark Version

zeta模式

Java or Scala Version

1.8

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions