21
21
public class StreamAcknowledgementManager {
22
22
private static final Logger LOG = LoggerFactory .getLogger (StreamAcknowledgementManager .class );
23
23
private static final int CHECKPOINT_RECORD_INTERVAL = 50 ;
24
+ private static final int NO_ACK_PARTITION_TIME_OUT_SECONDS = 900 ; // 15 minutes
24
25
private final ConcurrentLinkedQueue <CheckpointStatus > checkpoints = new ConcurrentLinkedQueue <>();
25
26
private final ConcurrentHashMap <String , CheckpointStatus > ackStatus = new ConcurrentHashMap <>();
26
27
private final AcknowledgementSetManager acknowledgementSetManager ;
@@ -42,7 +43,7 @@ public class StreamAcknowledgementManager {
42
43
public static final String NEGATIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME = "negativeAcknowledgementSets" ;
43
44
public static final String RECORDS_CHECKPOINTED = "recordsCheckpointed" ;
44
45
public static final String NO_DATA_EXTEND_LEASE_COUNT = "noDataExtendLeaseCount" ;
45
- public static final String GIVE_UP_PARTITION_COUNT = "giveupPartitionCount " ;
46
+ public static final String GIVE_UP_PARTITION_COUNT = "giveUpPartitionCount " ;
46
47
47
48
48
49
public StreamAcknowledgementManager (final AcknowledgementSetManager acknowledgementSetManager ,
@@ -85,29 +86,34 @@ private void monitorAcknowledgment(final ExecutorService executorService, final
85
86
checkpointStatus = checkpoints .peek ();
86
87
ackCount ++;
87
88
// at high TPS each ack contains 100 records. This should checkpoint every 100*50 = 5000 records.
88
- if (ackCount % CHECKPOINT_RECORD_INTERVAL == 0 ) {
89
+ if ((ackCount % CHECKPOINT_RECORD_INTERVAL == 0 ) ||
90
+ (System .currentTimeMillis () - lastCheckpointTime >= checkPointIntervalInMs )){
89
91
checkpoint (lastCheckpointStatus .getResumeToken (), lastCheckpointStatus .getRecordCount ());
92
+ lastCheckpointTime = System .currentTimeMillis ();
90
93
}
91
94
} while (checkpointStatus != null && checkpointStatus .isPositiveAcknowledgement ());
92
95
checkpoint (lastCheckpointStatus .getResumeToken (), lastCheckpointStatus .getRecordCount ());
93
96
lastCheckpointTime = System .currentTimeMillis ();
94
97
}
95
98
} else {
96
99
LOG .debug ("Checkpoint not complete for resume token {}" , checkpointStatus .getResumeToken ());
97
- final Duration ackWaitDuration = Duration . between ( Instant . ofEpochMilli ( checkpointStatus . getCreateTimestamp ()), Instant . now ());
100
+ // negative ack
98
101
if (checkpointStatus .isNegativeAcknowledgement ()) {
99
- // Give up partition and should interrupt parent thread to stop processing stream
100
- if (lastCheckpointStatus != null && lastCheckpointStatus .isPositiveAcknowledgement ()) {
101
- checkpoint (lastCheckpointStatus .getResumeToken (), lastCheckpointStatus .getRecordCount ());
102
- }
103
- LOG .warn ("Acknowledgement not received for the checkpoint {} past wait time. Giving up partition." , checkpointStatus .getResumeToken ());
104
- partitionCheckpoint .giveUpPartition ();
105
- this .giveupPartitionCount .increment ();
102
+ LOG .warn ("Negative Acknowledgement received for the checkpoint {}. Giving up partition." , checkpointStatus .getResumeToken ());
103
+ giveUpPartition (lastCheckpointStatus );
106
104
break ;
105
+ } else {
106
+ final Duration ackWaitDuration = Duration .between (Instant .ofEpochMilli (checkpointStatus .getCreateTimestamp ()), Instant .now ());
107
+ // no ack received within timeout period
108
+ if (!ackWaitDuration .minusSeconds (NO_ACK_PARTITION_TIME_OUT_SECONDS ).isNegative ()) {
109
+ LOG .warn ("Acknowledgement not received for the checkpoint {} past wait time. Giving up partition." , checkpointStatus .getResumeToken ());
110
+ giveUpPartition (lastCheckpointStatus );
111
+ break ;
112
+ }
107
113
}
108
114
}
109
115
} else {
110
- if (System .currentTimeMillis () - lastCheckpointTime >= checkPointIntervalInMs ) {
116
+ if (System .currentTimeMillis () - lastCheckpointTime >= checkPointIntervalInMs ) { // 1 min
111
117
partitionCheckpoint .extendLease ();
112
118
this .noDataExtendLeaseCount .increment ();
113
119
lastCheckpointTime = System .currentTimeMillis ();
@@ -128,10 +134,19 @@ private void monitorAcknowledgment(final ExecutorService executorService, final
128
134
executorService .shutdown ();
129
135
}
130
136
137
+ private void giveUpPartition (final CheckpointStatus lastCheckpointStatus ) {
138
+ // Give up partition and should interrupt parent thread to stop processing stream
139
+ if (lastCheckpointStatus != null && lastCheckpointStatus .isPositiveAcknowledgement ()) {
140
+ checkpoint (lastCheckpointStatus .getResumeToken (), lastCheckpointStatus .getRecordCount ());
141
+ }
142
+ partitionCheckpoint .giveUpPartition ();
143
+ this .giveupPartitionCount .increment ();
144
+ }
145
+
131
146
private void checkpoint (final String resumeToken , final long recordCount ) {
132
147
LOG .debug ("Perform regular checkpointing for resume token {} at record count {}" , resumeToken , recordCount );
133
148
partitionCheckpoint .checkpoint (resumeToken , recordCount );
134
- this .recordsCheckpointed .increment (recordCount );
149
+ this .recordsCheckpointed .increment ();
135
150
}
136
151
137
152
Optional <AcknowledgementSet > createAcknowledgementSet (final String resumeToken , final long recordNumber ) {
0 commit comments