Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/en/connectors/source/Hbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ Reads data from Apache Hbase.
| end_rowkey | string | No | - |
| start_row_inclusive | boolean | No | true |
| end_row_inclusive | boolean | No | false |
| start_timestamp | long | No | - |
| end_timestamp | long | No | - |
| common-options | | No | - |

### zookeeper_quorum [string]
Expand Down Expand Up @@ -92,6 +94,19 @@ Whether to include the end row in the scan range. When set to false, the end row
- **Both false (start_row_inclusive=false, end_row_inclusive=false)**: This may cause **data loss** at split boundaries, as the boundary rows will be excluded from all splits.
- **Both true (start_row_inclusive=true, end_row_inclusive=true)**: This may cause **duplicate data** at split boundaries, as the boundary rows will be included in multiple adjacent splits.

### start_timestamp

Start timestamp (inclusive) for scan time range. Unit: milliseconds since epoch. The time range follows [start, end). If only start_timestamp is set, the end is treated as open-ended.

### end_timestamp

End timestamp (exclusive) for scan time range. Unit: milliseconds since epoch. The time range follows [start, end). If only end_timestamp is set, the start is treated as open-ended.

**Notes:**

- `start_timestamp` / `end_timestamp` must be >= 0. If both are set, `start_timestamp` must be < `end_timestamp` (time range is [start, end), so `start_timestamp == end_timestamp` produces an empty scan).
- When `start_rowkey` / `end_rowkey` and `start_timestamp` / `end_timestamp` are configured together, both the rowkey range and the time range constraints are applied (intersection).

### common-options

