diff --git a/flink-join/README.md b/flink-join/README.md new file mode 100644 index 00000000..d4d98e33 --- /dev/null +++ b/flink-join/README.md @@ -0,0 +1,88 @@ +# Flink Foreign Key Joins + +This example demonstrates how two Debezium change data topics can be joined via Flink. + +The source database contains two tables, `customers` and `addresses`, with a foreign key relationship from the latter to the former, +i.e. a customer can have multiple addresses. + +Using Flink the change event for the parent customers are represented as a dynamic table defined with CREATE TABLE, while the child addresses are represented as a stream of pojos that are first aggregated by the foreign key. +Each insertion, update or deletion of a record on either side will re-trigger the join. + +## Building + +Prepare the Java components by first performing a Maven build. + +```console +$ mvn clean install +``` + +## Environment + +Setup the necessary environment variables + +```console +$ export DEBEZIUM_VERSION=1.2 + +``` + +The `DEBEZIUM_VERSION` specifies which version of Debezium artifacts should be used. + +## Start the demo + +Start all Debezium components: + +```console +$ docker-compose up connect +``` + +This creates the kafka connect service and all dependent services defined in the `docker-compose.yaml` file. + +## Configure the Debezium connector + +Register the connector to stream outbox changes from the order service: + +```console +$ http PUT http://localhost:8083/connectors/inventory-connector/config < register-postgres.json +HTTP/1.1 201 Created +``` +## Run the Flink Job + +To run the Flink job in local mode, simply compile and start the job class: + +```console +$ mvn clean install +$ mvn exec:java \ + -Dexec.mainClass="io.debezium.examples.flink.join.FlinkJoinTableStream" \ + -Dexec.classpathScope=compile +``` + +To run the Flink job against a remote cluster is a little more involved. The simplest approach is to create a docker compose session cluster (docker-compose up jobmanager) then copy the Flink Kafka dependency to the lib and follow the instructions for submitting this project jar as a job - see the [Flink docs](https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html#session-cluster-with-docker-compose). + +## Review the outcome + +Examine the joined events using _kafkacat_: + +```console +$ docker run --tty --rm \ + --network kstreams-fk-join-network \ + debezium/tooling:1.1 \ + kafkacat -b kafka:9092 -C -o beginning -q \ + -t customers-with-addresses | jq . +``` + +## Useful Commands + +Getting a session in the Postgres DB of the "order" service: + +```console +$ docker run --tty --rm -i \ + --network kstreams-fk-join-network \ + debezium/tooling:1.1 \ + bash -c 'pgcli postgresql://postgres:postgres@postgres:5432/postgres' +``` + +E.g. to update a customer record: + +```sql +update inventory.customers set first_name = 'Sarah' where id = 1001; +``` diff --git a/flink-join/docker-compose.yaml b/flink-join/docker-compose.yaml new file mode 100644 index 00000000..a22b6c53 --- /dev/null +++ b/flink-join/docker-compose.yaml @@ -0,0 +1,76 @@ +version: '3.5' + +services: + + zookeeper: + image: debezium/zookeeper:${DEBEZIUM_VERSION} + ports: + - 2181:2181 + - 2888:2888 + - 3888:3888 + networks: + - my-network + kafka: + image: debezium/kafka:${DEBEZIUM_VERSION} + ports: + - 9092:9092 + links: + - zookeeper + environment: + - ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_GROUP_MIN_SESSION_TIMEOUT_MS=100 + networks: + - my-network + + postgres: + image: debezium/example-postgres:${DEBEZIUM_VERSION} + ports: + - 5432:5432 + environment: + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=postgres + volumes: + - ./inventory-addresses.sql:/docker-entrypoint-initdb.d/zzz.sql + networks: + - my-network + + connect: + image: debezium/connect:${DEBEZIUM_VERSION} + ports: + - 8083:8083 + links: + - kafka + - postgres + environment: + - BOOTSTRAP_SERVERS=kafka:9092 + - GROUP_ID=1 + - CONFIG_STORAGE_TOPIC=my_connect_configs + - OFFSET_STORAGE_TOPIC=my_connect_offsets + - STATUS_STORAGE_TOPIC=my_connect_statuses + networks: + - my-network + + jobmanager: + image: flink:1.11.2-scala_2.11 + ports: + - "8081:8081" + command: jobmanager + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: jobmanager + + taskmanager: + image: flink:1.11.2-scala_2.11 + depends_on: + - jobmanager + command: taskmanager + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: jobmanager + taskmanager.numberOfTaskSlots: 2 + +networks: + my-network: + name: flink-join-network diff --git a/flink-join/inventory-addresses.sql b/flink-join/inventory-addresses.sql new file mode 100644 index 00000000..8c4ffae9 --- /dev/null +++ b/flink-join/inventory-addresses.sql @@ -0,0 +1,18 @@ +CREATE TABLE inventory.addresses ( + id SERIAL NOT NULL PRIMARY KEY, + customer_id INTEGER NOT NULL, + street VARCHAR(255) NOT NULL, + city VARCHAR(255) NOT NULL, + zipcode VARCHAR(255) NOT NULL, + country VARCHAR(255) NOT NULL, + FOREIGN KEY (customer_id) REFERENCES inventory.customers(id) +); +ALTER SEQUENCE inventory.addresses_id_seq RESTART WITH 100001; +ALTER TABLE inventory.addresses REPLICA IDENTITY FULL; + +INSERT INTO inventory.addresses +VALUES (default, 1001, '42 Main Street', 'Hamburg', '90210', 'Canada'), + (default, 1001, '11 Post Dr.', 'Berlin', '90211', 'Canada'), + (default, 1002, '12 Rodeo Dr.', 'Los Angeles', '90212', 'US'), + (default, 1002, '1 Debezium Plaza', 'Monterey', '90213', 'US'), + (default, 1002, '2 Debezium Plaza', 'Monterey', '90213', 'US'); diff --git a/flink-join/pom.xml b/flink-join/pom.xml new file mode 100644 index 00000000..0ff6edd5 --- /dev/null +++ b/flink-join/pom.xml @@ -0,0 +1,71 @@ + + + 4.0.0 + + io.debezium.examples.flink.join + flink-join + 1.0.0-SNAPSHOT + + 1.8 + 1.8 + UTF-8 + UTF-8 + 2.22.0 + + + + org.apache.flink + flink-table-api-java-bridge_2.11 + 1.11.2 + provided + + + org.apache.flink + flink-table-planner-blink_2.11 + 1.11.2 + provided + + + org.apache.flink + flink-streaming-scala_2.11 + 1.11.2 + provided + + + org.apache.flink + flink-sql-connector-kafka_2.11 + 1.11.2 + + + org.apache.flink + flink-table-common + 1.11.2 + provided + + + org.apache.flink + flink-json + 1.11.2 + provided + + + org.apache.flink + flink-clients_2.11 + 1.11.2 + + + + + + maven-surefire-plugin + ${surefire-plugin.version} + + + org.jboss.logmanager.LogManager + + + + + + diff --git a/flink-join/register-postgres.json b/flink-join/register-postgres.json new file mode 100644 index 00000000..6cae5838 --- /dev/null +++ b/flink-join/register-postgres.json @@ -0,0 +1,16 @@ +{ + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + "database.hostname": "postgres", + "database.port": "5432", + "database.user": "postgres", + "database.password": "postgres", + "database.dbname" : "postgres", + "database.server.name": "dbserver1", + "schema.whitelist": "inventory", + "decimal.handling.mode" : "string", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false" +} diff --git a/flink-join/src/main/java/io/debezium/examples/flink/join/FlinkJoin.java b/flink-join/src/main/java/io/debezium/examples/flink/join/FlinkJoin.java new file mode 100644 index 00000000..a6350f88 --- /dev/null +++ b/flink-join/src/main/java/io/debezium/examples/flink/join/FlinkJoin.java @@ -0,0 +1,95 @@ +package io.debezium.examples.flink.join; + +import java.util.Properties; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.formats.json.JsonRowSerializationSchema; +import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +/** + * Performs an inner join of a customer and an address + */ +public class FlinkJoin { + + public static String TOPIC_OUT = "customer-with-address"; + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + + tableEnv.executeSql("CREATE TABLE customers (\n" + + " id INT PRIMARY KEY,\n" + + " first_name STRING,\n" + + " last_name STRING,\n" + + " email STRING\n" + + ") WITH (\n" + + " 'connector' = 'kafka',\n" + + " 'topic' = 'dbserver1.inventory.customers',\n" + + " 'properties.bootstrap.servers' = 'localhost:9092',\n" + + " 'properties.group.id' = '1',\n" + + " 'format' = 'debezium-json',\n" + + " 'scan.startup.mode' = 'earliest-offset'\n" + + ")"); + + tableEnv.executeSql("CREATE TABLE addresses (\n" + + " id BIGINT PRIMARY KEY,\n" + + " customer_id INT,\n" + + " street STRING,\n" + + " city STRING,\n" + + " zipcode STRING,\n" + + " country STRING\n" + + ") WITH (\n" + + " 'connector' = 'kafka',\n" + + " 'topic' = 'dbserver1.inventory.addresses',\n" + + " 'properties.bootstrap.servers' = 'localhost:9092',\n" + + " 'properties.group.id' = '1',\n" + + " 'format' = 'debezium-json',\n" + + " 'scan.startup.mode' = 'earliest-offset'\n" + + ")"); + + Table addressWithEmail = tableEnv.sqlQuery("select c.id, c.email, a.country, a.id as address_id " + + "from customers as c inner join addresses as a on c.id = a.customer_id"); + + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", "localhost:9092"); + + //-- coming soon in flink, we should be able to output a changelog/cdc stream + //DebeziumJsonSerializationSchema schema ... + + //we cannot directly write this table result, which has a retract stream as input + //into an output kafka table, so we create an output stream to direct into the output topic + //we have to filter the before update as we want an unwrapped result + DataStream> output = tableEnv.toRetractStream(addressWithEmail, Row.class) + .filter((t)->t.f1.getKind()!=RowKind.UPDATE_BEFORE); + + TypeInformation rowType = ((TupleTypeInfo)output.getType()).getTypeAt(1); + JsonRowSerializationSchema rowSerialization = JsonRowSerializationSchema.builder().withTypeInfo(rowType).build(); + //output the key as the first field + JsonRowSerializationSchema keySerialization = JsonRowSerializationSchema.builder() + .withTypeInfo(new RowTypeInfo(Types.INT)).build(); + + FlinkKafkaProducer> kafkaProducer = new FlinkKafkaProducer>(TOPIC_OUT, + ((record, timestamp) -> new ProducerRecord(TOPIC_OUT, + keySerialization.serialize(record.f1), + record.f0 ? rowSerialization.serialize(record.f1) : null)), + properties, + Semantic.EXACTLY_ONCE); + + output.addSink(kafkaProducer); + + env.execute("Debezium Join"); + } + +} diff --git a/flink-join/src/main/java/io/debezium/examples/flink/join/FlinkJoinTableStream.java b/flink-join/src/main/java/io/debezium/examples/flink/join/FlinkJoinTableStream.java new file mode 100644 index 00000000..73c0a5fd --- /dev/null +++ b/flink-join/src/main/java/io/debezium/examples/flink/join/FlinkJoinTableStream.java @@ -0,0 +1,175 @@ +package io.debezium.examples.flink.join; + +import static org.apache.flink.table.api.Expressions.$; + +import java.io.Serializable; +import java.util.Properties; +import java.util.stream.StreamSupport; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.formats.json.JsonRowSerializationSchema; +import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic; +import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; + +import io.debezium.examples.flink.join.model.Address; + +public class FlinkJoinTableStream { + + public interface Mapper extends Serializable { + + T map(ObjectMapper mapper, JsonNode object); + + } + + /** + * Specialized unwrap of Debezium payloads that conveys the key, whether it's an upsert or delete, and the full record + * this is similar to the handling with flink debezium tables, but does not create both a before and after update + */ + public static class DebeziumUnwrap extends RichMapFunction> { + + ObjectMapper objectMapper = new ObjectMapper(); + Mapper keyMapper; + Mapper valueMapper; + + public DebeziumUnwrap(Mapper keyMapper, Mapper valueMapper) { + this.keyMapper = keyMapper; + this.valueMapper = valueMapper; + } + + @Override + public Tuple3 map(ObjectNode r) throws Exception { + JsonNode jsonNode = r.get("value"); + String op = jsonNode.get("op").asText(); + JsonNode value = jsonNode.get("after"); + boolean upsert = true; + if ("d".equals(op)) { + value = jsonNode.get("before"); + upsert = false; + } + return Tuple3.of(keyMapper.map(objectMapper, r.get("key")), upsert, valueMapper.map(objectMapper, value)); + } + } + + /** + * Specialized aggregate that collects rows by key "id", and outputs by "customer_id" + * The input matches the output of the DebeziumUnwrap above, but with the expectation that the stream is partitioned by "customer_id" + */ + public static class AddressAgg extends RichMapFunction, Tuple2> { + + private transient MapState map; + + @Override + public Tuple2 map(Tuple3 value) throws Exception { + if (value.f1) { + map.put(value.f2.id, value.f2); + } else { + map.remove(value.f2.id); + } + + return Tuple2.of(value.f2.customer_id, + StreamSupport.stream(map.values().spliterator(), false).toArray(Address[]::new)); + } + + @Override + public void open(Configuration config) { + map = getRuntimeContext().getMapState(new MapStateDescriptor<>("values", Long.class, Address.class)); + } + } + + public static String TOPIC_OUT = "customers-with-addresses"; + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + + //add customers as a table + tableEnv.executeSql("CREATE TABLE customers (\n" + + " id INT PRIMARY KEY,\n" + + " first_name STRING,\n" + + " last_name STRING,\n" + + " email STRING\n" + + ") WITH (\n" + + " 'connector' = 'kafka',\n" + + " 'topic' = 'dbserver1.inventory.customers',\n" + + " 'properties.bootstrap.servers' = 'localhost:9092',\n" + + " 'properties.group.id' = '1',\n" + + " 'format' = 'debezium-json',\n" + + " 'scan.startup.mode' = 'earliest-offset'\n" + + ")"); + + //add addresses as a stream + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", "localhost:9092"); + properties.setProperty("group.id", "1"); + + FlinkKafkaConsumer function = new FlinkKafkaConsumer<>("dbserver1.inventory.addresses", + new JSONKeyValueDeserializationSchema(false), properties); + function.setStartFromEarliest(); + DataStream addressStream = env.addSource(function); + + //consume the generic json stream with specialized debezium handling + //to build what is effectively an append stream of address aggregates + DataStream> agg = addressStream + .filter((r)->r.has("value")) //skip tombstones + .map(new DebeziumUnwrap((mapper, n) -> n.asLong(), + (mapper, n) -> mapper.convertValue(n, Address.class)) + , Types.TUPLE(Types.LONG, Types.BOOLEAN, TypeInformation.of(Address.class))) + .keyBy((t)->t.f2.customer_id) + .map(new AddressAgg()); + + //join customers to table created from the aggregation. we're using the table api without idle state expiration + //to process the join indefinitely without a window + Table join = tableEnv.from("customers") + .leftOuterJoin(tableEnv.fromDataStream(agg, $("customer_id"), $("addresses")), $("id").isEqual($("customer_id"))) + .dropColumns($("customer_id")); //drop the id from the right side + + //we cannot directly write this table result, which has a retract stream as input + //into an output kafka table, so we create an output stream to direct into the output topic + + //we don't have to filter before update events as they don't make it through this join + //but that isn't great because that seems to give us ephemeral deletes on some modifications + DataStream> output = tableEnv.toRetractStream(join, Row.class); + + //we have to use the appropriate type information for serialization + //under the covers flink has substituted Row instances for the pojos + //the stream type reflects that + TypeInformation rowType = ((TupleTypeInfo)output.getType()).getTypeAt(1); + JsonRowSerializationSchema rowSerialization = JsonRowSerializationSchema.builder().withTypeInfo(rowType).build(); + //output the key as the first field + JsonRowSerializationSchema keySerialization = JsonRowSerializationSchema.builder() + .withTypeInfo(new RowTypeInfo(Types.INT)).build(); + + //only output the value on upsert events, null on delete + FlinkKafkaProducer> kafkaProducer = new FlinkKafkaProducer>(TOPIC_OUT, + ((record, timestamp) -> new ProducerRecord(TOPIC_OUT, + keySerialization.serialize(record.f1), + record.f0 ? rowSerialization.serialize(record.f1) : null)), + properties, + Semantic.EXACTLY_ONCE); + + output.addSink(kafkaProducer); + + env.execute("Debezium Join"); + } + +} diff --git a/flink-join/src/main/java/io/debezium/examples/flink/join/model/Address.java b/flink-join/src/main/java/io/debezium/examples/flink/join/model/Address.java new file mode 100644 index 00000000..c1c39e99 --- /dev/null +++ b/flink-join/src/main/java/io/debezium/examples/flink/join/model/Address.java @@ -0,0 +1,35 @@ +package io.debezium.examples.flink.join.model; + +public class Address { + + public long id; + public int customer_id; + public String street; + public String city; + public String zipcode; + public String country; + + @Override + public String toString() { + return "Address [id=" + id + ", customer_id=" + customer_id + ", street=" + street + ", city=" + city + + ", zipcode=" + zipcode + ", country=" + country + "]"; + } + + @Override + public int hashCode() { + return Long.hashCode(id); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof Address)) { + return false; + } + Address other = (Address)obj; + return other.id == id; + } + +} diff --git a/kstreams-fk-join/README.md b/kstreams-fk-join/README.md index 25a221d2..1e212464 100644 --- a/kstreams-fk-join/README.md +++ b/kstreams-fk-join/README.md @@ -47,7 +47,7 @@ This executes all configurations set forth by the `docker-compose.yaml` file. ## Configure the Debezium connector -Register the connector that to stream outbox changes from the order service: +Register the connector to stream outbox changes from the order service: ```console $ http PUT http://localhost:8083/connectors/inventory-connector/config < register-postgres.json diff --git a/kstreams-fk-join/aggregator/src/main/java/io/debezium/examples/kstreams/fkjoin/util/JsonObjectSerde.java b/kstreams-fk-join/aggregator/src/main/java/io/debezium/examples/kstreams/fkjoin/util/JsonObjectSerde.java deleted file mode 100644 index 403fa1ae..00000000 --- a/kstreams-fk-join/aggregator/src/main/java/io/debezium/examples/kstreams/fkjoin/util/JsonObjectSerde.java +++ /dev/null @@ -1,84 +0,0 @@ -package io.debezium.examples.kstreams.fkjoin.util; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.Map; - -import javax.json.Json; -import javax.json.JsonObject; -import javax.json.JsonReader; - -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serializer; - -/** - * A {@link Serde} that (de-)serializes JSON. - */ -public class JsonObjectSerde implements Serde { - - public JsonObjectSerde() { - } - - @Override - public void configure(Map configs, boolean isKey) { - } - - @Override - public void close() { - } - - @Override - public Serializer serializer() { - return new JsonSerializer(); - } - - @Override - public Deserializer deserializer() { - return new JsonDeserializer(); - } - - private final class JsonDeserializer implements Deserializer { - - @Override - public void configure(Map configs, boolean isKey) { - } - - @Override - public JsonObject deserialize(String topic, byte[] data) { - if (data == null) { - return null; - } - - try (JsonReader reader = Json.createReader(new ByteArrayInputStream(data))) { - return reader.readObject(); - } - } - - @Override - public void close() { - } - } - - private final class JsonSerializer implements Serializer { - - @Override - public void configure(Map configs, boolean isKey) { - } - - @Override - public byte[] serialize(String topic, JsonObject data) { - try (ByteArrayOutputStream output = new ByteArrayOutputStream()) { - Json.createWriter(output).writeObject(data); - return output.toByteArray(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void close() { - } - } -}