Skip to content

Commit 9d58bc0

Browse files
authored
[Fix][Connector-V2][Hbase] Fix ERROR_WHEN_DATA_EXISTS NPE on empty table (#10336)
1 parent 443a12a commit 9d58bc0

File tree

3 files changed

+107
-9
lines changed
  • seatunnel-connectors-v2/connector-hbase/src
  • seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase

3 files changed

+107
-9
lines changed

seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -310,15 +310,13 @@ public void truncateTable(String databaseName, String tableName) {
310310
* @return true if the table has data, false otherwise
311311
*/
312312
public boolean isExistsData(String databaseName, String tableName) {
313-
try {
314-
Table table = connection.getTable(TableName.valueOf(databaseName, tableName));
315-
Scan scan = new Scan();
316-
scan.setCaching(1);
317-
scan.setLimit(1);
318-
try (ResultScanner scanner = table.getScanner(scan)) {
319-
Result result = scanner.next();
320-
return !result.isEmpty();
321-
}
313+
Scan scan = new Scan();
314+
scan.setCaching(1);
315+
scan.setLimit(1);
316+
try (Table table = connection.getTable(TableName.valueOf(databaseName, tableName));
317+
ResultScanner scanner = table.getScanner(scan)) {
318+
Result result = scanner.next();
319+
return result != null && !result.isEmpty();
322320
} catch (IOException e) {
323321
throw new HbaseConnectorException(
324322
HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.hbase.client;
19+
20+
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
21+
22+
import org.apache.hadoop.hbase.HBaseConfiguration;
23+
import org.apache.hadoop.hbase.TableName;
24+
import org.apache.hadoop.hbase.client.Admin;
25+
import org.apache.hadoop.hbase.client.BufferedMutator;
26+
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
27+
import org.apache.hadoop.hbase.client.Connection;
28+
import org.apache.hadoop.hbase.client.Result;
29+
import org.apache.hadoop.hbase.client.ResultScanner;
30+
import org.apache.hadoop.hbase.client.Scan;
31+
import org.apache.hadoop.hbase.client.Table;
32+
33+
import org.junit.jupiter.api.Test;
34+
import org.mockito.Mockito;
35+
36+
import java.lang.reflect.Constructor;
37+
38+
import static org.junit.jupiter.api.Assertions.assertFalse;
39+
import static org.junit.jupiter.api.Assertions.assertTrue;
40+
import static org.mockito.ArgumentMatchers.any;
41+
42+
class HbaseClientTest {
43+
44+
@Test
45+
void testIsExistsDataReturnsFalseWhenScannerNextReturnsNull() throws Exception {
46+
Connection connection = Mockito.mock(Connection.class);
47+
Table table = Mockito.mock(Table.class);
48+
ResultScanner scanner = Mockito.mock(ResultScanner.class);
49+
Mockito.when(connection.getTable(any(TableName.class))).thenReturn(table);
50+
Mockito.when(table.getScanner(any(Scan.class))).thenReturn(scanner);
51+
Mockito.when(scanner.next()).thenReturn(null);
52+
53+
HbaseClient client = newHbaseClient(connection);
54+
55+
assertFalse(client.isExistsData("ns", "tbl"));
56+
}
57+
58+
@Test
59+
void testIsExistsDataReturnsTrueWhenScannerHasResult() throws Exception {
60+
Connection connection = Mockito.mock(Connection.class);
61+
Table table = Mockito.mock(Table.class);
62+
ResultScanner scanner = Mockito.mock(ResultScanner.class);
63+
Result result = Mockito.mock(Result.class);
64+
Mockito.when(result.isEmpty()).thenReturn(false);
65+
Mockito.when(connection.getTable(any(TableName.class))).thenReturn(table);
66+
Mockito.when(table.getScanner(any(Scan.class))).thenReturn(scanner);
67+
Mockito.when(scanner.next()).thenReturn(result);
68+
69+
HbaseClient client = newHbaseClient(connection);
70+
71+
assertTrue(client.isExistsData("ns", "tbl"));
72+
}
73+
74+
private HbaseClient newHbaseClient(Connection connection) throws Exception {
75+
HbaseClient.hbaseConfiguration = HBaseConfiguration.create();
76+
Mockito.when(connection.getAdmin()).thenReturn(Mockito.mock(Admin.class));
77+
Mockito.when(connection.getBufferedMutator(any(BufferedMutatorParams.class)))
78+
.thenReturn(Mockito.mock(BufferedMutator.class));
79+
HbaseParameters hbaseParameters = Mockito.mock(HbaseParameters.class);
80+
Mockito.when(hbaseParameters.getNamespace()).thenReturn("ns");
81+
Mockito.when(hbaseParameters.getTable()).thenReturn("tbl");
82+
Mockito.when(hbaseParameters.getWriteBufferSize()).thenReturn(1);
83+
84+
Constructor<HbaseClient> constructor =
85+
HbaseClient.class.getDeclaredConstructor(Connection.class, HbaseParameters.class);
86+
constructor.setAccessible(true);
87+
return constructor.newInstance(connection, hbaseParameters);
88+
}
89+
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,17 @@ public void testHbaseSinkWithErrorWhenDataExists(TestContainer container)
166166
Assertions.assertEquals(1, execResult.getExitCode());
167167
}
168168

169+
@TestTemplate
170+
public void testHbaseSinkWithErrorWhenDataExistsOnEmptyTable(TestContainer container)
171+
throws IOException, InterruptedException {
172+
deleteData(table);
173+
Assertions.assertEquals(0, countData(table));
174+
Container.ExecResult execResult =
175+
container.executeJob("/fake_to_hbase_with_error_when_data_exists.conf");
176+
Assertions.assertEquals(0, execResult.getExitCode());
177+
Assertions.assertEquals(5, countData(table));
178+
}
179+
169180
@TestTemplate
170181
public void testHbaseSinkWithRecreateSchema(TestContainer container)
171182
throws IOException, InterruptedException {

0 commit comments

Comments
 (0)