Skip to content

Commit 402291d

Browse files
yzeng1618zengyi
andauthored
[Feature] [Connector-V2][HBase] Support time-range scan with min/max timestamp in HBaseSource (#10318)
Co-authored-by: zengyi <zengyi@chinatelecom.cn>
1 parent c2316a2 commit 402291d

File tree

10 files changed

+350
-4
lines changed

10 files changed

+350
-4
lines changed

docs/en/connectors/source/Hbase.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ Reads data from Apache Hbase.
3333
| end_rowkey | string | No | - |
3434
| start_row_inclusive | boolean | No | true |
3535
| end_row_inclusive | boolean | No | false |
36+
| start_timestamp | long | No | - |
37+
| end_timestamp | long | No | - |
3638
| common-options | | No | - |
3739

3840
### zookeeper_quorum [string]
@@ -92,6 +94,19 @@ Whether to include the end row in the scan range. When set to false, the end row
9294
- **Both false (start_row_inclusive=false, end_row_inclusive=false)**: This may cause **data loss** at split boundaries, as the boundary rows will be excluded from all splits.
9395
- **Both true (start_row_inclusive=true, end_row_inclusive=true)**: This may cause **duplicate data** at split boundaries, as the boundary rows will be included in multiple adjacent splits.
9496

97+
### start_timestamp
98+
99+
Start timestamp (inclusive) for scan time range. Unit: milliseconds since epoch. The time range follows [start, end). If only start_timestamp is set, the end is treated as open-ended.
100+
101+
### end_timestamp
102+
103+
End timestamp (exclusive) for scan time range. Unit: milliseconds since epoch. The time range follows [start, end). If only end_timestamp is set, the start is treated as open-ended.
104+
105+
**Notes:**
106+
107+
- `start_timestamp` / `end_timestamp` must be >= 0. If both are set, `start_timestamp` must be < `end_timestamp` (time range is [start, end), so `start_timestamp == end_timestamp` produces an empty scan).
108+
- When `start_rowkey` / `end_rowkey` and `start_timestamp` / `end_timestamp` are configured together, both the rowkey range and the time range constraints are applied (intersection).
109+
95110
### common-options
96111

97112
Common parameters for Source plugins, refer to [Common Source Options](../common-options/source-common-options.md).
@@ -109,6 +124,8 @@ source {
109124
is_binary_rowkey = false
110125
start_rowkey = "B"
111126
end_rowkey = "C"
127+
start_timestamp = 1700000000000
128+
end_timestamp = 1700003600000
112129
schema = {
113130
columns = [
114131
{

docs/zh/connectors/source/Hbase.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ import ChangeLog from '../changelog/connector-hbase.md';
3333
| end_rowkey | string || - |
3434
| start_row_inclusive | boolean || true |
3535
| end_row_inclusive | boolean || false |
36+
| start_timestamp | long || - |
37+
| end_timestamp | long || - |
3638
| common-options | || - |
3739

3840
### zookeeper_quorum [string]
@@ -92,6 +94,19 @@ HBase 的行键既可以是文本字符串,也可以是二进制数据。在 S
9294
- **都设置为 false (start_row_inclusive=false, end_row_inclusive=false)**: 这可能会导致**数据丢失**,因为边界行会被所有 split 排除在外。
9395
- **都设置为 true (start_row_inclusive=true, end_row_inclusive=true)**: 这可能会导致**数据重复**,因为边界行会被相邻的多个 split 重复包含。
9496

97+
### start_timestamp
98+
99+
时间范围扫描的起始时间戳(包含)。单位为毫秒(epoch)。时间范围遵循 [start, end) 左闭右开约定。如果只设置 start_timestamp,则最大值视为无限上界。
100+
101+
### end_timestamp
102+
103+
时间范围扫描的结束时间戳(不包含)。单位为毫秒(epoch)。时间范围遵循 [start, end) 左闭右开约定。如果只设置 end_timestamp,则最小值视为无限下界。
104+
105+
**说明:**
106+
107+
- `start_timestamp` / `end_timestamp` 必须大于等于 0;若两者同时配置,需要满足 `start_timestamp < end_timestamp`(遵循 [start, end) 约定,`start_timestamp == end_timestamp` 将导致空扫描)。
108+
-`start_rowkey` / `end_rowkey``start_timestamp` / `end_timestamp` 同时配置时,会同时应用行键范围与时间范围限制,最终返回两者的交集。
109+
95110
### 常用选项
96111

97112
Source 插件常用参数,具体请参考 [Source 常用选项](../common-options/source-common-options.md)
@@ -109,6 +124,8 @@ source {
109124
is_binary_rowkey = false
110125
start_rowkey = "B"
111126
end_rowkey = "C"
127+
start_timestamp = 1700000000000
128+
end_timestamp = 1700003600000
112129
schema = {
113130
columns = [
114131
{

seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.hbase.client;
1919

20+
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
2021
import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
2122

2223
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
@@ -365,7 +366,18 @@ public void mutate(Put put) throws IOException {
365366
public ResultScanner scan(
366367
HbaseSourceSplit split, HbaseParameters hbaseParameters, List<String> columnNames)
367368
throws IOException {
369+
Scan scan = buildScan(split, hbaseParameters, columnNames);
370+
return this.connection
371+
.getTable(TableName.valueOf(hbaseParameters.getTable()))
372+
.getScanner(scan);
373+
}
374+
375+
@VisibleForTesting
376+
static Scan buildScan(
377+
HbaseSourceSplit split, HbaseParameters hbaseParameters, List<String> columnNames)
378+
throws IOException {
368379
Scan scan = new Scan();
380+
applyTimeRange(scan, hbaseParameters);
369381
scan.withStartRow(split.getStartRow(), hbaseParameters.isStartRowInclusive());
370382
scan.withStopRow(split.getEndRow(), hbaseParameters.isEndRowInclusive());
371383
scan.setCacheBlocks(hbaseParameters.isCacheBlocks());
@@ -375,9 +387,30 @@ public ResultScanner scan(
375387
String[] columnNameSplit = columnName.split(":");
376388
scan.addColumn(Bytes.toBytes(columnNameSplit[0]), Bytes.toBytes(columnNameSplit[1]));
377389
}
378-
return this.connection
379-
.getTable(TableName.valueOf(hbaseParameters.getTable()))
380-
.getScanner(scan);
390+
return scan;
391+
}
392+
393+
private static void applyTimeRange(Scan scan, HbaseParameters hbaseParameters)
394+
throws IOException {
395+
Long startTimestamp = hbaseParameters.getStartTimestamp();
396+
Long endTimestamp = hbaseParameters.getEndTimestamp();
397+
if (startTimestamp == null && endTimestamp == null) {
398+
return;
399+
}
400+
401+
if (startTimestamp != null && startTimestamp < 0) {
402+
throw new IllegalArgumentException("start_timestamp can't be negative");
403+
}
404+
if (endTimestamp != null && endTimestamp < 0) {
405+
throw new IllegalArgumentException("end_timestamp can't be negative");
406+
}
407+
408+
long min = startTimestamp == null ? 0L : startTimestamp;
409+
long max = endTimestamp == null ? Long.MAX_VALUE : endTimestamp;
410+
if (min >= max) {
411+
throw new IllegalArgumentException("start_timestamp must be less than end_timestamp");
412+
}
413+
scan.setTimeRange(min, max);
381414
}
382415

383416
/**

seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ public class HbaseParameters implements Serializable {
4646

4747
private String endRowkey;
4848

49+
private Long startTimestamp;
50+
51+
private Long endTimestamp;
52+
4953
private Map<String, String> familyNames;
5054

5155
private String versionColumn;
@@ -155,6 +159,13 @@ public static HbaseParameters buildWithSourceConfig(ReadonlyConfig pluginConfig)
155159
if (pluginConfig.getOptional(HbaseSourceOptions.END_ROW_INCLUSIVE).isPresent()) {
156160
builder.endRowInclusive(pluginConfig.get(HbaseSourceOptions.END_ROW_INCLUSIVE));
157161
}
162+
163+
if (pluginConfig.getOptional(HbaseSourceOptions.START_TIMESTAMP).isPresent()) {
164+
builder.startTimestamp(pluginConfig.get(HbaseSourceOptions.START_TIMESTAMP));
165+
}
166+
if (pluginConfig.getOptional(HbaseSourceOptions.END_TIMESTAMP).isPresent()) {
167+
builder.endTimestamp(pluginConfig.get(HbaseSourceOptions.END_TIMESTAMP));
168+
}
158169
return builder.build();
159170
}
160171
}

seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSourceOptions.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,20 @@ public class HbaseSourceOptions extends HbaseBaseOptions {
4848
.withDescription(
4949
"Whether to include the end row in the scan. Default is false (exclusive), following the left-closed-right-open convention.");
5050

51+
public static final Option<Long> START_TIMESTAMP =
52+
Options.key("start_timestamp")
53+
.longType()
54+
.noDefaultValue()
55+
.withDescription(
56+
"Start timestamp (inclusive) for scan time range in milliseconds since epoch.");
57+
58+
public static final Option<Long> END_TIMESTAMP =
59+
Options.key("end_timestamp")
60+
.longType()
61+
.noDefaultValue()
62+
.withDescription(
63+
"End timestamp (exclusive) for scan time range in milliseconds since epoch.");
64+
5165
public static final Option<Boolean> IS_BINARY_ROW_KEY =
5266
Options.key("is_binary_rowkey")
5367
.booleanType()

seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.seatunnel.api.table.factory.Factory;
2727
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
2828
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
29+
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseBaseOptions;
2930
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
3031
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseSourceOptions;
3132
import org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
@@ -46,6 +47,18 @@ public OptionRule optionRule() {
4647
return OptionRule.builder()
4748
.required(HbaseSourceOptions.ZOOKEEPER_QUORUM)
4849
.required(HbaseSourceOptions.TABLE)
50+
.optional(
51+
HbaseBaseOptions.HBASE_EXTRA_CONFIG,
52+
HbaseSourceOptions.HBASE_CACHING_CONFIG,
53+
HbaseSourceOptions.HBASE_BATCH_CONFIG,
54+
HbaseSourceOptions.HBASE_CACHE_BLOCKS_CONFIG,
55+
HbaseSourceOptions.IS_BINARY_ROW_KEY,
56+
HbaseSourceOptions.START_ROW_KEY,
57+
HbaseSourceOptions.END_ROW_KEY,
58+
HbaseSourceOptions.START_ROW_INCLUSIVE,
59+
HbaseSourceOptions.END_ROW_INCLUSIVE,
60+
HbaseSourceOptions.START_TIMESTAMP,
61+
HbaseSourceOptions.END_TIMESTAMP)
4962
.build();
5063
}
5164

seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClientTest.java

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.connectors.seatunnel.hbase.client;
1919

2020
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
21+
import org.apache.seatunnel.connectors.seatunnel.hbase.source.HbaseSourceSplit;
2122

2223
import org.apache.hadoop.hbase.HBaseConfiguration;
2324
import org.apache.hadoop.hbase.TableName;
@@ -29,17 +30,22 @@
2930
import org.apache.hadoop.hbase.client.ResultScanner;
3031
import org.apache.hadoop.hbase.client.Scan;
3132
import org.apache.hadoop.hbase.client.Table;
33+
import org.apache.hadoop.hbase.io.TimeRange;
34+
import org.apache.hadoop.hbase.util.Bytes;
3235

3336
import org.junit.jupiter.api.Test;
3437
import org.mockito.Mockito;
3538

3639
import java.lang.reflect.Constructor;
40+
import java.util.Arrays;
3741

42+
import static org.junit.jupiter.api.Assertions.assertEquals;
3843
import static org.junit.jupiter.api.Assertions.assertFalse;
44+
import static org.junit.jupiter.api.Assertions.assertThrows;
3945
import static org.junit.jupiter.api.Assertions.assertTrue;
4046
import static org.mockito.ArgumentMatchers.any;
4147

42-
class HbaseClientTest {
48+
public class HbaseClientTest {
4349

4450
@Test
4551
void testIsExistsDataReturnsFalseWhenScannerNextReturnsNull() throws Exception {
@@ -86,4 +92,84 @@ private HbaseClient newHbaseClient(Connection connection) throws Exception {
8692
constructor.setAccessible(true);
8793
return constructor.newInstance(connection, hbaseParameters);
8894
}
95+
96+
@Test
97+
void testBuildScanWithTimeRange() throws Exception {
98+
HbaseParameters hbaseParameters =
99+
HbaseParameters.builder().startTimestamp(1000L).endTimestamp(3000L).build();
100+
HbaseSourceSplit split = new HbaseSourceSplit(0, Bytes.toBytes("a"), Bytes.toBytes("b"));
101+
102+
Scan scan = HbaseClient.buildScan(split, hbaseParameters, Arrays.asList("info:score"));
103+
104+
TimeRange timeRange = scan.getTimeRange();
105+
assertEquals(1000L, timeRange.getMin());
106+
assertEquals(3000L, timeRange.getMax());
107+
}
108+
109+
@Test
110+
void testBuildScanWithOnlyStartTimestamp() throws Exception {
111+
HbaseParameters hbaseParameters = HbaseParameters.builder().startTimestamp(1000L).build();
112+
HbaseSourceSplit split = new HbaseSourceSplit(0, Bytes.toBytes("a"), Bytes.toBytes("b"));
113+
114+
Scan scan = HbaseClient.buildScan(split, hbaseParameters, Arrays.asList("info:score"));
115+
116+
TimeRange timeRange = scan.getTimeRange();
117+
assertEquals(1000L, timeRange.getMin());
118+
assertEquals(Long.MAX_VALUE, timeRange.getMax());
119+
}
120+
121+
@Test
122+
void testBuildScanWithOnlyEndTimestamp() throws Exception {
123+
HbaseParameters hbaseParameters = HbaseParameters.builder().endTimestamp(2000L).build();
124+
HbaseSourceSplit split = new HbaseSourceSplit(0, Bytes.toBytes("a"), Bytes.toBytes("b"));
125+
126+
Scan scan = HbaseClient.buildScan(split, hbaseParameters, Arrays.asList("info:score"));
127+
128+
TimeRange timeRange = scan.getTimeRange();
129+
assertEquals(0L, timeRange.getMin());
130+
assertEquals(2000L, timeRange.getMax());
131+
}
132+
133+
@Test
134+
void testBuildScanWithInvalidTimeRange() {
135+
HbaseParameters hbaseParameters =
136+
HbaseParameters.builder().startTimestamp(3000L).endTimestamp(1000L).build();
137+
HbaseSourceSplit split = new HbaseSourceSplit(0, Bytes.toBytes("a"), Bytes.toBytes("b"));
138+
139+
assertThrows(
140+
IllegalArgumentException.class,
141+
() -> HbaseClient.buildScan(split, hbaseParameters, Arrays.asList("info:score")));
142+
}
143+
144+
@Test
145+
void testBuildScanWithNegativeMinTimestamp() {
146+
HbaseParameters hbaseParameters =
147+
HbaseParameters.builder().startTimestamp(-1L).endTimestamp(1000L).build();
148+
HbaseSourceSplit split = new HbaseSourceSplit(0, Bytes.toBytes("a"), Bytes.toBytes("b"));
149+
150+
assertThrows(
151+
IllegalArgumentException.class,
152+
() -> HbaseClient.buildScan(split, hbaseParameters, Arrays.asList("info:score")));
153+
}
154+
155+
@Test
156+
void testBuildScanWithNegativeMaxTimestamp() {
157+
HbaseParameters hbaseParameters = HbaseParameters.builder().endTimestamp(-1L).build();
158+
HbaseSourceSplit split = new HbaseSourceSplit(0, Bytes.toBytes("a"), Bytes.toBytes("b"));
159+
160+
assertThrows(
161+
IllegalArgumentException.class,
162+
() -> HbaseClient.buildScan(split, hbaseParameters, Arrays.asList("info:score")));
163+
}
164+
165+
@Test
166+
void testBuildScanWithEqualTimeRange() {
167+
HbaseParameters hbaseParameters =
168+
HbaseParameters.builder().startTimestamp(1000L).endTimestamp(1000L).build();
169+
HbaseSourceSplit split = new HbaseSourceSplit(0, Bytes.toBytes("a"), Bytes.toBytes("b"));
170+
171+
assertThrows(
172+
IllegalArgumentException.class,
173+
() -> HbaseClient.buildScan(split, hbaseParameters, Arrays.asList("info:score")));
174+
}
89175
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.hbase.config;
19+
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
22+
import org.junit.jupiter.api.Test;
23+
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
27+
import static org.junit.jupiter.api.Assertions.assertEquals;
28+
29+
public class HbaseParametersTest {
30+
31+
@Test
32+
void testBuildWithSourceConfigReadsTimeRange() {
33+
Map<String, Object> config = new HashMap<>();
34+
config.put(HbaseBaseOptions.ZOOKEEPER_QUORUM.key(), "127.0.0.1:2181");
35+
config.put(HbaseBaseOptions.TABLE.key(), "test_table");
36+
config.put(HbaseSourceOptions.START_TIMESTAMP.key(), 1000L);
37+
config.put(HbaseSourceOptions.END_TIMESTAMP.key(), 2000L);
38+
39+
HbaseParameters parameters =
40+
HbaseParameters.buildWithSourceConfig(ReadonlyConfig.fromMap(config));
41+
42+
assertEquals(1000L, parameters.getStartTimestamp());
43+
assertEquals(2000L, parameters.getEndTimestamp());
44+
}
45+
}

0 commit comments

Comments
 (0)