Skip to content

Commit 916c654

Browse files
committed
feat: redis support tableNodeConfig
1 parent 3e0d610 commit 916c654

File tree

8 files changed

+107
-34
lines changed

8 files changed

+107
-34
lines changed

connectors/redis-connector/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161
<properties>
6262
<java.version>8</java.version>
6363
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
64-
<tapdata.pdk.api.version>2.0.0-SNAPSHOT</tapdata.pdk.api.version>
64+
<tapdata.pdk.api.version>2.0.6-SNAPSHOT</tapdata.pdk.api.version>
6565
</properties>
6666

6767
<build>

connectors/redis-connector/src/main/java/io/tapdata/connector/redis/RedisConfig.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.tapdata.connector.redis.constant.DeployModeEnum;
44
import io.tapdata.entity.utils.BeanUtils;
5+
import io.tapdata.entity.utils.DataMap;
56
import io.tapdata.entity.utils.InstanceFactory;
67
import io.tapdata.kit.EmptyKit;
78
import org.apache.commons.lang3.StringUtils;
@@ -40,6 +41,7 @@ public class RedisConfig {
4041
private Boolean oneKey = false;
4142
private String schemaKey = "-schema-key-";
4243
private long rateLimit = 5000L;
44+
private Map<String, DataMap> tableConfig;
4345

4446
private final static String DATA_BASE ="database";
4547

@@ -168,6 +170,10 @@ public String getValueType() {
168170
return valueType;
169171
}
170172

173+
public String getValueType(String key) {
174+
return getTableConfigValue(key, "valueType", valueType);
175+
}
176+
171177
public void setValueType(String valueType) {
172178
this.valueType = valueType;
173179
}
@@ -192,6 +198,10 @@ public String getValueData() {
192198
return valueData;
193199
}
194200

201+
public String getValueData(String key) {
202+
return getTableConfigValue(key, "valueData", valueData);
203+
}
204+
195205
public void setValueData(String valueData) {
196206
this.valueData = valueData;
197207
}
@@ -200,6 +210,10 @@ public String getValueJoinString() {
200210
return valueJoinString;
201211
}
202212

213+
public String getValueJoinString(String key) {
214+
return getTableConfigValue(key, "valueJoinString", valueJoinString);
215+
}
216+
203217
public void setValueJoinString(String valueJoinString) {
204218
this.valueJoinString = valueJoinString;
205219
}
@@ -216,6 +230,10 @@ public Boolean getCsvFormat() {
216230
return csvFormat;
217231
}
218232

233+
public Boolean getCsvFormat(String key) {
234+
return getTableConfigValue(key, "csvFormat", csvFormat);
235+
}
236+
219237
public void setCsvFormat(Boolean csvFormat) {
220238
this.csvFormat = csvFormat;
221239
}
@@ -224,6 +242,10 @@ public Boolean getListHead() {
224242
return listHead;
225243
}
226244

245+
public Boolean getListHead(String key) {
246+
return getTableConfigValue(key, "listHead", listHead);
247+
}
248+
227249
public void setListHead(Boolean listHead) {
228250
this.listHead = listHead;
229251
}
@@ -232,6 +254,10 @@ public String getKeyExpression() {
232254
return keyExpression;
233255
}
234256

257+
public String getKeyExpression(String key) {
258+
return getTableConfigValue(key, "keyExpression", keyExpression);
259+
}
260+
235261
public void setKeyExpression(String keyExpression) {
236262
this.keyExpression = keyExpression;
237263
}
@@ -264,6 +290,10 @@ public Boolean getOneKey() {
264290
return oneKey;
265291
}
266292

293+
public Boolean getOneKey(String key) {
294+
return getTableConfigValue(key, "oneKey", oneKey);
295+
}
296+
267297
public void setOneKey(Boolean oneKey) {
268298
this.oneKey = oneKey;
269299
}
@@ -283,4 +313,19 @@ public long getRateLimit() {
283313
public void setRateLimit(long rateLimit) {
284314
this.rateLimit = rateLimit;
285315
}
316+
317+
public Map<String, DataMap> getTableConfig() {
318+
return tableConfig;
319+
}
320+
321+
public void setTableConfig(Map<String, DataMap> tableConfig) {
322+
this.tableConfig = tableConfig;
323+
}
324+
325+
private <T> T getTableConfigValue(String key, String propertyName, T defaultValue) {
326+
if (tableConfig != null && tableConfig.containsKey(key)) {
327+
return tableConfig.get(key).getValue(propertyName, defaultValue);
328+
}
329+
return defaultValue;
330+
}
286331
}

connectors/redis-connector/src/main/java/io/tapdata/connector/redis/RedisConnector.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public void initConnection(TapConnectionContext connectionContext) throws Throwa
8989
connectorContext.getStateMap().put("firstConnectorId", firstConnectorId);
9090
}
9191
redisConfig.load(connectionContext.getNodeConfig());
92+
redisConfig.setTableConfig(connectionContext.getTableNodeConfig());
9293
});
9394
this.redisContext = new RedisContext(redisConfig);
9495
this.redisExceptionCollector = new RedisExceptionCollector();
@@ -140,7 +141,7 @@ public ConnectionOptions connectionTest(TapConnectionContext connectionContext,
140141

141142
private void writeRecord(TapConnectorContext connectorContext, List<TapRecordEvent> tapRecordEvents, TapTable tapTable, Consumer<WriteListResult<TapRecordEvent>> writeListResultConsumer) throws Throwable {
142143
AbstractRedisRecordWriter recordWriter;
143-
switch (ValueTypeEnum.fromString(redisConfig.getValueType())) {
144+
switch (ValueTypeEnum.fromString(redisConfig.getValueType(tapTable.getId()))) {
144145
case LIST:
145146
recordWriter = new ListRedisRecordWriter(redisContext, tapTable);
146147
break;
@@ -162,13 +163,13 @@ private void writeRecord(TapConnectorContext connectorContext, List<TapRecordEve
162163
}
163164

164165
private void clearTable(TapConnectorContext tapConnectorContext, TapClearTableEvent tapClearTableEvent) {
165-
if (redisConfig.getOneKey()) {
166+
if (redisConfig.getOneKey(tapClearTableEvent.getTableId())) {
166167
cleanOneKey(tapClearTableEvent.getTableId());
167168
}
168169
}
169170

170171
private void dropTable(TapConnectorContext tapConnectorContext, TapDropTableEvent tapDropTableEvent) {
171-
if (redisConfig.getOneKey()) {
172+
if (redisConfig.getOneKey(tapDropTableEvent.getTableId())) {
172173
cleanOneKey(tapDropTableEvent.getTableId());
173174
}
174175
}
@@ -179,30 +180,30 @@ private void cleanOneKey(String keyName) {
179180
}
180181
try (CommonJedis jedis = redisContext.getJedis()) {
181182
jedis.del(keyName);
182-
if (ValueTypeEnum.fromString(redisConfig.getValueType()) == ValueTypeEnum.HASH) {
183+
if (ValueTypeEnum.fromString(redisConfig.getValueType(keyName)) == ValueTypeEnum.HASH) {
183184
jedis.hdel(redisConfig.getSchemaKey(), keyName);
184185
}
185186
}
186187
}
187188

188189
private void createTable(TapConnectorContext tapConnectorContext, TapCreateTableEvent createTableEvent) {
189-
switch (ValueTypeEnum.fromString(redisConfig.getValueType())) {
190+
String keyName = createTableEvent.getTableId();
191+
switch (ValueTypeEnum.fromString(redisConfig.getValueType(keyName))) {
190192
case STRING:
191193
case SET:
192194
case ZSET:
193195
return;
194196
}
195-
if (!redisConfig.getListHead()) {
197+
if (!redisConfig.getListHead(keyName)) {
196198
return;
197199
}
198200
try (CommonJedis jedis = redisContext.getJedis()) {
199201
List<String> fieldList = createTableEvent.getTable().getNameFieldMap().entrySet().stream().sorted(Comparator.comparing(v ->
200202
EmptyKit.isNull(v.getValue().getPos()) ? 99999 : v.getValue().getPos())).map(Map.Entry::getKey).collect(Collectors.toList());
201-
if (redisConfig.getOneKey()) {
202-
String keyName = createTableEvent.getTableId();
203-
if (ValueTypeEnum.fromString(redisConfig.getValueType()) == ValueTypeEnum.LIST) {
203+
if (redisConfig.getOneKey(keyName)) {
204+
if (ValueTypeEnum.fromString(redisConfig.getValueType(keyName)) == ValueTypeEnum.LIST) {
204205
jedis.del(keyName);
205-
jedis.rpush(keyName, String.join(EmptyKit.isEmpty(redisConfig.getValueJoinString()) ? "," : redisConfig.getValueJoinString(), fieldList));
206+
jedis.rpush(keyName, String.join(EmptyKit.isEmpty(redisConfig.getValueJoinString(keyName)) ? "," : redisConfig.getValueJoinString(keyName), fieldList));
206207
} else {
207208
jedis.hset(redisConfig.getSchemaKey(), keyName, fieldList.stream().map(v -> csvFormat(v, ",")).collect(Collectors.joining(",")));
208209
}

connectors/redis-connector/src/main/java/io/tapdata/connector/redis/writer/AbstractRedisRecordWriter.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ public abstract class AbstractRedisRecordWriter {
2727
protected final CommonJedis jedis;
2828
protected final RedisConfig redisConfig;
2929
protected final TapTable tapTable;
30+
protected final String valueData;
31+
protected final String valueJoinString;
32+
protected final Boolean csvFormat;
33+
protected final String keyExpression;
34+
protected final Boolean oneKey;
3035
protected final List<String> fieldList;
3136
protected final List<String> keyFieldList;
3237
protected static final JsonParser jsonParser = InstanceFactory.instance(JsonParser.class); //json util
@@ -35,6 +40,11 @@ public AbstractRedisRecordWriter(RedisContext redisContext, TapTable tapTable) {
3540
this.redisConfig = redisContext.getRedisConfig();
3641
this.jedis = redisContext.getJedis();
3742
this.tapTable = tapTable;
43+
this.valueData = redisConfig.getValueData(tapTable.getId());
44+
this.valueJoinString = redisConfig.getValueJoinString(tapTable.getId());
45+
this.csvFormat = redisConfig.getCsvFormat(tapTable.getId());
46+
this.keyExpression = redisConfig.getKeyExpression(tapTable.getId());
47+
this.oneKey = redisConfig.getOneKey(tapTable.getId());
3848
this.fieldList = tapTable.getNameFieldMap().entrySet().stream().sorted(Comparator.comparing(v ->
3949
EmptyKit.isNull(v.getValue().getPos()) ? 99999 : v.getValue().getPos())).map(Map.Entry::getKey).collect(Collectors.toList());
4050
this.keyFieldList = getKeyFieldList();
@@ -92,8 +102,8 @@ public void write(List<TapRecordEvent> tapRecordEvents, Consumer<WriteListResult
92102
protected abstract void handleDeleteEvent(TapDeleteRecordEvent event, RedisPipeline pipelined);
93103

94104
protected String getRedisKey(Map<String, Object> value) {
95-
if (EmptyKit.isNotBlank(redisConfig.getKeyExpression())) {
96-
String key = redisConfig.getKeyExpression();
105+
if (EmptyKit.isNotBlank(keyExpression)) {
106+
String key = keyExpression;
97107
for (String field : fieldList) {
98108
Object obj = value.get(field);
99109
key = key.replaceAll("\\$\\{" + field + "}", EmptyKit.isNull(obj) ? "null" : String.valueOf(obj));
@@ -117,7 +127,7 @@ protected String getRedisKey(Map<String, Object> value) {
117127

118128
protected List<String> getKeyFieldList() {
119129
List<String> keyFieldList = new ArrayList<>();
120-
String expression = redisConfig.getKeyExpression();
130+
String expression = keyExpression;
121131
if (EmptyKit.isBlank(expression)) {
122132
return keyFieldList;
123133
}
@@ -137,12 +147,12 @@ protected String getTextValue(Map<String, Object> value) {
137147
return fieldList.stream().map(v -> {
138148
Object obj = value.get(v);
139149
String str = EmptyKit.isNull(obj) ? "null" : String.valueOf(obj);
140-
if (redisConfig.getCsvFormat()) {
141-
return csvFormat(str, redisConfig.getValueJoinString());
150+
if (csvFormat) {
151+
return csvFormat(str, valueJoinString);
142152
} else {
143153
return str;
144154
}
145-
}).collect(Collectors.joining(redisConfig.getValueJoinString()));
155+
}).collect(Collectors.joining(valueJoinString));
146156
}
147157

148158
private String csvFormat(String str, String delimiter) {

connectors/redis-connector/src/main/java/io/tapdata/connector/redis/writer/HashRedisRecordWriter.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ public HashRedisRecordWriter(RedisContext redisContext, TapTable tapTable) {
2525
protected void handleInsertEvent(TapInsertRecordEvent event, RedisPipeline pipelined) {
2626
Map<String, Object> value = event.getAfter();
2727
String fieldName = getRedisKey(value);
28-
if (redisConfig.getOneKey()) {
29-
String strValue = ValueDataEnum.JSON.getType().equals(redisConfig.getValueData()) ? getJsonValue(value) : getTextValue(value);
28+
if (oneKey) {
29+
String strValue = ValueDataEnum.JSON.getType().equals(valueData) ? getJsonValue(value) : getTextValue(value);
3030
pipelined.hset(keyName, fieldName, strValue);
3131
} else {
3232
pipelined.hmset(fieldName, toStringMap(value));
@@ -43,9 +43,9 @@ protected void handleUpdateEvent(TapUpdateRecordEvent event, RedisPipeline pipel
4343
} else {
4444
keyFieldList.forEach(v -> lastBefore.put(v, afterValue.get(v)));
4545
}
46-
if (redisConfig.getOneKey()) {
46+
if (oneKey) {
4747
pipelined.hdel(keyName, getRedisKey(lastBefore));
48-
String strValue = ValueDataEnum.JSON.getType().equals(redisConfig.getValueData()) ? getJsonValue(afterValue) : getTextValue(afterValue);
48+
String strValue = ValueDataEnum.JSON.getType().equals(valueData) ? getJsonValue(afterValue) : getTextValue(afterValue);
4949
pipelined.hset(keyName, getRedisKey(afterValue), strValue);
5050
} else {
5151
pipelined.del(getRedisKey(lastBefore));
@@ -57,8 +57,8 @@ protected void handleUpdateEvent(TapUpdateRecordEvent event, RedisPipeline pipel
5757
protected void handleDeleteEvent(TapDeleteRecordEvent event, RedisPipeline pipelined) {
5858
Map<String, Object> value = event.getBefore();
5959
String fieldName = getRedisKey(value);
60-
if (redisConfig.getOneKey()) {
61-
String strValue = ValueDataEnum.JSON.getType().equals(redisConfig.getValueData()) ? getJsonValue(value) : getTextValue(value);
60+
if (oneKey) {
61+
String strValue = ValueDataEnum.JSON.getType().equals(valueData) ? getJsonValue(value) : getTextValue(value);
6262
pipelined.hdel(keyName, fieldName, strValue);
6363
} else {
6464
pipelined.del(fieldName);

connectors/redis-connector/src/main/java/io/tapdata/connector/redis/writer/ListRedisRecordWriter.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ public ListRedisRecordWriter(RedisContext redisContext, TapTable tapTable) {
2424
@Override
2525
protected void handleInsertEvent(TapInsertRecordEvent event, RedisPipeline pipelined) {
2626
Map<String, Object> value = event.getAfter();
27-
String strValue = ValueDataEnum.JSON.getType().equals(redisConfig.getValueData()) ? getJsonValue(value) : getTextValue(value);
28-
if (!redisConfig.getOneKey()) {
27+
String strValue = ValueDataEnum.JSON.getType().equals(valueData) ? getJsonValue(value) : getTextValue(value);
28+
if (!oneKey) {
2929
keyName = getRedisKey(value);
3030
}
3131
pipelined.rpush(keyName, strValue);
@@ -34,7 +34,7 @@ protected void handleInsertEvent(TapInsertRecordEvent event, RedisPipeline pipel
3434
@Override
3535
protected void handleUpdateEvent(TapUpdateRecordEvent event, RedisPipeline pipelined) throws Exception {
3636
Map<String, Object> afterValue = event.getAfter();
37-
String newValue = ValueDataEnum.JSON.getType().equals(redisConfig.getValueData()) ? getJsonValue(afterValue) : getTextValue(afterValue);
37+
String newValue = ValueDataEnum.JSON.getType().equals(valueData) ? getJsonValue(afterValue) : getTextValue(afterValue);
3838
if (null == event.getBefore()) {
3939
throw new Exception("Redis update failed reason before data is null");
4040
}
@@ -45,8 +45,8 @@ protected void handleUpdateEvent(TapUpdateRecordEvent event, RedisPipeline pipel
4545
} else {
4646
keyFieldList.forEach(v -> lastBefore.put(v, afterValue.get(v)));
4747
}
48-
String oldValue = ValueDataEnum.JSON.getType().equals(redisConfig.getValueData()) ? getJsonValue(lastBefore) : getTextValue(lastBefore);
49-
if (!redisConfig.getOneKey()) {
48+
String oldValue = ValueDataEnum.JSON.getType().equals(valueData) ? getJsonValue(lastBefore) : getTextValue(lastBefore);
49+
if (!oneKey) {
5050
String newKeyName = getRedisKey(afterValue);
5151
String oldKeyName = getRedisKey(lastBefore);
5252
if (newKeyName.equals(oldKeyName)) {
@@ -63,8 +63,8 @@ protected void handleUpdateEvent(TapUpdateRecordEvent event, RedisPipeline pipel
6363
@Override
6464
protected void handleDeleteEvent(TapDeleteRecordEvent event, RedisPipeline pipelined) {
6565
Map<String, Object> value = event.getBefore();
66-
String oldValue = ValueDataEnum.JSON.getType().equals(redisConfig.getValueData()) ? getJsonValue(value) : getTextValue(value);
67-
if (redisConfig.getOneKey()) {
66+
String oldValue = ValueDataEnum.JSON.getType().equals(valueData) ? getJsonValue(value) : getTextValue(value);
67+
if (oneKey) {
6868
pipelined.lrem(keyName, 1, oldValue);
6969
} else {
7070
pipelined.lrem(getRedisKey(value), 1, oldValue);

connectors/redis-connector/src/main/java/io/tapdata/connector/redis/writer/StringRedisRecordWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public StringRedisRecordWriter(RedisContext redisContext, TapTable tapTable) {
2121
@Override
2222
protected void handleInsertEvent(TapInsertRecordEvent event, RedisPipeline pipelined) {
2323
Map<String, Object> value = event.getAfter();
24-
String strValue = ValueDataEnum.JSON.getType().equals(redisConfig.getValueData()) ? getJsonValue(value) : getTextValue(value);
24+
String strValue = ValueDataEnum.JSON.getType().equals(valueData) ? getJsonValue(value) : getTextValue(value);
2525
pipelined.set(getRedisKey(value), strValue);
2626
}
2727

@@ -36,7 +36,7 @@ protected void handleUpdateEvent(TapUpdateRecordEvent event, RedisPipeline pipel
3636
keyFieldList.forEach(v -> lastBefore.put(v, afterValue.get(v)));
3737
}
3838
pipelined.del(getRedisKey(lastBefore));
39-
String strValue = ValueDataEnum.JSON.getType().equals(redisConfig.getValueData()) ? getJsonValue(afterValue) : getTextValue(afterValue);
39+
String strValue = ValueDataEnum.JSON.getType().equals(valueData) ? getJsonValue(afterValue) : getTextValue(afterValue);
4040
pipelined.set(getRedisKey(afterValue), strValue);
4141
}
4242

0 commit comments

Comments
 (0)