Skip to content

Commit 07510ed

Browse files
lm-yljlimin
and
limin
authored
[Fix][Connector-Redis] Redis did not write successfully, but the task did not fail (#9055)
Co-authored-by: limin <[email protected]>
1 parent cac2de1 commit 07510ed

File tree

8 files changed

+553
-16
lines changed

8 files changed

+553
-16
lines changed

Diff for: seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java

+39-15
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@
2020
import org.apache.seatunnel.api.table.type.RowKind;
2121
import org.apache.seatunnel.common.utils.JsonUtils;
2222
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
23+
import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
24+
import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisErrorCode;
2325

2426
import org.apache.commons.collections4.CollectionUtils;
2527

2628
import redis.clients.jedis.Jedis;
2729
import redis.clients.jedis.Pipeline;
2830
import redis.clients.jedis.Response;
31+
import redis.clients.jedis.exceptions.JedisException;
2932

3033
import java.util.ArrayList;
3134
import java.util.List;
@@ -142,69 +145,76 @@ public List<List<String>> batchGetZset(List<String> keys) {
142145
@Override
143146
public void batchWriteString(
144147
List<RowKind> rowKinds, List<String> keys, List<String> values, long expireSeconds) {
148+
List<Response<?>> responses = new ArrayList<>();
145149
Pipeline pipelined = jedis.pipelined();
146150
int size = keys.size();
147151
for (int i = 0; i < size; i++) {
148152
RowKind rowKind = rowKinds.get(i);
149153
String key = keys.get(i);
150154
String value = values.get(i);
151155
if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) {
152-
pipelined.del(key);
156+
responses.add(pipelined.del(key));
153157
} else {
154-
pipelined.set(key, value);
158+
responses.add(pipelined.set(key, value));
155159
if (expireSeconds > 0) {
156-
pipelined.expire(key, expireSeconds);
160+
responses.add(pipelined.expire(key, expireSeconds));
157161
}
158162
}
159163
}
160164
pipelined.sync();
165+
processResponses(responses);
161166
}
162167

163168
@Override
164169
public void batchWriteList(
165170
List<RowKind> rowKinds, List<String> keys, List<String> values, long expireSeconds) {
171+
List<Response<?>> responses = new ArrayList<>();
166172
Pipeline pipelined = jedis.pipelined();
167173
int size = keys.size();
168174
for (int i = 0; i < size; i++) {
169175
RowKind rowKind = rowKinds.get(i);
170176
String key = keys.get(i);
171177
String value = values.get(i);
172178
if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) {
173-
pipelined.lrem(key, 1, value);
179+
responses.add(pipelined.lrem(key, 1, value));
174180
} else {
175-
pipelined.lpush(key, value);
181+
responses.add(pipelined.lpush(key, value));
176182
if (expireSeconds > 0) {
177-
pipelined.expire(key, expireSeconds);
183+
responses.add(pipelined.expire(key, expireSeconds));
178184
}
179185
}
180186
}
181187
pipelined.sync();
188+
processResponses(responses);
182189
}
183190

184191
@Override
185192
public void batchWriteSet(
186193
List<RowKind> rowKinds, List<String> keys, List<String> values, long expireSeconds) {
194+
List<Response<?>> responses = new ArrayList<>();
187195
Pipeline pipelined = jedis.pipelined();
188196
int size = keys.size();
189197
for (int i = 0; i < size; i++) {
190198
RowKind rowKind = rowKinds.get(i);
191199
String key = keys.get(i);
192200
String value = values.get(i);
193201
if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) {
194-
pipelined.srem(key, value);
202+
responses.add(pipelined.srem(key, value));
195203
} else {
196-
pipelined.sadd(key, value);
204+
responses.add(pipelined.sadd(key, value));
197205
if (expireSeconds > 0) {
198-
pipelined.expire(key, expireSeconds);
206+
responses.add(pipelined.expire(key, expireSeconds));
199207
}
200208
}
201209
}
202210
pipelined.sync();
211+
processResponses(responses);
203212
}
204213

205214
@Override
206215
public void batchWriteHash(
207216
List<RowKind> rowKinds, List<String> keys, List<String> values, long expireSeconds) {
217+
List<Response<?>> responses = new ArrayList<>();
208218
Pipeline pipelined = jedis.pipelined();
209219
int size = keys.size();
210220
for (int i = 0; i < size; i++) {
@@ -214,36 +224,50 @@ public void batchWriteHash(
214224
Map<String, String> fieldsMap = JsonUtils.toMap(value);
215225
if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) {
216226
for (Map.Entry<String, String> entry : fieldsMap.entrySet()) {
217-
pipelined.hdel(key, entry.getKey());
227+
responses.add(pipelined.hdel(key, entry.getKey()));
218228
}
219229
} else {
220-
pipelined.hset(key, fieldsMap);
230+
responses.add(pipelined.hset(key, fieldsMap));
221231
if (expireSeconds > 0) {
222-
pipelined.expire(key, expireSeconds);
232+
responses.add(pipelined.expire(key, expireSeconds));
223233
}
224234
}
225235
}
226236
pipelined.sync();
237+
processResponses(responses);
227238
}
228239

