Skip to content

Commit a12c29a

Browse files
[FLINK-39200][mysql] Skip insert/update/delete binlog deserialization of unsubscribed tables in MySql CDC binary log client.
1 parent c05af15 commit a12c29a

5 files changed

Lines changed: 343 additions & 6 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.github.shyiko.mysql.binlog.event.deserialization;
19+
20+
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
21+
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
22+
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
23+
24+
import java.io.IOException;
25+
import java.io.Serializable;
26+
import java.util.BitSet;
27+
import java.util.Collections;
28+
import java.util.LinkedList;
29+
import java.util.List;
30+
import java.util.Map;
31+
32+
/**
33+
* Copied from mysql-binlog-connector 0.27.2 to add a {@link TableIdFilter}.
34+
*
35+
* <p>Line 65-70: Use a {@link TableIdFilter} to skip the binlog deserialization of unwanted tables.
36+
*/
37+
public class DeleteRowsEventDataDeserializer
38+
extends AbstractRowsEventDataDeserializer<DeleteRowsEventData> {
39+
40+
private boolean mayContainExtraInformation;
41+
42+
/** the table id filter to skip further deserialization of unsubscribed table ids. */
43+
private TableIdFilter tableIdFilter = TableIdFilter.all();
44+
45+
public DeleteRowsEventDataDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
46+
super(tableMapEventByTableId);
47+
}
48+
49+
public DeleteRowsEventDataDeserializer setMayContainExtraInformation(
50+
boolean mayContainExtraInformation) {
51+
this.mayContainExtraInformation = mayContainExtraInformation;
52+
return this;
53+
}
54+
55+
public DeleteRowsEventDataDeserializer setTableIdFilter(TableIdFilter tableIdFilter) {
56+
this.tableIdFilter = tableIdFilter;
57+
return this;
58+
}
59+
60+
@Override
61+
public DeleteRowsEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
62+
DeleteRowsEventData eventData = new DeleteRowsEventData();
63+
eventData.setTableId(inputStream.readLong(6));
64+
65+
// skip further deserialization if the table id is unsubscribed
66+
if (!tableIdFilter.test(eventData.getTableId())) {
67+
eventData.setIncludedColumns(null);
68+
eventData.setRows(Collections.emptyList());
69+
return eventData;
70+
}
71+
72+
inputStream.readInteger(2); // reserved
73+
if (mayContainExtraInformation) {
74+
int extraInfoLength = inputStream.readInteger(2);
75+
inputStream.skip(extraInfoLength - 2);
76+
}
77+
int numberOfColumns = inputStream.readPackedInteger();
78+
eventData.setIncludedColumns(inputStream.readBitSet(numberOfColumns, true));
79+
eventData.setRows(
80+
deserializeRows(
81+
eventData.getTableId(), eventData.getIncludedColumns(), inputStream));
82+
return eventData;
83+
}
84+
85+
private List<Serializable[]> deserializeRows(
86+
long tableId, BitSet includedColumns, ByteArrayInputStream inputStream)
87+
throws IOException {
88+
List<Serializable[]> result = new LinkedList<Serializable[]>();
89+
while (inputStream.available() > 0) {
90+
result.add(deserializeRow(tableId, includedColumns, inputStream));
91+
}
92+
return result;
93+
}
94+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.github.shyiko.mysql.binlog.event.deserialization;
20+
21+
import java.util.function.Predicate;
22+
23+
/** The filter used for skipping the binlog deserialization of unsubscribed table. */
24+
public interface TableIdFilter extends Predicate<Long> {
25+
26+
@Override
27+
boolean test(Long tableId);
28+
29+
static TableIdFilter all() {
30+
return tableId -> true;
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright 2013 Stanley Shyiko
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.github.shyiko.mysql.binlog.event.deserialization;
17+
18+
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
19+
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
20+
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
21+
22+
import java.io.IOException;
23+
import java.io.Serializable;
24+
import java.util.AbstractMap;
25+
import java.util.ArrayList;
26+
import java.util.BitSet;
27+
import java.util.Collections;
28+
import java.util.List;
29+
import java.util.Map;
30+
31+
/**
32+
* Copied from mysql-binlog-connector 0.27.2 to add a {@link TableIdFilter}.
33+
*
34+
* <p>Line 64-69: Use a {@link TableIdFilter} to skip the binlog deserialization of unwanted tables.
35+
*/
36+
public class UpdateRowsEventDataDeserializer
37+
extends AbstractRowsEventDataDeserializer<UpdateRowsEventData> {
38+
39+
private boolean mayContainExtraInformation;
40+
41+
/** the table id filter to skip further deserialization of unsubscribed table ids. */
42+
private TableIdFilter tableIdFilter = TableIdFilter.all();
43+
44+
public UpdateRowsEventDataDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
45+
super(tableMapEventByTableId);
46+
}
47+
48+
public UpdateRowsEventDataDeserializer setMayContainExtraInformation(
49+
boolean mayContainExtraInformation) {
50+
this.mayContainExtraInformation = mayContainExtraInformation;
51+
return this;
52+
}
53+
54+
public UpdateRowsEventDataDeserializer setTableIdFilter(TableIdFilter tableIdFilter) {
55+
this.tableIdFilter = tableIdFilter;
56+
return this;
57+
}
58+
59+
@Override
60+
public UpdateRowsEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
61+
UpdateRowsEventData eventData = new UpdateRowsEventData();
62+
eventData.setTableId(inputStream.readLong(6));
63+
64+
// skip further deserialization if the table id is unsubscribed
65+
if (!tableIdFilter.test(eventData.getTableId())) {
66+
eventData.setIncludedColumns(null);
67+
eventData.setRows(Collections.emptyList());
68+
return eventData;
69+
}
70+
71+
inputStream.skip(2); // reserved
72+
if (mayContainExtraInformation) {
73+
int extraInfoLength = inputStream.readInteger(2);
74+
inputStream.skip(extraInfoLength - 2);
75+
}
76+
int numberOfColumns = inputStream.readPackedInteger();
77+
eventData.setIncludedColumnsBeforeUpdate(inputStream.readBitSet(numberOfColumns, true));
78+
eventData.setIncludedColumns(inputStream.readBitSet(numberOfColumns, true));
79+
eventData.setRows(deserializeRows(eventData, inputStream));
80+
return eventData;
81+
}
82+
83+
private List<Map.Entry<Serializable[], Serializable[]>> deserializeRows(
84+
UpdateRowsEventData eventData, ByteArrayInputStream inputStream) throws IOException {
85+
long tableId = eventData.getTableId();
86+
BitSet includedColumnsBeforeUpdate = eventData.getIncludedColumnsBeforeUpdate(),
87+
includedColumns = eventData.getIncludedColumns();
88+
List<Map.Entry<Serializable[], Serializable[]>> rows =
89+
new ArrayList<Map.Entry<Serializable[], Serializable[]>>();
90+
while (inputStream.available() > 0) {
91+
rows.add(
92+
new AbstractMap.SimpleEntry<Serializable[], Serializable[]>(
93+
deserializeRow(tableId, includedColumnsBeforeUpdate, inputStream),
94+
deserializeRow(tableId, includedColumns, inputStream)));
95+
}
96+
return rows;
97+
}
98+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright 2013 Stanley Shyiko
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.github.shyiko.mysql.binlog.event.deserialization;
17+
18+
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
19+
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
20+
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
21+
22+
import java.io.IOException;
23+
import java.io.Serializable;
24+
import java.util.BitSet;
25+
import java.util.Collections;
26+
import java.util.LinkedList;
27+
import java.util.List;
28+
import java.util.Map;
29+
30+
/**
31+
* Copied from mysql-binlog-connector 0.27.2 to add a {@link TableIdFilter}.
32+
*
33+
* <p>Line 64-68: Use a {@link TableIdFilter} to skip the binlog deserialization of unwanted tables.
34+
*/
35+
public class WriteRowsEventDataDeserializer
36+
extends AbstractRowsEventDataDeserializer<WriteRowsEventData> {
37+
38+
private boolean mayContainExtraInformation;
39+
40+
/** the table id filter to skip further deserialization of unsubscribed table ids. */
41+
private TableIdFilter tableIdFilter = TableIdFilter.all();
42+
43+
public WriteRowsEventDataDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
44+
super(tableMapEventByTableId);
45+
}
46+
47+
public WriteRowsEventDataDeserializer setMayContainExtraInformation(
48+
boolean mayContainExtraInformation) {
49+
this.mayContainExtraInformation = mayContainExtraInformation;
50+
return this;
51+
}
52+
53+
public WriteRowsEventDataDeserializer setTableIdFilter(TableIdFilter tableIdFilter) {
54+
this.tableIdFilter = tableIdFilter;
55+
return this;
56+
}
57+
58+
@Override
59+
public WriteRowsEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
60+
WriteRowsEventData eventData = new WriteRowsEventData();
61+
eventData.setTableId(inputStream.readLong(6));
62+
63+
// skip further deserialization if the table id is unsubscribed
64+
if (!tableIdFilter.test(eventData.getTableId())) {
65+
eventData.setIncludedColumns(null);
66+
eventData.setRows(Collections.emptyList());
67+
return eventData;
68+
}
69+
70+
inputStream.skip(2); // reserved
71+
if (mayContainExtraInformation) {
72+
int extraInfoLength = inputStream.readInteger(2);
73+
inputStream.skip(extraInfoLength - 2);
74+
}
75+
int numberOfColumns = inputStream.readPackedInteger();
76+
eventData.setIncludedColumns(inputStream.readBitSet(numberOfColumns, true));
77+
eventData.setRows(
78+
deserializeRows(
79+
eventData.getTableId(), eventData.getIncludedColumns(), inputStream));
80+
return eventData;
81+
}
82+
83+
private List<Serializable[]> deserializeRows(
84+
long tableId, BitSet includedColumns, ByteArrayInputStream inputStream)
85+
throws IOException {
86+
List<Serializable[]> result = new LinkedList<Serializable[]>();
87+
while (inputStream.available() > 0) {
88+
result.add(deserializeRow(tableId, includedColumns, inputStream));
89+
}
90+
return result;
91+
}
92+
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException;
2525
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
2626
import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
27+
import com.github.shyiko.mysql.binlog.event.deserialization.TableIdFilter;
2728
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
2829
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
2930
import com.github.shyiko.mysql.binlog.network.DefaultSSLSocketFactory;
@@ -94,6 +95,9 @@
9495
*
9596
* <p>Line 947-958 : Use iterator instead of index-based loop to avoid O(n²) complexity when
9697
* processing LinkedList rows in handleChange method. See FLINK-38846.
98+
*
99+
* <p>Line 317, 1358-1366 : Use a {@link TableIdFilter} to skip binlog deserialization of unmatched
100+
* tables.
97101
*/
98102
public class MySqlStreamingChangeEventSource
99103
implements StreamingChangeEventSource<MySqlPartition, MySqlOffsetContext> {
@@ -311,30 +315,38 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
311315
}
312316
};
313317

