Skip to content

Commit 5805a97

Browse files
craig[bot]stevendanna
craig[bot]
andcommitted
Merge #146270
146270: kvserver: remove UnregisterFromReplica callback r=tbg a=stevendanna The UnregisterFromReplica callback required obtaining the rangefeedMu mutex from inside the rangefeed scheduler worker. This is a problem because the current holder of the rangefeedMu (specifically (*ScheduledProcessor).Register) may require a response from the very same scheduler worker to make progress and release rangefeedMu. Here, we solve that by removing this UnregisterFromReplica callback completely. This callback was responsible for removing the processor from the replica when the processor had been shut down. But, nearly every code path that calls Stop() already removes the processor from the replica itself. The only case where this wasn't true is when the processor stops itself because it has no more active registrations. Not removing the processor from the replica in that case has two consequences: 1. We may hold onto memory related to the ScheduledProcessor struct, 2. The replica does extra work because of the rangefeed processor is set. Here, we choose to ignore (1) as a minor problem since a most ranges that have a rangefeed started once will have one started again in the future. We solve (2) by making the `stopped` state variable accessible from outside the processor and consulting it before using the rangefeed processor on the replica. The assumption here is that the atomic load is cheap in comparison to the work we need to do when the processor is present. Fixes #144828 Epic: none Release note (bug fix): Fix a bug that could lead to a node stall. Co-authored-by: Steven Danna <[email protected]>
2 parents 995ee1d + 9de8f59 commit 5805a97

File tree

4 files changed

+53
-31
lines changed

4 files changed

+53
-31
lines changed

pkg/kv/kvserver/rangefeed/processor.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,6 @@ type Config struct {
104104
// for low-volume system ranges, since the worker pool is small (default 2).
105105
// Only has an effect when Scheduler is used.
106106
Priority bool
107-
108-
// UnregisterFromReplica is a callback provided from the
109-
// replica that this processor can call when shutting down to
110-
// remove itself from the replica.
111-
UnregisterFromReplica func(Processor)
112107
}
113108

