Skip to content

Commit 5e68cd7

Browse files
authored
feat: cli to reindex elastic guests index (#259)
* cli to reindex guests * use generator pattern * batch reindex * update contributing for local dev * claude md patterns
1 parent ad2eb59 commit 5e68cd7

File tree

7 files changed

+288
-0
lines changed

7 files changed

+288
-0
lines changed

CLAUDE.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,54 @@ type Config struct {
325325

326326
Use `slog` package for structured logging. No `fmt.Println` in production paths.
327327

328+
### CLI Commands (`backend/cmd/cli`)
329+
330+
Commands are registered in `main.go` and implemented in per-topic files (e.g. `guests.go`).
331+
332+
**Registering a command:**
333+
```go
334+
// main.go
335+
var commands = map[string]command{
336+
"reindex-guests": {
337+
description: "Fetch all guests from the database and reindex them in OpenSearch",
338+
run: runReindexGuests,
339+
},
340+
}
341+
```
342+
343+
**Backfill/reindex pattern** — paginated DB read via `iter.Seq2`, batched writes to the sink:
344+
345+
```go
346+
// repository: paginated generator using cursor pagination
347+
func (r *GuestsRepository) AllGuestDocuments(ctx context.Context) iter.Seq2[*models.GuestDocument, error] {
348+
return func(yield func(*models.GuestDocument, error) bool) {
349+
var cursorName, cursorID string
350+
for {
351+
// fetch page using (cursorName, cursorID) as cursor
352+
// yield each doc; return on last page or if yield returns false
353+
}
354+
}
355+
}
356+
357+
// cli: accumulate into batches, flush when full
358+
batch := make([]*models.GuestDocument, 0, batchSize)
359+
for doc, err := range repo.AllGuestDocuments(ctx) {
360+
if err != nil { return err }
361+
batch = append(batch, doc)
362+
if len(batch) == batchSize {
363+
if err := sink.BulkIndex(ctx, batch); err != nil { return err }
364+
batch = batch[:0]
365+
}
366+
}
367+
if len(batch) > 0 {
368+
if err := sink.BulkIndex(ctx, batch); err != nil { return err }
369+
}
370+
```
371+
372+
- DB is read in pages, sink is written in batches — neither loaded fully into memory
373+
- `iter.Seq2` is used so the caller drives iteration with a plain `for range`
374+
- No semaphore needed unless batches are indexed concurrently
375+
328376
---
329377

330378
## Testing (Frontend)

CONTRIBUTING.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Contributing
2+
3+
## Local Setup
4+
5+
### OpenSearch
6+
7+
Run OpenSearch locally with the security plugin disabled (no TLS or auth required for local dev):
8+
9+
```bash
10+
docker run -d --name opensearch \
11+
-p 9200:9200 -p 9600:9600 \
12+
-e "discovery.type=single-node" \
13+
-e "DISABLE_SECURITY_PLUGIN=true" \
14+
opensearchproject/opensearch:latest
15+
```
16+
17+
Verify it's running:
18+
19+
```bash
20+
curl http://localhost:9200
21+
```
22+
23+
> In production, OpenSearch uses TLS and credentials injected via Doppler. `OPENSEARCH_INSECURE_SKIP_TLS` should be `false` in prod.

backend/cli

36.7 MB
Binary file not shown.

backend/cmd/cli/guests.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/generate/selfserve/config"
8+
"github.com/generate/selfserve/internal/models"
9+
"github.com/generate/selfserve/internal/repository"
10+
opensearchstorage "github.com/generate/selfserve/internal/service/storage/opensearch"
11+
storage "github.com/generate/selfserve/internal/service/storage/postgres"
12+
)
13+
14+
const reindexBatchSize = 100
15+
16+
func runReindexGuests(ctx context.Context, cfg config.Config, _ []string) error {
17+
pgRepo, err := storage.NewRepository(cfg.DB)
18+
if err != nil {
19+
return fmt.Errorf("failed to connect to db: %w", err)
20+
}
21+
defer pgRepo.Close()
22+
23+
osClient, err := opensearchstorage.NewClient(cfg.OpenSearch)
24+
if err != nil {
25+
return fmt.Errorf("failed to connect to opensearch: %w", err)
26+
}
27+
28+
if err := opensearchstorage.EnsureGuestsIndex(ctx, osClient); err != nil {
29+
return fmt.Errorf("failed to ensure guests index: %w", err)
30+
}
31+
32+
guestsRepo := repository.NewGuestsRepository(pgRepo.DB)
33+
osGuestsRepo := repository.NewOpenSearchGuestsRepository(osClient)
34+
35+
var total int
36+
batch := make([]*models.GuestDocument, 0, reindexBatchSize)
37+
38+
flush := func() error {
39+
if err := osGuestsRepo.BulkIndexGuests(ctx, batch); err != nil {
40+
return err
41+
}
42+
total += len(batch)
43+
batch = batch[:0]
44+
return nil
45+
}
46+
47+
for doc, err := range guestsRepo.AllGuestDocuments(ctx) {
48+
if err != nil {
49+
return fmt.Errorf("failed to fetch guest documents: %w", err)
50+
}
51+
batch = append(batch, doc)
52+
if len(batch) == reindexBatchSize {
53+
if err := flush(); err != nil {
54+
return fmt.Errorf("failed to bulk index batch: %w", err)
55+
}
56+
}
57+
}
58+
59+
if len(batch) > 0 {
60+
if err := flush(); err != nil {
61+
return fmt.Errorf("failed to bulk index batch: %w", err)
62+
}
63+
}
64+
65+
fmt.Printf("reindex-guests completed: %d indexed\n", total)
66+
return nil
67+
}

backend/cmd/cli/main.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ var commands = map[string]command{
3232
description: "Sync users from Clerk into the database",
3333
run: runSyncUsers,
3434
},
35+
"reindex-guests": {
36+
description: "Fetch all guests from the database and reindex them in OpenSearch",
37+
run: runReindexGuests,
38+
},
3539
}
3640

