Skip to content

Commit b119ac5

Browse files
authored
enhance: add wal access mode options (#40617)
issue: #40532 Signed-off-by: chyezh <[email protected]>
1 parent 03b63bf commit b119ac5

File tree

14 files changed

+418
-127
lines changed

14 files changed

+418
-127
lines changed

internal/streamingcoord/server/balancer/channel/pchannel.go

+9-6
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@ func newPChannelMeta(name string) *PChannelMeta {
1212
return &PChannelMeta{
1313
inner: &streamingpb.PChannelMeta{
1414
Channel: &streamingpb.PChannelInfo{
15-
Name: name,
16-
Term: 1,
15+
Name: name,
16+
Term: 1,
17+
AccessMode: streamingpb.PChannelAccessMode(types.AccessModeRW),
1718
},
1819
Node: nil,
1920
State: streamingpb.PChannelMetaState_PCHANNEL_META_STATE_UNINITIALIZED,
@@ -70,8 +71,9 @@ func (c *PChannelMeta) AssignHistories() []types.PChannelInfoAssigned {
7071
for _, h := range c.inner.Histories {
7172
history = append(history, types.PChannelInfoAssigned{
7273
Channel: types.PChannelInfo{
73-
Name: c.inner.GetChannel().GetName(),
74-
Term: h.Term,
74+
Name: c.inner.GetChannel().GetName(),
75+
Term: h.Term,
76+
AccessMode: types.AccessMode(h.AccessMode),
7577
},
7678
Node: types.NewStreamingNodeInfoFromProto(h.Node),
7779
})
@@ -114,8 +116,9 @@ func (m *mutablePChannel) TryAssignToServerID(streamingNode types.StreamingNodeI
114116
if m.inner.State != streamingpb.PChannelMetaState_PCHANNEL_META_STATE_UNINITIALIZED {
115117
// if the channel is already initialized, add the history.
116118
m.inner.Histories = append(m.inner.Histories, &streamingpb.PChannelAssignmentLog{
117-
Term: m.inner.Channel.Term,
118-
Node: m.inner.Node,
119+
Term: m.inner.Channel.Term,
120+
Node: m.inner.Node,
121+
AccessMode: m.inner.Channel.AccessMode,
119122
})
120123
}
121124

internal/streamingnode/server/wal/adaptor/opener.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func (o *openerAdaptorImpl) Open(ctx context.Context, opt *wal.OpenOption) (wal.
4545
defer o.lifetime.Done()
4646

4747
id := o.idAllocator.Allocate()
48-
logger := o.logger.With(zap.Any("channel", opt.Channel), zap.Int64("id", id))
48+
logger := o.logger.With(zap.String("channel", opt.Channel.String()), zap.Int64("id", id))
4949

5050
l, err := o.opener.Open(ctx, &walimpls.OpenOption{
5151
Channel: opt.Channel,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package adaptor
2+
3+
import (
4+
"context"
5+
6+
"go.uber.org/zap"
7+
8+
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
9+
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
10+
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
11+
"github.com/milvus-io/milvus/pkg/v2/log"
12+
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
13+
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
14+
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls"
15+
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
16+
)
17+
18+
var _ wal.WAL = (*roWALAdaptorImpl)(nil)
19+
20+
type roWALAdaptorImpl struct {
21+
log.Binder
22+
lifetime *typeutil.Lifetime
23+
available chan struct{}
24+
idAllocator *typeutil.IDAllocator
25+
roWALImpls walimpls.ROWALImpls
26+
scannerRegistry scannerRegistry
27+
scanners *typeutil.ConcurrentMap[int64, wal.Scanner]
28+
cleanup func()
29+
scanMetrics *metricsutil.ScanMetrics
30+
}
31+
32+
func (w *roWALAdaptorImpl) WALName() string {
33+
return w.roWALImpls.WALName()
34+
}
35+
36+
// Channel returns the channel info of wal.
37+
func (w *roWALAdaptorImpl) Channel() types.PChannelInfo {
38+
return w.roWALImpls.Channel()
39+
}
40+
41+
func (w *roWALAdaptorImpl) GetLatestMVCCTimestamp(ctx context.Context, vchannel string) (uint64, error) {
42+
panic("we cannot acquire lastest mvcc timestamp from a read only wal")
43+
}
44+
45+
// Append writes a record to the log.
46+
func (w *roWALAdaptorImpl) Append(ctx context.Context, msg message.MutableMessage) (*wal.AppendResult, error) {
47+
panic("we cannot append message into a read only wal")
48+
}
49+
50+
// Append a record to the log asynchronously.
51+
func (w *roWALAdaptorImpl) AppendAsync(ctx context.Context, msg message.MutableMessage, cb func(*wal.AppendResult, error)) {
52+
panic("we cannot append message into a read only wal")
53+
}
54+
55+
// Read returns a scanner for reading records from the wal.
56+
func (w *roWALAdaptorImpl) Read(ctx context.Context, opts wal.ReadOption) (wal.Scanner, error) {
57+
if !w.lifetime.Add(typeutil.LifetimeStateWorking) {
58+
return nil, status.NewOnShutdownError("wal is on shutdown")
59+
}
60+
defer w.lifetime.Done()
61+
62+
name, err := w.scannerRegistry.AllocateScannerName()
63+
if err != nil {
64+
return nil, err
65+
}
66+
// wrap the scanner with cleanup function.
67+
id := w.idAllocator.Allocate()
68+
s := newScannerAdaptor(
69+
name,
70+
w.roWALImpls,
71+
opts,
72+
w.scanMetrics.NewScannerMetrics(),
73+
func() { w.scanners.Remove(id) })
74+
w.scanners.Insert(id, s)
75+
return s, nil
76+
}
77+
78+
// IsAvailable returns whether the wal is available.
79+
func (w *roWALAdaptorImpl) IsAvailable() bool {
80+
select {
81+
case <-w.available:
82+
return false
83+
default:
84+
return true
85+
}
86+
}
87+
88+
// Available returns a channel that will be closed when the wal is shut down.
89+
func (w *roWALAdaptorImpl) Available() <-chan struct{} {
90+
return w.available
91+
}
92+
93+
// Close overrides Scanner Close function.
94+
func (w *roWALAdaptorImpl) Close() {
95+
// begin to close the wal.
96+
w.Logger().Info("wal begin to close...")
97+
w.lifetime.SetState(typeutil.LifetimeStateStopped)
98+
w.lifetime.Wait()
99+
close(w.available)
100+
101+
w.Logger().Info("wal begin to close scanners...")
102+
103+
// close all wal instances.
104+
w.scanners.Range(func(id int64, s wal.Scanner) bool {
105+
s.Close()
106+
log.Info("close scanner by wal adaptor", zap.Int64("id", id), zap.Any("channel", w.Channel()))
107+
return true
108+
})
109+
110+
w.Logger().Info("scanner close done, close inner wal...")
111+
w.roWALImpls.Close()
112+
113+
w.Logger().Info("call wal cleanup function...")
114+
w.cleanup()
115+
w.Logger().Info("wal closed")
116+
117+
// close all metrics.
118+
w.scanMetrics.Close()
119+
}

internal/streamingnode/server/wal/adaptor/scanner_adaptor.go

+9-5
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
1010
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
11+
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/wab"
1112
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
1213
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/utility"
1314
"github.com/milvus-io/milvus/pkg/v2/log"
@@ -24,7 +25,7 @@ var _ wal.Scanner = (*scannerAdaptorImpl)(nil)
2425
// newScannerAdaptor creates a new scanner adaptor.
2526
func newScannerAdaptor(
2627
name string,
27-
l walimpls.WALImpls,
28+
l walimpls.ROWALImpls,
2829
readOption wal.ReadOption,
2930
scanMetrics *metricsutil.ScannerMetrics,
3031
cleanup func(),
@@ -58,7 +59,7 @@ func newScannerAdaptor(
5859
type scannerAdaptorImpl struct {
5960
*helper.ScannerHelper
6061
logger *log.MLogger
61-
innerWAL walimpls.WALImpls
62+
innerWAL walimpls.ROWALImpls
6263
readOption wal.ReadOption
6364
filterFunc func(message.ImmutableMessage) bool
6465
reorderBuffer *utility.ReOrderByTimeTickBuffer // support time tick reorder.
@@ -122,9 +123,12 @@ func (s *scannerAdaptorImpl) execute() {
122123

123124
// produceEventLoop produces the message from the wal and write ahead buffer.
124125
func (s *scannerAdaptorImpl) produceEventLoop(msgChan chan<- message.ImmutableMessage) error {
125-
wb, err := resource.Resource().TimeTickInspector().MustGetOperator(s.Channel()).WriteAheadBuffer(s.Context())
126-
if err != nil {
127-
return err
126+
var wb wab.ROWriteAheadBuffer
127+
var err error
128+
if s.Channel().AccessMode == types.AccessModeRW {
129+
if wb, err = resource.Resource().TimeTickInspector().MustGetOperator(s.Channel()).WriteAheadBuffer(s.Context()); err != nil {
130+
return err
131+
}
128132
}
129133

130134
scanner := newSwithableScanner(s.Name(), s.logger, s.innerWAL, wb, s.readOption.DeliverPolicy, msgChan)

internal/streamingnode/server/wal/adaptor/scanner_switchable.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ var (
2626
func newSwithableScanner(
2727
scannerName string,
2828
logger *log.MLogger,
29-
innerWAL walimpls.WALImpls,
29+
innerWAL walimpls.ROWALImpls,
3030
writeAheadBuffer wab.ROWriteAheadBuffer,
3131
deliverPolicy options.DeliverPolicy,
3232
msgChan chan<- message.ImmutableMessage,
@@ -55,7 +55,7 @@ type switchableScanner interface {
5555
type switchableScannerImpl struct {
5656
scannerName string
5757
logger *log.MLogger
58-
innerWAL walimpls.WALImpls
58+
innerWAL walimpls.ROWALImpls
5959
msgChan chan<- message.ImmutableMessage
6060
writeAheadBuffer wab.ROWriteAheadBuffer
6161
}
@@ -144,7 +144,8 @@ func (s *catchupScanner) consumeWithScanner(ctx context.Context, scanner walimpl
144144
if err := s.HandleMessage(ctx, msg); err != nil {
145145
return nil, err
146146
}
147-
if msg.MessageType() != message.MessageTypeTimeTick {
147+
if msg.MessageType() != message.MessageTypeTimeTick || s.writeAheadBuffer == nil {
148+
// If there's no write ahead buffer, we cannot switch into tailing mode, so skip the checking.
148149
continue
149150
}
150151
// Here's a timetick message from the scanner, make tailing read if we catch up the writeahead buffer.

0 commit comments

Comments
 (0)