Skip to content

Commit 29d260c

Browse files
committed
synchronously index comments for the first time run
1 parent 42d6bcd commit 29d260c

File tree

4 files changed

+38
-34
lines changed

4 files changed

+38
-34
lines changed

backend/app/cmd/server.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"github.com/go-pkgz/lcw/eventbus"
1919
log "github.com/go-pkgz/lgr"
2020
ntf "github.com/go-pkgz/notify"
21-
"github.com/go-pkgz/syncs"
2221
"github.com/golang-jwt/jwt"
2322
"github.com/kyokomi/emoji/v2"
2423
bolt "go.etcd.io/bbolt"
@@ -672,20 +671,13 @@ func (a *serverApp) run(ctx context.Context) error {
672671
log.Printf("[WARN] failed to resubmit comments with staging images, %s", e)
673672
}
674673

674+
// synchronously index comments in several workers for the first time run
675675
numWorkersForIndexing := 8
676-
grp := syncs.NewErrSizedGroup(numWorkersForIndexing)
677-
for _, siteID := range a.Sites {
678-
a.dataService.RunSiteIndexers(ctx, siteID, grp) // index comments for the first time run
676+
err := a.dataService.IndexSites(ctx, a.Sites, numWorkersForIndexing)
677+
if err != nil {
678+
log.Printf("[WARN] failed to index comments, %s", err)
679679
}
680680

681-
// don't lock here, wait in background to log error if any
682-
go func() {
683-
err := grp.Wait()
684-
if err != nil {
685-
log.Printf("[WARN] background task for indexing existing comments failed: %s", err)
686-
}
687-
}()
688-
689681
go a.imageService.Cleanup(ctx) // pictures cleanup for staging images
690682

691683
a.restSrv.Run(a.Address, a.Port)

backend/app/store/search/search_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ func TestSearch_IndexStartup(t *testing.T) {
241241

242242
grp := syncs.NewErrSizedGroup(4)
243243
for _, siteID := range sites {
244-
err := IndexSite(context.Background(), siteID, searcher, storeEngine, grp)
244+
err := RunBackgroundSiteIndexer(context.Background(), siteID, searcher, storeEngine, grp)
245245
require.NoError(t, err)
246246
}
247247
err := grp.Wait()

backend/app/store/search/site_index.go

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,17 @@ import (
44
"context"
55
"fmt"
66
"log"
7-
"sync/atomic"
87
"time"
98

109
"github.com/go-pkgz/syncs"
1110
"github.com/umputun/remark42/backend/app/store"
1211
"github.com/umputun/remark42/backend/app/store/engine"
1312
)
1413

15-
// IndexSite rebuilds search index for the site
14+
// RunBackgroundSiteIndexer spawns background processes to build search index for the site from scratch
1615
// Run indexing of each topic in parallel in a sized group
17-
func IndexSite(ctx context.Context, siteID string, s *Service, e engine.Interface, grp *syncs.ErrSizedGroup) error {
16+
// Caller should wait for the group to finish
17+
func RunBackgroundSiteIndexer(ctx context.Context, siteID string, s *Service, e engine.Interface, grp *syncs.ErrSizedGroup) error {
1818
siteIdx, isIndexed := s.sitesEngines[siteID]
1919
if !isIndexed {
2020
log.Printf("[INFO] skipping indexing site %q", siteID)
@@ -32,17 +32,14 @@ func IndexSite(ctx context.Context, siteID string, s *Service, e engine.Interfac
3232
return nil
3333
}
3434

35-
log.Printf("[INFO] indexing site %s", siteID)
36-
startTime := time.Now()
37-
3835
req := engine.InfoRequest{Locator: store.Locator{SiteID: siteID}}
3936
topics, err := e.Info(req)
4037

4138
if err != nil {
4239
return fmt.Errorf("failed to get topics for site %q: %w", siteID, err)
4340
}
4441

45-
var indexedCnt uint64
42+
// worker to index single topic
4643
worker := func(ctx context.Context, url string) error {
4744
locator := store.Locator{SiteID: siteID, URL: url}
4845
select {
@@ -58,22 +55,18 @@ func IndexSite(ctx context.Context, siteID string, s *Service, e engine.Interfac
5855
}
5956

6057
indexErr := s.indexBatch(comments)
61-
log.Printf("[INFO] %d documents indexed from site %v", len(comments), locator)
62-
6358
if indexErr != nil {
6459
return fmt.Errorf("failed to index comments for search: %w", indexErr)
6560
}
6661

67-
atomic.AddUint64(&indexedCnt, uint64(len(comments)))
68-
62+
log.Printf("[INFO] %d documents indexed from site %v", len(comments), locator)
6963
return nil
7064
}
7165

7266
for i := len(topics) - 1; i >= 0; i-- {
7367
url := topics[i].URL
7468
grp.Go(func() error { return worker(ctx, url) })
7569
}
76-
77-
log.Printf("[INFO] total %d documents indexed for site %q in %v", indexedCnt, siteID, time.Since(startTime))
70+
// don't wait for the group to finish, caller should do it
7871
return nil
7972
}

backend/app/store/service/service.go

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -950,17 +950,36 @@ func (s *DataStore) Search(siteID, query, sortBy string, limit, skip int) ([]sto
950950
return comments, searchRes.Total, nil
951951
}
952952

953-
// RunSiteIndexers indexes all comments for siteID in the sized group.
954-
// It's required to call this method at the startup to support "cold start"
955-
// when store contains some data, but search functionality is enabled for the first time.
956-
func (s *DataStore) RunSiteIndexers(ctx context.Context, siteID string, grp *syncs.ErrSizedGroup) {
953+
// IndexSites indexes all comments for all sites.
954+
// It's required to call this method at the startup to support cold start
955+
// (store contains some data, but search functionality is enabled for the first time)
956+
// Note: this method is synchronous and can take time to complete to build the initial index.
957+
func (s *DataStore) IndexSites(ctx context.Context, sites []string, numWorkers int) error {
957958
if s.SearchService == nil {
958-
return
959+
return nil
959960
}
960-
err := search.IndexSite(ctx, siteID, s.SearchService, s.Engine, grp)
961-
if err != nil {
962-
log.Printf("[WARN] error occurred during indexing comments for site %q: %e", siteID, err)
961+
962+
startTime := time.Now()
963+
964+
// run indexing for all sites in the same pool
965+
grp := syncs.NewErrSizedGroup(numWorkers)
966+
cancelCtx, cancelFunc := context.WithCancel(context.Background())
967+
defer cancelFunc()
968+
for _, siteID := range sites {
969+
err := search.RunBackgroundSiteIndexer(cancelCtx, siteID, s.SearchService, s.Engine, grp)
970+
if err != nil {
971+
// cancel all existing background jobs
972+
cancelFunc()
973+
_ = grp.Wait()
974+
return err
975+
}
976+
}
977+
978+
err := grp.Wait()
979+
if err == nil {
980+
log.Printf("[INFO] finish building search index in %v", time.Since(startTime))
963981
}
982+
return err
964983
}
965984

966985
// Close store service

0 commit comments

Comments
 (0)