Skip to content

enhance: add delete rows into delete msg header and more metric #41952

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion internal/flushcommon/pipeline/flow_graph_dd_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@
zap.Int32("msgType", int32(msg.Type())),
zap.Uint64("timetick", manualFlushMsg.ManualFlushMessage.TimeTick()),
zap.Uint64("flushTs", manualFlushMsg.ManualFlushMessage.Header().FlushTs),
zap.Int64s("segmentIDs", manualFlushMsg.ManualFlushMessage.Header().SegmentIds),
)
logger.Info("receive manual flush message")
if err := ddn.msgHandler.HandleManualFlush(manualFlushMsg.ManualFlushMessage); err != nil {
Expand Down Expand Up @@ -302,7 +303,12 @@
if header.GetCollectionId() != ddn.collectionID {
continue
}
logger := log.With(zap.String("vchannel", ddn.Name()))
logger := log.With(
zap.String("vchannel", ddn.Name()),
zap.Int32("msgType", int32(msg.Type())),
zap.Uint64("timetick", schemaMsg.SchemaChangeMessage.TimeTick()),
zap.Int64s("segmentIDs", schemaMsg.SchemaChangeMessage.Header().FlushedSegmentIds),
)

Check warning on line 311 in internal/flushcommon/pipeline/flow_graph_dd_node.go

View check run for this annotation

Codecov / codecov/patch

internal/flushcommon/pipeline/flow_graph_dd_node.go#L306-L311

Added lines #L306 - L311 were not covered by tests
logger.Info("receive schema change message")
body, err := schemaMsg.SchemaChangeMessage.Body()
if err != nil {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/proxy/task_delete_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (dt *deleteTaskByStreamingService) Execute(ctx context.Context) (err error)
msg, err := message.NewDeleteMessageBuilderV1().
WithHeader(&message.DeleteMessageHeader{
CollectionId: dt.collectionID,
Rows: uint64(deleteMsg.NumRows),
}).
WithBody(deleteMsg.DeleteRequest).
WithVChannel(vchannel).
Expand Down
4 changes: 2 additions & 2 deletions internal/streamingcoord/client/broadcast/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (
func TestBroadcast(t *testing.T) {
s := newMockServer(t, 0)
bs := NewGRPCBroadcastService(walimplstest.WALName, s)
msg, _ := message.NewDropCollectionMessageBuilderV1().
msg := message.NewDropCollectionMessageBuilderV1().
WithHeader(&message.DropCollectionMessageHeader{}).
WithBody(&msgpb.DropCollectionRequest{}).
WithBroadcast([]string{"v1"}, message.NewCollectionNameResourceKey("r1")).
BuildBroadcast()
MustBuildBroadcast()
_, err := bs.Broadcast(context.Background(), msg)
assert.NoError(t, err)
err = bs.Ack(context.Background(), types.BroadcastAckRequest{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,16 @@ func TestFlushMsgHandler_HandleFlush(t *testing.T) {
wbMgr := writebuffer.NewMockBufferManager(t)
wbMgr.EXPECT().SealSegments(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("mock err"))

msg, err := message.NewFlushMessageBuilderV2().
WithVChannel(vchannel).
msg := message.NewFlushMessageBuilderV2().
WithBroadcast([]string{vchannel}).
WithHeader(&message.FlushMessageHeader{
CollectionId: 0,
SegmentId: 1,
}).
WithBody(&message.FlushMessageBody{}).
BuildMutable()
assert.NoError(t, err)
MustBuildBroadcast().
WithBroadcastID(1).
SplitIntoMutableMessage()[0]

handler := newMsgHandler(wbMgr)
msgID := mock_message.NewMockMessageID(t)
Expand All @@ -72,15 +73,16 @@ func TestFlushMsgHandler_HandleManualFlush(t *testing.T) {
wbMgr.EXPECT().SealSegments(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("mock err"))
wbMgr.EXPECT().FlushChannel(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("mock err"))

msg, err := message.NewManualFlushMessageBuilderV2().
WithVChannel(vchannel).
msg := message.NewManualFlushMessageBuilderV2().
WithBroadcast([]string{vchannel}).
WithHeader(&message.ManualFlushMessageHeader{
CollectionId: 0,
FlushTs: 1000,
}).
WithBody(&message.ManualFlushMessageBody{}).
BuildMutable()
assert.NoError(t, err)
MustBuildBroadcast().
WithBroadcastID(1).
SplitIntoMutableMessage()[0]

handler := newMsgHandler(wbMgr)
msgID := mock_message.NewMockMessageID(t)
Expand Down Expand Up @@ -115,20 +117,21 @@ func TestFlushMsgHandler_HandlSchemaChange(t *testing.T) {
wbMgr := writebuffer.NewMockBufferManager(t)
wbMgr.EXPECT().SealSegments(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("mock err"))

msg, err := message.NewSchemaChangeMessageBuilderV2().
WithVChannel(vchannel).
msg := message.NewSchemaChangeMessageBuilderV2().
WithBroadcast([]string{vchannel}).
WithHeader(&message.SchemaChangeMessageHeader{
CollectionId: 0,
FlushedSegmentIds: []int64{1},
}).
WithBody(&message.SchemaChangeMessageBody{}).
BuildMutable()
assert.NoError(t, err)
MustBuildBroadcast().
WithBroadcastID(1).
SplitIntoMutableMessage()[0]

handler := newMsgHandler(wbMgr)
msgID := mock_message.NewMockMessageID(t)
im := message.MustAsImmutableCollectionSchemaChangeV2(msg.IntoImmutableMessage(msgID))
err = handler.HandleSchemaChange(context.Background(), im)
err := handler.HandleSchemaChange(context.Background(), im)
assert.Error(t, err)

// test normal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func newOldVersionImmutableMessage(
case *msgstream.InsertMsg:
mutableMessage = newV1InsertMsgFromV0(underlyingMsg, uint64(len(msg.Payload())))
case *msgstream.DeleteMsg:
mutableMessage = newV1DeleteMsgFromV0(underlyingMsg)
mutableMessage = newV1DeleteMsgFromV0(underlyingMsg, uint64(underlyingMsg.NumRows))
case *msgstream.TimeTickMsg:
mutableMessage = newV1TimeTickMsgFromV0(underlyingMsg)
case *msgstream.CreatePartitionMsg:
Expand Down Expand Up @@ -131,11 +131,12 @@ func newV1InsertMsgFromV0(msg *msgstream.InsertMsg, binarySize uint64) message.M
}

// newV1DeleteMsgFromV0 creates a new delete message from the old version delete message.
func newV1DeleteMsgFromV0(msg *msgstream.DeleteMsg) message.MutableMessage {
func newV1DeleteMsgFromV0(msg *msgstream.DeleteMsg, rows uint64) message.MutableMessage {
mutableMessage, err := message.NewDeleteMessageBuilderV1().
WithVChannel(msg.ShardName).
WithHeader(&message.DeleteMessageHeader{
CollectionId: msg.CollectionID,
Rows: rows,
}).
WithBody(msg.DeleteRequest).
BuildMutable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func (impl *shardInterceptor) handleDeleteMessage(ctx context.Context, msg messa
return nil, status.NewUnrecoverableError(err.Error())
}

impl.shardManager.ApplyDelete(deleteMessage)
return appendOp(ctx, msg)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func TestShardInterceptor(t *testing.T) {

shardManager.EXPECT().CheckIfCollectionExists(mock.Anything).Unset()
shardManager.EXPECT().CheckIfCollectionExists(mock.Anything).Return(nil)
shardManager.EXPECT().ApplyDelete(mock.Anything).Return(nil)
msgID, err = i.DoAppend(ctx, msg, appender)
assert.NoError(t, err)
assert.NotNil(t, msgID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type ShardManager interface {

AssignSegment(req *AssignSegmentRequest) (*AssignSegmentResult, error)

ApplyDelete(msg message.MutableDeleteMessageV1) error

WaitUntilGrowingSegmentReady(collectionID int64, partitonID int64) (<-chan struct{}, error)

FlushAndFenceSegmentAllocUntil(collectionID int64, timetick uint64) ([]int64, error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,25 @@ func (m *shardManagerImpl) AssignSegment(req *AssignSegmentRequest) (*AssignSegm
m.mu.Lock()
defer m.mu.Unlock()

if pm, ok := m.partitionManagers[req.PartitionID]; ok {
return pm.AssignSegment(req)
} else {
pm, ok := m.partitionManagers[req.PartitionID]
if !ok {
return nil, ErrPartitionNotFound
}
result, err := pm.AssignSegment(req)
if err == nil {
m.metrics.ObserveInsert(req.InsertMetrics.Rows, req.InsertMetrics.BinarySize)
return result, nil
}
return nil, err
}

// ApplyDelete: TODO move the L0 flush operation here.
func (m *shardManagerImpl) ApplyDelete(msg message.MutableDeleteMessageV1) error {
m.mu.Lock()
defer m.mu.Unlock()

m.metrics.ObserveDelete(msg.Header().GetRows())
return nil
}

// WaitUntilGrowingSegmentReady waits until the growing segment is ready.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (m *StatsManager) selectSegmentsWithTimePolicy() map[int64]policy.SealPolic
// selectSegmentsUntilLessThanLWM selects segments until the total size is less than the threshold.
func (m *StatsManager) selectSegmentsUntilLessThanLWM() []int64 {
m.mu.Lock()
restSpace := m.totalStats.BinarySize - uint64(m.cfg.growingBytesLWM)
restSpace := int64(m.totalStats.BinarySize) - m.cfg.growingBytesLWM
m.mu.Unlock()

if restSpace <= 0 {
Expand All @@ -363,7 +363,7 @@ func (m *StatsManager) selectSegmentsUntilLessThanLWM() []int64 {
})
for restSpace > 0 && statsHeap.Len() > 0 {
nextOne := statsHeap.Pop()
restSpace -= nextOne.binarySize
restSpace -= int64(nextOne.binarySize)
segmentIDs = append(segmentIDs, nextOne.segmentID)
}
return segmentIDs
Expand Down
22 changes: 22 additions & 0 deletions internal/streamingnode/server/wal/metricsutil/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ func NewSegmentAssignMetrics(pchannel string) *SegmentAssignMetrics {
flushedTotal: metrics.WALSegmentFlushedTotal.MustCurryWith(constLabel),
partitionTotal: metrics.WALPartitionTotal.With(constLabel),
collectionTotal: metrics.WALCollectionTotal.With(constLabel),

insertRowsTotal: metrics.WALInsertRowsTotal.With(constLabel),
insertBytes: metrics.WALInsertBytes.With(constLabel),
deleteRowsTotal: metrics.WALDeleteRowsTotal.With(constLabel),
}
}

Expand All @@ -37,6 +41,10 @@ type SegmentAssignMetrics struct {
flushedTotal *prometheus.CounterVec
partitionTotal prometheus.Gauge
collectionTotal prometheus.Gauge

insertRowsTotal prometheus.Counter
insertBytes prometheus.Counter
deleteRowsTotal prometheus.Counter
}

// ObserveOnAllocating observe a allocating operation and return a guard function.
Expand All @@ -55,6 +63,17 @@ func (m *SegmentAssignMetrics) ObseveOnFlushing() func() {
}
}

// ObserveInsert observe a insert operation
func (m *SegmentAssignMetrics) ObserveInsert(rows uint64, bytes uint64) {
m.insertRowsTotal.Add(float64(rows))
m.insertBytes.Add(float64(bytes))
}

// ObserveDelete observe a insert operation
func (m *SegmentAssignMetrics) ObserveDelete(rows uint64) {
m.deleteRowsTotal.Add(float64(rows))
}

// ObserveCreateSegment increments the total number of growing segment.
func (m *SegmentAssignMetrics) ObserveCreateSegment() {
m.allocTotal.Inc()
Expand Down Expand Up @@ -87,4 +106,7 @@ func (m *SegmentAssignMetrics) Close() {
metrics.WALSegmentBytes.Delete(m.constLabel)
metrics.WALPartitionTotal.Delete(m.constLabel)
metrics.WALCollectionTotal.Delete(m.constLabel)
metrics.WALInsertRowsTotal.Delete(m.constLabel)
metrics.WALDeleteRowsTotal.Delete(m.constLabel)
metrics.WALInsertBytes.Delete(m.constLabel)
}
19 changes: 19 additions & 0 deletions pkg/metrics/streaming_service_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,22 @@ var (
Buckets: secondsBuckets,
}, WALChannelLabelName, WALTxnStateLabelName)

// Rows level counter.
WALInsertRowsTotal = newWALGaugeVec(prometheus.GaugeOpts{
Name: "insert_rows_total",
Help: "Rows of growing insert on wal",
}, WALChannelLabelName)

WALInsertBytes = newWALGaugeVec(prometheus.GaugeOpts{
Name: "insert_bytes",
Help: "Bytes of growing insert on wal",
}, WALChannelLabelName)

WALDeleteRowsTotal = newWALGaugeVec(prometheus.GaugeOpts{
Name: "delete_rows_total",
Help: "Rows of growing delete on wal",
}, WALChannelLabelName)

// Segment related metrics
WALGrowingSegmentRowsTotal = newWALGaugeVec(prometheus.GaugeOpts{
Name: "growing_segment_rows_total",
Expand Down Expand Up @@ -495,6 +511,9 @@ func registerWAL(registry *prometheus.Registry) {
registry.MustRegister(WALTimeTickSyncTimeTick)
registry.MustRegister(WALInflightTxn)
registry.MustRegister(WALTxnDurationSeconds)
registry.MustRegister(WALInsertRowsTotal)
registry.MustRegister(WALInsertBytes)
registry.MustRegister(WALDeleteRowsTotal)
registry.MustRegister(WALGrowingSegmentBytes)
registry.MustRegister(WALGrowingSegmentRowsTotal)
registry.MustRegister(WALGrowingSegmentHWMBytes)
Expand Down
1 change: 1 addition & 0 deletions pkg/proto/messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ message SegmentAssignment {
// DeleteMessageHeader
message DeleteMessageHeader {
int64 collection_id = 1;
uint64 rows = 2;
}

// FlushMessageHeader just nothing.
Expand Down
Loading
Loading