Skip to content

Commit af60771

Browse files
authored
Merge pull request #2 from SkyAPM/external-seg
Add external segment streaming support with deduplication
2 parents 3750bbb + 2772e8a commit af60771

File tree

12 files changed

+2686
-6
lines changed

12 files changed

+2686
-6
lines changed

.github/workflows/lint.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,4 @@ jobs:
1313
- name: golangci-lint
1414
uses: golangci/[email protected]
1515
with:
16-
version: v1.61.0
16+
version: v1.64.8

.github/workflows/tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ jobs:
88
test:
99
strategy:
1010
matrix:
11-
go-version: [1.23.x]
11+
go-version: [1.24.x]
1212
platform: [ubuntu-latest, macos-latest, windows-latest]
1313
runs-on: ${{ matrix.platform }}
1414
steps:

config.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ type Config struct {
3939

4040
SearchStartFunc func(size uint64) error
4141
SearchEndFunc func(size uint64)
42+
43+
// External segment streaming options
44+
EnableExternalSegments bool
45+
EnableDeduplication bool
46+
ExternalSegmentTempDir string
4247
}
4348

4449
// WithVirtualField allows you to describe a field that
@@ -87,6 +92,24 @@ func (config Config) WithPrepareMergeCallback(f func(src []*roaring.Bitmap, segm
8792
return config
8893
}
8994

95+
func (config Config) WithExternalSegments(tempDir string, enableDeduplication bool) Config {
96+
config.EnableExternalSegments = true
97+
config.EnableDeduplication = enableDeduplication
98+
config.ExternalSegmentTempDir = tempDir
99+
100+
// Propagate to index config
101+
config.indexConfig.EnableExternalSegments = true
102+
config.indexConfig.EnableDeduplication = enableDeduplication
103+
config.indexConfig.ExternalSegmentTempDir = tempDir
104+
105+
return config
106+
}
107+
108+
// DefaultExternalSegmentConfig provides sensible defaults for external segment streaming
109+
func DefaultExternalSegmentConfig(tempDir string) Config {
110+
return Config{}.WithExternalSegments(tempDir, true)
111+
}
112+
90113
func DefaultConfig(path string) Config {
91114
indexConfig := index.DefaultConfig(path)
92115
return defaultConfig(indexConfig)
@@ -124,6 +147,12 @@ func defaultConfig(indexConfig index.Config) Config {
124147
}
125148
return rv.DefaultSimilarity.ComputeNorm(length)
126149
})
150+
151+
// Propagate external segment settings from main config to index config
152+
indexConfig.EnableExternalSegments = rv.EnableExternalSegments
153+
indexConfig.EnableDeduplication = rv.EnableDeduplication
154+
indexConfig.ExternalSegmentTempDir = rv.ExternalSegmentTempDir
155+
127156
rv.indexConfig = indexConfig
128157

129158
return rv

external_segment_example.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
// Copyright (c) 2020 The Bluge Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package bluge
16+
17+
import (
18+
"fmt"
19+
"io"
20+
"log"
21+
)
22+
23+
// ExampleExternalSegmentStreaming demonstrates how to use the external segment streaming feature
24+
func ExampleExternalSegmentStreaming() {
25+
// Configure writer with external segment support
26+
config := DefaultConfig("./index").WithExternalSegments("./temp_segments", true)
27+
28+
writer, err := OpenWriter(config)
29+
if err != nil {
30+
log.Printf("failed to open writer: %v", err)
31+
return
32+
}
33+
defer writer.Close()
34+
35+
// Enable external segment receiver
36+
receiver, err := writer.EnableExternalSegments()
37+
if err != nil {
38+
log.Printf("failed to enable external segments: %v", err)
39+
return
40+
}
41+
42+
// Start receiving a segment
43+
err = receiver.StartSegment()
44+
if err != nil {
45+
log.Printf("failed to start segment: %v", err)
46+
return
47+
}
48+
49+
// Simulate streaming chunks from an external source
50+
// In a real scenario, this would come from a network stream, file, etc.
51+
var externalSegmentData []byte // This would be your actual segment data
52+
chunkSize := 64 * 1024 // 64KB chunks
53+
54+
for len(externalSegmentData) > 0 {
55+
chunkEnd := chunkSize
56+
if chunkEnd > len(externalSegmentData) {
57+
chunkEnd = len(externalSegmentData)
58+
}
59+
60+
chunk := externalSegmentData[:chunkEnd]
61+
externalSegmentData = externalSegmentData[chunkEnd:]
62+
63+
err = receiver.WriteChunk(chunk)
64+
if err != nil {
65+
log.Printf("failed to write chunk: %v", err)
66+
return
67+
}
68+
}
69+
70+
// Signal completion of segment streaming
71+
err = receiver.CompleteSegment()
72+
if err != nil {
73+
log.Printf("failed to complete segment: %v", err)
74+
return
75+
}
76+
77+
fmt.Printf("External segment streamed successfully\n")
78+
fmt.Printf("Status: %v\n", receiver.Status())
79+
fmt.Printf("Bytes received: %d\n", receiver.BytesReceived())
80+
}
81+
82+
// ExampleExternalSegmentStreamingFromReader demonstrates streaming from an io.Reader
83+
func ExampleExternalSegmentStreamingFromReader(segmentReader io.Reader) error {
84+
// Configure writer with external segment support
85+
config := DefaultConfig("./index").WithExternalSegments("./temp_segments", true)
86+
87+
writer, err := OpenWriter(config)
88+
if err != nil {
89+
return err
90+
}
91+
defer writer.Close()
92+
93+
// Enable external segment receiver
94+
receiver, err := writer.EnableExternalSegments()
95+
if err != nil {
96+
return err
97+
}
98+
99+
// Start receiving a segment
100+
err = receiver.StartSegment()
101+
if err != nil {
102+
return err
103+
}
104+
105+
// Stream data from reader
106+
chunkSize := 64 * 1024 // 64KB chunks
107+
buffer := make([]byte, chunkSize)
108+
109+
for {
110+
n, err := segmentReader.Read(buffer)
111+
if err != nil && err != io.EOF {
112+
if recoverErr := receiver.RecoverFromFailure(); recoverErr != nil {
113+
return fmt.Errorf("failed to recover from failure: %w, original error: %w", recoverErr, err)
114+
}
115+
return err
116+
}
117+
118+
if n == 0 {
119+
break // End of data
120+
}
121+
122+
// Write chunk
123+
err = receiver.WriteChunk(buffer[:n])
124+
if err != nil {
125+
return err
126+
}
127+
128+
if err == io.EOF {
129+
break
130+
}
131+
}
132+
133+
// Signal completion
134+
err = receiver.CompleteSegment()
135+
if err != nil {
136+
return err
137+
}
138+
139+
fmt.Printf("External segment streamed successfully from reader\n")
140+
return nil
141+
}

0 commit comments

Comments
 (0)