Skip to content

Commit b49b948

Browse files
add constructor with TableIdFilter and fix checkstyle issues
1 parent d836435 commit b49b948

5 files changed

Lines changed: 742 additions & 48 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/DeleteRowsEventDataDeserializer.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18+
1819
package com.github.shyiko.mysql.binlog.event.deserialization;
1920

2021
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
@@ -32,18 +33,26 @@
3233
/**
3334
* Copied from mysql-binlog-connector 0.27.2 to add a {@link TableIdFilter}.
3435
*
35-
* <p>Line 65-70: Use a {@link TableIdFilter} to skip the binlog deserialization of unwanted tables.
36+
* <p>Line 52-56: Add a new constructor with {@link TableIdFilter} supplied.
37+
*
38+
* <p>Line 70-74: Use a {@link TableIdFilter} to skip the binlog deserialization of unwanted tables.
3639
*/
3740
public class DeleteRowsEventDataDeserializer
3841
extends AbstractRowsEventDataDeserializer<DeleteRowsEventData> {
3942

4043
private boolean mayContainExtraInformation;
4144

4245
/** the table id filter to skip further deserialization of unsubscribed table ids. */
43-
private TableIdFilter tableIdFilter = TableIdFilter.all();
46+
private final TableIdFilter tableIdFilter;
4447

4548
public DeleteRowsEventDataDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
49+
this(tableMapEventByTableId, TableIdFilter.all());
50+
}
51+
52+
public DeleteRowsEventDataDeserializer(
53+
Map<Long, TableMapEventData> tableMapEventByTableId, TableIdFilter tableIdFilter) {
4654
super(tableMapEventByTableId);
55+
this.tableIdFilter = tableIdFilter;
4756
}
4857