318+
final TableIdFilter tableIdFilter = getTableIdDeserializationFilter();
319+
314320
// Add our custom deserializers ...
315321
eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
316322
eventDeserializer.setEventDataDeserializer(EventType.GTID, new GtidEventDataDeserializer());
317323
eventDeserializer.setEventDataDeserializer(
318324
EventType.WRITE_ROWS,
319-
new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId));
325+
new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId)
326+
.setTableIdFilter(tableIdFilter));
320327
eventDeserializer.setEventDataDeserializer(
321328
EventType.UPDATE_ROWS,
322-
new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId));
329+
new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId)
330+
.setTableIdFilter(tableIdFilter));
323331
eventDeserializer.setEventDataDeserializer(
324332
EventType.DELETE_ROWS,
325-
new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId));
333+
new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId)
334+
.setTableIdFilter(tableIdFilter));
326335
eventDeserializer.setEventDataDeserializer(
327336
EventType.EXT_WRITE_ROWS,
328337
new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId)
329-
.setMayContainExtraInformation(true));
338+
.setMayContainExtraInformation(true)
339+
.setTableIdFilter(tableIdFilter));
330340
eventDeserializer.setEventDataDeserializer(
331341
EventType.EXT_UPDATE_ROWS,
332342
new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId)
333-
.setMayContainExtraInformation(true));
343+
.setMayContainExtraInformation(true)
344+
.setTableIdFilter(tableIdFilter));
334345
eventDeserializer.setEventDataDeserializer(
335346
EventType.EXT_DELETE_ROWS,
336347
new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId)
337-
.setMayContainExtraInformation(true));
348+
.setMayContainExtraInformation(true)
349+
.setTableIdFilter(tableIdFilter));
338350
client.setEventDeserializer(eventDeserializer);
339351
}
340352

@@ -1345,6 +1357,15 @@ protected void initSSLContext(SSLContext sc) throws GeneralSecurityException {
13451357
return null;
13461358
}
13471359

1360+
private TableIdFilter getTableIdDeserializationFilter() {
1361+
return tableId -> {
1362+
// since only subscribed table is recording schema, the result could be null
1363+
TableId table = taskContext.getSchema().getTableId(tableId);
1364+
return table != null
1365+
&& connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(table);
1366+
};
1367+
}
1368+
13481369
private void logStreamingSourceState() {
13491370
logStreamingSourceState(Level.ERROR);
13501371
}

0 commit comments

Comments
 (0)