@@ -116,6 +116,7 @@ public abstract class PersistentReplicator extends AbstractReplicator
116
116
protected final ReplicatorStatsImpl stats = new ReplicatorStatsImpl ();
117
117
118
118
protected volatile boolean fetchSchemaInProgress = false ;
119
+ private volatile boolean waitForCursorRewinding = false ;
119
120
120
121
public PersistentReplicator (String localCluster , PersistentTopic localTopic , ManagedCursor cursor ,
121
122
String remoteCluster , String remoteTopic ,
@@ -143,9 +144,15 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man
143
144
144
145
@ Override
145
146
protected void setProducerAndTriggerReadEntries (Producer <byte []> producer ) {
146
- // Rewind the cursor to be sure to read again all non-acked messages sent while restarting.
147
- cursor .rewind ();
148
- cursor .cancelPendingReadRequest ();
147
+ waitForCursorRewinding = true ;
148
+
149
+ // Repeat until there are no read operations in progress
150
+ if (STATE_UPDATER .get (this ) == State .Starting && HAVE_PENDING_READ_UPDATER .get (this ) == TRUE
151
+ && !cursor .cancelPendingReadRequest ()) {
152
+ brokerService .getPulsar ().getExecutor ()
153
+ .schedule (() -> setProducerAndTriggerReadEntries (producer ), 10 , TimeUnit .MILLISECONDS );
154
+ return ;
155
+ }
149
156
150
157
/**
151
158
* 1. Try change state to {@link Started}.
@@ -158,6 +165,7 @@ protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {
158
165
if (!(producer instanceof ProducerImpl )) {
159
166
log .error ("[{}] The partitions count between two clusters is not the same, the replicator can not be"
160
167
+ " created successfully: {}" , replicatorId , state );
168
+ waitForCursorRewinding = false ;
161
169
doCloseProducerAsync (producer , () -> {});
162
170
throw new ClassCastException (producer .getClass ().getName () + " can not be cast to ProducerImpl" );
163
171
}
@@ -168,6 +176,11 @@ protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {
168
176
backOff .reset ();
169
177
// activate cursor: so, entries can be cached.
170
178
this .cursor .setActive ();
179
+
180
+ // Rewind the cursor to be sure to read again all non-acked messages sent while restarting
181
+ cursor .rewind ();
182
+ waitForCursorRewinding = false ;
183
+
171
184
// read entries
172
185
readMoreEntries ();
173
186
} else {
@@ -183,6 +196,7 @@ protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {
183
196
log .error ("[{}] Replicator state is not expected, so close the producer. Replicator state: {}" ,
184
197
replicatorId , changeStateRes .getRight ());
185
198
}
199
+ waitForCursorRewinding = false ;
186
200
// Close the producer if change the state fail.
187
201
doCloseProducerAsync (producer , () -> {});
188
202
}
@@ -296,6 +310,11 @@ protected void readMoreEntries() {
296
310
297
311
// Schedule read
298
312
if (HAVE_PENDING_READ_UPDATER .compareAndSet (this , FALSE , TRUE )) {
313
+ if (waitForCursorRewinding ) {
314
+ log .info ("[{}] Skip the reading because repl producer is starting" , replicatorId );
315
+ HAVE_PENDING_READ_UPDATER .set (this , FALSE );
316
+ return ;
317
+ }
299
318
if (log .isDebugEnabled ()) {
300
319
log .debug ("[{}] Schedule read of {} messages or {} bytes" , replicatorId , messagesToRead ,
301
320
bytesToRead );
0 commit comments