-
Notifications
You must be signed in to change notification settings - Fork 195
Description
STEPS
In Spring application, I have set following configs to start a common wildcard stream-load properties aim to load multiple tables
// create load manager
StreamLoadTableProperties tableProps = StreamLoadTableProperties.builder()
.database("*")
.table("*")
.streamLoadDataFormat(StreamLoadDataFormat.JSON)
.build();
StreamLoadProperties properties = new StreamLoadProperties.Builder()
.jdbcUrl(jdbcDataSourceConfig.getUrl())
.loadUrls(jdbcDataSourceConfig.getJdbcProperties().getProperty("loadUrl"))
.username(jdbcDataSourceConfig.getUser())
.password(jdbcDataSourceConfig.getPassword())
.enableTransaction()
.defaultTableProperties(tableProps)
.maxRetries(0)
.build();
StreamLoadManagerV2 manager = new StreamLoadManagerV2(properties, true);
manager.init();
// load
streamLoadManager.write(null, SINK_DB, SINK_TBL, rows);
streamLoadManager.flush();I assume that use * will cover all and set to default, my data is json
but I got error:
errorLog: Error: Target column count: 12 doesn't match source value column count: 1. Column separator: '\t', Row delimiter: '\n'. Row: [{...}]
it shows that the json format not working,
then I set
StreamLoadTableProperties tableProps = StreamLoadTableProperties.builder()
.database("*")
.table("*")
.streamLoadDataFormat(StreamLoadDataFormat.JSON)
.addProperty("format", "json")
.addProperty("strip_outer_array", "true")
.addProperty("ignore_json_size", "true")
.build();again not working with the same error
BUGS
I look into source code, find 2 possibly bugs
bug1. properties not copy
// StreamLoadManagerV2.java
protected TableRegion getCacheRegion(String uniqueKey, String database, String table) {
if (uniqueKey == null) {
uniqueKey = StreamLoadUtils.getTableUniqueKey(database, table); // i use null as uniqueKey
}
...
StreamLoadTableProperties tableProperties = properties.getTableProperties(uniqueKey, database, table); // here bug 1
region = new TransactionTableRegion(uniqueKey, database, table, this,
tableProperties, streamLoader, labelGenerator, maxRetries, retryIntervalInMs); // here bug 2
// StreamLoadProperties.java
public StreamLoadTableProperties getTableProperties(String uniqueKey, String database, String table) {
StreamLoadTableProperties tableProperties = tablePropertiesMap.getOrDefault(uniqueKey, defaultTableProperties);
if (!tableProperties.getDatabase().equals(database) || !tableProperties.getTable().equals(table)) { // default is *
StreamLoadTableProperties.Builder tablePropertiesBuilder = StreamLoadTableProperties.builder();
tablePropertiesBuilder = tablePropertiesBuilder.copyFrom(tableProperties).database(database).table(table); // here
return tablePropertiesBuilder.build();
} else {
return tableProperties;
}
}
// StreamLoadTableProperties.java
public Builder copyFrom(StreamLoadTableProperties streamLoadTableProperties) {
database(streamLoadTableProperties.getDatabase());
table(streamLoadTableProperties.getTable());
columns(streamLoadTableProperties.getColumns());
streamLoadDataFormat(streamLoadTableProperties.getDataFormat());
chunkLimit(streamLoadTableProperties.getChunkLimit());
maxBufferRows(streamLoadTableProperties.getMaxBufferRows());
tableProperties.putAll(streamLoadTableProperties.getTableProperties());
commonProperties.putAll(streamLoadTableProperties.getCommonProperties());
return this;
}
// properties not copy in this case I write another specific table properties then worked
StreamLoadTableProperties specificTableProps = StreamLoadTableProperties.builder()
.database(SINK_DB)
.table(SINK_TBL)
.uniqueKey(StreamLoadUtils.getTableUniqueKey(SINK_DB, SINK_TBL))
.streamLoadDataFormat(StreamLoadDataFormat.JSON)
.addProperty("format", "json")
.addProperty("strip_outer_array", "true")
.addProperty("ignore_json_size", "true")
.build();
StreamLoadProperties properties = new StreamLoadProperties.Builder()
.jdbcUrl(jdbcDataSourceConfig.getUrl())
.loadUrls(jdbcDataSourceConfig.getJdbcProperties().getProperty("loadUrl"))
.username(jdbcDataSourceConfig.getUser())
.password(jdbcDataSourceConfig.getPassword())
.enableTransaction()
.defaultTableProperties(tableProps)
.addTableProperties(specificTableProps) // here new specific table props
.maxRetries(0)
.build();bug2. StreamLoadDataFormat.JSON seems not set to header
according to doc
/docs/sql-reference/sql-statements/loading_unloading/STREAM_LOAD/
-T <file_path>
-H "format: CSV | JSON"
// TransactionTableRegion.java
private void initHeaders(StreamLoadTableProperties properties) {
headers.putAll(properties.getProperties());
if (properties.getDataFormat() instanceof StreamLoadDataFormat.CSVFormat && compressionType.isPresent()) {
// seems no json format set, so I have to set addProperty("format", "json")
// DefaultStreamLoader.java
protected void initDefaultHeaders(StreamLoadProperties properties) {
Map<String, String> headers = new HashMap<>(properties.getHeaders());
...
protected StreamLoadResponse sendToSR(TableRegion region) {
...
httpPut.setHeaders(defaultHeaders);
for (Map.Entry<String, String> entry : region.getHeaders().entrySet()) {
httpPut.removeHeaders(entry.getKey());
httpPut.addHeader(entry.getKey(), entry.getValue());
}
// direct set headers from default headers and properties headers, in this case if we want to use json format global, we can StreamLoadProperties addHeader() then it will generate to default headers
please review above check if this is a logic bug or feature?
if you need I can pull a mr