Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,54 @@ type Config struct {

Use `slog` package for structured logging. No `fmt.Println` in production paths.

### CLI Commands (`backend/cmd/cli`)

Commands are registered in `main.go` and implemented in per-topic files (e.g. `guests.go`).

**Registering a command:**
```go
// main.go
var commands = map[string]command{
"reindex-guests": {
description: "Fetch all guests from the database and reindex them in OpenSearch",
run: runReindexGuests,
},
}
```

**Backfill/reindex pattern** — paginated DB read via `iter.Seq2`, batched writes to the sink:

```go
// repository: paginated generator using cursor pagination
func (r *GuestsRepository) AllGuestDocuments(ctx context.Context) iter.Seq2[*models.GuestDocument, error] {
return func(yield func(*models.GuestDocument, error) bool) {
var cursorName, cursorID string
for {
// fetch page using (cursorName, cursorID) as cursor
// yield each doc; return on last page or if yield returns false
}
}
}

// cli: accumulate into batches, flush when full
batch := make([]*models.GuestDocument, 0, batchSize)
for doc, err := range repo.AllGuestDocuments(ctx) {
if err != nil { return err }
batch = append(batch, doc)
if len(batch) == batchSize {
if err := sink.BulkIndex(ctx, batch); err != nil { return err }
batch = batch[:0]
}
}
if len(batch) > 0 {
if err := sink.BulkIndex(ctx, batch); err != nil { return err }
}
```

- DB is read in pages, sink is written in batches — neither loaded fully into memory
- `iter.Seq2` is used so the caller drives iteration with a plain `for range`
- No semaphore needed unless batches are indexed concurrently

---

## Testing (Frontend)
Expand Down
23 changes: 23 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Contributing

## Local Setup

### OpenSearch

Run OpenSearch locally with the security plugin disabled (no TLS or auth required for local dev):

```bash
docker run -d --name opensearch \
-p 9200:9200 -p 9600:9600 \
-e "discovery.type=single-node" \
-e "DISABLE_SECURITY_PLUGIN=true" \
opensearchproject/opensearch:latest
```

Verify it's running:

```bash
curl http://localhost:9200
```

> In production, OpenSearch uses TLS and credentials injected via Doppler. `OPENSEARCH_INSECURE_SKIP_TLS` should be `false` in prod.
Binary file added backend/cli
Binary file not shown.
67 changes: 67 additions & 0 deletions backend/cmd/cli/guests.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package main

import (
"context"
"fmt"

"github.com/generate/selfserve/config"
"github.com/generate/selfserve/internal/models"
"github.com/generate/selfserve/internal/repository"
opensearchstorage "github.com/generate/selfserve/internal/service/storage/opensearch"
storage "github.com/generate/selfserve/internal/service/storage/postgres"
)

const reindexBatchSize = 100

func runReindexGuests(ctx context.Context, cfg config.Config, _ []string) error {
pgRepo, err := storage.NewRepository(cfg.DB)
if err != nil {
return fmt.Errorf("failed to connect to db: %w", err)
}
defer pgRepo.Close()

osClient, err := opensearchstorage.NewClient(cfg.OpenSearch)
if err != nil {
return fmt.Errorf("failed to connect to opensearch: %w", err)
}

if err := opensearchstorage.EnsureGuestsIndex(ctx, osClient); err != nil {
return fmt.Errorf("failed to ensure guests index: %w", err)
}

guestsRepo := repository.NewGuestsRepository(pgRepo.DB)
osGuestsRepo := repository.NewOpenSearchGuestsRepository(osClient)

var total int
batch := make([]*models.GuestDocument, 0, reindexBatchSize)

flush := func() error {
if err := osGuestsRepo.BulkIndexGuests(ctx, batch); err != nil {
return err
}
total += len(batch)
batch = batch[:0]
return nil
}

for doc, err := range guestsRepo.AllGuestDocuments(ctx) {
if err != nil {
return fmt.Errorf("failed to fetch guest documents: %w", err)
}
batch = append(batch, doc)
if len(batch) == reindexBatchSize {
if err := flush(); err != nil {
return fmt.Errorf("failed to bulk index batch: %w", err)
}
}
}

if len(batch) > 0 {
if err := flush(); err != nil {
return fmt.Errorf("failed to bulk index batch: %w", err)
}
}

fmt.Printf("reindex-guests completed: %d indexed\n", total)
return nil
}
4 changes: 4 additions & 0 deletions backend/cmd/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ var commands = map[string]command{
description: "Sync users from Clerk into the database",
run: runSyncUsers,
},
"reindex-guests": {
description: "Fetch all guests from the database and reindex them in OpenSearch",
run: runReindexGuests,
},
}

