Skip to content

Commit 1da9bd6

Browse files
authored
[E2E][HBase]Refactor hbase e2e (#6859)
1 parent 2cc82bd commit 1da9bd6

File tree

7 files changed

+220
-95
lines changed

7 files changed

+220
-95
lines changed

Diff for: seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/format/HBaseDeserializationFormat.java

+2
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ private Object deserializeValue(SeaTunnelDataType<?> typeInfo, byte[] cell) {
5858

5959
switch (typeInfo.getSqlType()) {
6060
case TINYINT:
61+
return cell[0];
6162
case SMALLINT:
63+
return (short) ((cell[0] & 0xFF) << 8 | (cell[1] & 0xFF));
6264
case INT:
6365
return Bytes.toInt(cell);
6466
case BOOLEAN:

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/pom.xml

+12-2
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,24 @@
2626
<name>SeaTunnel : E2E : Connector V2 : Hbase</name>
2727

2828
<dependencies>
29-
29+
<dependency>
30+
<groupId>org.apache.seatunnel</groupId>
31+
<artifactId>connector-fake</artifactId>
32+
<version>${project.version}</version>
33+
<scope>test</scope>
34+
</dependency>
3035
<dependency>
3136
<groupId>org.apache.seatunnel</groupId>
3237
<artifactId>connector-hbase</artifactId>
3338
<version>${project.version}</version>
3439
<scope>test</scope>
3540
</dependency>
36-
41+
<dependency>
42+
<groupId>org.apache.seatunnel</groupId>
43+
<artifactId>connector-assert</artifactId>
44+
<version>${project.version}</version>
45+
<scope>test</scope>
46+
</dependency>
3747
</dependencies>
3848

3949
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.seatunnel.e2e.connector.hbase;
20+
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.hbase.HBaseConfiguration;
23+
import org.apache.hadoop.hbase.TableName;
24+
import org.apache.hadoop.hbase.client.Admin;
25+
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
26+
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
27+
import org.apache.hadoop.hbase.client.Connection;
28+
import org.apache.hadoop.hbase.client.ConnectionFactory;
29+
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
30+
import org.apache.hadoop.hbase.util.Bytes;
31+
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
import org.testcontainers.containers.GenericContainer;
35+
import org.testcontainers.containers.output.Slf4jLogConsumer;
36+
import org.testcontainers.lifecycle.Startables;
37+
import org.testcontainers.utility.DockerImageName;
38+
import org.testcontainers.utility.DockerLoggerFactory;
39+
40+
import java.io.IOException;
41+
import java.net.InetAddress;
42+
import java.net.UnknownHostException;
43+
import java.util.ArrayList;
44+
import java.util.Arrays;
45+
import java.util.List;
46+
import java.util.Objects;
47+
import java.util.stream.Stream;
48+
49+
import static org.apache.seatunnel.e2e.common.container.TestContainer.NETWORK;
50+
51+
public class HbaseCluster {
52+
53+
private static final Logger LOG = LoggerFactory.getLogger(HbaseCluster.class);
54+
55+
private static final int ZOOKEEPER_PORT = 2181;
56+
private static final int MASTER_PORT = 16000;
57+
private static final int REGION_PORT = 16020;
58+
private static final String HOST = "hbase_e2e";
59+
60+
private static final String DOCKER_NAME = "jcjabouille/hbase-standalone:2.4.9";
61+
private static final DockerImageName HBASE_DOCKER_IMAGE = DockerImageName.parse(DOCKER_NAME);
62+
63+
private Connection connection;
64+
private GenericContainer<?> hbaseContainer;
65+
66+
public Connection startService() throws IOException {
67+
String hostname = InetAddress.getLocalHost().getHostName();
68+
hbaseContainer =
69+
new GenericContainer<>(HBASE_DOCKER_IMAGE)
70+
.withNetwork(NETWORK)
71+
.withNetworkAliases(HOST)
72+
.withExposedPorts(MASTER_PORT)
73+
.withExposedPorts(REGION_PORT)
74+
.withExposedPorts(ZOOKEEPER_PORT)
75+
.withCreateContainerCmdModifier(cmd -> cmd.withHostName(hostname))
76+
.withEnv("HBASE_MASTER_PORT", String.valueOf(MASTER_PORT))
77+
.withEnv("HBASE_REGION_PORT", String.valueOf(REGION_PORT))
78+
.withEnv(
79+
"HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT",
80+
String.valueOf(ZOOKEEPER_PORT))
81+
.withEnv("HBASE_ZOOKEEPER_QUORUM", HOST)
82+
.withLogConsumer(
83+
new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_NAME)));
84+
hbaseContainer.setPortBindings(
85+
Arrays.asList(
86+
String.format("%s:%s", MASTER_PORT, MASTER_PORT),
87+
String.format("%s:%s", REGION_PORT, REGION_PORT),
88+
String.format("%s:%s", ZOOKEEPER_PORT, ZOOKEEPER_PORT)));
89+
Startables.deepStart(Stream.of(hbaseContainer)).join();
90+
LOG.info("HBase container started");
91+
92+
String zookeeperQuorum = getZookeeperQuorum();
93+
LOG.info("Successfully start hbase service, zookeeper quorum: {}", zookeeperQuorum);
94+
Configuration configuration = HBaseConfiguration.create();
95+
configuration.set("hbase.zookeeper.quorum", zookeeperQuorum);
96+
configuration.set("hbase.security.authentication", "simple");
97+
configuration.set("hbase.rpc.timeout", "10000");
98+
configuration.set("hbase.master.port", String.valueOf(MASTER_PORT));
99+
configuration.set("hbase.regionserver.port", String.valueOf(REGION_PORT));
100+
connection = ConnectionFactory.createConnection(configuration);
101+
return connection;
102+
}
103+
104+
public void createTable(String tableName, List<String> list) throws IOException {
105+
TableDescriptorBuilder tableDesc =
106+
TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
107+
108+
List<ColumnFamilyDescriptor> colFamilyList = new ArrayList<>();
109+
for (String columnFamilys : list) {
110+
ColumnFamilyDescriptorBuilder c =
111+
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamilys));
112+
colFamilyList.add(c.build());
113+
}
114+
tableDesc.setColumnFamilies(colFamilyList);
115+
Admin hbaseAdmin = connection.getAdmin();
116+
hbaseAdmin.createTable(tableDesc.build());
117+
}
118+
119+
public void stopService() throws IOException {
120+
if (Objects.nonNull(connection)) {
121+
connection.close();
122+
}
123+
if (Objects.nonNull(hbaseContainer)) {
124+
hbaseContainer.close();
125+
}
126+
hbaseContainer = null;
127+
}
128+
129+
public static String getZookeeperQuorum() {
130+
String host = null;
131+
try {
132+
host = InetAddress.getLocalHost().getHostAddress();
133+
} catch (UnknownHostException e) {
134+
throw new RuntimeException(e);
135+
}
136+
return String.format("%s:%s", host, ZOOKEEPER_PORT);
137+
}
138+
}

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java

