Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/en/connectors/sink/Hbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ The zookeeper cluster host of hbase, example: "hadoop001:2181,hadoop002:2181,had
### table [string]

The table name you want to write, example: "seatunnel"
If your table is under a custom namespace, use `namespace:table` (for example, `ns1:seatunnel_test`); if omitted, SeaTunnel will write to HBase's default namespace (`default`).

### rowkey_column [list]

Expand Down
1 change: 1 addition & 0 deletions docs/en/connectors/source/Hbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ The zookeeper quorum for Hbase cluster hosts, e.g., "hadoop001:2181,hadoop002:21
### table [string]

The name of the table to write to, e.g., "seatunnel".
If your table lives in a custom namespace, use the `namespace:table` form (for example, `ns1:seatunnel_test`); when the namespace is omitted SeaTunnel will read from HBase's default namespace (`default`).

### schema [config]

Expand Down
1 change: 1 addition & 0 deletions docs/zh/connectors/sink/Hbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ hbase的zookeeper集群主机, 示例: "hadoop001:2181,hadoop002:2181,hadoop003:
### table [string]

要写入的表名, 例如: "seatunnel"
如果表在自定义 namespace 下,请使用 `namespace:table` 形式(如 `ns1:seatunnel_test`);未填写 namespace 时,SeaTunnel 会写入到 HBase 默认命名空间 `default`。

### rowkey_column [list]

Expand Down
1 change: 1 addition & 0 deletions docs/zh/connectors/source/Hbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ hbase的zookeeper集群主机,例如:“hadoop001:2181,hadoop002:2181,hadoop
### table [string]

要写入的表名,例如:“seatunnel”
如果表在自定义 namespace 下,请使用 `namespace:table` 形式(如 `ns1:seatunnel_test`);未填写 namespace 时,SeaTunnel 会使用 HBase 的默认命名空间 `default`。

### schema [config]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,9 @@ public ResultScanner scan(
String[] columnNameSplit = columnName.split(":");
scan.addColumn(Bytes.toBytes(columnNameSplit[0]), Bytes.toBytes(columnNameSplit[1]));
}
String namespace = hbaseParameters.getNamespace();
return this.connection
.getTable(TableName.valueOf(hbaseParameters.getTable()))
.getTable(TableName.valueOf(namespace, hbaseParameters.getTable()))
.getScanner(scan);
}

