Skip to content

Commit 130d4f5

Browse files
authored
Support intermediate sync() calls in multi node pipelines (#3226)
* Support intermediate sync() calls in multi node pipelines * improve
1 parent cbda0d6 commit 130d4f5

File tree

2 files changed

+253
-186
lines changed

2 files changed

+253
-186
lines changed

src/main/java/redis/clients/jedis/MultiNodePipelineBase.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,14 @@ public abstract class MultiNodePipelineBase implements PipelineCommands, Pipelin
4141

4242
private final Map<HostAndPort, Queue<Response<?>>> pipelinedResponses;
4343
private final Map<HostAndPort, Connection> connections;
44-
private volatile boolean synced;
44+
private volatile boolean syncing = false;
4545

4646
private final CommandObjects commandObjects;
4747
private GraphCommandObjects graphCommandObjects;
4848

4949
public MultiNodePipelineBase(CommandObjects commandObjects) {
5050
pipelinedResponses = new LinkedHashMap<>();
5151
connections = new LinkedHashMap<>();
52-
synced = false;
5352
this.commandObjects = commandObjects;
5453
}
5554

@@ -90,16 +89,15 @@ public void close() {
9089
try {
9190
sync();
9291
} finally {
93-
for (Connection connection : connections.values()) {
94-
IOUtils.closeQuietly(connection);
95-
}
92+
connections.values().forEach(IOUtils::closeQuietly);
9693
}
9794
}
9895

9996
public final void sync() {
100-
if (synced) {
97+
if (syncing) {
10198
return;
10299
}
100+
syncing = true;
103101

104102
Iterator<Map.Entry<HostAndPort, Queue<Response<?>>>> pipelinedResponsesIterator
105103
= pipelinedResponses.entrySet().iterator();
@@ -121,7 +119,8 @@ public final void sync() {
121119
IOUtils.closeQuietly(connection);
122120
}
123121
}
124-
synced = true;
122+
123+
syncing = false;
125124
}
126125

127126
@Override

0 commit comments

Comments
 (0)