Skip to content

Commit 59ab274

Browse files
authored
fix: use flusher and recovery checkpoint together to determine the truncate position (#41934)
issue: #41544 - unify the log field of message - use the minimum one of flusher and recovery storage checkpoint as the truncate position Signed-off-by: chyezh <[email protected]>
1 parent b099926 commit 59ab274

File tree

10 files changed

+86
-13
lines changed

10 files changed

+86
-13
lines changed

internal/mocks/streamingnode/server/wal/mock_recovery/mock_RecoveryStorage.go

Lines changed: 35 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/streamingnode/server/flusher/flusherimpl/wal_flusher.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/samber/lo"
88
"go.uber.org/zap"
99

10+
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
1011
"github.com/milvus-io/milvus/internal/flushcommon/broker"
1112
"github.com/milvus-io/milvus/internal/flushcommon/util"
1213
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
@@ -149,7 +150,14 @@ func (impl *WALFlusherImpl) buildFlusherComponents(ctx context.Context, l wal.WA
149150
broker := broker.NewCoordBroker(mixc, paramtable.GetNodeID())
150151
chunkManager := resource.Resource().ChunkManager()
151152

152-
cpUpdater := util.NewChannelCheckpointUpdater(broker)
153+
cpUpdater := util.NewChannelCheckpointUpdaterWithCallback(broker, func(mp *msgpb.MsgPosition) {
154+
messageID := adaptor.MustGetMessageIDFromMQWrapperIDBytes(l.WALName(), mp.MsgID)
155+
impl.rs.UpdateFlusherCheckpoint(&recovery.WALCheckpoint{
156+
MessageID: messageID,
157+
TimeTick: mp.Timestamp,
158+
Magic: recovery.RecoveryMagicStreamingInitialized,
159+
})
160+
})
153161
go cpUpdater.Start()
154162

155163
fc := &flusherComponents{

internal/streamingnode/server/wal/metricsutil/append.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ func (m *AppendMetrics) StartAppendGuard() *AppendMetricsGuard {
6767
func (m *AppendMetrics) IntoLogFields() []zap.Field {
6868
fields := []zap.Field{
6969
log.FieldMessage(m.msg),
70-
zap.Duration("append_duration", m.appendDuration),
71-
zap.Duration("impl_append_duration", m.implAppendDuration),
70+
zap.Duration("appendDuration", m.appendDuration),
71+
zap.Duration("implAppendDuration", m.implAppendDuration),
7272
}
7373
for name, ims := range m.interceptors {
7474
for i, im := range ims {
@@ -80,10 +80,10 @@ func (m *AppendMetrics) IntoLogFields() []zap.Field {
8080
if m.err != nil {
8181
fields = append(fields, zap.Error(m.err))
8282
} else {
83-
fields = append(fields, zap.String("message_id", m.result.MessageID.String()))
84-
fields = append(fields, zap.Uint64("time_tick", m.result.TimeTick))
83+
fields = append(fields, zap.String("messageID", m.result.MessageID.String()))
84+
fields = append(fields, zap.Uint64("timetick", m.result.TimeTick))
8585
if m.result.TxnCtx != nil {
86-
fields = append(fields, zap.Int64("txn_id", int64(m.result.TxnCtx.TxnID)))
86+
fields = append(fields, zap.Int64("txnID", int64(m.result.TxnCtx.TxnID)))
8787
}
8888
}
8989
return fields

internal/streamingnode/server/wal/recovery/checkpoint.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
)
88

99
const (
10-
recoveryMagicStreamingInitialized int64 = 1 // the vchannel info is set into the catalog.
10+
RecoveryMagicStreamingInitialized int64 = 1 // the vchannel info is set into the catalog.
1111
// the checkpoint is set into the catalog.
1212
)
1313

internal/streamingnode/server/wal/recovery/recovery_background_task.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,22 @@ func (rs *recoveryStorageImpl) persistDirtySnapshot(ctx context.Context, snapsho
123123

124124
// sample the checkpoint for truncator to make wal truncation.
125125
rs.metrics.ObServePersistedMetrics(snapshot.Checkpoint.TimeTick)
126-
rs.truncator.SampleCheckpoint(snapshot.Checkpoint)
126+
rs.sampleTruncateCheckpoint(snapshot.Checkpoint)
127127
return
128128
}
129129

130+
func (rs *recoveryStorageImpl) sampleTruncateCheckpoint(checkpoint *WALCheckpoint) {
131+
if rs.flusherCheckpoint == nil {
132+
return
133+
}
134+
// use the smaller one to truncate the wal.
135+
if rs.flusherCheckpoint.MessageID.LTE(checkpoint.MessageID) {
136+
rs.truncator.SampleCheckpoint(rs.flusherCheckpoint)
137+
} else {
138+
rs.truncator.SampleCheckpoint(checkpoint)
139+
}
140+
}
141+
130142
// dropAllVirtualChannel drops all virtual channels that are in the dropped state.
131143
// TODO: DropVirtualChannel will be called twice here,
132144
// call it in recovery storage is used to promise the drop virtual channel must be called after recovery.

internal/streamingnode/server/wal/recovery/recovery_persisted.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func (r *recoveryStorageImpl) initializeRecoverInfo(ctx context.Context, channel
119119
Id: untilMessage.LastConfirmedMessageID().Marshal(),
120120
},
121121
TimeTick: untilMessage.TimeTick(),
122-
RecoveryMagic: recoveryMagicStreamingInitialized,
122+
RecoveryMagic: RecoveryMagicStreamingInitialized,
123123
}
124124
if err := resource.Resource().StreamingNodeCatalog().SaveConsumeCheckpoint(ctx, channelInfo.Name, checkpoint); err != nil {
125125
return nil, errors.Wrap(err, "failed to save checkpoint to catalog")

internal/streamingnode/server/wal/recovery/recovery_persisted_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func TestInitRecoveryInfoFromMeta(t *testing.T) {
4646
Id: rmq.NewRmqID(1).Marshal(),
4747
},
4848
TimeTick: 1,
49-
RecoveryMagic: recoveryMagicStreamingInitialized,
49+
RecoveryMagic: RecoveryMagicStreamingInitialized,
5050
}, nil)
5151
resource.InitForTest(t, resource.OptStreamingNodeCatalog(snCatalog))
5252
walName := "rocksmq"
@@ -58,7 +58,7 @@ func TestInitRecoveryInfoFromMeta(t *testing.T) {
5858
err := rs.recoverRecoveryInfoFromMeta(context.Background(), walName, channel, lastConfirmed.IntoImmutableMessage(rmq.NewRmqID(1)))
5959
assert.NoError(t, err)
6060
assert.NotNil(t, rs.checkpoint)
61-
assert.Equal(t, recoveryMagicStreamingInitialized, rs.checkpoint.Magic)
61+
assert.Equal(t, RecoveryMagicStreamingInitialized, rs.checkpoint.Magic)
6262
assert.True(t, rs.checkpoint.MessageID.EQ(rmq.NewRmqID(1)))
6363
}
6464

internal/streamingnode/server/wal/recovery/recovery_storage.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ type RecoveryStorage interface {
6161
// ObserveMessage observes the message from the WAL.
6262
ObserveMessage(msg message.ImmutableMessage)
6363

64+
// UpdateFlusherCheckpoint updates the checkpoint of flusher.
65+
// TODO: should be removed in future, after merge the flusher logic into recovery storage.
66+
UpdateFlusherCheckpoint(checkpoint *WALCheckpoint)
67+
6468
// Close closes the recovery storage.
6569
Close()
6670
}

internal/streamingnode/server/wal/recovery/recovery_storage_impl.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ type recoveryStorageImpl struct {
8282
segments map[int64]*segmentRecoveryInfo
8383
vchannels map[string]*vchannelRecoveryInfo
8484
checkpoint *WALCheckpoint
85+
flusherCheckpoint *WALCheckpoint
8586
dirtyCounter int // records the message count since last persist snapshot.
8687
// used to trigger the recovery persist operation.
8788
persistNotifier chan struct{}
@@ -90,6 +91,19 @@ type recoveryStorageImpl struct {
9091
metrics *recoveryMetrics
9192
}
9293

94+
// UpdateFlusherCheckpoint updates the checkpoint of flusher.
95+
// TODO: should be removed in future, after merge the flusher logic into recovery storage.
96+
func (r *recoveryStorageImpl) UpdateFlusherCheckpoint(checkpoint *WALCheckpoint) {
97+
r.mu.Lock()
98+
defer r.mu.Unlock()
99+
if r.flusherCheckpoint == nil || r.flusherCheckpoint.MessageID.LTE(checkpoint.MessageID) {
100+
r.flusherCheckpoint = checkpoint
101+
r.Logger().Info("update checkpoint of flusher", zap.String("messageID", checkpoint.MessageID.String()))
102+
return
103+
}
104+
r.Logger().Warn("update illegal checkpoint of flusher", zap.String("current", r.flusherCheckpoint.MessageID.String()), zap.String("target", checkpoint.MessageID.String()))
105+
}
106+
93107
// ObserveMessage is called when a new message is observed.
94108
func (r *recoveryStorageImpl) ObserveMessage(msg message.ImmutableMessage) {
95109
r.mu.Lock()

internal/streamingnode/server/wal/recovery/wal_truncator_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func TestTruncator(t *testing.T) {
2323
truncator := newSamplingTruncator(&WALCheckpoint{
2424
MessageID: rmq.NewRmqID(1),
2525
TimeTick: 1,
26-
Magic: recoveryMagicStreamingInitialized,
26+
Magic: RecoveryMagicStreamingInitialized,
2727
}, w, newRecoveryStorageMetrics(types.PChannelInfo{Name: "test", Term: 1}))
2828

2929
for i := 0; i < 20; i++ {
@@ -32,7 +32,7 @@ func TestTruncator(t *testing.T) {
3232
truncator.SampleCheckpoint(&WALCheckpoint{
3333
MessageID: rmq.NewRmqID(int64(i)),
3434
TimeTick: tsoutil.ComposeTSByTime(time.Now(), 0),
35-
Magic: recoveryMagicStreamingInitialized,
35+
Magic: RecoveryMagicStreamingInitialized,
3636
})
3737
}
3838
}

0 commit comments

Comments
 (0)