func main() {
Expand Down
86 changes: 86 additions & 0 deletions backend/internal/repository/guests.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package repository
import (
"context"
"errors"
"iter"
"time"

"github.com/generate/selfserve/internal/errs"
Expand Down Expand Up @@ -191,6 +192,91 @@ func (r *GuestsRepository) UpdateGuest(ctx context.Context, id string, update *m
return &guest, nil
}

const fetchAllGuestDocumentsPageSize = 100

// AllGuestDocuments returns a paginated iterator over every guest document in the
// database. It yields one *models.GuestDocument at a time, fetching the next page
// only when the previous one is exhausted. Stop iterating early by returning false
// from the yield function; the first non-nil error stops iteration and is yielded
// as the second value.
func (r *GuestsRepository) AllGuestDocuments(ctx context.Context) iter.Seq2[*models.GuestDocument, error] {
return func(yield func(*models.GuestDocument, error) bool) {
var cursorName, cursorID string

for {
rows, err := r.db.Query(ctx, `
SELECT
g.id,
gb.hotel_id,
CONCAT_WS(' ', g.first_name, g.last_name) AS full_name,
g.first_name,
g.last_name,
COALESCE(g.preferences, g.first_name) AS preferred_name,
g.email,
g.phone,
g.preferences,
g.notes,
r.floor,
r.room_number,
gb.group_size,
gb.status,
gb.arrival_date,
gb.departure_date
FROM guest_bookings gb
JOIN guests g ON g.id = gb.guest_id
JOIN rooms r ON r.id = gb.room_id
WHERE (
$1::text = ''
OR (CONCAT_WS(' ', g.first_name, g.last_name), g.id::text) > ($1::text, $2::text)
)
ORDER BY CONCAT_WS(' ', g.first_name, g.last_name) ASC, g.id ASC
LIMIT $3
`, cursorName, cursorID, fetchAllGuestDocumentsPageSize)
if err != nil {
yield(nil, err)
return
}

var page []*models.GuestDocument
for rows.Next() {
var doc models.GuestDocument
if err := rows.Scan(
&doc.ID, &doc.HotelID, &doc.FullName,
&doc.FirstName, &doc.LastName, &doc.PreferredName,
&doc.Email, &doc.Phone, &doc.Preferences, &doc.Notes,
&doc.Floor, &doc.RoomNumber, &doc.GroupSize,
&doc.BookingStatus, &doc.ArrivalDate, &doc.DepartureDate,
); err != nil {
rows.Close()
yield(nil, err)
return
}
page = append(page, &doc)
}
rows.Close()

if err := rows.Err(); err != nil {
yield(nil, err)
return
}

for _, doc := range page {
if !yield(doc, nil) {
return
}
}

if len(page) < fetchAllGuestDocumentsPageSize {
return // last page
}

last := page[len(page)-1]
cursorName = last.FullName
cursorID = last.ID
}
}
}

func (r *GuestsRepository) FindGuestsWithActiveBooking(ctx context.Context, filters *models.GuestFilters) (*models.GuestPage, error) {
floorsFilter := filters.Floors
groupSizesFilter := filters.GroupSize
Expand Down
60 changes: 60 additions & 0 deletions backend/internal/repository/guests_opensearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/generate/selfserve/internal/models"
opensearchstorage "github.com/generate/selfserve/internal/service/storage/opensearch"
Expand Down Expand Up @@ -42,6 +43,65 @@ func (r *OpenSearchGuestsRepository) IndexGuest(ctx context.Context, doc *models
return nil
}

func (r *OpenSearchGuestsRepository) BulkIndexGuests(ctx context.Context, docs []*models.GuestDocument) error {
if len(docs) == 0 {
return nil
}

var body bytes.Buffer
for _, doc := range docs {
meta := fmt.Sprintf(`{"index":{"_index":%q,"_id":%q}}`, opensearchstorage.GuestsIndex, doc.ID)
body.WriteString(meta)
body.WriteByte('\n')
serialized, err := json.Marshal(doc)
if err != nil {
return fmt.Errorf("marshaling guest %s: %w", doc.ID, err)
}
body.Write(serialized)
body.WriteByte('\n')
}

res, err := opensearchapi.BulkRequest{
Body: &body,
}.Do(ctx, r.client)
if err != nil {
return fmt.Errorf("bulk indexing guests: %w", err)
}
defer res.Body.Close()

if res.IsError() {
return fmt.Errorf("bulk index failed: %s", res.String())
}

var result struct {
Errors bool `json:"errors"`
Items []map[string]struct {
ID string `json:"_id"`
Status int `json:"status"`
Error *struct {
Reason string `json:"reason"`
} `json:"error,omitempty"`
} `json:"items"`
}
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return fmt.Errorf("decoding bulk response: %w", err)
}

if result.Errors {
var errs []string
for _, item := range result.Items {
for _, op := range item {
if op.Error != nil {
errs = append(errs, fmt.Sprintf("id=%s: %s", op.ID, op.Error.Reason))
}
}
}
return fmt.Errorf("bulk index partial failure: %s", strings.Join(errs, "; "))
}

return nil
}

func (r *OpenSearchGuestsRepository) DeleteGuest(ctx context.Context, id string) error {
deleteResponse, err := opensearchapi.DeleteRequest{
Index: opensearchstorage.GuestsIndex,
Expand Down
Loading