Skip to content

Commit 942dbf2

Browse files
authored
Merge pull request #11029 from owncloud/search_scaling
feat: allow scaling of the search service
2 parents cb4d9fd + 620b9ad commit 942dbf2

File tree

6 files changed

+290
-56
lines changed

6 files changed

+290
-56
lines changed

services/search/pkg/config/engine.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@ type Engine struct {
99
// EngineBleve configures the bleve engine
1010
type EngineBleve struct {
1111
Datapath string `yaml:"data_path" env:"SEARCH_ENGINE_BLEVE_DATA_PATH" desc:"The directory where the filesystem will store search data. If not defined, the root directory derives from $OCIS_BASE_DATA_PATH/search." introductionVersion:"pre5.0"`
12+
Scale bool `yaml:"scale" env:"SEARCH_ENGINE_BLEVE_SCALE" desc:"Enable scaling of the search index (bleve). If set to 'true', the instance of the search service will no longer have exclusive write access to the index. Note when scaling search, all instances of the search service must be set to true! For 'false', which is the default, the running search service has exclusive access to the index as long it is running. This locks out other search processes tying to access the index." introductionVersion:"%%NEXT%%"`
1213
}

services/search/pkg/engine/bleve.go

Lines changed: 85 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"errors"
66
"math"
77
"path"
8-
"path/filepath"
98
"reflect"
109
"strings"
1110
"time"
@@ -31,44 +30,44 @@ import (
3130
searchMessage "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/search/v0"
3231
searchService "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/search/v0"
3332
"github.com/owncloud/ocis/v2/services/search/pkg/content"
33+
bleveEngine "github.com/owncloud/ocis/v2/services/search/pkg/engine/bleve"
3434
searchQuery "github.com/owncloud/ocis/v2/services/search/pkg/query"
3535
)
3636

3737
// Bleve represents a search engine which utilizes bleve to search and store resources.
3838
type Bleve struct {
39-
index bleve.Index
39+
indexGetter bleveEngine.IndexGetter
4040
queryCreator searchQuery.Creator[query.Query]
4141
}
4242

43-
// NewBleveIndex returns a new bleve index
44-
// given path must exist.
45-
func NewBleveIndex(root string) (bleve.Index, error) {
46-
destination := filepath.Join(root, "bleve")
47-
index, err := bleve.Open(destination)
48-
if errors.Is(bleve.ErrorIndexPathDoesNotExist, err) {
49-
m, err := BuildBleveMapping()
50-
if err != nil {
51-
return nil, err
52-
}
53-
index, err = bleve.New(destination, m)
54-
if err != nil {
55-
return nil, err
56-
}
57-
58-
return index, nil
59-
}
60-
61-
return index, err
62-
}
63-
6443
// NewBleveEngine creates a new Bleve instance
65-
func NewBleveEngine(index bleve.Index, queryCreator searchQuery.Creator[query.Query]) *Bleve {
44+
// If scalable is set to true, one connection to the index is created and
45+
// closed per operation, so multiple operations can be executed in parallel.
46+
// If set to false, only one write connection is created for the whole
47+
// service, which will lock the index for other processes. In this case,
48+
// you must close the engine yourself.
49+
func NewBleveEngine(indexGetter bleveEngine.IndexGetter, queryCreator searchQuery.Creator[query.Query]) *Bleve {
6650
return &Bleve{
67-
index: index,
51+
indexGetter: indexGetter,
6852
queryCreator: queryCreator,
6953
}
7054
}
7155

56+
// Close will get the index and close it. If the indexGetter is returning
57+
// new instances, this method will close just the new returned instance but
58+
// not any other instances that might be in use.
59+
//
60+
// This method is useful if "memory" and "persistent" (not "persistentScale")
61+
// index getters are used.
62+
func (b *Bleve) Close() error {
63+
// regardless of the implementation, we want to close the index
64+
bleveIndex, _, err := b.indexGetter.GetIndex()
65+
if err != nil {
66+
return err
67+
}
68+
return bleveIndex.Close()
69+
}
70+
7271
// BuildBleveMapping builds a bleve index mapping which can be used for indexing
7372
func BuildBleveMapping() (mapping.IndexMapping, error) {
7473
nameMapping := bleve.NewTextFieldMapping()
@@ -123,6 +122,12 @@ func BuildBleveMapping() (mapping.IndexMapping, error) {
123122
// Search executes a search request operation within the index.
124123
// Returns a SearchIndexResponse object or an error.
125124
func (b *Bleve) Search(ctx context.Context, sir *searchService.SearchIndexRequest) (*searchService.SearchIndexResponse, error) {
125+
bleveIndex, closeFn, err := b.indexGetter.GetIndex(bleveEngine.ReadOnly(true))
126+
if err != nil {
127+
return nil, err
128+
}
129+
defer closeFn()
130+
126131
createdQuery, err := b.queryCreator.Create(sir.Query)
127132
if err != nil {
128133
if searchQuery.IsValidationError(err) {
@@ -169,7 +174,7 @@ func (b *Bleve) Search(ctx context.Context, sir *searchService.SearchIndexReques
169174
}
170175

171176
bleveReq.Fields = []string{"*"}
172-
res, err := b.index.Search(bleveReq)
177+
res, err := bleveIndex.Search(bleveReq)
173178
if err != nil {
174179
return nil, err
175180
}
@@ -237,19 +242,31 @@ func (b *Bleve) Search(ctx context.Context, sir *searchService.SearchIndexReques
237242

238243
// Upsert indexes or stores Resource data fields.
239244
func (b *Bleve) Upsert(id string, r Resource) error {
240-
return b.index.Index(id, r)
245+
bleveIndex, closeFn, err := b.indexGetter.GetIndex()
246+
if err != nil {
247+
return err
248+
}
249+
defer closeFn()
250+
251+
return bleveIndex.Index(id, r)
241252
}
242253

243254
// Move updates the resource location and all of its necessary fields.
244255
func (b *Bleve) Move(id string, parentid string, target string) error {
245-
r, err := b.getResource(id)
256+
bleveIndex, closeFn, err := b.indexGetter.GetIndex()
257+
if err != nil {
258+
return err
259+
}
260+
defer closeFn()
261+
262+
r, err := b.getResource(bleveIndex, id)
246263
if err != nil {
247264
return err
248265
}
249266
currentPath := r.Path
250267
nextPath := utils.MakeRelativePath(target)
251268

252-
r, err = b.updateEntity(id, func(r *Resource) {
269+
r, err = b.updateEntity(bleveIndex, id, func(r *Resource) {
253270
r.Path = nextPath
254271
r.Name = path.Base(nextPath)
255272
r.ParentID = parentid
@@ -266,13 +283,13 @@ func (b *Bleve) Move(id string, parentid string, target string) error {
266283
bleveReq := bleve.NewSearchRequest(q)
267284
bleveReq.Size = math.MaxInt
268285
bleveReq.Fields = []string{"*"}
269-
res, err := b.index.Search(bleveReq)
286+
res, err := bleveIndex.Search(bleveReq)
270287
if err != nil {
271288
return err
272289
}
273290

274291
for _, h := range res.Hits {
275-
_, err := b.updateEntity(h.ID, func(r *Resource) {
292+
_, err := b.updateEntity(bleveIndex, h.ID, func(r *Resource) {
276293
r.Path = strings.Replace(r.Path, currentPath, nextPath, 1)
277294
})
278295
if err != nil {
@@ -289,29 +306,53 @@ func (b *Bleve) Move(id string, parentid string, target string) error {
289306
// instead of removing the resource it just marks it as deleted!
290307
// can be undone
291308
func (b *Bleve) Delete(id string) error {
292-
return b.setDeleted(id, true)
309+
bleveIndex, closeFn, err := b.indexGetter.GetIndex()
310+
if err != nil {
311+
return err
312+
}
313+
defer closeFn()
314+
315+
return b.setDeleted(bleveIndex, id, true)
293316
}
294317

295318
// Restore is the counterpart to Delete.
296319
// It restores the resource which makes it available again.
297320
func (b *Bleve) Restore(id string) error {
298-
return b.setDeleted(id, false)
321+
bleveIndex, closeFn, err := b.indexGetter.GetIndex()
322+
if err != nil {
323+
return err
324+
}
325+
defer closeFn()
326+
327+
return b.setDeleted(bleveIndex, id, false)
299328
}
300329

301330
// Purge removes a resource from the index, irreversible operation.
302331
func (b *Bleve) Purge(id string) error {
303-
return b.index.Delete(id)
332+
bleveIndex, closeFn, err := b.indexGetter.GetIndex()
333+
if err != nil {
334+
return err
335+
}
336+
defer closeFn()
337+
338+
return bleveIndex.Delete(id)
304339
}
305340

306341
// DocCount returns the number of resources in the index.
307342
func (b *Bleve) DocCount() (uint64, error) {
308-
return b.index.DocCount()
343+
bleveIndex, closeFn, err := b.indexGetter.GetIndex(bleveEngine.ReadOnly(true))
344+
if err != nil {
345+
return 0, err
346+
}
347+
defer closeFn()
348+
349+
return bleveIndex.DocCount()
309350
}
310351

311-
func (b *Bleve) getResource(id string) (*Resource, error) {
352+
func (b *Bleve) getResource(bleveIndex bleve.Index, id string) (*Resource, error) {
312353
req := bleve.NewSearchRequest(bleve.NewDocIDQuery([]string{id}))
313354
req.Fields = []string{"*"}
314-
res, err := b.index.Search(req)
355+
res, err := bleveIndex.Search(req)
315356
if err != nil {
316357
return nil, err
317358
}
@@ -446,19 +487,19 @@ func getPhotoValue[T any](fields map[string]interface{}) *T {
446487
return nil
447488
}
448489

449-
func (b *Bleve) updateEntity(id string, mutateFunc func(r *Resource)) (*Resource, error) {
450-
it, err := b.getResource(id)
490+
func (b *Bleve) updateEntity(bleveIndex bleve.Index, id string, mutateFunc func(r *Resource)) (*Resource, error) {
491+
it, err := b.getResource(bleveIndex, id)
451492
if err != nil {
452493
return nil, err
453494
}
454495

455496
mutateFunc(it)
456497

457-
return it, b.index.Index(it.ID, it)
498+
return it, bleveIndex.Index(it.ID, it)
458499
}
459500

460-
func (b *Bleve) setDeleted(id string, deleted bool) error {
461-
it, err := b.updateEntity(id, func(r *Resource) {
501+
func (b *Bleve) setDeleted(bleveIndex bleve.Index, id string, deleted bool) error {
502+
it, err := b.updateEntity(bleveIndex, id, func(r *Resource) {
462503
r.Deleted = deleted
463504
})
464505
if err != nil {
@@ -473,13 +514,13 @@ func (b *Bleve) setDeleted(id string, deleted bool) error {
473514
bleveReq := bleve.NewSearchRequest(q)
474515
bleveReq.Size = math.MaxInt
475516
bleveReq.Fields = []string{"*"}
476-
res, err := b.index.Search(bleveReq)
517+
res, err := bleveIndex.Search(bleveReq)
477518
if err != nil {
478519
return err
479520
}
480521

481522
for _, h := range res.Hits {
482-
_, err := b.updateEntity(h.ID, func(r *Resource) {
523+
_, err := b.updateEntity(bleveIndex, h.ID, func(r *Resource) {
483524
r.Deleted = deleted
484525
})
485526
if err != nil {

0 commit comments

Comments
 (0)