Skip to content

Commit afbd17a

Browse files
committed
enhance: add delete rows into delete msg header and more metric
- add delete rows into delete messsage header - add more insert/delete metrics - fix non-broadcast message has broadcast header Signed-off-by: chyezh <[email protected]>
1 parent 8a85bc4 commit afbd17a

File tree

19 files changed

+317
-176
lines changed

19 files changed

+317
-176
lines changed

internal/flushcommon/pipeline/flow_graph_dd_node.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
274274
zap.Int32("msgType", int32(msg.Type())),
275275
zap.Uint64("timetick", manualFlushMsg.ManualFlushMessage.TimeTick()),
276276
zap.Uint64("flushTs", manualFlushMsg.ManualFlushMessage.Header().FlushTs),
277+
zap.Int64s("segmentIDs", manualFlushMsg.ManualFlushMessage.Header().SegmentIds),
277278
)
278279
logger.Info("receive manual flush message")
279280
if err := ddn.msgHandler.HandleManualFlush(manualFlushMsg.ManualFlushMessage); err != nil {
@@ -302,7 +303,12 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
302303
if header.GetCollectionId() != ddn.collectionID {
303304
continue
304305
}
305-
logger := log.With(zap.String("vchannel", ddn.Name()))
306+
logger := log.With(
307+
zap.String("vchannel", ddn.Name()),
308+
zap.Int32("msgType", int32(msg.Type())),
309+
zap.Uint64("timetick", schemaMsg.SchemaChangeMessage.TimeTick()),
310+
zap.Int64s("segmentIDs", schemaMsg.SchemaChangeMessage.Header().FlushedSegmentIds),
311+
)
306312
logger.Info("receive schema change message")
307313
body, err := schemaMsg.SchemaChangeMessage.Body()
308314
if err != nil {

internal/mocks/streamingnode/server/wal/interceptors/shard/mock_shards/mock_ShardManager.go

Lines changed: 46 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/proxy/task_delete_streaming.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func (dt *deleteTaskByStreamingService) Execute(ctx context.Context) (err error)
4949
msg, err := message.NewDeleteMessageBuilderV1().
5050
WithHeader(&message.DeleteMessageHeader{
5151
CollectionId: dt.collectionID,
52+
Rows: uint64(deleteMsg.NumRows),
5253
}).
5354
WithBody(deleteMsg.DeleteRequest).
5455
WithVChannel(vchannel).

internal/streamingcoord/client/broadcast/broadcast_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ import (
2222
func TestBroadcast(t *testing.T) {
2323
s := newMockServer(t, 0)
2424
bs := NewGRPCBroadcastService(walimplstest.WALName, s)
25-
msg, _ := message.NewDropCollectionMessageBuilderV1().
25+
msg := message.NewDropCollectionMessageBuilderV1().
2626
WithHeader(&message.DropCollectionMessageHeader{}).
2727
WithBody(&msgpb.DropCollectionRequest{}).
2828
WithBroadcast([]string{"v1"}, message.NewCollectionNameResourceKey("r1")).
29-
BuildBroadcast()
29+
MustBuildBroadcast()
3030
_, err := bs.Broadcast(context.Background(), msg)
3131
assert.NoError(t, err)
3232
err = bs.Ack(context.Background(), types.BroadcastAckRequest{

internal/streamingnode/server/flusher/flusherimpl/msg_handler_impl_test.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,16 @@ func TestFlushMsgHandler_HandleFlush(t *testing.T) {
3838
wbMgr := writebuffer.NewMockBufferManager(t)
3939
wbMgr.EXPECT().SealSegments(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("mock err"))
4040

41-
msg, err := message.NewFlushMessageBuilderV2().
42-
WithVChannel(vchannel).
41+
msg := message.NewFlushMessageBuilderV2().
42+
WithBroadcast([]string{vchannel}).
4343
WithHeader(&message.FlushMessageHeader{
4444
CollectionId: 0,
4545
SegmentId: 1,
4646
}).
4747
WithBody(&message.FlushMessageBody{}).
48-
BuildMutable()
49-
assert.NoError(t, err)
48+
MustBuildBroadcast().
49+
WithBroadcastID(1).
50+
SplitIntoMutableMessage()[0]
5051

5152
handler := newMsgHandler(wbMgr)
5253
msgID := mock_message.NewMockMessageID(t)
@@ -72,15 +73,16 @@ func TestFlushMsgHandler_HandleManualFlush(t *testing.T) {
7273
wbMgr.EXPECT().SealSegments(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("mock err"))
7374
wbMgr.EXPECT().FlushChannel(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("mock err"))
7475

75-
msg, err := message.NewManualFlushMessageBuilderV2().
76-
WithVChannel(vchannel).
76+
msg := message.NewManualFlushMessageBuilderV2().
77+
WithBroadcast([]string{vchannel}).
7778
WithHeader(&message.ManualFlushMessageHeader{
7879
CollectionId: 0,
7980
FlushTs: 1000,
8081
}).
8182
WithBody(&message.ManualFlushMessageBody{}).
82-
BuildMutable()
83-
assert.NoError(t, err)
83+
MustBuildBroadcast().
84+
WithBroadcastID(1).
85+
SplitIntoMutableMessage()[0]
8486

8587
handler := newMsgHandler(wbMgr)
8688
msgID := mock_message.NewMockMessageID(t)
@@ -115,20 +117,21 @@ func TestFlushMsgHandler_HandlSchemaChange(t *testing.T) {
115117
wbMgr := writebuffer.NewMockBufferManager(t)
116118
wbMgr.EXPECT().SealSegments(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("mock err"))
117119

118-
msg, err := message.NewSchemaChangeMessageBuilderV2().
119-
WithVChannel(vchannel).
120+
msg := message.NewSchemaChangeMessageBuilderV2().
121+
WithBroadcast([]string{vchannel}).
120122
WithHeader(&message.SchemaChangeMessageHeader{
121123
CollectionId: 0,
122124
FlushedSegmentIds: []int64{1},
123125
}).
124126
WithBody(&message.SchemaChangeMessageBody{}).
125-
BuildMutable()
126-
assert.NoError(t, err)
127+
MustBuildBroadcast().
128+
WithBroadcastID(1).
129+
SplitIntoMutableMessage()[0]
127130

128131
handler := newMsgHandler(wbMgr)
129132
msgID := mock_message.NewMockMessageID(t)
130133
im := message.MustAsImmutableCollectionSchemaChangeV2(msg.IntoImmutableMessage(msgID))
131-
err = handler.HandleSchemaChange(context.Background(), im)
134+
err := handler.HandleSchemaChange(context.Background(), im)
132135
assert.Error(t, err)
133136

134137
// test normal

internal/streamingnode/server/wal/adaptor/old_version_message.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func newOldVersionImmutableMessage(
4242
case *msgstream.InsertMsg:
4343
mutableMessage = newV1InsertMsgFromV0(underlyingMsg, uint64(len(msg.Payload())))
4444
case *msgstream.DeleteMsg:
45-
mutableMessage = newV1DeleteMsgFromV0(underlyingMsg)
45+
mutableMessage = newV1DeleteMsgFromV0(underlyingMsg, uint64(underlyingMsg.NumRows))
4646
case *msgstream.TimeTickMsg:
4747
mutableMessage = newV1TimeTickMsgFromV0(underlyingMsg)
4848
case *msgstream.CreatePartitionMsg:
@@ -131,11 +131,12 @@ func newV1InsertMsgFromV0(msg *msgstream.InsertMsg, binarySize uint64) message.M
131131
}
132132

133133
// newV1DeleteMsgFromV0 creates a new delete message from the old version delete message.
134-
func newV1DeleteMsgFromV0(msg *msgstream.DeleteMsg) message.MutableMessage {
134+
func newV1DeleteMsgFromV0(msg *msgstream.DeleteMsg, rows uint64) message.MutableMessage {
135135
mutableMessage, err := message.NewDeleteMessageBuilderV1().
136136
WithVChannel(msg.ShardName).
137137
WithHeader(&message.DeleteMessageHeader{
138138
CollectionId: msg.CollectionID,
139+
Rows: rows,
139140
}).
140141
WithBody(msg.DeleteRequest).
141142
BuildMutable()

internal/streamingnode/server/wal/interceptors/shard/shard_interceptor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ func (impl *shardInterceptor) handleDeleteMessage(ctx context.Context, msg messa
201201
return nil, status.NewUnrecoverableError(err.Error())
202202
}
203203

204+
impl.shardManager.ApplyDelete(deleteMessage)
204205
return appendOp(ctx, msg)
205206
}
206207

internal/streamingnode/server/wal/interceptors/shard/shard_interceptor_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ func TestShardInterceptor(t *testing.T) {
218218

219219
shardManager.EXPECT().CheckIfCollectionExists(mock.Anything).Unset()
220220
shardManager.EXPECT().CheckIfCollectionExists(mock.Anything).Return(nil)
221+
shardManager.EXPECT().ApplyDelete(mock.Anything).Return(nil)
221222
msgID, err = i.DoAppend(ctx, msg, appender)
222223
assert.NoError(t, err)
223224
assert.NotNil(t, msgID)

internal/streamingnode/server/wal/interceptors/shard/shards/shard_manager_interface.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ type ShardManager interface {
3838

3939
AssignSegment(req *AssignSegmentRequest) (*AssignSegmentResult, error)
4040

41+
ApplyDelete(msg message.MutableDeleteMessageV1) error
42+
4143
WaitUntilGrowingSegmentReady(collectionID int64, partitonID int64) (<-chan struct{}, error)
4244

4345
FlushAndFenceSegmentAllocUntil(collectionID int64, timetick uint64) ([]int64, error)

internal/streamingnode/server/wal/interceptors/shard/shards/shard_manager_segment.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,25 @@ func (m *shardManagerImpl) AssignSegment(req *AssignSegmentRequest) (*AssignSegm
122122
m.mu.Lock()
123123
defer m.mu.Unlock()
124124

125-
if pm, ok := m.partitionManagers[req.PartitionID]; ok {
126-
return pm.AssignSegment(req)
127-
} else {
125+
pm, ok := m.partitionManagers[req.PartitionID]
126+
if !ok {
128127
return nil, ErrPartitionNotFound
129128
}
129+
result, err := pm.AssignSegment(req)
130+
if err == nil {
131+
m.metrics.ObserveInsert(req.InsertMetrics.Rows, req.InsertMetrics.BinarySize)
132+
return result, nil
133+
}
134+
return nil, err
135+
}
136+
137+
// ApplyDelete: TODO move the L0 flush operation here.
138+
func (m *shardManagerImpl) ApplyDelete(msg message.MutableDeleteMessageV1) error {
139+
m.mu.Lock()
140+
defer m.mu.Unlock()
141+
142+
m.metrics.ObserveDelete(msg.Header().GetRows())
143+
return nil
130144
}
131145

132146
// WaitUntilGrowingSegmentReady waits until the growing segment is ready.

internal/streamingnode/server/wal/interceptors/shard/stats/stats_manager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ func (m *StatsManager) selectSegmentsWithTimePolicy() map[int64]policy.SealPolic
349349
// selectSegmentsUntilLessThanLWM selects segments until the total size is less than the threshold.
350350
func (m *StatsManager) selectSegmentsUntilLessThanLWM() []int64 {
351351
m.mu.Lock()
352-
restSpace := m.totalStats.BinarySize - uint64(m.cfg.growingBytesLWM)
352+
restSpace := int64(m.totalStats.BinarySize) - m.cfg.growingBytesLWM
353353
m.mu.Unlock()
354354

355355
if restSpace <= 0 {
@@ -363,7 +363,7 @@ func (m *StatsManager) selectSegmentsUntilLessThanLWM() []int64 {
363363
})
364364
for restSpace > 0 && statsHeap.Len() > 0 {
365365
nextOne := statsHeap.Pop()
366-
restSpace -= nextOne.binarySize
366+
restSpace -= int64(nextOne.binarySize)
367367
segmentIDs = append(segmentIDs, nextOne.segmentID)
368368
}
369369
return segmentIDs

internal/streamingnode/server/wal/metricsutil/segment.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ func NewSegmentAssignMetrics(pchannel string) *SegmentAssignMetrics {
2222
flushedTotal: metrics.WALSegmentFlushedTotal.MustCurryWith(constLabel),
2323
partitionTotal: metrics.WALPartitionTotal.With(constLabel),
2424
collectionTotal: metrics.WALCollectionTotal.With(constLabel),
25+
26+
insertRowsTotal: metrics.WALInsertRowsTotal.With(constLabel),
27+
insertBytes: metrics.WALInsertBytes.With(constLabel),
28+
deleteRowsTotal: metrics.WALDeleteRowsTotal.With(constLabel),
2529
}
2630
}
2731

@@ -37,6 +41,10 @@ type SegmentAssignMetrics struct {
3741
flushedTotal *prometheus.CounterVec
3842
partitionTotal prometheus.Gauge
3943
collectionTotal prometheus.Gauge
44+
45+
insertRowsTotal prometheus.Counter
46+
insertBytes prometheus.Counter
47+
deleteRowsTotal prometheus.Counter
4048
}
4149

4250
// ObserveOnAllocating observe a allocating operation and return a guard function.
@@ -55,6 +63,17 @@ func (m *SegmentAssignMetrics) ObseveOnFlushing() func() {
5563
}
5664
}
5765

66+
// ObserveInsert observe a insert operation
67+
func (m *SegmentAssignMetrics) ObserveInsert(rows uint64, bytes uint64) {
68+
m.insertRowsTotal.Add(float64(rows))
69+
m.insertBytes.Add(float64(bytes))
70+
}
71+
72+
// ObserveDelete observe a insert operation
73+
func (m *SegmentAssignMetrics) ObserveDelete(rows uint64) {
74+
m.deleteRowsTotal.Add(float64(rows))
75+
}
76+
5877
// ObserveCreateSegment increments the total number of growing segment.
5978
func (m *SegmentAssignMetrics) ObserveCreateSegment() {
6079
m.allocTotal.Inc()
@@ -87,4 +106,7 @@ func (m *SegmentAssignMetrics) Close() {
87106
metrics.WALSegmentBytes.Delete(m.constLabel)
88107
metrics.WALPartitionTotal.Delete(m.constLabel)
89108
metrics.WALCollectionTotal.Delete(m.constLabel)
109+
metrics.WALInsertRowsTotal.Delete(m.constLabel)
110+
metrics.WALDeleteRowsTotal.Delete(m.constLabel)
111+
metrics.WALInsertBytes.Delete(m.constLabel)
90112
}

pkg/metrics/streaming_service_metrics.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,22 @@ var (
231231
Buckets: secondsBuckets,
232232
}, WALChannelLabelName, WALTxnStateLabelName)
233233

234+
// Rows level counter.
235+
WALInsertRowsTotal = newWALGaugeVec(prometheus.GaugeOpts{
236+
Name: "insert_rows_total",
237+
Help: "Rows of growing insert on wal",
238+
}, WALChannelLabelName)
239+
240+
WALInsertBytes = newWALGaugeVec(prometheus.GaugeOpts{
241+
Name: "insert_bytes",
242+
Help: "Bytes of growing insert on wal",
243+
}, WALChannelLabelName)
244+
245+
WALDeleteRowsTotal = newWALGaugeVec(prometheus.GaugeOpts{
246+
Name: "delete_rows_total",
247+
Help: "Rows of growing delete on wal",
248+
}, WALChannelLabelName)
249+
234250
// Segment related metrics
235251
WALGrowingSegmentRowsTotal = newWALGaugeVec(prometheus.GaugeOpts{
236252
Name: "growing_segment_rows_total",
@@ -495,6 +511,9 @@ func registerWAL(registry *prometheus.Registry) {
495511
registry.MustRegister(WALTimeTickSyncTimeTick)
496512
registry.MustRegister(WALInflightTxn)
497513
registry.MustRegister(WALTxnDurationSeconds)
514+
registry.MustRegister(WALInsertRowsTotal)
515+
registry.MustRegister(WALInsertBytes)
516+
registry.MustRegister(WALDeleteRowsTotal)
498517
registry.MustRegister(WALGrowingSegmentBytes)
499518
registry.MustRegister(WALGrowingSegmentRowsTotal)
500519
registry.MustRegister(WALGrowingSegmentHWMBytes)

pkg/proto/messages.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ message SegmentAssignment {
137137
// DeleteMessageHeader
138138
message DeleteMessageHeader {
139139
int64 collection_id = 1;
140+
uint64 rows = 2;
140141
}
141142

142143
// FlushMessageHeader just nothing.

0 commit comments

Comments
 (0)