114109
// SetDefaults initializes unset fields in Config to values
@@ -166,6 +161,9 @@ type Processor interface {
166161
// It is not valid to restart a processor after it has been stopped.
167162
StopWithErr(pErr *kvpb.Error)
168163

164+
// Returns true if a stop event has already been processed by this processor.
165+
Stopping() bool
166+
169167
// Lifecycle of registrations.
170168

171169
// Register registers the stream over the specified span of keys.

pkg/kv/kvserver/rangefeed/scheduled_processor.go

+21-10
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,20 @@ type ScheduledProcessor struct {
5757

5858
requestQueue chan request
5959
eventC chan *event
60-
// If true, processor is not processing data anymore and waiting for registrations
61-
// to be complete.
62-
stopping bool
60+
61+
// stopping, when true, indicates that a stop request has been processed.
62+
// After this is set, all subsequent registration requests will fail fast and
63+
// any events will be ignored. This is only ever set via stopInternal, but it
64+
// is an atomic to allow it to be read outside this processor. This is used in
65+
// the replica to handle the case where we have stopped ourselves but are
66+
// still attached to the replica. It should be preferred over stoppedC for all
67+
// uses other than unblocking request waiters since stoppedC is not closed
68+
// until the processsor is fully closed up.
69+
stopping atomic.Bool
70+
// stoppedC is closed during the final steps of processor shutdown. This is
71+
// used to unblock anyone who might be waiting on a response. Callsites that
72+
// need to decide whether or not to process a request should consult stopping
73+
// instead.
6374
stoppedC chan struct{}
6475

6576
// stopper passed by start that is used for firing up async work from scheduler.
@@ -185,7 +196,7 @@ func (p *ScheduledProcessor) processEvents(ctx context.Context) {
185196
for max := len(p.eventC); max > 0; max-- {
186197
select {
187198
case e := <-p.eventC:
188-
if !p.stopping {
199+
if !p.stopping.Load() {
189200
// If we are stopping, there's no need to forward any remaining
190201
// data since registrations already have errors set.
191202
p.consumeEvent(ctx, e)
@@ -255,10 +266,6 @@ func (p *ScheduledProcessor) cleanup() {
255266
p.taskCancel()
256267
close(p.stoppedC)
257268
p.MemBudget.Close(ctx)
258-
if p.UnregisterFromReplica != nil {
259-
p.UnregisterFromReplica(p)
260-
}
261-
262269
}
263270

264271
// Stop shuts down the processor and closes all registrations. Safe to call on
@@ -278,6 +285,10 @@ func (p *ScheduledProcessor) StopWithErr(pErr *kvpb.Error) {
278285
p.sendStop(pErr)
279286
}
280287

288+
func (p *ScheduledProcessor) Stopping() bool {
289+
return p.stopping.Load()
290+
}
291+
281292
// DisconnectSpanWithErr disconnects all rangefeed registrations that overlap
282293
// the given span with the given error.
283294
func (p *ScheduledProcessor) DisconnectSpanWithErr(span roachpb.Span, pErr *kvpb.Error) {
@@ -299,7 +310,7 @@ func (p *ScheduledProcessor) stopInternal(ctx context.Context, pErr *kvpb.Error)
299310
p.reg.DisconnectAllOnShutdown(ctx, pErr)
300311
// First set stopping flag to ensure that once all registrations are removed
301312
// processor should stop.
302-
p.stopping = true
313+
p.stopping.Store(true)
303314
p.scheduler.StopProcessor()
304315
}
305316

@@ -349,7 +360,7 @@ func (p *ScheduledProcessor) Register(
349360
}
350361

351362
filter := runRequest(p, func(ctx context.Context, p *ScheduledProcessor) *Filter {
352-
if p.stopping {
363+
if p.stopping.Load() {
353364
return nil
354365
}
355366
if !p.Span.AsRawSpanWithNoLocals().Contains(r.getSpan()) {

pkg/kv/kvserver/replica.go

+3
Original file line numberDiff line numberDiff line change
@@ -1046,6 +1046,9 @@ type Replica struct {
10461046
// Requires Replica.raftMu be held when providing logical ops and
10471047
// informing the processor of closed timestamp updates. This properly
10481048
// synchronizes updates that are linearized and driven by the Raft log.
1049+
//
1050+
// proc should only be accessed via getRangefeedProcessorAndFilter or
1051+
// getRangefeedProcessor in nearly all cases.
10491052
proc rangefeed.Processor
10501053
// opFilter is a best-effort filter that informs the raft processing
10511054
// goroutine of which logical operations the rangefeed processor is

pkg/kv/kvserver/replica_rangefeed.go

+26-16
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,18 @@ func (r *Replica) RangeFeed(
348348
func (r *Replica) getRangefeedProcessorAndFilter() (rangefeed.Processor, *rangefeed.Filter) {
349349
r.rangefeedMu.RLock()
350350
defer r.rangefeedMu.RUnlock()
351-
return r.rangefeedMu.proc, r.rangefeedMu.opFilter
351+
p := r.rangefeedMu.proc
352+
if p != nil && p.Stopping() {
353+
// This is here only to try to preserve existing behaviour when fixing
354+
// #144828. Nearly all call paths that stop the processor immediately remove
355+
// the processor from the replica. The only call path where this isn't true
356+
// is when the processor stops itself after find all of its registrations
357+
// have been removed. Thus, we check stopping here to avoid doing work on
358+
// this stop-but-not-yet-removed processor.
359+
return nil, r.rangefeedMu.opFilter
360+
} else {
361+
return p, r.rangefeedMu.opFilter
362+
}
352363
}
353364

354365
func (r *Replica) getRangefeedProcessor() rangefeed.Processor {
@@ -485,21 +496,20 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
485496
desc := r.Desc()
486497
tp := rangefeedTxnPusher{ir: r.store.intentResolver, r: r, span: desc.RSpan()}
487498
cfg := rangefeed.Config{
488-
AmbientContext: r.AmbientContext,
489-
Clock: r.Clock(),
490-
Stopper: r.store.stopper,
491-
Settings: r.store.ClusterSettings(),
492-
RangeID: r.RangeID,
493-
Span: desc.RSpan(),
494-
TxnPusher: &tp,
495-
PushTxnsAge: r.store.TestingKnobs().RangeFeedPushTxnsAge,
496-
EventChanCap: defaultEventChanCap,
497-
EventChanTimeout: defaultEventChanTimeout,
498-
Metrics: r.store.metrics.RangeFeedMetrics,
499-
MemBudget: feedBudget,
500-
Scheduler: r.store.getRangefeedScheduler(),
501-
Priority: isSystemSpan, // only takes effect when Scheduler != nil
502-
UnregisterFromReplica: r.unsetRangefeedProcessor,
499+
AmbientContext: r.AmbientContext,
500+
Clock: r.Clock(),
501+
Stopper: r.store.stopper,
502+
Settings: r.store.ClusterSettings(),
503+
RangeID: r.RangeID,
504+
Span: desc.RSpan(),
505+
TxnPusher: &tp,
506+
PushTxnsAge: r.store.TestingKnobs().RangeFeedPushTxnsAge,
507+
EventChanCap: defaultEventChanCap,
508+
EventChanTimeout: defaultEventChanTimeout,
509+
Metrics: r.store.metrics.RangeFeedMetrics,
510+
MemBudget: feedBudget,
511+
Scheduler: r.store.getRangefeedScheduler(),
512+
Priority: isSystemSpan, // only takes effect when Scheduler != nil
503513
}
504514
p = rangefeed.NewProcessor(cfg)
505515

0 commit comments

Comments
 (0)