Skip to content

Commit 038b7b2

Browse files
author
zengyi
committed
[Improve][Connector-V2][Hbase] Deprecate legacy RegionLocator API and harden split enumeration
1 parent 6f1989e commit 038b7b2

File tree

4 files changed

+105
-16
lines changed

4 files changed

+105
-16
lines changed

seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,10 +418,13 @@ private static void applyTimeRange(Scan scan, HbaseParameters hbaseParameters)
418418
/**
419419
* Get a RegionLocator.
420420
*
421-
* @param tableName table name
421+
* @param tableName table name (preferably fully qualified as {@code namespace:table})
422422
* @return RegionLocator
423423
* @throws IOException exception
424+
* @deprecated Use {@link #getRegionLocator(String, String)} instead to avoid relying on the
425+
* default namespace behavior.
424426
*/
427+
@Deprecated
425428
public RegionLocator getRegionLocator(String tableName) throws IOException {
426429
return this.connection.getRegionLocator(TableName.valueOf(tableName));
427430
}

seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumerator.java

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -62,22 +62,14 @@ public class HbaseSourceSplitEnumerator
6262

6363
public HbaseSourceSplitEnumerator(
6464
Context<HbaseSourceSplit> context, HbaseParameters hbaseParameters) {
65-
this(
66-
context,
67-
hbaseParameters,
68-
new HashSet<>(),
69-
HbaseClient.createInstance(hbaseParameters));
65+
this(context, hbaseParameters, new HashSet<>(), null);
7066
}
7167

7268
public HbaseSourceSplitEnumerator(
7369
Context<HbaseSourceSplit> context,
7470
HbaseParameters hbaseParameters,
7571
HbaseSourceState sourceState) {
76-
this(
77-
context,
78-
hbaseParameters,
79-
sourceState.getAssignedSplits(),
80-
HbaseClient.createInstance(hbaseParameters));
72+
this(context, hbaseParameters, sourceState.getAssignedSplits(), null);
8173
}
8274

8375
@VisibleForTesting
@@ -101,7 +93,7 @@ private HbaseSourceSplitEnumerator(
10193
Context<HbaseSourceSplit> context,
10294
HbaseParameters hbaseParameters,
10395
Set<HbaseSourceSplit> assignedSplit) {
104-
this(context, hbaseParameters, assignedSplit, HbaseClient.createInstance(hbaseParameters));
96+
this(context, hbaseParameters, assignedSplit, null);
10597
}
10698