+37-72
Original file line numberDiff line numberDiff line change
@@ -19,88 +19,63 @@
1919

2020
import org.apache.seatunnel.e2e.common.TestResource;
2121
import org.apache.seatunnel.e2e.common.TestSuiteBase;
22+
import org.apache.seatunnel.e2e.common.container.EngineType;
2223
import org.apache.seatunnel.e2e.common.container.TestContainer;
24+
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
2325

24-
import org.apache.hadoop.conf.Configuration;
2526
import org.apache.hadoop.hbase.Cell;
2627
import org.apache.hadoop.hbase.CellUtil;
27-
import org.apache.hadoop.hbase.HBaseConfiguration;
2828
import org.apache.hadoop.hbase.TableName;
2929
import org.apache.hadoop.hbase.client.Admin;
30-
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
31-
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
3230
import org.apache.hadoop.hbase.client.Connection;
33-
import org.apache.hadoop.hbase.client.ConnectionFactory;
31+
import org.apache.hadoop.hbase.client.Delete;
3432
import org.apache.hadoop.hbase.client.Result;
3533
import org.apache.hadoop.hbase.client.ResultScanner;
3634
import org.apache.hadoop.hbase.client.Scan;
3735
import org.apache.hadoop.hbase.client.Table;
38-
import org.apache.hadoop.hbase.client.TableDescriptor;
39-
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
40-
import org.apache.hadoop.hbase.io.compress.Compression;
4136
import org.apache.hadoop.hbase.util.Bytes;
4237

