Skip to content

Commit a22088a

Browse files
authored
enhance: [StorageV2] Make packed reader use correct path (#41919)
Related to #39173 This PR - Use updated path with bucketName for packedReader - Update milvus-storage commit to report reader/writer initialization failure, see also milvus-io/milvus-storage#192 --------- Signed-off-by: Congqi Xia <[email protected]>
1 parent e4f04fd commit a22088a

File tree

5 files changed

+13
-7
lines changed

5 files changed

+13
-7
lines changed

internal/core/thirdparty/milvus-storage/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
# Update milvus-storage_VERSION for the first occurrence
1515
milvus_add_pkg_config("milvus-storage")
1616
set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "")
17-
set( milvus-storage_VERSION d04ae52 )
17+
set( milvus-storage_VERSION 1238e21 )
1818
set( GIT_REPOSITORY "https://github.com/milvus-io/milvus-storage.git")
1919
message(STATUS "milvus-storage repo: ${GIT_REPOSITORY}")
2020
message(STATUS "milvus-storage version: ${milvus-storage_VERSION}")

internal/datanode/index/task_index.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ func (it *indexBuildTask) Execute(ctx context.Context) error {
299299
if buildIndexParams.StorageVersion == storage.StorageV2 {
300300
buildIndexParams.SegmentInsertFiles = GetSegmentInsertFiles(
301301
it.req.GetInsertLogs(),
302-
it.req.GetStorageConfig().RootPath,
302+
it.req.GetStorageConfig(),
303303
it.req.GetCollectionID(),
304304
it.req.GetPartitionID(),
305305
it.req.GetSegmentID())

internal/datanode/index/task_stats.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,7 @@ func buildIndexParams(
605605
if req.GetStorageVersion() == storage.StorageV2 {
606606
params.SegmentInsertFiles = GetSegmentInsertFiles(
607607
req.GetInsertLogs(),
608-
req.GetStorageConfig().RootPath,
608+
req.GetStorageConfig(),
609609
req.GetCollectionID(),
610610
req.GetPartitionID(),
611611
req.GetTargetSegmentID())

internal/datanode/index/util.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,16 @@ package index
2828
import "C"
2929

3030
import (
31+
"path"
32+
3133
"github.com/cockroachdb/errors"
3234

3335
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
3436
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
3537
"github.com/milvus-io/milvus/pkg/v2/common"
3638
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
3739
"github.com/milvus-io/milvus/pkg/v2/proto/indexcgopb"
40+
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
3841
"github.com/milvus-io/milvus/pkg/v2/util/hardware"
3942
"github.com/milvus-io/milvus/pkg/v2/util/metautil"
4043
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
@@ -100,14 +103,17 @@ func CalculateNodeSlots() int64 {
100103
return totalSlot
101104
}
102105

103-
func GetSegmentInsertFiles(fieldBinlogs []*datapb.FieldBinlog, rootPath string, collectionID int64, partitionID int64, segmentID int64) *indexcgopb.SegmentInsertFiles {
106+
func GetSegmentInsertFiles(fieldBinlogs []*datapb.FieldBinlog, storageConfig *indexpb.StorageConfig, collectionID int64, partitionID int64, segmentID int64) *indexcgopb.SegmentInsertFiles {
104107
insertLogs := make([]*indexcgopb.FieldInsertFiles, 0)
105108
for _, insertLog := range fieldBinlogs {
106109
filePaths := make([]string, 0)
107110
columnGroupID := insertLog.GetFieldID()
108111
for _, binlog := range insertLog.GetBinlogs() {
109-
filePaths = append(filePaths,
110-
metautil.BuildInsertLogPath(rootPath, collectionID, partitionID, segmentID, columnGroupID, binlog.GetLogID()))
112+
filePath := metautil.BuildInsertLogPath(storageConfig.GetRootPath(), collectionID, partitionID, segmentID, columnGroupID, binlog.GetLogID())
113+
if storageConfig.StorageType != "local" {
114+
filePath = path.Join(storageConfig.GetBucketName(), filePath)
115+
}
116+
filePaths = append(filePaths, filePath)
111117
}
112118
insertLogs = append(insertLogs, &indexcgopb.FieldInsertFiles{
113119
FilePaths: filePaths,

internal/storage/rw.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ func NewBinlogRecordReader(ctx context.Context, binlogs []*datapb.FieldBinlog, s
207207
for j, binlog := range binlogs {
208208
logPath := binlog.GetLogPath()
209209
if paramtable.Get().CommonCfg.StorageType.GetValue() != "local" {
210-
path.Join(rwOptions.bucketName, logPath)
210+
logPath = path.Join(rwOptions.bucketName, logPath)
211211
}
212212
paths[j] = append(paths[j], logPath)
213213
}

0 commit comments

Comments
 (0)