-
Notifications
You must be signed in to change notification settings - Fork 53
Description
你好我的bireme启动后,从debezium同步到PG时,出现数据转换错误,请帮忙看一下,折腾两天了,谢谢!
------------------------------------------------------------------------------------------
kafka-avro-console-consumer输出:
{"before":null,"after":{"debezium1.kafka_test_db.kafka_test_table.Value":{"name":{"string":"zhangsan"},"age":{"int":18}}},"source":{"version":{"string":"0.9.2.Final"},"connector":{"string":"mysql"},"name":"debezium1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":9899,"row":0,"snapshot":{"boolean":true},"thread":null,"db":{"string":"kafka_test_db"},"table":{"string":"kafka_test_table"},"query":null},"op":"c","ts_ms":{"long":1553828086387}}
------------------------------------------------------------------------------------------
bireme.out报错:
12:51:00 INFO Bireme - initialize Bireme daemon
12:51:00 INFO class cn.hashdata.bireme.Config - Configures:
pipeline thread pool size = 5
transform thread pool size = 10
merge thread pool size = 10
merge interval = 10000
batch size = 50000
loader connection size = 1
loader task queue size = 2
loaders count = 1
reporter = console
report interval = 15
state server addr = 0.0.0.0
state server port = 6666
target database url = jdbc:postgresql://127.0.0.1:5432/syncdb
12:51:00 INFO class cn.hashdata.bireme.Config - Data Source:
Type: DEBEZIUM Name: debezium1
12:51:00 INFO Bireme - start Bireme daemon.
12:51:00 INFO Bireme - Start getting metadata of target tables from target database.
12:51:00 INFO class cn.hashdata.bireme.GetPrimaryKeys - MySQL、Greenplum table check completed, the state is okay!
12:51:00 INFO class cn.hashdata.bireme.GetPrimaryKeys - Greenplum table primary key check is completed, the state is okay!
12:51:00 INFO Bireme - Finish getting metadata of target tables from target database.
12:51:00 INFO Bireme - Start establishing connections for loaders.
12:51:01 INFO Bireme - Finishing establishing 1 connections for loaders.
12:51:01 INFO Scheduler - Scheduler start working.
12:51:01 ERROR Debezium-debezium1-debezium1.kafka_test_db.kafka_test_table - Dispatch failed. Message: Transform failed.
12:51:01 ERROR Debezium-debezium1-debezium1.kafka_test_db.kafka_test_table - Stack Trace:
cn.hashdata.bireme.BiremeException: Transform failed.
at cn.hashdata.bireme.Dispatcher.dispatch(Dispatcher.java:60) ~[bireme-2.0.0-alpha-1.jar:?]
at cn.hashdata.bireme.pipeline.PipeLine.startDispatch(PipeLine.java:150) [bireme-2.0.0-alpha-1.jar:?]
at cn.hashdata.bireme.pipeline.PipeLine.executePipeline(PipeLine.java:106) [bireme-2.0.0-alpha-1.jar:?]
at cn.hashdata.bireme.pipeline.PipeLine.call(PipeLine.java:87) [bireme-2.0.0-alpha-1.jar:?]
at cn.hashdata.bireme.pipeline.PipeLine.call(PipeLine.java:39) [bireme-2.0.0-alpha-1.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_201]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_201]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_201]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_201]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_201]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
Caused by: com.google.gson.JsonSyntaxException: com.google.gson.stream.MalformedJsonException: Use JsonReader.setLenient(true) to accept malformed JSON at line 2 column 2
path $
at com.google.gson.JsonParser.parse(JsonParser.java:65) ~[gson-2.8.0.jar:?]
at com.google.gson.JsonParser.parse(JsonParser.java:45) ~[gson-2.8.0.jar:?]
at cn.hashdata.bireme.pipeline.DebeziumPipeLine$DebeziumTransformer.transform(DebeziumPipeLine.java:89) ~[bireme-2.0.0-alpha-1.jar:?]
at cn.hashdata.bireme.pipeline.KafkaPipeLine$KafkaTransformer.fillRowSet(KafkaPipeLine.java:113) ~[bireme-2.0.0-alpha-1.jar:?]
at cn.hashdata.bireme.pipeline.PipeLine$Transformer.call(PipeLine.java:273) ~[bireme-2.0.0-alpha-1.jar:?]
at cn.hashdata.bireme.pipeline.PipeLine$Transformer.call(PipeLine.java:248) ~[bireme-2.0.0-alpha-1.jar:?]
... 4 more
Caused by: com.google.gson.stream.MalformedJsonException: Use JsonReader.setLenient(true) to accept malformed JSON at line 2 column 2 path $
at com.google.gson.stream.JsonReader.syntaxError(JsonReader.java:1559) ~[gson-2.8.0.jar:?]
at com.google.gson.stream.JsonReader.checkLenient(JsonReader.java:1401) ~[gson-2.8.0.jar:?]
at com.google.gson.stream.JsonReader.doPeek(JsonReader.java:542) ~[gson-2.8.0.jar:?]
at com.google.gson.stream.JsonReader.peek(JsonReader.java:425) ~[gson-2.8.0.jar:?]
at com.google.gson.JsonParser.parse(JsonParser.java:60) ~[gson-2.8.0.jar:?]
at com.google.gson.JsonParser.parse(JsonParser.java:45) ~[gson-2.8.0.jar:?]
at cn.hashdata.bireme.pipeline.DebeziumPipeLine$DebeziumTransformer.transform(DebeziumPipeLine.java:89) ~[bireme-2.0.0-alpha-1.jar:?]
at cn.hashdata.bireme.pipeline.KafkaPipeLine$KafkaTransformer.fillRowSet(KafkaPipeLine.java:113) ~[bireme-2.0.0-alpha-1.jar:?]
at cn.hashdata.bireme.pipeline.PipeLine$Transformer.call(PipeLine.java:273) ~[bireme-2.0.0-alpha-1.jar:?]
at cn.hashdata.bireme.pipeline.PipeLine$Transformer.call(PipeLine.java:248) ~[bireme-2.0.0-alpha-1.jar:?]
... 4 more
12:51:01 INFO Scheduler - All pipeline stop.