3741
func main() {

backend/internal/repository/guests.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package repository
33
import (
44
"context"
55
"errors"
6+
"iter"
67
"time"
78

89
"github.com/generate/selfserve/internal/errs"
@@ -191,6 +192,91 @@ func (r *GuestsRepository) UpdateGuest(ctx context.Context, id string, update *m
191192
return &guest, nil
192193
}
193194

195+
const fetchAllGuestDocumentsPageSize = 100
196+
197+
// AllGuestDocuments returns a paginated iterator over every guest document in the
198+
// database. It yields one *models.GuestDocument at a time, fetching the next page
199+
// only when the previous one is exhausted. Stop iterating early by returning false
200+
// from the yield function; the first non-nil error stops iteration and is yielded
201+
// as the second value.
202+
func (r *GuestsRepository) AllGuestDocuments(ctx context.Context) iter.Seq2[*models.GuestDocument, error] {
203+
return func(yield func(*models.GuestDocument, error) bool) {
204+
var cursorName, cursorID string
205+
206+
for {
207+
rows, err := r.db.Query(ctx, `
208+
SELECT
209+
g.id,
210+
gb.hotel_id,
211+
CONCAT_WS(' ', g.first_name, g.last_name) AS full_name,
212+
g.first_name,
213+
g.last_name,
214+
COALESCE(g.preferences, g.first_name) AS preferred_name,
215+
g.email,
216+
g.phone,
217+
g.preferences,
218+
g.notes,
219+
r.floor,
220+
r.room_number,
221+
gb.group_size,
222+
gb.status,
223+
gb.arrival_date,
224+
gb.departure_date
225+
FROM guest_bookings gb
226+
JOIN guests g ON g.id = gb.guest_id
227+
JOIN rooms r ON r.id = gb.room_id
228+
WHERE (
229+
$1::text = ''
230+
OR (CONCAT_WS(' ', g.first_name, g.last_name), g.id::text) > ($1::text, $2::text)
231+
)
232+
ORDER BY CONCAT_WS(' ', g.first_name, g.last_name) ASC, g.id ASC
233+
LIMIT $3
234+
`, cursorName, cursorID, fetchAllGuestDocumentsPageSize)
235+
if err != nil {
236+
yield(nil, err)
237+
return
238+
}
239+
240+
var page []*models.GuestDocument
241+
for rows.Next() {
242+
var doc models.GuestDocument
243+
if err := rows.Scan(
244+
&doc.ID, &doc.HotelID, &doc.FullName,
245+
&doc.FirstName, &doc.LastName, &doc.PreferredName,
246+
&doc.Email, &doc.Phone, &doc.Preferences, &doc.Notes,
247+
&doc.Floor, &doc.RoomNumber, &doc.GroupSize,
248+
&doc.BookingStatus, &doc.ArrivalDate, &doc.DepartureDate,
249+
); err != nil {
250+
rows.Close()
251+
yield(nil, err)
252+
return
253+
}
254+
page = append(page, &doc)
255+
}
256+
rows.Close()
257+
258+
if err := rows.Err(); err != nil {
259+
yield(nil, err)
260+
return
261+
}
262+
263+
for _, doc := range page {
264+
if !yield(doc, nil) {
265+
return
266+
}
267+
}
268+
269+
if len(page) < fetchAllGuestDocumentsPageSize {
270+
return // last page
271+
}
272+
273+
last := page[len(page)-1]
274+
cursorName = last.FullName
275+
cursorID = last.ID
276+
}
277+
}
278+
}
279+
194280
func (r *GuestsRepository) FindGuestsWithActiveBooking(ctx context.Context, filters *models.GuestFilters) (*models.GuestPage, error) {
195281
floorsFilter := filters.Floors
196282
groupSizesFilter := filters.GroupSize

backend/internal/repository/guests_opensearch.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"encoding/json"
77
"fmt"
8+
"strings"
89

910
"github.com/generate/selfserve/internal/models"
1011
opensearchstorage "github.com/generate/selfserve/internal/service/storage/opensearch"
@@ -42,6 +43,65 @@ func (r *OpenSearchGuestsRepository) IndexGuest(ctx context.Context, doc *models
4243
return nil
4344
}
4445

46+
func (r *OpenSearchGuestsRepository) BulkIndexGuests(ctx context.Context, docs []*models.GuestDocument) error {
47+
if len(docs) == 0 {
48+
return nil
49+
}
50+
51+
var body bytes.Buffer
52+
for _, doc := range docs {
53+
meta := fmt.Sprintf(`{"index":{"_index":%q,"_id":%q}}`, opensearchstorage.GuestsIndex, doc.ID)
54+
body.WriteString(meta)
55+
body.WriteByte('\n')
56+
serialized, err := json.Marshal(doc)
57+
if err != nil {
58+
return fmt.Errorf("marshaling guest %s: %w", doc.ID, err)
59+
}
60+
body.Write(serialized)
61+
body.WriteByte('\n')
62+
}
63+
64+
res, err := opensearchapi.BulkRequest{
65+
Body: &body,
66+
}.Do(ctx, r.client)
67+
if err != nil {
68+
return fmt.Errorf("bulk indexing guests: %w", err)
69+
}
70+
defer res.Body.Close()
71+
72+
if res.IsError() {
73+
return fmt.Errorf("bulk index failed: %s", res.String())
74+
}
75+
76+
var result struct {
77+
Errors bool `json:"errors"`
78+
Items []map[string]struct {
79+
ID string `json:"_id"`
80+
Status int `json:"status"`
81+
Error *struct {
82+
Reason string `json:"reason"`
83+
} `json:"error,omitempty"`
84+
} `json:"items"`
85+
}
86+
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
87+
return fmt.Errorf("decoding bulk response: %w", err)
88+
}
89+
90+
if result.Errors {
91+
var errs []string
92+
for _, item := range result.Items {
93+
for _, op := range item {
94+
if op.Error != nil {
95+
errs = append(errs, fmt.Sprintf("id=%s: %s", op.ID, op.Error.Reason))
96+
}
97+
}
98+
}
99+
return fmt.Errorf("bulk index partial failure: %s", strings.Join(errs, "; "))
100+
}
101+
102+
return nil
103+
}
104+
45105
func (r *OpenSearchGuestsRepository) DeleteGuest(ctx context.Context, id string) error {
46106
deleteResponse, err := opensearchapi.DeleteRequest{
47107
Index: opensearchstorage.GuestsIndex,

0 commit comments

Comments
 (0)