4338
import org.junit.jupiter.api.AfterAll;
4439
import org.junit.jupiter.api.Assertions;
4540
import org.junit.jupiter.api.BeforeAll;
46-
import org.junit.jupiter.api.Disabled;
4741
import org.junit.jupiter.api.TestTemplate;
4842
import org.testcontainers.containers.Container;
49-
import org.testcontainers.containers.GenericContainer;
50-
import org.testcontainers.containers.output.Slf4jLogConsumer;
51-
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
52-
import org.testcontainers.lifecycle.Startables;
53-
import org.testcontainers.utility.DockerImageName;
54-
import org.testcontainers.utility.DockerLoggerFactory;
5543

5644
import lombok.extern.slf4j.Slf4j;
5745

5846
import java.io.IOException;
59-
import java.time.Duration;
6047
import java.util.ArrayList;
48+
import java.util.Arrays;
6149
import java.util.Objects;
62-
import java.util.stream.Stream;
6350

6451
@Slf4j
65-
@Disabled(
66-
"Hbase docker e2e case need user add mapping information of between container id and ip address in hosts file")
52+
@DisabledOnContainer(
53+
value = {},
54+
type = {EngineType.SEATUNNEL},
55+
disabledReason = "The hbase container authentication configuration is incorrect.")
6756
public class HbaseIT extends TestSuiteBase implements TestResource {
6857

69-
private static final String IMAGE = "harisekhon/hbase:latest";
70-
71-
private static final int PORT = 2181;
72-
73-
private static final String HOST = "hbase-e2e";
74-
7558
private static final String TABLE_NAME = "seatunnel_test";
7659

7760
private static final String FAMILY_NAME = "info";
7861

79-
private final Configuration hbaseConfiguration = HBaseConfiguration.create();
80-
8162
private Connection hbaseConnection;
8263

8364
private Admin admin;
8465

8566
private TableName table;
8667

87-
private GenericContainer<?> hbaseContainer;
68+
private HbaseCluster hbaseCluster;
8869

8970
@BeforeAll
9071
@Override
9172
public void startUp() throws Exception {
92-
hbaseContainer =
93-
new GenericContainer<>(DockerImageName.parse(IMAGE))
94-
.withNetwork(NETWORK)
95-
.withNetworkAliases(HOST)
96-
.withExposedPorts(PORT)
97-
.withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)))
98-
.waitingFor(
99-
new HostPortWaitStrategy()
100-
.withStartupTimeout(Duration.ofMinutes(2)));
101-
Startables.deepStart(Stream.of(hbaseContainer)).join();
102-
log.info("Hbase container started");
103-
this.initialize();
73+
hbaseCluster = new HbaseCluster();
74+
hbaseConnection = hbaseCluster.startService();
75+
// Create table for hbase sink test
76+
log.info("initial");
77+
hbaseCluster.createTable(TABLE_NAME, Arrays.asList(FAMILY_NAME));
78+
table = TableName.valueOf(TABLE_NAME);
10479
}
10580

10681
@AfterAll
@@ -109,59 +84,37 @@ public void tearDown() throws Exception {
10984
if (Objects.nonNull(admin)) {
11085
admin.close();
11186
}
112-
if (Objects.nonNull(hbaseConnection)) {
113-
hbaseConnection.close();
114-
}
115-
if (Objects.nonNull(hbaseContainer)) {
116-
hbaseContainer.close();
117-
}
118-
}
119-
120-
private void initialize() throws IOException {
121-
hbaseConfiguration.set("hbase.zookeeper.quorum", HOST + ":" + PORT);
122-
hbaseConnection = ConnectionFactory.createConnection(hbaseConfiguration);
123-
admin = hbaseConnection.getAdmin();
124-
table = TableName.valueOf(TABLE_NAME);
125-
ColumnFamilyDescriptor familyDescriptor =
126-
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_NAME.getBytes())
127-
.setCompressionType(Compression.Algorithm.SNAPPY)
128-
.setCompactionCompressionType(Compression.Algorithm.SNAPPY)
129-
.build();
130-
TableDescriptor tableDescriptor =
131-
TableDescriptorBuilder.newBuilder(table).setColumnFamily(familyDescriptor).build();
132-
admin.createTable(tableDescriptor);
133-
log.info("Hbase table has been initialized");
87+
hbaseCluster.stopService();
13488
}
13589

