Skip to content

Commit 606b778

Browse files
committed
save
1 parent 54d530f commit 606b778

File tree

1 file changed

+67
-38
lines changed

1 file changed

+67
-38
lines changed

db/recsplit/recsplit.go

Lines changed: 67 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,11 @@ type recsplitScratch struct {
8282
primaryAggrBound uint16
8383
secondaryAggrBound uint16
8484
bytesPerRec int // Bytes per record in offset encoding
85+
86+
// Input fields used by the parallel path: producer fills these, worker reads them.
87+
bucketIdx uint64
88+
bucket []uint64
89+
offsets []uint64
8590
}
8691

8792
// bucketResult contains the output of processing a single bucket.
@@ -112,13 +117,6 @@ func (br *bucketResult) Reset() {
112117
br.err = nil
113118
}
114119

115-
// bucketTask is a unit of work sent to a parallel worker.
116-
type bucketTask struct {
117-
bucketIdx uint64
118-
bucket []uint64 // worker takes ownership and may modify in-place
119-
offsets []uint64 // worker takes ownership and may modify in-place
120-
}
121-
122120
// RecSplit is the implementation of Recursive Split algorithm for constructing perfect hash mapping, described in
123121
// https://arxiv.org/pdf/1910.06416.pdf Emmanuel Esposito, Thomas Mueller Graf, and Sebastiano Vigna.
124122
// Recsplit: Minimal perfect hashing via recursive splitting. In 2020 Proceedings of the Symposium on Algorithm Engineering and Experiments (ALENEX),
@@ -1151,22 +1149,32 @@ func newWorkerScratch(rs *RecSplit) *recsplitScratch {
11511149
secondaryAggrBound: rs.scratch.secondaryAggrBound,
11521150
bytesPerRec: rs.scratch.bytesPerRec,
11531151
trace: rs.scratch.trace,
1152+
bucket: make([]uint64, 0, rs.bucketSize),
1153+
offsets: make([]uint64, 0, rs.bucketSize),
11541154
}
11551155
}
11561156

