Skip to content

Commit 2054982

Browse files
authored
Add support for Iceberg decimals (#537)
Write decimals as actual Iceberg decimals (and not byte arrays) when Kafka source connector is configured to use `precise` decimal handling mode.
1 parent 780ac54 commit 2054982

File tree

3 files changed

+114
-2
lines changed

3 files changed

+114
-2
lines changed

debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/RecordConverter.java

+12
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.slf4j.LoggerFactory;
2929

3030
import java.io.IOException;
31+
import java.math.BigDecimal;
32+
import java.math.BigInteger;
3133
import java.nio.ByteBuffer;
3234
import java.time.LocalDate;
3335
import java.time.LocalDateTime;
@@ -205,6 +207,16 @@ private Object jsonValToIcebergVal(Types.NestedField field, JsonNode node) {
205207
return node.floatValue();
206208
case DOUBLE: // double is represented in 64 bits
207209
return node.asDouble();
210+
case DECIMAL: {
211+
BigDecimal decimalVal = null;
212+
try {
213+
int scale = ((Types.DecimalType) field.type()).scale();
214+
decimalVal = new BigDecimal(new BigInteger(node.binaryValue()), scale);
215+
} catch (IOException e) {
216+
throw new RuntimeException("Failed to convert decimal value to iceberg value, field: " + field.name(), e);
217+
}
218+
return decimalVal;
219+
}
208220
case BOOLEAN:
209221
return node.asBoolean();
210222
case UUID:

debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/SchemaConverter.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ private IcebergSchemaInfo debeziumFieldToIcebergField(JsonNode fieldSchema, Stri
9696
return schemaData;
9797
default:
9898
// its primitive field
99-
final Types.NestedField field = Types.NestedField.of(schemaData.nextFieldId().getAndIncrement(), !isPkField, fieldName, icebergPrimitiveField(fieldName, fieldType, fieldTypeName));
99+
final Types.NestedField field = Types.NestedField.of(schemaData.nextFieldId().getAndIncrement(), !isPkField, fieldName, icebergPrimitiveField(fieldName, fieldType, fieldTypeName, fieldSchema));
100100
schemaData.fields().add(field);
101101
if (isPkField) schemaData.identifierFieldIds().add(field.fieldId());
102102
return schemaData;
@@ -182,7 +182,7 @@ public Schema icebergSchema() {
182182

183183
}
184184

185-
private Type.PrimitiveType icebergPrimitiveField(String fieldName, String fieldType, String fieldTypeName) {
185+
private Type.PrimitiveType icebergPrimitiveField(String fieldName, String fieldType, String fieldTypeName, JsonNode fieldSchema) {
186186
// Debezium Temporal types: https://debezium.io/documentation//reference/connectors/postgresql.html#postgresql-temporal-types
187187
switch (fieldType) {
188188
case "int8":
@@ -247,6 +247,16 @@ private Type.PrimitiveType icebergPrimitiveField(String fieldName, String fieldT
247247
case "uuid":
248248
return Types.UUIDType.get();
249249
case "bytes":
250+
// With `decimal.handling.mode` set to `precise` debezium relational source connector would encode decimals
251+
// as byte arrays. We want to write them out as Iceberg decimals.
252+
if (fieldTypeName.equals("org.apache.kafka.connect.data.Decimal")) {
253+
JsonNode params = fieldSchema.get("parameters");
254+
if (!params.isNull() && !params.isEmpty()) {
255+
int precision = params.get("connect.decimal.precision").asInt();
256+
int scale = params.get("scale").asInt();
257+
return Types.DecimalType.of(precision, scale);
258+
}
259+
}
250260
return Types.BinaryType.get();
251261
default:
252262
// default to String type
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
*
3+
* * Copyright memiiso Authors.
4+
* *
5+
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
*/
8+
9+
package io.debezium.server.iceberg;
10+
11+
import com.google.common.collect.Lists;
12+
import io.debezium.relational.RelationalDatabaseConnectorConfig;
13+
import io.debezium.server.iceberg.testresources.CatalogJdbc;
14+
import io.debezium.server.iceberg.testresources.S3Minio;
15+
import io.debezium.server.iceberg.testresources.SourcePostgresqlDB;
16+
import io.quarkus.test.common.QuarkusTestResource;
17+
import io.quarkus.test.junit.QuarkusTest;
18+
import io.quarkus.test.junit.QuarkusTestProfile;
19+
import io.quarkus.test.junit.TestProfile;
20+
import org.apache.iceberg.catalog.Namespace;
21+
import org.apache.iceberg.catalog.TableIdentifier;
22+
import org.apache.iceberg.data.Record;
23+
import org.apache.iceberg.io.CloseableIterable;
24+
import org.apache.spark.sql.Dataset;
25+
import org.apache.spark.sql.Row;
26+
import org.apache.spark.sql.types.DataTypes;
27+
import org.awaitility.Awaitility;
28+
import org.junit.jupiter.api.Assertions;
29+
import org.junit.jupiter.api.Test;
30+
31+
import java.time.Duration;
32+
import java.util.HashMap;
33+
import java.util.Map;
34+
import java.util.Objects;
35+
36+
import static io.debezium.server.iceberg.TestConfigSource.ICEBERG_CATALOG_TABLE_NAMESPACE;
37+
import static org.junit.jupiter.api.Assertions.assertEquals;
38+
import static org.mockito.Mockito.when;
39+
40+
/**
41+
* Integration test that verifies basic reading from PostgreSQL database and writing to iceberg destination.
42+
*
43+
* @author Ismail Simsek
44+
*/
45+
@QuarkusTest
46+
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
47+
@QuarkusTestResource(value = SourcePostgresqlDB.class, restrictToAnnotatedClass = true)
48+
@QuarkusTestResource(value = CatalogJdbc.class, restrictToAnnotatedClass = true)
49+
@TestProfile(IcebergChangeConsumerDecimalTest.TestProfile.class)
50+
public class IcebergChangeConsumerDecimalTest extends BaseSparkTest {
51+
52+
@Test
53+
public void testConsumingNumerics() throws Exception {
54+
assertEquals(sinkType, "iceberg");
55+
String sql = "\n" +
56+
" DROP TABLE IF EXISTS inventory.data_types;\n" +
57+
" CREATE TABLE IF NOT EXISTS inventory.data_types (\n" +
58+
" c_id INTEGER ,\n" +
59+
" c_decimal DECIMAL(18,6)\n" +
60+
" );";
61+
SourcePostgresqlDB.runSQL(sql);
62+
sql = "INSERT INTO inventory.data_types (c_id, c_decimal) " +
63+
"VALUES (1, '1234566.34456'::decimal)";
64+
SourcePostgresqlDB.runSQL(sql);
65+
Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> {
66+
try {
67+
Dataset<Row> df = getTableData("testc.inventory.data_types");
68+
df.show(false);
69+
70+
Assertions.assertEquals(1, df.count());
71+
Assertions.assertEquals(1, df.filter("c_id = 1 AND c_decimal = CAST('1234566.344560' AS DECIMAL(18,6))").count(), "c_decimal not matching");
72+
return true;
73+
} catch (Exception e) {
74+
e.printStackTrace();
75+
return false;
76+
}
77+
});
78+
}
79+
80+
public static class TestProfile implements QuarkusTestProfile {
81+
@Override
82+
public Map<String, String> getConfigOverrides() {
83+
Map<String, String> config = new HashMap<>();
84+
config.put("debezium.sink.iceberg.destination-regexp", "\\d");
85+
config.put("debezium.source.decimal.handling.mode", "precise");
86+
return config;
87+
}
88+
}
89+
90+
}

0 commit comments

Comments
 (0)