4958
public DeleteRowsEventDataDeserializer setMayContainExtraInformation(
@@ -52,11 +61,6 @@ public DeleteRowsEventDataDeserializer setMayContainExtraInformation(
5261
return this;
5362
}
5463

55-
public DeleteRowsEventDataDeserializer setTableIdFilter(TableIdFilter tableIdFilter) {
56-
this.tableIdFilter = tableIdFilter;
57-
return this;
58-
}
59-
6064
@Override
6165
public DeleteRowsEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
6266
DeleteRowsEventData eventData = new DeleteRowsEventData();

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/UpdateRowsEventDataDeserializer.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
11
/*
2-
* Copyright 2013 Stanley Shyiko
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
39
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
10+
* http://www.apache.org/licenses/LICENSE-2.0
911
*
1012
* Unless required by applicable law or agreed to in writing, software
1113
* distributed under the License is distributed on an "AS IS" BASIS,
1214
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1315
* See the License for the specific language governing permissions and
1416
* limitations under the License.
1517
*/
18+
1619
package com.github.shyiko.mysql.binlog.event.deserialization;
1720

1821
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
@@ -31,18 +34,26 @@
3134
/**
3235
* Copied from mysql-binlog-connector 0.27.2 to add a {@link TableIdFilter}.
3336
*
34-
* <p>Line 64-69: Use a {@link TableIdFilter} to skip the binlog deserialization of unwanted tables.
37+
* <p>Line 53-57: Add a new constructor with {@link TableIdFilter} supplied.
38+
*
39+
* <p>Line 71-75: Use a {@link TableIdFilter} to skip the binlog deserialization of unwanted tables.
3540
*/
3641
public class UpdateRowsEventDataDeserializer
3742
extends AbstractRowsEventDataDeserializer<UpdateRowsEventData> {
3843

3944
private boolean mayContainExtraInformation;
4045

4146
/** the table id filter to skip further deserialization of unsubscribed table ids. */
42-
private TableIdFilter tableIdFilter = TableIdFilter.all();
47+
private final TableIdFilter tableIdFilter;
4348

4449
public UpdateRowsEventDataDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
50+
this(tableMapEventByTableId, TableIdFilter.all());
51+
}
52+
53+
public UpdateRowsEventDataDeserializer(
54+
Map<Long, TableMapEventData> tableMapEventByTableId, TableIdFilter tableIdFilter) {
4555
super(tableMapEventByTableId);
56+
this.tableIdFilter = tableIdFilter;
4657
}
4758

4859
public UpdateRowsEventDataDeserializer setMayContainExtraInformation(
@@ -51,11 +62,6 @@ public UpdateRowsEventDataDeserializer setMayContainExtraInformation(
5162
return this;
5263
}
5364

54-
public UpdateRowsEventDataDeserializer setTableIdFilter(TableIdFilter tableIdFilter) {
55-
this.tableIdFilter = tableIdFilter;
56-
return this;
57-
}
58-
5965
@Override
6066
public UpdateRowsEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
6167
UpdateRowsEventData eventData = new UpdateRowsEventData();

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/WriteRowsEventDataDeserializer.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
11
/*
2-
* Copyright 2013 Stanley Shyiko
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
39
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
10+
* http://www.apache.org/licenses/LICENSE-2.0
911
*
1012
* Unless required by applicable law or agreed to in writing, software
1113
* distributed under the License is distributed on an "AS IS" BASIS,
1214
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1315
* See the License for the specific language governing permissions and
1416
* limitations under the License.
1517
*/
18+
1619
package com.github.shyiko.mysql.binlog.event.deserialization;
1720

1821
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
@@ -30,18 +33,26 @@
3033
/**
3134
* Copied from mysql-binlog-connector 0.27.2 to add a {@link TableIdFilter}.
3235
*
33-
* <p>Line 64-68: Use a {@link TableIdFilter} to skip the binlog deserialization of unwanted tables.
36+
* <p>Line 52-56: Add a new constructor with {@link TableIdFilter} supplied.
37+
*
38+
* <p>Line 70-74: Use a {@link TableIdFilter} to skip the binlog deserialization of unwanted tables.
3439
*/
3540
public class WriteRowsEventDataDeserializer
3641
extends AbstractRowsEventDataDeserializer<WriteRowsEventData> {
3742

3843
private boolean mayContainExtraInformation;
3944

4045
/** the table id filter to skip further deserialization of unsubscribed table ids. */
41-
private TableIdFilter tableIdFilter = TableIdFilter.all();
46+
private final TableIdFilter tableIdFilter;
4247

4348
public WriteRowsEventDataDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
49+
this(tableMapEventByTableId, TableIdFilter.all());
50+
}
51+
52+
public WriteRowsEventDataDeserializer(
53+
Map<Long, TableMapEventData> tableMapEventByTableId, TableIdFilter tableIdFilter) {
4454
super(tableMapEventByTableId);
55+
this.tableIdFilter = tableIdFilter;
4556
}
4657

4758
public WriteRowsEventDataDeserializer setMayContainExtraInformation(
@@ -50,11 +61,6 @@ public WriteRowsEventDataDeserializer setMayContainExtraInformation(
5061
return this;
5162
}
5263

53-
public WriteRowsEventDataDeserializer setTableIdFilter(TableIdFilter tableIdFilter) {
54-
this.tableIdFilter = tableIdFilter;
55-
return this;
56-
}
57-
5864
@Override
5965
public WriteRowsEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
6066
WriteRowsEventData eventData = new WriteRowsEventData();

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -327,31 +327,25 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
327327
eventDeserializer.setEventDataDeserializer(EventType.GTID, new GtidEventDataDeserializer());
328328
eventDeserializer.setEventDataDeserializer(
329329
EventType.WRITE_ROWS,
330-
new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId)
331-
.setTableIdFilter(tableIdFilter));
330+
new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId, tableIdFilter));
332331
eventDeserializer.setEventDataDeserializer(
333332
EventType.UPDATE_ROWS,
334-
new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId)
335-
.setTableIdFilter(tableIdFilter));
333+
new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId, tableIdFilter));
336334
eventDeserializer.setEventDataDeserializer(
337335
EventType.DELETE_ROWS,
338-
new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId)
339-
.setTableIdFilter(tableIdFilter));
336+
new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId, tableIdFilter));
340337
eventDeserializer.setEventDataDeserializer(
341338
EventType.EXT_WRITE_ROWS,
342-
new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId)
343-
.setMayContainExtraInformation(true)
344-
.setTableIdFilter(tableIdFilter));
339+
new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId, tableIdFilter)
340+
.setMayContainExtraInformation(true));
345341
eventDeserializer.setEventDataDeserializer(
346342
EventType.EXT_UPDATE_ROWS,
347-
new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId)
348-
.setMayContainExtraInformation(true)
349-
.setTableIdFilter(tableIdFilter));
343+
new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId, tableIdFilter)
344+
.setMayContainExtraInformation(true));
350345
eventDeserializer.setEventDataDeserializer(
351346
EventType.EXT_DELETE_ROWS,
352-
new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId)
353-
.setMayContainExtraInformation(true)
354-
.setTableIdFilter(tableIdFilter));
347+
new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId, tableIdFilter)
348+
.setMayContainExtraInformation(true));
355349
client.setEventDeserializer(eventDeserializer);
356350
}
357351

0 commit comments

Comments
 (0)