Common parameters for Source plugins, refer to [Common Source Options](../common-options/source-common-options.md).
Expand All @@ -109,6 +124,8 @@ source {
is_binary_rowkey = false
start_rowkey = "B"
end_rowkey = "C"
start_timestamp = 1700000000000
end_timestamp = 1700003600000
schema = {
columns = [
{
Expand Down
17 changes: 17 additions & 0 deletions docs/zh/connectors/source/Hbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import ChangeLog from '../changelog/connector-hbase.md';
| end_rowkey | string | 否 | - |
| start_row_inclusive | boolean | 否 | true |
| end_row_inclusive | boolean | 否 | false |
| start_timestamp | long | 否 | - |
| end_timestamp | long | 否 | - |
| common-options | | 否 | - |

### zookeeper_quorum [string]
Expand Down Expand Up @@ -92,6 +94,19 @@ HBase 的行键既可以是文本字符串,也可以是二进制数据。在 S
- **都设置为 false (start_row_inclusive=false, end_row_inclusive=false)**: 这可能会导致**数据丢失**,因为边界行会被所有 split 排除在外。
- **都设置为 true (start_row_inclusive=true, end_row_inclusive=true)**: 这可能会导致**数据重复**,因为边界行会被相邻的多个 split 重复包含。

### start_timestamp

时间范围扫描的起始时间戳(包含)。单位为毫秒(epoch)。时间范围遵循 [start, end) 左闭右开约定。如果只设置 start_timestamp,则最大值视为无限上界。

### end_timestamp

时间范围扫描的结束时间戳(不包含)。单位为毫秒(epoch)。时间范围遵循 [start, end) 左闭右开约定。如果只设置 end_timestamp,则最小值视为无限下界。

**说明:**

- `start_timestamp` / `end_timestamp` 必须大于等于 0;若两者同时配置,需要满足 `start_timestamp < end_timestamp`(遵循 [start, end) 约定,`start_timestamp == end_timestamp` 将导致空扫描)。
- 当 `start_rowkey` / `end_rowkey` 与 `start_timestamp` / `end_timestamp` 同时配置时,会同时应用行键范围与时间范围限制,最终返回两者的交集。

### 常用选项

Source 插件常用参数,具体请参考 [Source 常用选项](../common-options/source-common-options.md)
Expand All @@ -109,6 +124,8 @@ source {
is_binary_rowkey = false
start_rowkey = "B"
end_rowkey = "C"
start_timestamp = 1700000000000
end_timestamp = 1700003600000
schema = {
columns = [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.hbase.client;

import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;

import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
Expand Down Expand Up @@ -367,7 +368,18 @@ public void mutate(Put put) throws IOException {
public ResultScanner scan(
HbaseSourceSplit split, HbaseParameters hbaseParameters, List<String> columnNames)
throws IOException {
Scan scan = buildScan(split, hbaseParameters, columnNames);
return this.connection
.getTable(TableName.valueOf(hbaseParameters.getTable()))
.getScanner(scan);
}

@VisibleForTesting
static Scan buildScan(
HbaseSourceSplit split, HbaseParameters hbaseParameters, List<String> columnNames)
throws IOException {
Scan scan = new Scan();
applyTimeRange(scan, hbaseParameters);
scan.withStartRow(split.getStartRow(), hbaseParameters.isStartRowInclusive());
scan.withStopRow(split.getEndRow(), hbaseParameters.isEndRowInclusive());
scan.setCacheBlocks(hbaseParameters.isCacheBlocks());
Expand All @@ -377,9 +389,30 @@ public ResultScanner scan(
String[] columnNameSplit = columnName.split(":");
scan.addColumn(Bytes.toBytes(columnNameSplit[0]), Bytes.toBytes(columnNameSplit[1]));
}
return this.connection
.getTable(TableName.valueOf(hbaseParameters.getTable()))
.getScanner(scan);
return scan;
}

private static void applyTimeRange(Scan scan, HbaseParameters hbaseParameters)
throws IOException {
Long startTimestamp = hbaseParameters.getStartTimestamp();
Long endTimestamp = hbaseParameters.getEndTimestamp();
if (startTimestamp == null && endTimestamp == null) {
return;
}

if (startTimestamp != null && startTimestamp < 0) {
throw new IllegalArgumentException("start_timestamp can't be negative");
}
if (endTimestamp != null && endTimestamp < 0) {
throw new IllegalArgumentException("end_timestamp can't be negative");
}

long min = startTimestamp == null ? 0L : startTimestamp;
long max = endTimestamp == null ? Long.MAX_VALUE : endTimestamp;
if (min >= max) {
throw new IllegalArgumentException("start_timestamp must be less than end_timestamp");
}
scan.setTimeRange(min, max);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public class HbaseParameters implements Serializable {

private String endRowkey;

private Long startTimestamp;

private Long endTimestamp;

private Map<String, String> familyNames;

private String versionColumn;
Expand Down Expand Up @@ -155,6 +159,13 @@ public static HbaseParameters buildWithSourceConfig(ReadonlyConfig pluginConfig)
if (pluginConfig.getOptional(HbaseSourceOptions.END_ROW_INCLUSIVE).isPresent()) {
builder.endRowInclusive(pluginConfig.get(HbaseSourceOptions.END_ROW_INCLUSIVE));
}

if (pluginConfig.getOptional(HbaseSourceOptions.START_TIMESTAMP).isPresent()) {
builder.startTimestamp(pluginConfig.get(HbaseSourceOptions.START_TIMESTAMP));
}
if (pluginConfig.getOptional(HbaseSourceOptions.END_TIMESTAMP).isPresent()) {
builder.endTimestamp(pluginConfig.get(HbaseSourceOptions.END_TIMESTAMP));
}
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,20 @@ public class HbaseSourceOptions extends HbaseBaseOptions {
.withDescription(
"Whether to include the end row in the scan. Default is false (exclusive), following the left-closed-right-open convention.");

public static final Option<Long> START_TIMESTAMP =
Options.key("start_timestamp")
.longType()
.noDefaultValue()
.withDescription(
"Start timestamp (inclusive) for scan time range in milliseconds since epoch.");

public static final Option<Long> END_TIMESTAMP =
Options.key("end_timestamp")
.longType()
.noDefaultValue()
.withDescription(
"End timestamp (exclusive) for scan time range in milliseconds since epoch.");

public static final Option<Boolean> IS_BINARY_ROW_KEY =
Options.key("is_binary_rowkey")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseBaseOptions;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
Expand All @@ -46,6 +47,18 @@ public OptionRule optionRule() {
return OptionRule.builder()
.required(HbaseSourceOptions.ZOOKEEPER_QUORUM)
.required(HbaseSourceOptions.TABLE)
.optional(
HbaseBaseOptions.HBASE_EXTRA_CONFIG,
HbaseSourceOptions.HBASE_CACHING_CONFIG,
HbaseSourceOptions.HBASE_BATCH_CONFIG,
HbaseSourceOptions.HBASE_CACHE_BLOCKS_CONFIG,
HbaseSourceOptions.IS_BINARY_ROW_KEY,
HbaseSourceOptions.START_ROW_KEY,
HbaseSourceOptions.END_ROW_KEY,
HbaseSourceOptions.START_ROW_INCLUSIVE,
HbaseSourceOptions.END_ROW_INCLUSIVE,
HbaseSourceOptions.START_TIMESTAMP,
HbaseSourceOptions.END_TIMESTAMP)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.hbase.client;

import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
import org.apache.seatunnel.connectors.seatunnel.hbase.source.HbaseSourceSplit;

import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;

import org.junit.jupiter.api.Test;

import java.util.Arrays;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class HbaseClientTest {

@Test
void testBuildScanWithTimeRange() throws Exception {
HbaseParameters hbaseParameters =
HbaseParameters.builder().startTimestamp(1000L).endTimestamp(3000L).build();
HbaseSourceSplit split = new HbaseSourceSplit(0, Bytes.toBytes("a"), Bytes.toBytes("b"));

Scan scan = HbaseClient.buildScan(split, hbaseParameters, Arrays.asList("info:score"));

TimeRange timeRange = scan.getTimeRange();
assertEquals(1000L, timeRange.getMin());
assertEquals(3000L, timeRange.getMax());
}

@Test
void testBuildScanWithOnlyStartTimestamp() throws Exception {
HbaseParameters hbaseParameters = HbaseParameters.builder().startTimestamp(1000L).build();
HbaseSourceSplit split = new HbaseSourceSplit(0, Bytes.toBytes("a"), Bytes.toBytes("b"));

Scan scan = HbaseClient.buildScan(split, hbaseParameters, Arrays.asList("info:score"));

TimeRange timeRange = scan.getTimeRange();
assertEquals(1000L, timeRange.getMin());
assertEquals(Long.MAX_VALUE, timeRange.getMax());
}

@Test
void testBuildScanWithOnlyEndTimestamp() throws Exception {
HbaseParameters hbaseParameters = HbaseParameters.builder().endTimestamp(2000L).build();
HbaseSourceSplit split = new HbaseSourceSplit(0, Bytes.toBytes("a"), Bytes.toBytes("b"));

Scan scan = HbaseClient.buildScan(split, hbaseParameters, Arrays.asList("info:score"));

TimeRange timeRange = scan.getTimeRange();
assertEquals(0L, timeRange.getMin());
assertEquals(2000L, timeRange.getMax());
}

@Test
void testBuildScanWithInvalidTimeRange() {
HbaseParameters hbaseParameters =
HbaseParameters.builder().startTimestamp(3000L).endTimestamp(1000L).build();
HbaseSourceSplit split = new HbaseSourceSplit(0, Bytes.toBytes("a"), Bytes.toBytes("b"));

assertThrows(
IllegalArgumentException.class,
() -> HbaseClient.buildScan(split, hbaseParameters, Arrays.asList("info:score")));
}

@Test
void testBuildScanWithNegativeMinTimestamp() {
HbaseParameters hbaseParameters =
HbaseParameters.builder().startTimestamp(-1L).endTimestamp(1000L).build();
HbaseSourceSplit split = new HbaseSourceSplit(0, Bytes.toBytes("a"), Bytes.toBytes("b"));

assertThrows(
IllegalArgumentException.class,
() -> HbaseClient.buildScan(split, hbaseParameters, Arrays.asList("info:score")));
}

@Test
void testBuildScanWithNegativeMaxTimestamp() {
HbaseParameters hbaseParameters = HbaseParameters.builder().endTimestamp(-1L).build();
HbaseSourceSplit split = new HbaseSourceSplit(0, Bytes.toBytes("a"), Bytes.toBytes("b"));

assertThrows(
IllegalArgumentException.class,
() -> HbaseClient.buildScan(split, hbaseParameters, Arrays.asList("info:score")));
}

@Test
void testBuildScanWithEqualTimeRange() {
HbaseParameters hbaseParameters =
HbaseParameters.builder().startTimestamp(1000L).endTimestamp(1000L).build();
HbaseSourceSplit split = new HbaseSourceSplit(0, Bytes.toBytes("a"), Bytes.toBytes("b"));

assertThrows(
IllegalArgumentException.class,
() -> HbaseClient.buildScan(split, hbaseParameters, Arrays.asList("info:score")));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.hbase.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;

import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class HbaseParametersTest {

@Test
void testBuildWithSourceConfigReadsTimeRange() {
Map<String, Object> config = new HashMap<>();
config.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(), "127.0.0.1:2181");
config.put(HbaseBaseOptions.TABLE.key(), "test_table");
config.put(HbaseSourceOptions.START_TIMESTAMP.key(), 1000L);
config.put(HbaseSourceOptions.END_TIMESTAMP.key(), 2000L);

HbaseParameters parameters =
HbaseParameters.buildWithSourceConfig(ReadonlyConfig.fromMap(config));

assertEquals(1000L, parameters.getStartTimestamp());
assertEquals(2000L, parameters.getEndTimestamp());
}
}
Loading