Skip to content

Commit e603e08

Browse files
committed
refactor(canal): remove event buffer
1 parent 60a5b41 commit e603e08

File tree

10 files changed

+7
-78
lines changed

10 files changed

+7
-78
lines changed

go.mod

-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ require (
3535
github.com/trim21/errgo v0.0.3
3636
github.com/trim21/go-phpserialize v0.1.0
3737
github.com/trim21/htest v0.0.4
38-
github.com/trim21/pkg v0.0.4
3938
go.uber.org/fx v1.23.0
4039
go.uber.org/zap v1.27.0
4140
golang.org/x/sync v0.12.0

go.sum

-2
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,6 @@ github.com/trim21/go-phpserialize v0.1.0 h1:4wLvVYkP6Ya07M+2YweFdx4WOrydZzVyxKRD
239239
github.com/trim21/go-phpserialize v0.1.0/go.mod h1:99ibWRqdNj3DTX+dbHvo+L47TsoEXwArpvBPRLrvz20=
240240
github.com/trim21/htest v0.0.4 h1:dDIzKNdIClgtB158DlO+Xf0sfwNycmx3kfo/FJuY+eE=
241241
github.com/trim21/htest v0.0.4/go.mod h1:W+zaYAGCBqx38eMrMGvXrALnbcXR6OBtZiRiHahgo+E=
242-
github.com/trim21/pkg v0.0.4 h1:0nYODKdqNUzmUaPFvqSiR420u2uXQgIYyVyiNfH7olc=
243-
github.com/trim21/pkg v0.0.4/go.mod h1:edl6xdqBOJrhMuIGvcY2lg5L9cqp/hVuwHRM/kdzbMg=
244242
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
245243
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
246244
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=

internal/search/character/client.go

-11
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88

99
"github.com/meilisearch/meilisearch-go"
1010
"github.com/trim21/errgo"
11-
"github.com/trim21/pkg/queue"
1211
"go.uber.org/zap"
1312

1413
"github.com/bangumi/server/config"
@@ -54,22 +53,12 @@ type client struct {
5453
meili meilisearch.ServiceManager
5554
log *zap.Logger
5655
q *query.Query
57-
58-
queue *queue.Batched[searcher.Document]
59-
}
60-
61-
func (c *client) Close() {
62-
if c.queue != nil {
63-
c.queue.Close()
64-
}
6556
}
6657

6758
func (c *client) canalInit(cfg config.AppConfig) error {
6859
if err := searcher.ValidateConfigs(cfg); err != nil {
6960
return errgo.Wrap(err, "validate search config")
7061
}
71-
c.queue = searcher.NewBatchQueue(cfg, c.log, c.index)
72-
searcher.RegisterQueueMetrics(idx, c.queue)
7362
shouldCreateIndex, err := searcher.NeedFirstRun(c.meili, idx)
7463
if err != nil {
7564
return err

internal/search/character/event.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ func (c *client) OnUpdate(ctx context.Context, id model.CharacterID) error {
4545

4646
extracted := extract(&s)
4747

48-
c.queue.Push(extracted)
48+
_, err = c.index.UpdateDocumentsWithContext(ctx, extracted, "id")
4949

50-
return nil
50+
return err
5151
}
5252

5353
func (c *client) OnDelete(ctx context.Context, id model.CharacterID) error {

internal/search/person/client.go

-11
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88

99
"github.com/meilisearch/meilisearch-go"
1010
"github.com/trim21/errgo"
11-
"github.com/trim21/pkg/queue"
1211
"go.uber.org/zap"
1312

1413
"github.com/bangumi/server/config"
@@ -54,22 +53,12 @@ type client struct {
5453
meili meilisearch.ServiceManager
5554
log *zap.Logger
5655
q *query.Query
57-
58-
queue *queue.Batched[searcher.Document]
59-
}
60-
61-
func (c *client) Close() {
62-
if c.queue != nil {
63-
c.queue.Close()
64-
}
6556
}
6657

6758
func (c *client) canalInit(cfg config.AppConfig) error {
6859
if err := searcher.ValidateConfigs(cfg); err != nil {
6960
return errgo.Wrap(err, "validate search config")
7061
}
71-
c.queue = searcher.NewBatchQueue(cfg, c.log, c.index)
72-
searcher.RegisterQueueMetrics(idx, c.queue)
7362
shouldCreateIndex, err := searcher.NeedFirstRun(c.meili, idx)
7463
if err != nil {
7564
return err

internal/search/person/event.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ func (c *client) OnUpdate(ctx context.Context, id model.PersonID) error {
4545

4646
extracted := extract(&s)
4747

48-
c.queue.Push(extracted)
48+
_, err = c.index.UpdateDocumentsWithContext(ctx, extracted, "id")
4949

50-
return nil
50+
return err
5151
}
5252

5353
func (c *client) OnDelete(ctx context.Context, id model.PersonID) error {

internal/search/search.go

+1-5
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,4 @@ func (s *Search) EventDelete(ctx context.Context, id uint32, target SearchTarget
143143
return searcher.OnDelete(ctx, id)
144144
}
145145

146-
func (s *Search) Close() {
147-
for _, searcher := range s.searchers {
148-
searcher.Close()
149-
}
150-
}
146+
func (s *Search) Close() {}

internal/search/searcher/client.go

-30
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,8 @@ import (
1212
"github.com/avast/retry-go/v4"
1313
"github.com/labstack/echo/v4"
1414
"github.com/meilisearch/meilisearch-go"
15-
"github.com/prometheus/client_golang/prometheus"
1615
"github.com/samber/lo"
1716
"github.com/trim21/errgo"
18-
"github.com/trim21/pkg/queue"
1917
"go.uber.org/zap"
2018

2119
"github.com/bangumi/server/config"
@@ -25,8 +23,6 @@ import (
2523
type Searcher interface {
2624
Handle(c echo.Context) error
2725

28-
Close()
29-
3026
OnAdded(ctx context.Context, id uint32) error
3127
OnUpdate(ctx context.Context, id uint32) error
3228
OnDelete(ctx context.Context, id uint32) error
@@ -140,32 +136,6 @@ func NewDedupeFunc() func([]Document) []Document {
140136
}
141137
}
142138

143-
func NewBatchQueue(cfg config.AppConfig, log *zap.Logger, index meilisearch.IndexManager) *queue.Batched[Document] {
144-
return queue.NewBatchedDedupe(
145-
NewSendBatch(log, index),
146-
cfg.Search.SearchBatchSize,
147-
cfg.Search.SearchBatchInterval,
148-
NewDedupeFunc(),
149-
)
150-
}
151-
152-
func RegisterQueueMetrics(idx string, queue *queue.Batched[Document]) {
153-
prometheus.DefaultRegisterer.MustRegister(
154-
prometheus.NewGaugeFunc(
155-
prometheus.GaugeOpts{
156-
Namespace: "chii",
157-
Name: "meilisearch_queue_batch",
158-
Help: "meilisearch update queue batch size",
159-
ConstLabels: prometheus.Labels{
160-
"index": idx,
161-
},
162-
},
163-
func() float64 {
164-
return float64(queue.Len())
165-
},
166-
))
167-
}
168-
169139
func GetAttributes(rt reflect.Type, tag string) *[]string {
170140
var s []string
171141
for i := 0; i < rt.NumField(); i++ {

internal/search/subject/client.go

-12
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88

99
"github.com/meilisearch/meilisearch-go"
1010
"github.com/trim21/errgo"
11-
"github.com/trim21/pkg/queue"
1211
"go.uber.org/zap"
1312

1413
"github.com/bangumi/server/config"
@@ -54,23 +53,12 @@ type client struct {
5453
meili meilisearch.ServiceManager
5554
log *zap.Logger
5655
q *query.Query
57-
58-
queue *queue.Batched[searcher.Document]
59-
}
60-
61-
func (c *client) Close() {
62-
if c.queue != nil {
63-
c.queue.Close()
64-
}
6556
}
6657

6758
func (c *client) canalInit(cfg config.AppConfig) error {
6859
if err := searcher.ValidateConfigs(cfg); err != nil {
6960
return errgo.Wrap(err, "validate search config")
7061
}
71-
c.queue = searcher.NewBatchQueue(cfg, c.log, c.index)
72-
searcher.RegisterQueueMetrics(idx, c.queue)
73-
7462
shouldCreateIndex, err := searcher.NeedFirstRun(c.meili, idx)
7563
if err != nil {
7664
return err

internal/search/subject/event.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ func (c *client) OnUpdate(ctx context.Context, id model.SubjectID) error {
4646

4747
extracted := extract(&s)
4848

49-
c.queue.Push(extracted)
49+
_, err = c.index.UpdateDocumentsWithContext(ctx, extracted, "id")
5050

51-
return nil
51+
return err
5252
}
5353

5454
func (c *client) OnDelete(ctx context.Context, id model.SubjectID) error {

0 commit comments

Comments
 (0)