11571157
// recsplitBucketWorker is the goroutine body for parallel bucket processing.
11581158
// It consumes bucketTask values from tasks, computes the recsplit encoding, and
11591159
// sends *bucketResult values (in any order) to results.
1160-
func recsplitBucketWorker(ctx context.Context, tasks <-chan *bucketTask, results chan<- *bucketResult, sc *recsplitScratch) {
1161-
for task := range tasks {
1160+
// The scratch sc carries the input bucket data (sc.bucket, sc.offsets, sc.bucketIdx) as well
1161+
// as the per-worker computation buffers. After processing, sc is returned to free for reuse.
1162+
func recsplitBucketWorker(ctx context.Context, tasks <-chan *recsplitScratch, results chan<- *bucketResult, free chan<- *recsplitScratch) {
1163+
for sc := range tasks {
11621164
result := getBucketResult()
1163-
result.bucketIdx = task.bucketIdx
1164-
result.bucketSize = len(task.bucket)
1165+
result.bucketIdx = sc.bucketIdx
1166+
result.bucketSize = len(sc.bucket)
11651167

1166-
if len(task.bucket) > 1 {
1167-
for i, key := range task.bucket[1:] {
1168-
if key == task.bucket[i] {
1168+
if len(sc.bucket) > 1 {
1169+
for i, key := range sc.bucket[1:] {
1170+
if key == sc.bucket[i] {
11691171
result.err = fmt.Errorf("%w: %x", ErrCollision, key)
1172+
sc.bucket = sc.bucket[:0]
1173+
sc.offsets = sc.offsets[:0]
1174+
select {
1175+
case free <- sc:
1176+
case <-ctx.Done():
1177+
}
11701178
select {
11711179
case results <- result:
11721180
case <-ctx.Done():
@@ -1175,10 +1183,17 @@ func recsplitBucketWorker(ctx context.Context, tasks <-chan *bucketTask, results
11751183
return
11761184
}
11771185
}
1178-
result.preAlloc(len(task.bucket), sc.bytesPerRec)
1179-
sc.preAlloc(len(task.bucket))
1186+
result.preAlloc(len(sc.bucket), sc.bytesPerRec)
1187+
sc.preAlloc(len(sc.bucket))
11801188
var err error
1181-
sc.unaryBuf, err = recsplit(0, task.bucket, task.offsets, sc.unaryBuf[:0], sc, result)
1189+
sc.unaryBuf, err = recsplit(0, sc.bucket, sc.offsets, sc.unaryBuf[:0], sc, result)
1190+
result.unaryBuf = append(result.unaryBuf[:0], sc.unaryBuf...)
1191+
sc.bucket = sc.bucket[:0]
1192+
sc.offsets = sc.offsets[:0]
1193+
select {
1194+
case free <- sc:
1195+
case <-ctx.Done():
1196+
}
11821197
if err != nil {
11831198
result.err = err
11841199
select {
@@ -1188,13 +1203,18 @@ func recsplitBucketWorker(ctx context.Context, tasks <-chan *bucketTask, results
11881203
}
11891204
return
11901205
}
1191-
result.unaryBuf = append(result.unaryBuf[:0], sc.unaryBuf...)
11921206
} else {
1193-
result.preAlloc(len(task.bucket), sc.bytesPerRec)
1194-
for _, offset := range task.offsets {
1207+
result.preAlloc(len(sc.bucket), sc.bytesPerRec)
1208+
for _, offset := range sc.offsets {
11951209
binary.BigEndian.PutUint64(sc.numBuf[:], offset)
11961210
result.offsetData = append(result.offsetData, sc.numBuf[8-sc.bytesPerRec:]...)
11971211
}
1212+
sc.bucket = sc.bucket[:0]
1213+
sc.offsets = sc.offsets[:0]
1214+
select {
1215+
case free <- sc:
1216+
case <-ctx.Done():
1217+
}
11981218
}
11991219

12001220
select {
@@ -1245,23 +1265,24 @@ func (rs *RecSplit) buildWithWorkers(ctx context.Context) error {
12451265
defer cancel()
12461266

12471267
numWorkers := rs.workers
1248-
taskCh := make(chan *bucketTask, numWorkers*2)
1249-
resultCh := make(chan *bucketResult, numWorkers*2)
1250-
1251-
// Keep worker scratches so we can harvest their golombRice tables after they finish.
1252-
// Workers lazily extend golombRice as they encounter different bucket sizes;
1253-
// the longest table covers the highest m seen by any worker.
1268+
// freeScratchCh acts as a pool: N scratches circulate between producer and workers.
1269+
// Each scratch carries both the input bucket data and the per-worker computation buffers,
1270+
// so no separate bucketTask allocation is needed.
1271+
freeScratchCh := make(chan *recsplitScratch, numWorkers)
12541272
workerScratches := make([]*recsplitScratch, numWorkers)
12551273
for i := range workerScratches {
12561274
workerScratches[i] = newWorkerScratch(rs)
1275+
freeScratchCh <- workerScratches[i]
12571276
}
1277+
taskCh := make(chan *recsplitScratch, numWorkers)
1278+
resultCh := make(chan *bucketResult, numWorkers*2)
1279+
12581280
var wg sync.WaitGroup
1259-
for _, sc := range workerScratches {
1260-
sc := sc
1281+
for range numWorkers {
12611282
wg.Add(1)
12621283
go func() {
12631284
defer wg.Done()
1264-
recsplitBucketWorker(ctx, taskCh, resultCh, sc)
1285+
recsplitBucketWorker(ctx, taskCh, resultCh, freeScratchCh)
12651286
}()
12661287
}
12671288
go func() {
@@ -1279,34 +1300,42 @@ func (rs *RecSplit) buildWithWorkers(ctx context.Context) error {
12791300
rs.scratch.golombRice = best
12801301
}()
12811302

1282-
// Producer: iterate ETL collector, send one task per bucket.
1303+
// Producer: iterate ETL collector, send one scratch per bucket.
12831304
producerErrCh := make(chan error, 1)
12841305
go func() {
12851306
defer close(taskCh)
12861307
var curBucketIdx uint64 = math.MaxUint64
1287-
var bucket, offsets []uint64
1308+
var sc *recsplitScratch
12881309
err := rs.bucketCollector.Load(nil, "", func(k, v []byte, _ etl.CurrentTableReader, _ etl.LoadNextFunc) error {
12891310
// k is 4-byte BigEndian bucketIdx (uint32) + 8-byte fingerprint; v is the offset.
12901311
bucketIdx := uint64(binary.BigEndian.Uint32(k))
12911312
if curBucketIdx != bucketIdx {
12921313
if curBucketIdx != math.MaxUint64 {
1314+
sc.bucketIdx = curBucketIdx
12931315
select {
1294-
case taskCh <- &bucketTask{bucketIdx: curBucketIdx, bucket: bucket, offsets: offsets}:
1316+
case taskCh <- sc:
12951317
case <-ctx.Done():
12961318
return ctx.Err()
12971319
}
1298-
bucket = nil
1299-
offsets = nil
1320+
sc = nil
13001321
}
13011322
curBucketIdx = bucketIdx
13021323
}
1303-
bucket = append(bucket, binary.BigEndian.Uint64(k[4:]))
1304-
offsets = append(offsets, binary.BigEndian.Uint64(v))
1324+
if sc == nil {
1325+
select {
1326+
case sc = <-freeScratchCh:
1327+
case <-ctx.Done():
1328+
return ctx.Err()
1329+
}
1330+
}
1331+
sc.bucket = append(sc.bucket, binary.BigEndian.Uint64(k[4:]))
1332+
sc.offsets = append(sc.offsets, binary.BigEndian.Uint64(v))
13051333
return nil
13061334
}, etl.TransformArgs{Quit: ctx.Done()})
1307-
if err == nil && len(bucket) > 0 {
1335+
if err == nil && sc != nil && len(sc.bucket) > 0 {
1336+
sc.bucketIdx = curBucketIdx
13081337
select {
1309-
case taskCh <- &bucketTask{bucketIdx: curBucketIdx, bucket: bucket, offsets: offsets}:
1338+
case taskCh <- sc:
13101339
case <-ctx.Done():
13111340
err = ctx.Err()
13121341
}

0 commit comments

Comments
 (0)