Skip to content

Commit 4b4f333

Browse files
authored
refactor: remove flushIntervalMs parameter and scheduled flusher logic (#129)
* refactor: remove flushIntervalMs parameter and scheduled flusher logic - Remove FLUSH_INTERVAL config option from OBKVHBase2ConnectorOptions - Remove scheduled executor and timer-based flushing logic from OBKVHBase2SinkFunction - Remove unused failureThrowable and checkErrorAndRethrow() method - Update documentation to remove flushIntervalMs parameter - Update OBKVHBase2DynamicTableFactory to remove FLUSH_INTERVAL option - Data is now flushed only when buffer size is reached or on checkpoint This simplifies the connector by removing periodic flushing mechanism and relying on buffer-based and checkpoint-based flushing only. * Enhancement: remove useless numPendingRequests * perf: split mutationList into separate putBuffer and deleteBuffer - Replace List<Object> mutationList with typed List<Put> and List<Delete> buffers - Remove instanceof checks and type casting in sync() method - Add dedicated bufferLock object for synchronization - Check combined buffer size for flush trigger - Eliminate O(n) traversal with type classification on every flush Performance improvements: - Zero type checking overhead during flush operations - Better type safety at compile time - Clearer code intent and structure * fix: use synchronized(this) instead of bufferLock to fix serialization - Remove bufferLock field to avoid NotSerializableException - Use synchronized(this) for all buffer operations - No need to initialize lock in open() method - Fixes test failure in OBKVHBase2ConnectorITCase This is a cleaner solution that avoids serialization issues while maintaining the same thread safety guarantees.
1 parent a1f2617 commit 4b4f333

File tree

5 files changed

+76
-156
lines changed

5 files changed

+76
-156
lines changed

docs/sink/flink-connector-obkv-hbase2.md

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -283,32 +283,31 @@ CREATE TABLE t_sink (
283283

284284
## Configuration Options
285285

286-
| Option Name | Required | Default | Type | Description |
287-
|----------------------|----------|---------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
288-
| connector | Yes | | String | Must be set to 'obkv-hbase2' to use this connector. |
289-
| schema-name | Yes | | String | The database name in OceanBase. |
290-
| table-name | Yes | | String | HBase table name (without column family suffix). |
291-
| username | Yes | | String | Username. |
292-
| password | Yes | | String | Password. |
293-
| odp-mode | No | false | Boolean | Whether to connect to OBKV via ODP. Set to 'true' to connect via ODP, otherwise connect via config url. |
294-
| url | No | | String | Cluster config url, can be queried by `SHOW PARAMETERS LIKE 'obconfig_url'`. Required when 'odp-mode' is 'false'. |
295-
| sys.username | No | | String | Username of sys tenant. Required when 'odp-mode' is 'false'. |
296-
| sys.password | No | | String | Password of sys tenant. Required when 'odp-mode' is 'false'. |
297-
| odp-ip | No | | String | IP address of ODP. Required when 'odp-mode' is 'true'. |
298-
| odp-port | No | 2885 | Integer | RPC port of ODP. Optional when 'odp-mode' is 'true'. |
299-
| hbase.properties | No | | String | Properties to configure 'obkv-hbase-client-java', separated by semicolons. Format: 'key1=value1;key2=value2'. |
300-
| columnFamily | No | f | String | HBase column family name. |
301-
| rowkeyDelimiter | No | : | String | Delimiter for composite primary keys. |
302-
| writePkValue | No | false | Boolean | Whether to write primary key values as column values in HBase. |
303-
| bufferSize | No | 5000 | Integer | Buffer size for batch writing. |
304-
| flushIntervalMs | No | 2000 | Duration | Flush interval for batch writing (milliseconds). Set to '0' to disable scheduled flushing. |
305-
| ignoreNullWhenUpdate | No | true | Boolean | Whether to ignore null values when updating. When set to 'true', columns with null values are skipped for partial updates; when set to 'false', null values are written to HBase. |
306-
| ignoreDelete | No | false | Boolean | Whether to ignore delete operations. When set to 'true', delete operations are not executed. |
307-
| excludeUpdateColumns | No | | String | Column names to exclude from updates, separated by commas. These columns will not be updated. |
308-
| dynamicColumnSink | No | false | Boolean | Whether to enable dynamic column mode. When enabled, non-PK columns must be exactly 2 columns (columnKey and columnValue), both must be VARCHAR type. |
309-
| tsColumn | No | | String | Timestamp column name. When specified, the value of this column will be used as the timestamp for all columns. If this is set, 'tsMap' will be ignored. |
310-
| tsMap | No | | String | Timestamp mapping configuration. Format: 'tsColumn0:column0;tsColumn0:column1;tsColumn1:column2', meaning column0 and column1 use tsColumn0's value as timestamp, column2 uses tsColumn1's value as timestamp. |
311-
| tsInMills | No | true | Boolean | Whether timestamp unit is milliseconds. When set to 'false', timestamp unit is seconds. |
286+
| Option Name | Required | Default | Type | Description |
287+
|----------------------|----------|---------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
288+
| connector | Yes | | String | Must be set to 'obkv-hbase2' to use this connector. |
289+
| schema-name | Yes | | String | The database name in OceanBase. |
290+
| table-name | Yes | | String | HBase table name (without column family suffix). |
291+
| username | Yes | | String | Username. |
292+
| password | Yes | | String | Password. |
293+
| odp-mode | No | false | Boolean | Whether to connect to OBKV via ODP. Set to 'true' to connect via ODP, otherwise connect via config url. |
294+
| url | No | | String | Cluster config url, can be queried by `SHOW PARAMETERS LIKE 'obconfig_url'`. Required when 'odp-mode' is 'false'. |
295+
| sys.username | No | | String | Username of sys tenant. Required when 'odp-mode' is 'false'. |
296+
| sys.password | No | | String | Password of sys tenant. Required when 'odp-mode' is 'false'. |
297+
| odp-ip | No | | String | IP address of ODP. Required when 'odp-mode' is 'true'. |
298+
| odp-port | No | 2885 | Integer | RPC port of ODP. Optional when 'odp-mode' is 'true'. |
299+
| hbase.properties | No | | String | Properties to configure 'obkv-hbase-client-java', separated by semicolons. Format: 'key1=value1;key2=value2'. |
300+
| columnFamily | No | f | String | HBase column family name. |
301+
| rowkeyDelimiter | No | : | String | Delimiter for composite primary keys. |
302+
| writePkValue | No | false | Boolean | Whether to write primary key values as column values in HBase. |
303+
| bufferSize | No | 5000 | Integer | Buffer size for batch writing. |
304+
| ignoreNullWhenUpdate | No | true | Boolean | Whether to ignore null values when updating. When set to 'true', columns with null values are skipped for partial updates; when set to 'false', null values are written to HBase. |
305+
| ignoreDelete | No | false | Boolean | Whether to ignore delete operations. When set to 'true', delete operations are not executed. |
306+
| excludeUpdateColumns | No | | String | Column names to exclude from updates, separated by commas. These columns will not be updated. |
307+
| dynamicColumnSink | No | false | Boolean | Whether to enable dynamic column mode. When enabled, non-PK columns must be exactly 2 columns (columnKey and columnValue), both must be VARCHAR type. |
308+
| tsColumn | No | | String | Timestamp column name. When specified, the value of this column will be used as the timestamp for all columns. If this is set, 'tsMap' will be ignored. |
309+
| tsMap | No | | String | Timestamp mapping configuration. Format: 'tsColumn0:column0;tsColumn0:column1;tsColumn1:column2', meaning column0 and column1 use tsColumn0's value as timestamp, column2 uses tsColumn1's value as timestamp. |
310+
| tsInMills | No | true | Boolean | Whether timestamp unit is milliseconds. When set to 'false', timestamp unit is seconds. |
312311

313312
## Core Features
314313

0 commit comments

Comments
 (0)