- 通过holo-client读写Hologres
holo-client适用于大批量数据写入(批量、实时同步至holo)和高QPS点查(维表关联)场景。holo-client基于JDBC实现,使用时请确认实例剩余可用连接数。
- 查看最大连接数
select instance_max_connections(); -- holo实例版本需大于等于1.3.22,否则请参考官网文档实例规格与连接数的计算方式- 查看已使用连接数
select count(*) from pg_stat_activity where backend_type='client backend';- Maven
<dependency>
<groupId>com.alibaba.hologres</groupId>
<artifactId>holo-client</artifactId>
<version>2.7.1</version>
</dependency>- Gradle
implementation 'com.alibaba.hologres:holo-client:2.7.1'
- HoloClient最多会同时启动Max(writeThreadSize,readThreadSize)个连接
- idle超过connectionMaxIdleMs会被释放
- 在存活链接不足与处理请求量时,会自动创建新链接
建议项目中创建HoloClient单例,通过writeThreadSize和readThreadSize控制读写的并发(每并发占用1个JDBC连接,空闲超过connectionMaxIdleMs将被自动回收)
// 配置参数,url格式为 jdbc:postgresql://host:port/db
HoloConfig config = new HoloConfig();
config.setJdbcUrl(url);
config.setUsername(username);
config.setPassword(password);
try (HoloClient client = new HoloClient(config)) {
TableSchema schema0 = client.getTableSchema("t0");
Put put = new Put(schema0);
put.setObject("id", 1);
put.setObject("name", "name0");
put.setObject("address", "address0");
client.put(put);
...
//强制提交所有未提交put请求;HoloClient内部也会根据WriteBatchSize、WriteBatchByteSize、writeMaxIntervalMs三个参数自动提交
//client.flush();
} catch (HoloClientException e) {
}注1:若分区已存在,不论DynamicPartition为何值,写入数据都将插入到正确的分区表中;若分区不存在,DynamicPartition设置为true时,将会自动创建不存在的分区,否则抛出异常 注2: 写入分区表在HOLO 0.9及以后版本才能获得较好的性能,0.8建议先写到临时表,再通过insert into xxx select ...的方式写入到分区表
// 配置参数,url格式为 jdbc:postgresql://host:port/db
HoloConfig config = new HoloConfig();
config.setJdbcUrl(url);
config.setUsername(username);
config.setPassword(password);
config.setDynamicPartition(true); //当分区不存在时,将自动创建分区
try (HoloClient client = new HoloClient(config)) {
//create table t0(id int not null,region text not null,name text,primary key(id,region)) partition by list(region)
TableSchema schema0 = client.getTableSchema("t0");
Put put = new Put(schema0);
put.setObject("id", 1);
put.setObject("region", "SH");
put.setObject("name", "name0");
client.put(put);
...
//强制提交所有未提交put请求;HoloClient内部也会根据WriteBatchSize、WriteBatchByteSize、writeMaxIntervalMs三个参数自动提交
//client.flush();
} catch (HoloClientException e) {
}// 配置参数,url格式为 jdbc:postgresql://host:port/db
HoloConfig config = new HoloConfig();
config.setJdbcUrl(url);
config.setUsername(username);
config.setPassword(password);
config.setOnConflictAction(WriteMode.INSERT_OR_REPLACE);//配置主键冲突时策略
try (HoloClient client = new HoloClient(config)) {
//create table t0(id int not null,name0 text,address text,primary key(id))
TableSchema schema0 = client.getTableSchema("t0");
Put put = new Put(schema0);
put.setObject("id", 1);
put.setObject("name0", "name0");
put.setObject("address", "address0");
client.put(put);
...
put = new Put(schema0);
put.setObject(0, 1);
put.setObject(1, "newName");
put.setObject(2, "newAddress");
client.put(put);
...
//强制提交所有未提交put请求;HoloClient内部也会根据WriteBatchSize、WriteBatchByteSize、writeMaxIntervalMs三个参数自动提交
//client.flush();
} catch (HoloClientException e) {
}// 配置参数,url格式为 jdbc:postgresql://host:port/db
HoloConfig config = new HoloConfig();
config.setJdbcUrl(url);
config.setUsername(username);
config.setPassword(password);
config.setOnConflictAction(WriteMode.INSERT_OR_UPDATE);//局部更新时,配置主键冲突时策略为INSERT_OR_UPDATE
try (HoloClient client = new HoloClient(config)) {
//create table t0(id int not null,name0 text,address text,primary key(id))
TableSchema schema0 = client.getTableSchema("t0");
/*
表的字段为id,name0,address 只put id 和 name0两个字段,当主键冲突则更新对应字段,否则写入
*/
Put put = new Put(schema0);
put.setObject("id", 1);
put.setObject("name0", "name0");
client.put(put);
...
put = new Put(schema0);
put.setObject(0, 1);
put.setObject(1, "newName");
client.put(put);
...
//强制提交所有未提交put请求;HoloClient内部也会根据WriteBatchSize、WriteBatchByteSize、writeMaxIntervalMs三个参数自动提交
//client.flush();
} catch (HoloClientException e) {
}// 配置参数,url格式为 jdbc:postgresql://host:port/db
HoloConfig config = new HoloConfig();
config.setJdbcUrl(url);
config.setUsername(username);
config.setPassword(password);
config.setOnConflictAction(WriteMode.INSERT_OR_REPLACE);//配置主键冲突时策略
try (HoloClient client = new HoloClient(config)) {
//create table t0(id int not null,name0 text,address text,primary key(id))
TableSchema schema0 = client.getTableSchema("t0");
Put put = new Put(schema0);
put.getRecord().setType(Put.MutationType.DELETE);
put.setObject("id", 1);
client.put(put);
...
//强制提交所有未提交put请求;HoloClient内部也会根据WriteBatchSize、WriteBatchByteSize、writeMaxIntervalMs三个参数自动提交
//client.flush();
} catch (HoloClientException e) {
}当表有主键时,通过Checkandput接口,可以实现条件更新。例如下方示例代码,表示仅在新数据的update_time比表中原值大时才会进行写入。此接口可以在上游数据乱序时保证写入的有序性。checkandput接口同样也支持设置MutationType为delete,仅在满足check条件时删除数据。
// 配置参数,url格式为 jdbc:postgresql://host:port/db
HoloConfig config = new HoloConfig();
config.setJdbcUrl(url);
config.setUsername(username);
config.setPassword(password);
config.setOnConflictAction(WriteMode.INSERT_OR_UPDATE); //Checkandput接口要求WriteMode必须为INSERT_OR_UPDATE或者INSERT_OR_REPLACE
try (HoloClient client = new HoloClient(config)) {
TableSchema schema0 = client.getTableSchema("t0");
CheckAndPut put = new CheckAndPut(schema,
"update_time", // checkColumnName: 需要比较的字段名,目前仅支持配置单个字段
CheckCompareOp.GREATER, // checkOperator: 比较操作符,目前支持GREATER, GREATER_OR_EQUAL, EQUAL, NOT_EQUAL, LESS, LESS_OR_EQUAL, IS_NULL, IS_NOT_NULL
null, // checkValue: 需要比较的值,设置时表示使用此常量值与表中原有数据比较. 一般不需要设置,会直接与put中相应字段的值(put.setObject传入)做比较
"1970-01-01 00:08:00" // nullValue: 当表中当前值为null时,将null值视做nullValue。相当于sql `coalesce(old.column1, nullValue)`.
);
put.setObject("id", 1);
put.setObject("state", "ok");
put.setObject("update_time", "2020-01-01 00:00:00");
client.checkAndPut(put);
...
//强制提交所有未提交put请求;HoloClient内部也会根据WriteBatchSize、WriteBatchByteSize、writeMaxIntervalMs三个参数自动提交
//client.flush();
} catch (HoloClientException e) {
}fixed copy为hologres1.3.x 引入. fixed copy与HoloClient.put,以及普通copy的差异如下:
| 对比项 | 普通copy | fixed copy | HoloClient.put |
|---|---|---|---|
| 锁类型 | 表锁 | 行锁 | 行锁 |
| 数据可见性 | copy命令执行结束后可见 | 写入即可见 | 写入即可见 |
| 事务性 | 有,copy失败回滚 | 无,失败已经插入的数据不会回滚 | 有,单行事务 |
| 支持的主键冲突策略 | NONE(冲突则报错) | INSERT_OR_UPDATE INSERT_OR_IGNORE |
INSERT_OR_UPDATE INSERT_OR_IGNORE INSERT_OR_REPLACE |
| 是否支持delete | 否 | 否 | 是 |
| 性能 | 非常高 | 很高 | 高 |
- 实时写入场景
- 写入有delete 使用HoloClient.put方法写入.
- 无delete时,建议使用fixed copy写入,相比HoloClient.put方法,fixed copy方式可以更高的吞吐(因为是流模式),更低的数据延时,更低的客户端内存消耗(因为不攒批).
对于无delete的实时写入场景,建议都使用fixed copy写入.
import com.alibaba.hologres.client.copy.CopyFormat;
import com.alibaba.hologres.client.copy.CopyMode;
import com.alibaba.hologres.client.copy.in.CopyInWrapper;
import com.alibaba.hologres.client.model.OnConflictAction;
import com.alibaba.hologres.client.model.TableSchema;
import com.alibaba.hologres.client.Put;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.List;
public class CopyDemo {
public static void main(String[] args) throws Exception {
//注意,jdbcurl里需要是jdbc:hologres
String jdbcUrl = "jdbc:hologres://host:port/db";
String username = "";
String password = "";
/*
CREATE TABLE copy_demo (id INT NOT NULL, name TEXT NOT NULL, address TEXT, PRIMARY KEY(id));
*/
String tableName = "copy_demo";
// 支持写入部分列, 列在List里的顺序一定要和建表保持一致.
List<String> columns = new ArrayList<>();
columns.add("id");
columns.add("name");
try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password);
CopyInWrapper copyIn =
new CopyInWrapper(
conn,
tableName,
columns,
CopyFormat.BINARY, /*底层是否走二进制协议,二进制格式的速率更快,否则为文本模式.*/
CopyMode.STREAM, // STREAM表示流模式,即fixed copy.*/
OnConflictAction.INSERT_OR_UPDATE,
1024 * 1024 * 10 // maxCellBufferSize,需要保证能放下一行数据,否则会写入失败.
)) {
TableSchema schema = copyIn.getSchema();
for (int i = 0; i < 10; ++i) {
Put put = new Put(schema);
/*
* 一定要和CopyInWrapper的columns保持一致,不然会错列.
* 如果一列出现在columns中,这一列一定要调用setObject(index, obj);
* 否则,这一列一定不能调用setObject.
*/
put.setObject("id", i);
put.setObject("name", "name0");
/*
* 如果有脏数据,写入失败的报错很难定位具体行.
* 此时可以启用RecordChecker做事前校验,找到有问题的数据.
* RecordChecker会对写入性能造成一定影响,非排查环节不建议开启.
*/
// RecordChecker.check(put.getRecord());
/*
*putRecord既将record发送给hologres引擎,并立即返回,
*引擎会在第一时间尝试写入存储,普遍状态下数据会在5-20ms后可查.
*当CopyInWrapper的close方法执行完成并且没有任何错误抛出,意味着所有数据均已写入完成可以查询.
*/
copyIn.putRecord(put.getRecord());
}
// 程序结束之前需要调用close, 保证数据完全写入; demo使用了try-with-resources,无需手动close
// copyIn.close();
}
}
}Hologres 3.1版本起,fixed copy支持写入分区父表.
Holo-Client 2.7.0 版本起支持通过 CopyInStageWrapper 将数据以 Arrow 格式写入 Hologres 内部 Stage,再通过生成的 INSERT 语句将数据从 Stage 加载到目标表。
该方式适合需要先暂存再批量导入的场景,支持 INSERT_OR_UPDATE / INSERT_OR_IGNORE 等主键冲突策略,数据在 INSERT 执行前不可见,具有普通 copy 的原子性。
import com.alibaba.hologres.client.HoloClient;
import com.alibaba.hologres.client.HoloConfig;
import com.alibaba.hologres.client.Put;
import com.alibaba.hologres.client.copy.CopyUtil;
import com.alibaba.hologres.client.copy.in.CopyInStageWrapper;
import com.alibaba.hologres.client.copy.in.arrow.RecordArrowWriter;
import com.alibaba.hologres.client.model.OnConflictAction;
import com.alibaba.hologres.client.model.TableSchema;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class CopyStageDemo {
public static void main(String[] args) throws Exception {
// 注意:jdbcUrl需使用jdbc:hologres协议
String jdbcUrl = "jdbc:hologres://host:port/db";
String username = "";
String password = "";
/*
CREATE TABLE copy_stage_demo (id INT NOT NULL, name TEXT NOT NULL, address TEXT, PRIMARY KEY(id));
*/
String tableName = "copy_stage_demo";
HoloConfig config = new HoloConfig();
config.setJdbcUrl(jdbcUrl);
config.setUsername(username);
config.setPassword(password);
config.setRegion("local");
// 创建临时Stage名称
String stageName = "temp_stage_" + System.currentTimeMillis();
try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password);
HoloClient client = new HoloClient(config)) {
// 创建内部Stage,TTL为7200秒(2小时),超时后自动清理
String createStageSql =
"call hologres.hg_create_internal_stage('"
+ stageName
+ "', 'default_group', 7200);";
try (java.sql.Statement stmt = conn.createStatement()) {
stmt.execute(createStageSql);
}
// 获取表结构
TableSchema schema = client.getTableSchema(tableName);
// 定义要写入的列
List<String> columns = new ArrayList<>();
columns.add("id");
columns.add("name");
columns.add("address");
// 使用RecordArrowWriter和CopyInStageWrapper写入数据到Stage
try (RecordArrowWriter arrowWriter =
new RecordArrowWriter(
schema,
columns,
8192 // maxBatchSize,每8192行数据组成一个Arrow RecordBatch
);
CopyInStageWrapper<com.alibaba.hologres.client.model.Record> copyInStage =
new CopyInStageWrapper<>(
config,
stageName,
"data_file", // 文件名前缀
arrowWriter,
64 * 1024 * 1024 // fileSizeLimit,每个文件大小64 MB
)) {
for (int i = 0; i < 10; ++i) {
Put put = new Put(schema);
// 字段顺序须与columns保持一致
put.setObject("id", i);
put.setObject("name", "name" + i);
put.setObject("address", "address" + i);
copyInStage.putRecord(put.getRecord());
}
// 程序结束之前需要调用close,保证数据完全写入
// demo使用了try-with-resources,无需手动close
// copyInStage.close();
}
// 生成从Stage写入目标表的INSERT语句,并执行
String insertSql =
CopyUtil.buildInsertTableSelectFromStageSql(
schema,
columns,
Collections.singletonList(stageName),
OnConflictAction.INSERT_OR_UPDATE);
try (java.sql.Statement stmt = conn.createStatement()) {
stmt.execute(insertSql);
}
// 验证写入结果
try (java.sql.Statement stmt = conn.createStatement();
java.sql.ResultSet rs = stmt.executeQuery("select * from " + tableName)) {
while (rs.next()) {
System.out.println(
"id: " + rs.getInt(1)
+ ", name: " + rs.getString(2)
+ ", address: " + rs.getString(3));
}
}
} finally {
// 清理临时Stage(不清理也会根据TTL自动清理)
try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password);
java.sql.Statement stmt = conn.createStatement()) {
stmt.execute("call hologres.hg_drop_internal_stage('" + stageName + "');");
} catch (Exception e) {
System.err.println("清理Stage时出错: " + e.getMessage());
}
}
}
}// 配置参数,url格式为 jdbc:postgresql://host:port/db
HoloConfig config = new HoloConfig();
config.setJdbcUrl(url);
config.setUsername(username);
config.setPassword(password);
try (HoloClient client = new HoloClient(config)) {
//create table t0(id int not null,name0 text,address text,primary key(id))
TableSchema schema0 = client.getTableSchema("t0");
Get get = Get.newBuilder(schema).setPrimaryKey("id", 0).build(); // where id=1;
client.get(get).thenAcceptAsync((record)->{
// do something after get result
});
} catch (HoloClientException e) {
}
// 配置参数,url格式为 jdbc:postgresql://host:port/db
HoloConfig config = new HoloConfig();
config.setJdbcUrl(url);
config.setUsername(username);
config.setPassword(password);
try (HoloClient client = new HoloClient(config)) {
//create table t0(id int not null,name0 text,address text,primary key(id))
TableSchema schema0 = client.getTableSchema("t0");
Scan scan = Scan.newBuilder(schema).addEqualFilter("id", 102).addRangeFilter("name", "3", "4").withSelectedColumn("address").build();
//等同于select address from t0 where id=102 and name>=3 and name<4 order by id;
int size = 0;
try (RecordScanner rs = client.scan(scan)) {
while (rs.next()) {
Record record = rs.getRecord();
//handle record
}
}
//不排序
scan = Scan.newBuilder(schema).addEqualFilter("id", 102).addRangeFilter("name", "3", "4").withSelectedColumn("address").setSortKeys(SortKeys.NONE).build();
//等同于select address from t0 where id=102 and name>=3 and name<4;
size = 0;
try (RecordScanner rs = client.scan(scan)) {
while (rs.next()) {
Record record = rs.getRecord();
//handle record
}
} catch (HoloClientException e) {
}
} Holo-Client 2.5.8 版本开始支持批量读取, 通过copy方式读取arrow数据,并转换为Record,同时可以选择是否启用压缩
import com.alibaba.hologres.client.copy.CopyFormat;
import com.alibaba.hologres.client.copy.out.CopyOutWrapper;
import com.alibaba.hologres.client.model.Record;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class CopyOutDemo {
public static void main(String[] args) throws Exception {
//注意,jdbcurl里需要是jdbc:hologres
String jdbcUrl = "jdbc:hologres://host:port/db";
String username = "";
String password = "";
/*
CREATE TABLE copy_demo (id INT NOT NULL, name TEXT NOT NULL, address TEXT, PRIMARY KEY(id));
* */
String tableName = "copy_demo";
// 支持写入部分列, 列在List里的顺序一定要和建表保持一致.
List<String> columns = new ArrayList<>();
columns.add("id");
columns.add("name");
try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password);
CopyOutWrapper copyOut =
new CopyOutWrapper(
conn,
tableName,
columns,
CopyFormat.ARROW_LZ4, /* Copy Out 支持arrow以及arrow_lz4两种格式.*/
Collections.emptyList(), /* Copy Out 支持设置shard list, 这样可以启动多个线程并发读取不同的shard;不设置时读取整表.*/
"where id >= 0", /* Copy Out 支持设置filter字符串.*/
1024 * 1024 * 10 // maxCellBufferSize,需要保证能放下一行数据,否则会读取失败.
)) {
while (copyOut.hasNextBatch()) {
List<Record> records = copyOut.getRecords();
for (Record record : records) {
//handle record
}
}
}
}
}Hologres V1.1版本之后,支持使用holo-client进行表的Binlog消费。 Binlog相关知识可以参考文档 订阅Hologres Binlog , 使用Holo-client消费Binlog的建表、权限等准备工作和注意事项可以参考 通过JDBC消费Hologres Binlog
import com.alibaba.hologres.client.BinlogShardGroupReader;
import com.alibaba.hologres.client.Command;
import com.alibaba.hologres.client.HoloClient;
import com.alibaba.hologres.client.HoloConfig;
import com.alibaba.hologres.client.Subscribe;
import com.alibaba.hologres.client.exception.HoloClientException;
import com.alibaba.hologres.client.impl.binlog.BinlogOffset;
import com.alibaba.hologres.client.model.binlog.BinlogHeartBeatRecord;
import com.alibaba.hologres.client.model.binlog.BinlogRecord;
import java.util.HashMap;
import java.util.Map;
public class HoloBinlogExample {
public static BinlogShardGroupReader reader;
public static void main(String[] args) throws Exception {
String username = "";
String password = "";
String url = "jdbc:postgresql://ip:port/database";
String tableName = "test_message_src";
String slotName = "hg_replication_slot_1";
// 创建client的参数
HoloConfig holoConfig = new HoloConfig();
holoConfig.setJdbcUrl(url);
holoConfig.setUsername(username);
holoConfig.setPassword(password);
holoConfig.setBinlogReadBatchSize(128);
holoConfig.setBinlogIgnoreDelete(true);
holoConfig.setBinlogIgnoreBeforeUpdate(true);
holoConfig.setBinlogHeartBeatIntervalMs(5000L);
HoloClient client = new HoloClient(holoConfig);
// 获取表的shard数
int shardCount = Command.getShardCount(client, client.getTableSchema(tableName));
// 使用map保存每个shard的消费进度, 初始化为0
Map<Integer, Long> shardIdToLsn = new HashMap<>(shardCount);
for (int i = 0; i < shardCount; i++) {
shardIdToLsn.put(i, 0L);
}
// 消费binlog的请求,2.1版本前tableName和slotName为必要参数,2.1版本起仅需传入tableName
// Subscribe有StartTimeBuilder和OffsetBuilder两种,此处以前者为例
Subscribe subscribe = Subscribe.newStartTimeBuilder(tableName, slotName)
.setBinlogReadStartTime("2021-01-01 12:00:00")
.build();
// 创建binlog reader
reader = client.binlogSubscribe(subscribe);
BinlogRecord record;
int retryCount = 0;
long count = 0;
while(true) {
try {
if (reader.isCanceled()) {
// 根据保存的消费点位重新创建reader
reader = client.binlogSubscribe(subscribe);
}
while ((record = reader.getBinlogRecord()) != null) {
// 消费到最新
if (record instanceof BinlogHeartBeatRecord) {
// do something
continue;
}
// 处理读取到的binlog record,这里只做打印
System.out.println(record);
// 处理之后保存消费点位,异常时可以从此点位恢复
shardIdToLsn.put(record.getShardId(), record.getBinlogLsn());
count++;
// 读取成功,重置重试次数
retryCount = 0;
}
} catch (HoloClientException e) {
if (++retryCount > 10) {
throw new RuntimeException(e);
}
// 发生异常时推荐打印warn级别日志
System.out.println(String.format("binlog read failed because %s and retry %s times", e.getMessage(), retryCount));
// 重试期间进行一定时间的等待
Thread.sleep(5000L * retryCount);
// 用OffsetBuilder创建Subscribe,从而为每个shard指定起始消费点位
Subscribe.OffsetBuilder subscribeBuilder = Subscribe.newOffsetBuilder(tableName, slotName);
for (int i = 0; i < shardCount; i++) {
// BinlogOffset通过setSequence指定lsn,通过setTimestamp指定时间,两者同时指定lsn优先级大于时间戳
// 这里根据shardIdToLsn这个Map中保存的消费进度进行恢复
subscribeBuilder.addShardStartOffset(i, new BinlogOffset().setSequence(shardIdToLsn.get(i)));
}
subscribe = subscribeBuilder.build();
// 关闭reader
reader.cancel();
}
}
}
}这里只简单展示接口的使用,完整代码可以参考上方消费普通表。
/*
CREATE TABLE test_message_src (
id int NOT NULL,
name text,
kind text,
PRIMARY KEY (id, kind)
)
PARTITION BY LIST (kind) WITH (binlog_level = 'replica');
CREATE TABLE test_message_src_kind1 partition of test_message_src for values in ('kind1');
CREATE TABLE test_message_src_kind2 partition of test_message_src for values in ('kind2');
CREATE TABLE test_message_src_kind3 partition of test_message_src for values in ('kind3');
INSERT INTO test_message_src_kind1 SELECT generate_series(1, 100), 'foo' , 'kind1';
INSERT INTO test_message_src_kind2 SELECT generate_series(1, 100), 'foo' , 'kind2';
INSERT INTO test_message_src_kind3 SELECT generate_series(1, 100), 'foo' , 'kind3';
*/
// 创建client的参数
HoloConfig holoConfig = new HoloConfig();
holoConfig.setJdbcUrl(url);
holoConfig.setUsername(username);
holoConfig.setPassword(password);
holoConfig.setUseFixedFe(true);
holoConfig.setBinlogReadBatchSize(128);
// 分区表订阅模式, STATIC表示消费指定的分区,且消费过程中无法调整
holoConfig.setBinlogPartitionSubscribeMode(BinlogPartitionSubscribeMode.STATIC);
holoConfig.setBinlogHeartBeatIntervalMs(1000L);
HoloClient client = new HoloClient(holoConfig);
// 消费binlog的请求
// Subscribe有StartTimeBuilder和OffsetBuilder两种,此处以前者为例
Subscribe subscribe = Subscribe.newStartTimeBuilder(tableName)
.addPartitionValuesToSubscribe(new String[]{"kind2", "kind3"}) // 仅消费kind2和kind3这两个分区
.setBinlogReadStartTime("2021-01-01 12:00:00")
.build();
// 创建binlog reader
reader = client.partitionBinlogSubscribe(subscribe);
BinlogRecord record;
while ((record = reader.getBinlogRecord()) != null) {
// 消费到最新
if (record instanceof BinlogHeartBeatRecord) {
// do something
continue;
}
// 处理读取到的binlog record,这里只做打印
System.out.println(Arrays.toString(record.getValues()));
}/*
-- 必须开启动态分区功能
CREATE TABLE test_message_src (
id int NOT NULL,
name text,
kind text,
PRIMARY KEY (id, kind)
)
PARTITION BY LIST (kind) WITH (
binlog_level = 'replica',
auto_partitioning_enable = 'true',
auto_partitioning_time_unit = 'DAY',
auto_partitioning_num_precreate = '2'
);
-- 假设今天是2024-10-12, 消费时所有子表必须存在
CREATE TABLE test_message_src_20241010 PARTITION OF test_message_src FOR VALUES IN ('20241010');
CREATE TABLE test_message_src_20241011 PARTITION OF test_message_src FOR VALUES IN ('20241011');
CREATE TABLE test_message_src_20241012 PARTITION OF test_message_src FOR VALUES IN ('20241012');
INSERT INTO test_message_src_20241010 SELECT generate_series(1, 100), 'foo', '20241010';
INSERT INTO test_message_src_20241011 SELECT generate_series(1, 100), 'foo', '20241011';
INSERT INTO test_message_src_20241012 SELECT generate_series(1, 100), 'foo', '20241012';
*/
// 创建client的参数
HoloConfig holoConfig = new HoloConfig();
holoConfig.setJdbcUrl(url);
holoConfig.setUsername(username);
holoConfig.setPassword(password);
holoConfig.setUseFixedFe(true);
holoConfig.setBinlogReadBatchSize(128);
// 分区表订阅模式, DYNAMIC表示动态消费分区子表,会在新的一天到来时开启对新分区的消费
holoConfig.setBinlogPartitionSubscribeMode(BinlogPartitionSubscribeMode.DYNAMIC);
// 上一个分区表允许的数据迟到时间
holoConfig.setBinlogPartitionLatenessTimeoutSecond(60000);
holoConfig.setBinlogHeartBeatIntervalMs(1000L);
HoloClient client = new HoloClient(holoConfig);
int shardCount = Command.getShardCount(client, client.getTableSchema(tableName));
// 消费binlog的请求
// Subscribe有StartTimeBuilder和OffsetBuilder两种,此处以后者为例
Subscribe.OffsetBuilder builder = Subscribe.newOffsetBuilder(tableName);
for (int i = 0; i < shardCount; i++) {
// 父表仅需要shard信息
builder.addShardStartOffset(i, new BinlogOffset());
// 从20241011这个分区的最早binlog开始消费
builder.addShardStartOffsetForPartition("test_message_src_20241011", i, new BinlogOffset());
}
// 创建binlog reader
reader = client.partitionBinlogSubscribe(builder.build());
BinlogRecord record;
while ((record = reader.getBinlogRecord()) != null) {
// 消费到最新
if (record instanceof BinlogHeartBeatRecord) {
// do something
continue;
}
// 处理读取到的binlog record,这里只做打印
System.out.println(Arrays.toString(record.getValues()));
}public void doPut(HoloClient client, Put put) throws HoloClientException {
try {
client.put(put);
} catch (HoloClientWithDetailsException e) {
for(int i=0;i<e.size();++i){
//写入失败的记录
Record failedRecord = e.getFailRecord(i);
//写入失败的原因
HoloClientException cause = e.getException(i);
//脏数据处理逻辑
}
} catch (HoloClientException e) {
//非HoloClientWithDetailsException的异常一般是fatal的
throw e;
}
}
public void doFlush(HoloClient client) throws HoloClientException {
try {
client.flush();
} catch (HoloClientWithDetailsException e) {
for(int i=0;i<e.size();++i){
//写入失败的记录
Record failedRecord = e.getFailRecord(i);
//写入失败的原因
HoloClientException cause = e.getException(i);
//脏数据处理逻辑
}
} catch (HoloClientException e) {
//非HoloClientWithDetailsException的异常一般是fatal的
throw e;
}
}HoloConfig config = new HoloConfig();
config.setJdbcUrl(url);
config.setUsername(username);
config.setPassword(password);
try (HoloClient client = new HoloClient(config)) {
client.sql(conn -> {
try (Statement stat = conn.createStatement()) {
stat.execute("create table t0(id int)");
}
return null;
}).get();
} catch (HoloClientException e) {
}holoclient写入本质上只将一批Put放入多个队列后,在内存中合并成Batch提交给Worker,worker将batch拆分为若干条sql后提交提交。 1.X版本,写入的sql格式如下
insert into t0 (c0,c1,c2) values (?,?,?),(?,?,?) on conflict2.X版本,默认写入sql格式如下(需要hologres实例版本>=且不存在数组列)
insert into to (c0,c1,c2) select unnest(?),unnest(?),unnest(?) on conflict写入所拼sql上,增加对某个字段的check条件.详见Fixed Plan加速SQL执行文档中的带有条件判断的Upsert.
insert into test_check_and_insert(pk,c1,update_time) as old values(?, ?, ?) on conflict (pk)
do update set c1 = excluded.c1, update_time = excluded.update_time where excluded.update_time > old.update_time;unnest格式相比multi values有如下优点:
- ?个数等于列数,不再会因攒批过大导致?个数超过Short.MAX_VALUE
- batchSize不稳定时,不会产生多个PreparedStatement,节省服务端内存
- HoloConfig配置
- 删除rewriteSqlMaxBatchSize,目前会自动选择最大的Size
- 删除reWriteBatchedDeletes,自动开启
- 删除binlogReadStartTime,在Subscribe.Builder中指定
- 删除binlogReadTimeoutSeconds,在Subscribe.Builder中指定
- 包名
- org.postgresql.model.全部切换至com.alibaba.hologres.client.model.
- 类型
- Record.type从org.postgresql.core.SqlCommandType切换至com.alibaba.hologres.client.Put.MutationType
- binlog消费时,jdbc无法识别jdbc:postgresql://,必须格式为jdbc:hologres:// bug引入版本2.1.0,bug修复版本2.1.1
- 调用HoloClient.put接口,当holo引擎版本<1.1.38时会错误使用高版本才支持的insert into select unnest模式,导致写入失败 bug引入版本2.1.0,bug修复版本2.1.4
- binlog消费调用commit方法时,尚未消费到数据的shard会错误的将已消费lsn更新为-1 bug引入版本2.1.0,bug修复版本2.1.5
- 调用Put方法时表发生增减列,导致写入失败 bug引入版本1.X, bug修复版本2.1.5
- 当readWriteThread和writeThreadSize都为1时,getTableSchema可能触发死锁 bug引入版本1.X, bug修复版本2.2.0
- binlog消费时,存在内存泄露问题,bug引入版本2.1.0,bug修复版本2.2.10
- 分区表drop column之后,自动创建子分区时,可能拿到错误的分区值,bug引入版本1.X,bug修复版本2.2.11
- 当用户表名中有下划线时,可能获取到错误的字段信息,bug引入版本1.X,bug修复版本2.2.12
| 版本 | 新功能 | 缺陷修复 |
|---|---|---|
| 2.4.0 | • 新建连接时会设置 LOGIN_TIMEOUT 为 60s • 对实例只读状态的报错增加重试 • table schema 发生变化时,会强制 flush • 支持设置连接最大存活时间,默认为 24h • 支持激进模式,开启后如果连接空闲,不等攒批满就触发 flush,在流量小时可以降低写入延迟 |
• 修复 binlog 读取 timestamp 类型微秒精度丢失的问题 • 修复查询含非 utf8 字符时抛出异常的问题 |
| 2.4.1 | • 丰富脏数据信息,提高异常信息的可读性 | |
| 2.4.2 | • 支持 checkandput 接口,仅在数据满足条件时才进行写入 • 连接池复用时,会根据新的 HoloConfig 往大了更新连接池的线程数 |
|
| 2.5.0 | • 当连接的 host 对应多个 ip 时,支持 load balance | • 升级 pgjdbc 到 42.2.26.2,修复消费 binlog 时吞异常导致的 unexpected type 问题 • 修复 worker pool 复用死锁问题(2.4.2 修复不彻底,再次调整) |
| 2.5.4 | • 升级 pgjdbc 到 42.3.10 • binlog 消费支持分区父表 • copy 模式支持 time、timetz 类型 • 新增 copyMode,分为 STREAM、BULK_LOAD、BULK_LOAD_ON_CONFLICT |
• 修复 scan 操作通过 clustering key 排序不生效的问题 |
| 2.6.3 | • 支持 copy out 批量导出,支持对结果进行压缩 • Hologres 3.1 版本起,轻量级连接 fixed 支持直连 • 支持 AKv4 认证,默认开启 • 连接最大存活时间调整为 1 小时,会在连接空闲时自动重建连接 • Hologres 3.2 版本起,消费 binlog 支持列裁剪以及数据压缩 • Put 接口写入分区表时,默认直接写入父表 |
|
| 2.6.4 | • copy out(CopyOutWrapper)支持 arrow 格式读取 jsonb 类型 • copy 接口优化:简化 CopyInWrapper/CopyOutWrapper 的构造方式 |
• 修复 copy out arrow 格式处理 const array 的问题 • 修复读写时 TimeZone 未考虑夏令时(getRawOffset)导致时区偏移不准确的问题 |
| 2.6.6 | • 消费 Binlog 时支持传入带时区的 startTime,确保不受客户端时区影响 • 升级 pgjdbc 到 42.3.10.2,fixed fe 模式下支持读写 jsonb 类型 |
|
| 2.6.7 | • 修复 decimal 类型的 distribution key 计算 shard 时报错的问题 | |
| 2.6.9 | • 支持 Binlog Filter 功能,可在消费时对 Binlog 进行过滤 • BinlogFilter 支持序列化 • date 类型写入数据及创建分区时同时支持 YYYYMMDD 和 YYYY-MM-DD 两种格式 • 连接信息中包含 backend pid,方便排查问题 |
|
| 2.6.11 | • fixed copy(CopyInWrapper)增加对 binaryrow format 下 jsonb 类型的支持 • copy 支持所有 format 可通过传入 schema version 让服务端进行严格一致性检查 • copy 写入性能优化:binaryrow 直接写入 copy cellBuffer,减少一次内存申请和拷贝 • 消费 Binlog 支持设置可选参数 consumerGroup • Put 接口支持表达式写入(conflictUpdateSet / conflictWhere),可在 on conflict 时执行自定义表达式 • CopyInWrapper 支持通过字段名(而非下标)设置字段值,内部自动按 schema 顺序排列 • 写入部分列时,不包含 default 字段的场景也可以走 unnest 高性能写入路径 • 轻量级连接直连 URL 去掉 warehouse 和多余参数 • 去除 binaryrow format 客户端对 text/char/bpchar 类型的冗余检查 |
|
| 2.6.12 | • 修复 Scan 查询时 rangeFilter start 为 null 时 SQL 拼写错误的问题 • 升级依赖版本,修复 CVE 漏洞 |
|
| 2.6.13 | • Scan 查询 addRangeFilter 支持指定 start/end key 的开闭区间(isStartInclude/isEndInclude 参数) | |
| 2.6.14 | • 修复 worker 关闭时未停止 scanner 导致资源泄漏的问题 • 升级依赖版本,修复 CVE 漏洞 |
|
| 2.6.15 | • 增加关闭表达式 GUC 开关,insert 表达式支持版本调整为 Hologres 4.0+ • holo-client 关闭时,其发起的 scanner 也强制关闭 • 优化 PK 重复引起的攒批逻辑,不再触发无意义的 resize,并增加攒批不满原因的 debug 日志 • CheckAndPut 去掉通过列名构造的构造函数,接口更简洁 |
|
| 2.6.17 | • copy out(CopyOutWrapper)支持传入 schema,可按需读取指定列 • TableSchema 新增透出 global index 信息 • GetTableSchema 优化:获取前后的 schemaVersion 一致性校验,避免频繁 DDL 时字段和 schemaVersion 对不上 |
• 修复消费 Binlog 时,部分列场景下 null 值 index 设置错误导致错列的问题 • 修复 fatal 异常时 worker 无法正常关闭问题:worker 现在会继续运行并将后续积压的 action 都标记为失败,避免 HoloClient 无法关闭 |
| 2.7.0 | • 新增 CopyInStageWrapper,支持通过 fixed copy 将数据写入 Stage(外部存储),并提供生成 INSERT 语句的工具方法 • Arrow 依赖版本升级到 17.0,并对 copy out 相关实现进行适配 |
• 修复获取 index 信息时意外抛出异常的问题,改为返回空列表 |
| 2.7.1 | • insert overwrite stage 支持写入部分列 • 支持写入限流:新增 writeRps 参数,可限制每秒最多写入的记录数,同时作用于 HoloClient.put 和 fixed copy 写入 |
|
| 2.7.2 | • COPY CSV 的 NULLSTRING 支持使用\N • 优化 COPY STAGE 链路,避免频繁创建对象 |
| 参数名 | 默认值 | 说明 | 引入版本 |
|---|---|---|---|
| jdbcUrl | 无 | 必填 | 1.2.3 |
| username | 无 | 必填 | 1.2.3 |
| password | 无 | 必填 | 1.2.3 |
| appName | holo-client | jdbc的applicationName参数 | 1.2.9.1 |
| 参数名 | 默认值 | 说明 | 引入版本 |
|---|---|---|---|
| dynamicPartition | false | 若为true,当分区不存在时自动创建分区 | 1.2.3 |
| useFixedFe | false | 当hologres引擎版本>=1.3,开启FixedFe后,Get/Put将不消耗连接数(beta功能),连接池大小为writeThreadSize和readThreadSize | 2.2.0 |
| connectionSizeWhenUseFixedFe | 1 | 仅useFixedFe=true时生效,表示除了Get/Put之外的调用使用的连接池大小 | 2.2.0 |
| sslMode | disable | 是否启动传输加密,详见Hologres启用SSL传输加密。取值为disable、require、verify-ca、verify-full,后两种需要通过sslRootCertLocation参数配置CA证书的路径 | 2.3.0 |
| sslRootCertLocation | 无 | sslMode=verify-ca或verify-full时必填,CA证书的路径 | 2.3.0 |
| useAkv4 | true | 启用Akv4认证 | 2.3.0 |
| region | 无 | 启用Akv4认证需要设置Region,默认会尝试解析endpoint获取,如果使用ip地址等无法解析,需要手动指定 | 2.3.0 |
| 参数名 | 默认值 | 说明 | 引入版本 |
|---|---|---|---|
| writeThreadSize | 1 | 处理HoloClient.put方法请求的最大连接数 | |
| onConflictAction | INSERT_OR_REPLACE | 当INSERT目标表为有主键的表时采用不同策略 INSERT_OR_IGNORE 当主键冲突时,不写入 INSERT_OR_UPDATE 当主键冲突时,更新相应列 INSERT_OR_REPLACE当主键冲突时,更新所有列 (2.5.6及之前版本,此参数名为WriteMode) |
1.2.3 |
| writeBatchSize | 512 | 每个写入线程的最大批次大小,在经过WriteMode合并后的Put数量达到writeBatchSize时进行一次批量提交 | 1.2.3 |
| writeBatchByteSize | 2097152(2 * 1024 * 1024) | 每个写入线程的最大批次bytes大小,单位为Byte,默认2MB, 在经过WriteMode合并后的Put数据字节数达到writeBatchByteSize时进行一次批量提交 |
1.2.3 |
| writeBatchTotalByteSize | 20971520(20 * 1024 * 1024) | 所有表最大批次bytes大小,单位为Byte,默认20MB,在经过WriteMode合并后的Put数据字节数达到writeBatchByteSize时进行一次批量提交 | 1.2.8.1 |
| writeMaxIntervalMs | 10000 | 距离上次提交超过writeMaxIntervalMs会触发一次批量提交 | 1.2.4 |
| writerShardCountResizeIntervalMs | 30s | 主动调用flush时,触发resize,两次resize间隔不短于writerShardCountResizeIntervalMs | 1.2.10.1 |
| inputNumberAsEpochMsForDatetimeColumn | false | 当Number写入Date/timestamp/timestamptz列时,若为true,将number视作ApochMs | 1.2.5 |
| inputStringAsEpochMsForDatetimeColumn | false | 当String写入Date/timestamp/timestamptz列时,若为true,将String视作ApochMs | 1.2.6 |
| removeU0000InTextColumnValue | true | 当写入Text/Varchar列时,若为true,剔除字符串中的\u0000 | 1.2.10.1 |
| enableDefaultForNotNullColumn | true | 启用时,not null且未在表上设置default的字段传入null时,将以默认值写入. String 默认“”,Number 默认0,Date/timestamp/timestamptz 默认1970-01-01 00:00:00 | 1.2.6 |
| defaultTimeStampText | null | enableDefaultForNotNullColumn=true时,Date/timestamp/timestamptz的默认值 | 1.2.6 |
| useLegacyPutHandler | false | true时,写入sql格式为insert into xxx(c0,c1,...) values (?,?,...),... on conflict; false时优先使用sql格式为insert into xxx(c0,c1,...) select unnest(?),unnest(?),... on conflict | 2.0.1 |
| maxRowsPerSql | Integer.MAX_VALUE | useLegacyPutHandler=false,且通过unnest形式写入时,每条sql的最大行数 | 2.0.1 |
| maxBytesPerSql | Long.MAX_VALUE | useLegacyPutHandler=false,且通过unnest形式写入时,每条sql的最大字节数 | 2.0.1 |
| enableAffectedRows | false | 开启时 若用户用holoclient.sql执行statement.executeUpdate将会返回正确的affectrow计数,但对于行存表进行holoclient.put会有性能下降 | 2.2.5 |
| enableGenerateBinlog | true | 关闭时,通过当前holo-client写入的数据不会生成binlog | 2.2.11 |
| enableDeduplication | true | 写入时是否对攒批数据做去重,设置为false表示不会去重,如果数据重复非常严重,性能最差相当于writeBatchSize设置为1的逐条写入. | 2.3.0 |
| enableAggressive | false | 写入激进模式,开启后如果连接空闲,不等攒批满就会触发flush,在流量小时可以降低写入延迟. | 2.4.0 |
| writeRps | -1 | 写入限流,每秒最多写入的记录数(rows per second)。-1 表示不限流,同时对 HoloClient.put 和 fixed copy(CopyInWrapper/CopyInStageWrapper)写入生效. | 2.7.1 |
| 参数名 | 默认值 | 说明 | 引入版本 |
|---|---|---|---|
| readThreadSize | 1 | 点查并发线程数(每个并发占用1个数据库连接) | 1.2.4 |
| readBatchSize | 128 | 点查最大批次大小 | 1.2.3 |
| readBatchQueueSize | 256 | 点查请求缓冲队列大小 | 1.2.4 |
| scanFetchSize | 256 | Scan操作一次fetch的行数 | 1.2.9.1 |
| scanTimeoutSeconds | 256 | Scan操作的超时时间 | 1.2.9.1 |
| readTimeoutMilliseconds | 0 | Get操作的超时时间,0表示不超时 | 2.1.5 |
| readRetryCount | 1 | Get操作的尝试次数,1表示不重试 | 2.1.5 |
| 参数名 | 默认值 | 说明 | 引入版本 |
|---|---|---|---|
| retryCount | 3 | 当连接故障时,写入和查询的重试次数 | 1.2.3 |
| retrySleepInitMs | 1000 | 每次重试的等待时间=retrySleepInitMs+retry*retrySleepStepMs | 1.2.3 |
| retrySleepStepMs | 10000 | 每次重试的等待时间=retrySleepInitMs+retry*retrySleepStepMs | 1.2.3 |
| connectionMaxIdleMs | 60000 | 写入线程和点查线程数据库连接的最大Idle时间,超过连接将在空闲时被释放,使用时重建 | 1.2.4 |
| connectionMaxAliveMs | 86400000(24 * 60 * 60 * 1000L) | 写入线程和点查线程数据库连接的最大存活时间,超过连接将在空闲时被释放,使用时重建 | 2.4.0 |
| metaCacheTTL | 1 min | getTableSchema信息的本地缓存时间 | 1.2.6 |
| metaAutoRefreshFactor | 4 | 当tableSchema cache剩余存活时间短于 metaCacheTTL/metaAutoRefreshFactor 将自动刷新cache | 1.2.10.1 |
| 参数名 | 默认值 | 说明 | 引入版本 |
|---|---|---|---|
| binlogReadBatchSize | 1024 | 从每个shard单次获取的Binlog最大批次大小 | 1.2.16.5 |
| binlogHeartBeatIntervalMs | -1 | binlogRead 发送BinlogHeartBeatRecord的间隔. -1表示不发送, 当binlog没有新数据,每间隔binlogHeartBeatIntervalMs会下发一条BinlogHeartBeatRecord,此record的timestamp表示截止到这个时间的数据都已经消费完成. |
2.1.0 |
| binlogIgnoreDelete | false | 是否忽略消费Delete类型的binlog | 1.2.16.5 |
| binlogIgnoreBeforeUpdate | false | 是否忽略消费BeforeUpdate类型的binlog | 1.2.16.5 |
| binlogPartitionSubscribeMode | false | 取值如下 STATIC: 消费分区父表时,同时消费多张子表,子表在消费过程中无法新增或移除。默认消费此父表的所有子表。 DYNAMIC: 动态消费分区父表,要求分区父表必须开启动态分区管理。动态分区管理功能会按照时间单元自动创建分区子表,DYNAMIC 模式会按照从旧到新的顺序消费各个子表。当消费到次新子表时,会在新的时间单元到来时,开启最新子表的消费。 |
2.5.4 |
| partitionValuesToSubscribe | 空数组 | STATIC模式默认消费此父表的所有子表, 可以通过此参数指定需要消费的分区值数组 | 2.5.4 |
| binlogPartitionLatenessTimeoutSecond | 60 | DYNAMIC 模式消费分区父表时,容忍的数据迟到时间。DYNAMIC 模式会在新的一天到来时开启当前时间对应的最新子表的消费,但不会立刻关闭前一个分区,而是会持续监听lateness-timeout 配置的时间,以保证可以读取到上一个分区的迟到数据。例如: 对20240920 这张子表, 其消费会在 2024-09-21 00:01:00 关闭,而不是在2024-09-21 00:00:00 就停止消费。 | 2.5.4 |
消费分区父表目前有如下要求:
- 子表名称必须严格由父表名 + 下划线 + 分区值组成, 即{parent_table}_{partition_value} ,非此格式的子表可能无法消费到,对 DYNAMIC 模式,分区值格式与动态分区的时间单元有关,如 20240910。
- 对于 DYNAMIC 模式,要求分区父表必须开启动态分区管理。并且子表预创建参数auto_partitioning.num_precreate必须大于 1,否则作业在尝试消费最新子表时发现表不存在会抛出异常。
- 考虑 jdbc 模式消费 binlog 存在连接数的限制,消费分区父表需要使用 jdbc_fixed 模式,要求 Hologres 实例版本大于等于 2.1.27。
- INSERT_OR_INGORE
-- 当表有PK列时,等价生成如下sql
INSERT INTO t0 (pk, c0, c1, c2) values (?, ?, ?, ?) ON CONFLICT(pk) DO NOTHING;
-- 当表无PK列时,等价生成如下sql
INSERT INTO t0 (c0, c1, c2) values (?, ?, ?);- INSERT_OR_UPDATE
-- 当表有PK列时,等价生成如下sql
INSERT INTO t0 (pk, c0, c1, c2) values (?, ?, ?) ON CONFLICT(pk) DO UPDATE SET c0 = excluded.c0, c1 = excluded.c1, c2 = excluded.c2;
-- 当表无PK列时,等价生成如下sql
INSERT INTO t0 (c0, c1, c2) values (?, ?, ?);- INSERT_OR_REPLACE INSERT_OR_REPLACE相比INSERT_OR_UPDATE最大的区别是,UPDATE只有显式调用过Put.setObject的列才会参与到SQL中,而REPLACE模式下,没有调用put.setObject列等同于调用过一次put.setObject(index, null),所有列都会参与到sql中