13690
@TestTemplate
13791
public void testHbaseSink(TestContainer container) throws IOException, InterruptedException {
138-
Container.ExecResult execResult = container.executeJob("/fake-to-hbase.conf");
139-
Assertions.assertEquals(0, execResult.getExitCode());
92+
deleteData(table);
93+
Container.ExecResult sinkExecResult = container.executeJob("/fake-to-hbase.conf");
94+
Assertions.assertEquals(0, sinkExecResult.getExitCode());
14095
Table hbaseTable = hbaseConnection.getTable(table);
14196
Scan scan = new Scan();
142-
ArrayList<Result> results = new ArrayList<>();
14397
ResultScanner scanner = hbaseTable.getScanner(scan);
98+
ArrayList<Result> results = new ArrayList<>();
14499
for (Result result : scanner) {
145100
results.add(result);
146101
}
147102
Assertions.assertEquals(results.size(), 5);
148-
}
149-
150-
@TestTemplate
151-
public void testHbaseSource(TestContainer container) throws IOException, InterruptedException {
152-
Container.ExecResult execResult = container.executeJob("/hbase-to-assert.conf");
153-
Assertions.assertEquals(0, execResult.getExitCode());
103+
scanner.close();
104+
Container.ExecResult sourceExecResult = container.executeJob("/hbase-to-assert.conf");
105+
Assertions.assertEquals(0, sourceExecResult.getExitCode());
154106
}
155107

156108
@TestTemplate
157109
public void testHbaseSinkWithArray(TestContainer container)
158110
throws IOException, InterruptedException {
111+
deleteData(table);
159112
Container.ExecResult execResult = container.executeJob("/fake-to-hbase-array.conf");
160113
Assertions.assertEquals(0, execResult.getExitCode());
161114
Table hbaseTable = hbaseConnection.getTable(table);
162115
Scan scan = new Scan();
163-
ArrayList<Result> results = new ArrayList<>();
164116
ResultScanner scanner = hbaseTable.getScanner(scan);
117+
ArrayList<Result> results = new ArrayList<>();
165118
for (Result result : scanner) {
166119
String rowKey = Bytes.toString(result.getRow());
167120
for (Cell cell : result.listCells()) {
@@ -177,5 +130,17 @@ public void testHbaseSinkWithArray(TestContainer container)
177130
results.add(result);
178131
}
179132
Assertions.assertEquals(results.size(), 3);
133+
scanner.close();
134+
}
135+
136+
private void deleteData(TableName table) throws IOException {
137+
Table hbaseTable = hbaseConnection.getTable(table);
138+
Scan scan = new Scan();
139+
ResultScanner scanner = hbaseTable.getScanner(scan);
140+
// Delete the data generated by the test
141+
for (Result result = scanner.next(); result != null; result = scanner.next()) {
142+
Delete deleteRow = new Delete(result.getRow());
143+
hbaseTable.delete(deleteRow);
144+
}
180145
}
181146
}

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-array.conf

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ source {
4949

5050
sink {
5151
Hbase {
52-
zookeeper_quorum = "hbase-e2e:2181"
52+
zookeeper_quorum = "hbase_e2e:2181"
5353
table = "seatunnel_test"
5454
rowkey_column = ["name"]
5555
family_name {

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase.conf

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ source {
4040

4141
sink {
4242
Hbase {
43-
zookeeper_quorum = "hbase-e2e:2181"
43+
zookeeper_quorum = "hbase_e2e:2181"
4444
table = "seatunnel_test"
4545
rowkey_column = ["name"]
4646
family_name {

0 commit comments

Comments
 (0)