229240
@Override
230241
public void batchWriteZset(
231242
List<RowKind> rowKinds, List<String> keys, List<String> values, long expireSeconds) {
243+
List<Response<?>> responses = new ArrayList<>();
232244
Pipeline pipelined = jedis.pipelined();
233245
int size = keys.size();
234246
for (int i = 0; i < size; i++) {
235247
RowKind rowKind = rowKinds.get(i);
236248
String key = keys.get(i);
237249
String value = values.get(i);
238250
if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) {
239-
pipelined.zrem(key, value);
251+
responses.add(pipelined.zrem(key, value));
240252
} else {
241-
pipelined.zadd(key, 1, value);
253+
responses.add(pipelined.zadd(key, 1, value));
242254
if (expireSeconds > 0) {
243-
pipelined.expire(key, expireSeconds);
255+
responses.add(pipelined.expire(key, expireSeconds));
244256
}
245257
}
246258
}
247259
pipelined.sync();
260+
processResponses(responses);
261+
}
262+
263+
private void processResponses(List<Response<?>> responseList) {
264+
try {
265+
for (Response<?> response : responseList) {
266+
// If the response is an exception object, it will be thrown
267+
response.get();
268+
}
269+
} catch (JedisException e) {
270+
throw new RedisConnectorException(RedisErrorCode.GET_RESPONSE_FAILED, e);
271+
}
248272
}
249273
}

Diff for: seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020

