Skip to content

Commit 0567f51

Browse files
authored
fix: streamingnode get stucked when stop (#42501)
issue: #42498 - fix: sealed segment cannot be flushed after upgrading - fix: get mvcc panic when upgrading - ignore the L0 segment when graceful stop of querynode. --------- Signed-off-by: chyezh <[email protected]>
1 parent 35c1752 commit 0567f51

File tree

9 files changed

+31
-4
lines changed

9 files changed

+31
-4
lines changed

configs/milvus.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,6 +1204,9 @@ streaming:
12041204
# It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration
12051205
backoffInitialInterval: 50ms
12061206
backoffMultiplier: 2 # The multiplier of balance task trigger backoff, 2 by default
1207+
# The timeout of wal balancer operation, 10s by default.
1208+
# If the operation exceeds this timeout, it will be canceled.
1209+
operationTimeout: 10s
12071210
balancePolicy:
12081211
name: vchannelFair # The multiplier of balance task trigger backoff, 2 by default
12091212
vchannelFair:

internal/datacoord/handler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ func (h *ServerHandler) GetDataVChanPositions(channel RWChannel, partitionID Uni
7979
)
8080
for _, s := range segments {
8181
if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
82-
(s.GetState() != commonpb.SegmentState_Growing && s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
83-
// empty growing segment don't have dml position and start position
82+
((s.GetState() != commonpb.SegmentState_Growing && s.GetState() != commonpb.SegmentState_Sealed) && s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
83+
// empty growing and sealed segment don't have dml position and start position
8484
// and it should be recovered for streamingnode, so we add the state-filter here.
8585
continue
8686
}

internal/querynodev2/server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,9 @@ func (node *QueryNode) Stop() error {
562562
if node.pipelineManager != nil {
563563
channelNum = node.pipelineManager.Num()
564564
}
565+
if len(sealedSegments) == 0 && len(growingSegments) == 0 && channelNum == 0 {
566+
break outer
567+
}
565568

566569
select {
567570
case <-timeoutCh:

internal/streamingcoord/server/balancer/balancer_impl.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,21 +315,26 @@ func (b *balancerImpl) applyBalanceResultToStreamingNode(ctx context.Context, mo
315315

316316
// different channel can be execute concurrently.
317317
g, _ := errgroup.WithContext(ctx)
318+
opTimeout := paramtable.Get().StreamingCfg.WALBalancerOperationTimeout.GetAsDurationByParse()
318319
// generate balance operations and applied them.
319320
for _, channel := range modifiedChannels {
320321
channel := channel
321322
g.Go(func() error {
322323
// all history channels should be remove from related nodes.
323324
for _, assignment := range channel.AssignHistories() {
324-
if err := resource.Resource().StreamingNodeManagerClient().Remove(ctx, assignment); err != nil {
325+
opCtx, cancel := context.WithTimeout(ctx, opTimeout)
326+
defer cancel()
327+
if err := resource.Resource().StreamingNodeManagerClient().Remove(opCtx, assignment); err != nil {
325328
b.Logger().Warn("fail to remove channel", zap.String("assignment", assignment.String()), zap.Error(err))
326329
return err
327330
}
328331
b.Logger().Info("remove channel success", zap.String("assignment", assignment.String()))
329332
}
330333

331334
// assign the channel to the target node.
332-
if err := resource.Resource().StreamingNodeManagerClient().Assign(ctx, channel.CurrentAssignment()); err != nil {
335+
opCtx, cancel := context.WithTimeout(ctx, opTimeout)
336+
defer cancel()
337+
if err := resource.Resource().StreamingNodeManagerClient().Assign(opCtx, channel.CurrentAssignment()); err != nil {
333338
b.Logger().Warn("fail to assign channel", zap.String("assignment", channel.CurrentAssignment().String()), zap.Error(err))
334339
return err
335340
}

internal/streamingnode/client/handler/handler_client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ var (
3131
_ HandlerClient = (*handlerClientImpl)(nil)
3232
ErrClientClosed = errors.New("handler client is closed")
3333
ErrClientAssignmentNotReady = errors.New("handler client assignment not ready")
34+
ErrReadOnlyWAL = errors.New("wal is read only")
3435
)
3536

3637
type (

internal/streamingnode/client/handler/handler_client_impl.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ func (hc *handlerClientImpl) GetLatestMVCCTimestampIfLocal(ctx context.Context,
5959
if err != nil {
6060
return 0, err
6161
}
62+
if w.Channel().AccessMode != types.AccessModeRW {
63+
return 0, ErrReadOnlyWAL
64+
}
6265
return w.GetLatestMVCCTimestamp(ctx, vchannel)
6366
}
6467

internal/streamingnode/server/walmanager/wal_lifetime.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ func (w *walLifetime) Close() {
9393
logger := log.With(zap.String("current", toStateString(currentState)))
9494
if oldWAL := currentState.GetWAL(); oldWAL != nil {
9595
oldWAL.Close()
96+
w.statePair.SetCurrentState(newUnavailableCurrentState(currentState.Term(), nil))
9697
logger.Info("close current term wal done at wal life time close")
9798
}
9899
logger.Info("wal lifetime closed")

pkg/util/paramtable/component_param.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5572,6 +5572,7 @@ type streamingConfig struct {
55725572
WALBalancerTriggerInterval ParamItem `refreshable:"true"`
55735573
WALBalancerBackoffInitialInterval ParamItem `refreshable:"true"`
55745574
WALBalancerBackoffMultiplier ParamItem `refreshable:"true"`
5575+
WALBalancerOperationTimeout ParamItem `refreshable:"true"`
55755576

55765577
// balancer Policy
55775578
WALBalancerPolicyName ParamItem `refreshable:"true"`
@@ -5637,6 +5638,15 @@ It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDura
56375638
Export: true,
56385639
}
56395640
p.WALBalancerBackoffMultiplier.Init(base.mgr)
5641+
p.WALBalancerOperationTimeout = ParamItem{
5642+
Key: "streaming.walBalancer.operationTimeout",
5643+
Version: "2.6.0",
5644+
Doc: `The timeout of wal balancer operation, 10s by default.
5645+
If the operation exceeds this timeout, it will be canceled.`,
5646+
DefaultValue: "10s",
5647+
Export: true,
5648+
}
5649+
p.WALBalancerOperationTimeout.Init(base.mgr)
56405650

56415651
p.WALBalancerPolicyName = ParamItem{
56425652
Key: "streaming.walBalancer.balancePolicy.name",

pkg/util/paramtable/component_param_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,7 @@ func TestComponentParam(t *testing.T) {
628628
assert.Equal(t, 0.01, params.StreamingCfg.WALBalancerPolicyVChannelFairAntiAffinityWeight.GetAsFloat())
629629
assert.Equal(t, 0.01, params.StreamingCfg.WALBalancerPolicyVChannelFairRebalanceTolerance.GetAsFloat())
630630
assert.Equal(t, 3, params.StreamingCfg.WALBalancerPolicyVChannelFairRebalanceMaxStep.GetAsInt())
631+
assert.Equal(t, 10*time.Second, params.StreamingCfg.WALBalancerOperationTimeout.GetAsDurationByParse())
631632
assert.Equal(t, 1.0, params.StreamingCfg.WALBroadcasterConcurrencyRatio.GetAsFloat())
632633
assert.Equal(t, 10*time.Second, params.StreamingCfg.TxnDefaultKeepaliveTimeout.GetAsDurationByParse())
633634
assert.Equal(t, 30*time.Second, params.StreamingCfg.WALWriteAheadBufferKeepalive.GetAsDurationByParse())

0 commit comments

Comments
 (0)