Expand All @@ -392,4 +393,8 @@ public ResultScanner scan(
public RegionLocator getRegionLocator(String tableName) throws IOException {
return this.connection.getRegionLocator(TableName.valueOf(tableName));
}

public RegionLocator getRegionLocator(String namespace, String tableName) throws IOException {
return this.connection.getRegionLocator(TableName.valueOf(namespace, tableName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;

import org.apache.hadoop.hbase.NamespaceDescriptor;

import lombok.Builder;
import lombok.Getter;

Expand All @@ -30,6 +32,8 @@
@Getter
public class HbaseParameters implements Serializable {

public static final String DEFAULT_NAMESPACE = NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;

private String zookeeperQuorum;

private String namespace;
Expand Down Expand Up @@ -91,7 +95,7 @@ public static HbaseParameters buildWithConfig(ReadonlyConfig config) {
builder.table(table.substring(colonIndex + 1));
} else {
builder.table(table);
builder.namespace("default");
builder.namespace(DEFAULT_NAMESPACE);
}

// required parameters
Expand Down Expand Up @@ -125,6 +129,7 @@ public static HbaseParameters buildWithSourceConfig(ReadonlyConfig pluginConfig)
builder.table(table.substring(colonIndex + 1));
} else {
builder.table(table);
builder.namespace(DEFAULT_NAMESPACE);
}

if (pluginConfig.getOptional(HbaseSinkOptions.HBASE_EXTRA_CONFIG).isPresent()) {
Expand Down Expand Up @@ -157,4 +162,11 @@ public static HbaseParameters buildWithSourceConfig(ReadonlyConfig pluginConfig)
}
return builder.build();
}

public String getNamespace() {
if (namespace == null || namespace.trim().isEmpty()) {
return DEFAULT_NAMESPACE;
}
return namespace;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
import org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.hbase.util.HBaseUtil;

import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.util.Bytes;

Expand Down Expand Up @@ -59,14 +62,22 @@ public class HbaseSourceSplitEnumerator

public HbaseSourceSplitEnumerator(
Context<HbaseSourceSplit> context, HbaseParameters hbaseParameters) {
this(context, hbaseParameters, new HashSet<>());
this(
context,
hbaseParameters,
new HashSet<>(),
HbaseClient.createInstance(hbaseParameters));
}

public HbaseSourceSplitEnumerator(
Context<HbaseSourceSplit> context,
HbaseParameters hbaseParameters,
HbaseSourceState sourceState) {
this(context, hbaseParameters, sourceState.getAssignedSplits());
this(
context,
hbaseParameters,
sourceState.getAssignedSplits(),
HbaseClient.createInstance(hbaseParameters));
}

@VisibleForTesting
Expand Down Expand Up @@ -218,59 +229,85 @@ private void assignSplit(int taskId) {
public Set<HbaseSourceSplit> getTableSplits() {

try {
RegionLocator regionLocator = hbaseClient.getRegionLocator(hbaseParameters.getTable());
byte[][] startKeys = regionLocator.getStartKeys();
byte[][] endKeys = regionLocator.getEndKeys();
List<HbaseSourceSplit> splits = new ArrayList<>();
boolean isBinaryRowkey = hbaseParameters.isBinaryRowkey();
byte[] userStartRowkey =
HBaseUtil.convertRowKey(hbaseParameters.getStartRowkey(), isBinaryRowkey);
byte[] userEndRowkey =
HBaseUtil.convertRowKey(hbaseParameters.getEndRowkey(), isBinaryRowkey);
HBaseUtil.validateRowKeyRange(userStartRowkey, userEndRowkey);

int i = 0;
while (i < startKeys.length) {
byte[] regionStartKey = startKeys[i];
byte[] regionEndKey = endKeys[i];
if (userEndRowkey.length > 0
&& Bytes.compareTo(userEndRowkey, regionStartKey) <= 0
&& Bytes.compareTo(regionStartKey, HConstants.EMPTY_BYTE_ARRAY) != 0) {
i++;
continue;
}
String namespace = hbaseParameters.getNamespace();
TableName tableName = TableName.valueOf(namespace, hbaseParameters.getTable());
log.info("Enumerating HBase source splits for table [{}]", tableName.getNameAsString());
if (!hbaseClient.tableExists(tableName.getNameAsString())) {
String errorMsg =
String.format(
"HBase table [%s] does not exist", tableName.getNameAsString());
log.error(errorMsg);
throw new HbaseConnectorException(
HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION, errorMsg);
}

if (userStartRowkey.length > 0
&& Bytes.compareTo(userStartRowkey, regionEndKey) >= 0
&& Bytes.compareTo(regionEndKey, HConstants.EMPTY_BYTE_ARRAY) != 0) {
try (RegionLocator regionLocator =
hbaseClient.getRegionLocator(namespace, hbaseParameters.getTable())) {
byte[][] startKeys = regionLocator.getStartKeys();
byte[][] endKeys = regionLocator.getEndKeys();
if (startKeys.length == 0 || endKeys.length == 0) {
String errorMsg =
String.format(
"No region information found for HBase table [%s], please check whether the table exists "
+ "and current user has permission to access it",
tableName.getNameAsString());
log.error(errorMsg);
throw new HbaseConnectorException(
HbaseConnectorErrorCode.TABLE_QUERY_EXCEPTION, errorMsg);
}
List<HbaseSourceSplit> splits = new ArrayList<>();
boolean isBinaryRowkey = hbaseParameters.isBinaryRowkey();
byte[] userStartRowkey =
HBaseUtil.convertRowKey(hbaseParameters.getStartRowkey(), isBinaryRowkey);
byte[] userEndRowkey =
HBaseUtil.convertRowKey(hbaseParameters.getEndRowkey(), isBinaryRowkey);
HBaseUtil.validateRowKeyRange(userStartRowkey, userEndRowkey);

int i = 0;
while (i < startKeys.length) {
byte[] regionStartKey = startKeys[i];
byte[] regionEndKey = endKeys[i];
if (userEndRowkey.length > 0
&& Bytes.compareTo(userEndRowkey, regionStartKey) <= 0
&& Bytes.compareTo(regionStartKey, HConstants.EMPTY_BYTE_ARRAY) != 0) {
i++;
continue;
}

if (userStartRowkey.length > 0
&& Bytes.compareTo(userStartRowkey, regionEndKey) >= 0
&& Bytes.compareTo(regionEndKey, HConstants.EMPTY_BYTE_ARRAY) != 0) {
i++;
continue;
}
byte[] splitStartKey =
userStartRowkey.length > 0
&& (Bytes.compareTo(
regionStartKey,
HConstants.EMPTY_BYTE_ARRAY)
== 0
|| Bytes.compareTo(
userStartRowkey, regionStartKey)
> 0)
? userStartRowkey
: regionStartKey;

byte[] splitEndKey =
userEndRowkey.length > 0
&& (Bytes.compareTo(
regionEndKey,
HConstants.EMPTY_BYTE_ARRAY)
== 0
|| Bytes.compareTo(userEndRowkey, regionEndKey)
< 0)
? userEndRowkey
: regionEndKey;

splits.add(new HbaseSourceSplit(i, splitStartKey, splitEndKey));
i++;
continue;
}
byte[] splitStartKey =
userStartRowkey.length > 0
&& (Bytes.compareTo(
regionStartKey,
HConstants.EMPTY_BYTE_ARRAY)
== 0
|| Bytes.compareTo(userStartRowkey, regionStartKey)
> 0)
? userStartRowkey
: regionStartKey;

byte[] splitEndKey =
userEndRowkey.length > 0
&& (Bytes.compareTo(
regionEndKey,
HConstants.EMPTY_BYTE_ARRAY)
== 0
|| Bytes.compareTo(userEndRowkey, regionEndKey) < 0)
? userEndRowkey
: regionEndKey;

splits.add(new HbaseSourceSplit(i, splitStartKey, splitEndKey));
i++;
return new HashSet<>(splits);
}
return new HashSet<>(splits);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.hbase.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

public class HbaseParametersTest {

@Test
public void testBuildWithSourceConfigWithoutNamespace() {
Map<String, Object> configMap = new HashMap<>();
configMap.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(), "127.0.0.1:2181");
configMap.put(HbaseBaseOptions.TABLE.key(), "tbl");
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(configMap);

HbaseParameters parameters = HbaseParameters.buildWithSourceConfig(readonlyConfig);
Assertions.assertEquals(HbaseParameters.DEFAULT_NAMESPACE, parameters.getNamespace());
Assertions.assertEquals("tbl", parameters.getTable());
}

@Test
public void testBuildWithSourceConfigWithNamespace() {
Map<String, Object> configMap = new HashMap<>();
configMap.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(), "127.0.0.1:2181");
configMap.put(HbaseBaseOptions.TABLE.key(), "test:tbl");
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(configMap);

HbaseParameters parameters = HbaseParameters.buildWithSourceConfig(readonlyConfig);
Assertions.assertEquals("test", parameters.getNamespace());
Assertions.assertEquals("tbl", parameters.getTable());
}
}
Loading