2121
public enum RedisErrorCode implements SeaTunnelErrorCode {
2222
GET_REDIS_VERSION_INFO_FAILED("RedisErrorCode-01", "Failed to get the redis version"),
23-
INVALID_CONFIG("RedisErrorCode-02", "Invalid redis Config");
23+
INVALID_CONFIG("RedisErrorCode-02", "Invalid redis Config"),
24+
GET_RESPONSE_FAILED("RedisErrorCode-03", "Failed to get the write response");
2425

2526
private final String code;
2627
private final String description;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seatunnel.e2e.connector.redis;
18+
19+
import org.apache.seatunnel.e2e.common.TestResource;
20+
import org.apache.seatunnel.e2e.common.TestSuiteBase;
21+
import org.apache.seatunnel.e2e.common.container.TestContainer;
22+
23+
import org.junit.jupiter.api.AfterAll;
24+
import org.junit.jupiter.api.Assertions;
25+
import org.junit.jupiter.api.BeforeAll;
26+
import org.junit.jupiter.api.TestTemplate;
27+
import org.testcontainers.containers.GenericContainer;
28+
import org.testcontainers.containers.output.Slf4jLogConsumer;
29+
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
30+
import org.testcontainers.lifecycle.Startables;
31+
import org.testcontainers.utility.DockerImageName;
32+
import org.testcontainers.utility.DockerLoggerFactory;
33+
34+
import lombok.extern.slf4j.Slf4j;
35+
import redis.clients.jedis.Jedis;
36+
37+
import java.time.Duration;
38+
import java.util.Objects;
39+
import java.util.stream.Stream;
40+
41+
@Slf4j
42+
public class RedisMasterAndSlaveIT extends TestSuiteBase implements TestResource {
43+
private static RedisContainerInfo masterContainerInfo;
44+
private static RedisContainerInfo slaveContainerInfo;
45+
private static GenericContainer<?> master;
46+
private static GenericContainer<?> slave;
47+
private Jedis slaveJedis;
48+
49+
@BeforeAll
50+
@Override
51+
public void startUp() throws Exception {
52+
masterContainerInfo =
53+
new RedisContainerInfo("redis-e2e-master", 6379, "SeaTunnel", "redis:7");
54+
master =
55+
new GenericContainer<>(DockerImageName.parse(masterContainerInfo.getImageName()))
56+
.withNetwork(NETWORK)
57+
.withNetworkAliases(masterContainerInfo.getHost())
58+
.withExposedPorts(masterContainerInfo.getPort())
59+
.withLogConsumer(
60+
new Slf4jLogConsumer(
61+
DockerLoggerFactory.getLogger(
62+
masterContainerInfo.getImageName())))
63+
.withCommand(
64+
String.format(
65+
"redis-server --requirepass %s",
66+
masterContainerInfo.getPassword()))
67+
.waitingFor(
68+
new HostPortWaitStrategy()
69+
.withStartupTimeout(Duration.ofMinutes(2)));
70+
master.start();
71+
log.info("Redis master container started");
72+
73+
slaveContainerInfo =
74+
new RedisContainerInfo("redis-e2e-slave", 6379, "SeaTunnel", "redis:7");
75+
slave =
76+
new GenericContainer<>(DockerImageName.parse(slaveContainerInfo.getImageName()))
77+
.withNetwork(NETWORK)
78+
.withNetworkAliases(slaveContainerInfo.getHost())
79+
.withExposedPorts(slaveContainerInfo.getPort())
80+
.withLogConsumer(
81+
new Slf4jLogConsumer(
82+
DockerLoggerFactory.getLogger(
83+
slaveContainerInfo.getImageName())))
84+
.withCommand(
85+
String.format(
86+
"redis-server --requirepass %s --slaveof %s %s --masterauth %s",
87+
slaveContainerInfo.getPassword(),
88+
masterContainerInfo.getHost(),
89+
masterContainerInfo.getPort(),
90+
masterContainerInfo.getPassword()))
91+
.waitingFor(
92+
new HostPortWaitStrategy()
93+
.withStartupTimeout(Duration.ofMinutes(2)));
94+
slave.start();
95+
log.info("Redis slave container started");
96+
Startables.deepStart(Stream.of(master, slave)).join();
97+
this.initSlaveJedis();
98+
}
99+
100+
private void initSlaveJedis() {
101+
Jedis jedis = new Jedis(slave.getHost(), slave.getFirstMappedPort());
102+
jedis.auth(slaveContainerInfo.getPassword());
103+
jedis.ping();
104+
this.slaveJedis = jedis;
105+
}
106+
107+
@AfterAll
108+
@Override
109+
public void tearDown() throws Exception {
110+
if (Objects.nonNull(slaveJedis)) {
111+
slaveJedis.close();
112+
}
113+
114+
if (Objects.nonNull(slave)) {
115+
slave.close();
116+
}
117+
if (Objects.nonNull(master)) {
118+
master.close();
119+
}
120+
}
121+
122+
@TestTemplate
123+
public void testWriteKeyToReadOnlyRedis(TestContainer container) {
124+
try {
125+
container.executeJob("/fake-to-redis-test-readonly-key.conf");
126+
} catch (Exception e) {
127+
String containerLogs = container.getServerLogs();
128+
Assertions.assertTrue(
129+
containerLogs.contains("redis.clients.jedis.exceptions.JedisDataException"));
130+
}
131+
Assertions.assertEquals(null, slaveJedis.get("key_check"));
132+
}
133+
134+
@TestTemplate
135+
public void testWriteListToReadOnlyRedis(TestContainer container) {
136+
try {
137+
container.executeJob("/fake-to-redis-test-readonly-list.conf");
138+
} catch (Exception e) {
139+
String containerLogs = container.getServerLogs();
140+
Assertions.assertTrue(
141+
containerLogs.contains("redis.clients.jedis.exceptions.JedisDataException"));
142+
}
143+
Assertions.assertEquals(0, slaveJedis.llen("list_check"));
144+
}
145+
146+
@TestTemplate
147+
public void testWriteSetToReadOnlyRedis(TestContainer container) {
148+
try {
149+
container.executeJob("/fake-to-redis-test-readonly-set.conf");
150+
} catch (Exception e) {
151+
String containerLogs = container.getServerLogs();
152+
Assertions.assertTrue(
153+
containerLogs.contains("redis.clients.jedis.exceptions.JedisDataException"));
154+
}
155+
Assertions.assertEquals(0, slaveJedis.scard("set_check"));
156+
}
157+
158+
@TestTemplate
159+
public void testWriteZSetToReadOnlyRedis(TestContainer container) {
160+
try {
161+
container.executeJob("/fake-to-redis-test-readonly-zset.conf");
162+
} catch (Exception e) {
163+
String containerLogs = container.getServerLogs();
164+
Assertions.assertTrue(
165+
containerLogs.contains("redis.clients.jedis.exceptions.JedisDataException"));
166+
}
167+
Assertions.assertEquals(0, slaveJedis.zcard("zset_check"));
168+
}
169+
170+
@TestTemplate
171+
public void testWriteHashToReadOnlyRedis(TestContainer container) {
172+
try {
173+
container.executeJob("/fake-to-redis-test-readonly-hash.conf");
174+
} catch (Exception e) {
175+
String containerLogs = container.getServerLogs();
176+
Assertions.assertTrue(
177+
containerLogs.contains("redis.clients.jedis.exceptions.JedisDataException"));
178+
}
179+
Assertions.assertEquals(0, slaveJedis.hlen("hash_check"));
180+
}
181+
}

0 commit comments

Comments
 (0)