10799
private HbaseSourceSplitEnumerator(
@@ -227,10 +219,10 @@ private void assignSplit(int taskId) {
227219

228220
@VisibleForTesting
229221
public Set<HbaseSourceSplit> getTableSplits() {
230-
222+
String namespace = hbaseParameters.getNamespace();
223+
TableName tableName = TableName.valueOf(namespace, hbaseParameters.getTable());
231224
try {
232-
String namespace = hbaseParameters.getNamespace();
233-
TableName tableName = TableName.valueOf(namespace, hbaseParameters.getTable());
225+
HbaseClient hbaseClient = getHbaseClient();
234226
log.info("Enumerating HBase source splits for table [{}]", tableName.getNameAsString());
235227
if (!hbaseClient.tableExists(tableName.getNameAsString())) {
236228
String errorMsg =
@@ -309,8 +301,21 @@ public Set<HbaseSourceSplit> getTableSplits() {
309301
return new HashSet<>(splits);
310302
}
311303
} catch (IOException e) {
312-
throw new RuntimeException(e);
304+
String errorMsg =
305+
String.format(
306+
"Failed to enumerate splits for HBase table [%s]",
307+
tableName.getNameAsString());
308+
log.error(errorMsg, e);
309+
throw new HbaseConnectorException(
310+
HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION, errorMsg, e);
311+
}
312+
}
313+
314+
private synchronized HbaseClient getHbaseClient() {
315+
if (hbaseClient == null) {
316+
hbaseClient = HbaseClient.createInstance(hbaseParameters);
313317
}
318+
return hbaseClient;
314319
}
315320

316321
/** Hash algorithm for assigning splits to readers */

seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParametersTest.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,75 @@ void testBuildWithSourceConfigReadsTimeRange() {
6666
assertEquals(1000L, parameters.getStartTimestamp());
6767
assertEquals(2000L, parameters.getEndTimestamp());
6868
}
69+
70+
@Test
71+
void testGetNamespaceReturnsDefaultWhenNull() {
72+
HbaseParameters parameters =
73+
HbaseParameters.builder()
74+
.namespace(null)
75+
.table("tbl")
76+
.zookeeperQuorum("127.0.0.1:2181")
77+
.build();
78+
assertEquals(HbaseParameters.DEFAULT_NAMESPACE, parameters.getNamespace());
79+
}
80+
81+
@Test
82+
void testBuildWithSourceConfigWithLeadingColonUsesDefaultNamespace() {
83+
Map<String, Object> configMap = new HashMap<>();
84+
configMap.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(), "127.0.0.1:2181");
85+
configMap.put(HbaseBaseOptions.TABLE.key(), ":tbl");
86+
87+
HbaseParameters parameters =
88+
HbaseParameters.buildWithSourceConfig(ReadonlyConfig.fromMap(configMap));
89+
assertEquals(HbaseParameters.DEFAULT_NAMESPACE, parameters.getNamespace());
90+
assertEquals("tbl", parameters.getTable());
91+
}
92+
93+
@Test
94+
void testBuildWithSourceConfigWithMultipleColons() {
95+
Map<String, Object> configMap = new HashMap<>();
96+
configMap.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(), "127.0.0.1:2181");
97+
configMap.put(HbaseBaseOptions.TABLE.key(), "ns:tbl:extra");
98+
99+
HbaseParameters parameters =
100+
HbaseParameters.buildWithSourceConfig(ReadonlyConfig.fromMap(configMap));
101+
assertEquals("ns", parameters.getNamespace());
102+
assertEquals("tbl:extra", parameters.getTable());
103+
}
104+
105+
@Test
106+
void testBuildWithSourceConfigWithSpaces() {
107+
Map<String, Object> configMap = new HashMap<>();
108+
configMap.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(), "127.0.0.1:2181");
109+
configMap.put(HbaseBaseOptions.TABLE.key(), " ns : tbl ");
110+
111+
HbaseParameters parameters =
112+
HbaseParameters.buildWithSourceConfig(ReadonlyConfig.fromMap(configMap));
113+
assertEquals(" ns ", parameters.getNamespace());
114+
assertEquals(" tbl ", parameters.getTable());
115+
}
116+
117+
@Test
118+
void testBuildWithSourceConfigWithEmptyTableName() {
119+
Map<String, Object> configMap = new HashMap<>();
120+
configMap.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(), "127.0.0.1:2181");
121+
configMap.put(HbaseBaseOptions.TABLE.key(), "test:");
122+
123+
HbaseParameters parameters =
124+
HbaseParameters.buildWithSourceConfig(ReadonlyConfig.fromMap(configMap));
125+
assertEquals("test", parameters.getNamespace());
126+
assertEquals("", parameters.getTable());
127+
}
128+
129+
@Test
130+
void testBuildWithSourceConfigWithoutNamespaceKeepsSpacesInTableName() {
131+
Map<String, Object> configMap = new HashMap<>();
132+
configMap.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(), "127.0.0.1:2181");
133+
configMap.put(HbaseBaseOptions.TABLE.key(), " tbl ");
134+
135+
HbaseParameters parameters =
136+
HbaseParameters.buildWithSourceConfig(ReadonlyConfig.fromMap(configMap));
137+
assertEquals(HbaseParameters.DEFAULT_NAMESPACE, parameters.getNamespace());
138+
assertEquals(" tbl ", parameters.getTable());
139+
}
69140
}

seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceSplitEnumeratorTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,16 @@ void testGetTableSplitsWithNoRegionInfo() throws IOException {
139139
assertThrows(HbaseConnectorException.class, () -> enumerator.getTableSplits());
140140
}
141141

142+
@Test
143+
void testGetTableSplitsWrapsIOExceptionAsHbaseConnectorException() throws IOException {
144+
when(hbaseClient.getRegionLocator(HbaseParameters.DEFAULT_NAMESPACE, "test_table"))
145+
.thenThrow(new IOException("region locator error"));
146+
147+
HbaseConnectorException exception =
148+
assertThrows(HbaseConnectorException.class, () -> enumerator.getTableSplits());
149+
assertTrue(exception.getCause() instanceof IOException);
150+
}
151+
142152
@Test
143153
void testGetTableSplitsWithUserDefinedRowKeyRange() throws IOException {
144154
// Simulate a table with 4 regions but user only wants data from "row100" to "row300"

0 commit comments

Comments
 (0)