Skip to content

Commit 6313a45

Browse files
authored
Revert #22057 (#22115)
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
1 parent d63bf18 commit 6313a45

File tree

4 files changed

+18
-90
lines changed

4 files changed

+18
-90
lines changed

internal/datacoord/compaction_trigger.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,8 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
365365
break
366366
}
367367

368+
group.segments = FilterInIndexedSegments(t.handler, t.indexCoord, group.segments...)
369+
368370
isDiskIndex, err := t.updateSegmentMaxSize(group.segments)
369371
if err != nil {
370372
log.Warn("failed to update segment max size", zap.Error(err))

internal/datacoord/compaction_trigger_test.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ func Test_compactionTrigger_force(t *testing.T) {
438438
case <-time.After(2 * time.Second):
439439
hasPlan = false
440440
}
441-
assert.Equal(t, true, hasPlan)
441+
assert.Equal(t, false, hasPlan)
442442
})
443443

444444
t.Run(tt.name+" with meta error", func(t *testing.T) {
@@ -1033,8 +1033,13 @@ func Test_compactionTrigger_noplan(t *testing.T) {
10331033
err := tr.triggerCompaction()
10341034
assert.Equal(t, tt.wantErr, err != nil)
10351035
spy := (tt.fields.compactionHandler).(*spyCompactionHandler)
1036-
plan := <-spy.spyChan
1037-
assert.Equal(t, len(plan.SegmentBinlogs), 4)
1036+
select {
1037+
case val := <-spy.spyChan:
1038+
assert.Fail(t, "we expect no compaction generated", val)
1039+
return
1040+
case <-time.After(3 * time.Second):
1041+
return
1042+
}
10381043
})
10391044
}
10401045
}

internal/datacoord/handler.go

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -137,24 +137,14 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID Uni
137137
unIndexedIDs.Insert(s.GetID())
138138
}
139139
}
140-
hasUnIndexed := true
141-
for hasUnIndexed {
142-
hasUnIndexed = false
143-
for id := range unIndexedIDs {
144-
// Indexed segments are compacted to a raw segment,
145-
// replace it with the indexed ones
146-
if len(segmentInfos[id].GetCompactionFrom()) > 0 {
147-
unIndexedIDs.Remove(id)
148-
for _, segID := range segmentInfos[id].GetCompactionFrom() {
149-
if indexed.Contain(segID) {
150-
indexedIDs.Insert(segID)
151-
} else {
152-
unIndexedIDs.Insert(segID)
153-
hasUnIndexed = true
154-
}
155-
}
156-
droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...)
157-
}
140+
for id := range unIndexedIDs {
141+
// Indexed segments are compacted to a raw segment,
142+
// replace it with the indexed ones
143+
if len(segmentInfos[id].GetCompactionFrom()) > 0 &&
144+
indexed.Contain(segmentInfos[id].GetCompactionFrom()...) {
145+
unIndexedIDs.Remove(id)
146+
indexedIDs.Insert(segmentInfos[id].GetCompactionFrom()...)
147+
droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...)
158148
}
159149
}
160150

internal/datacoord/server_test.go

Lines changed: 0 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -2644,75 +2644,6 @@ func TestGetRecoveryInfo(t *testing.T) {
26442644
assert.Equal(t, UniqueID(8), resp.GetChannels()[0].GetDroppedSegmentIds()[0])
26452645
})
26462646

2647-
t.Run("with continuous compaction", func(t *testing.T) {
2648-
svr := newTestServer(t, nil)
2649-
defer closeTestServer(t, svr)
2650-
2651-
svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoord, error) {
2652-
return newMockRootCoordService(), nil
2653-
}
2654-
2655-
svr.meta.AddCollection(&collectionInfo{
2656-
ID: 0,
2657-
Schema: newTestSchema(),
2658-
})
2659-
2660-
err := svr.meta.UpdateChannelCheckpoint("vchan1", &internalpb.MsgPosition{
2661-
ChannelName: "vchan1",
2662-
Timestamp: 0,
2663-
})
2664-
assert.NoError(t, err)
2665-
2666-
seg1 := createSegment(9, 0, 0, 100, 30, "vchan1", commonpb.SegmentState_Dropped)
2667-
seg2 := createSegment(10, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped)
2668-
seg3 := createSegment(11, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped)
2669-
seg3.CompactionFrom = []int64{9, 10}
2670-
seg4 := createSegment(12, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Dropped)
2671-
seg5 := createSegment(13, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Flushed)
2672-
seg5.CompactionFrom = []int64{11, 12}
2673-
err = svr.meta.AddSegment(NewSegmentInfo(seg1))
2674-
assert.Nil(t, err)
2675-
err = svr.meta.AddSegment(NewSegmentInfo(seg2))
2676-
assert.Nil(t, err)
2677-
err = svr.meta.AddSegment(NewSegmentInfo(seg3))
2678-
assert.Nil(t, err)
2679-
err = svr.meta.AddSegment(NewSegmentInfo(seg4))
2680-
assert.Nil(t, err)
2681-
err = svr.meta.AddSegment(NewSegmentInfo(seg5))
2682-
assert.Nil(t, err)
2683-
mockResp := &indexpb.GetIndexInfoResponse{
2684-
Status: &commonpb.Status{},
2685-
SegmentInfo: map[int64]*indexpb.SegmentInfo{
2686-
seg4.ID: {
2687-
CollectionID: seg4.CollectionID,
2688-
SegmentID: seg4.ID,
2689-
EnableIndex: true,
2690-
IndexInfos: []*indexpb.IndexFilePathInfo{
2691-
{
2692-
SegmentID: seg4.ID,
2693-
FieldID: 2,
2694-
},
2695-
},
2696-
},
2697-
},
2698-
}
2699-
svr.indexCoord = mocks.NewMockIndexCoord(t)
2700-
svr.indexCoord.(*mocks.MockIndexCoord).EXPECT().GetIndexInfos(mock.Anything, mock.Anything).Return(mockResp, nil)
2701-
2702-
req := &datapb.GetRecoveryInfoRequest{
2703-
CollectionID: 0,
2704-
PartitionID: 0,
2705-
}
2706-
resp, err := svr.GetRecoveryInfo(context.TODO(), req)
2707-
assert.Nil(t, err)
2708-
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
2709-
assert.NotNil(t, resp.GetChannels()[0].SeekPosition)
2710-
assert.NotEqual(t, 0, resp.GetChannels()[0].GetSeekPosition().GetTimestamp())
2711-
assert.Len(t, resp.GetChannels()[0].GetDroppedSegmentIds(), 0)
2712-
assert.ElementsMatch(t, []UniqueID{9, 10}, resp.GetChannels()[0].GetUnflushedSegmentIds())
2713-
assert.ElementsMatch(t, []UniqueID{12}, resp.GetChannels()[0].GetFlushedSegmentIds())
2714-
})
2715-
27162647
t.Run("with closed server", func(t *testing.T) {
27172648
svr := newTestServer(t, nil)
27182649
closeTestServer(t, svr)

0 commit comments

Comments
 (0)