Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions history-service/internal/mongorepo/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@ import (
"go.mongodb.org/mongo-driver/v2/mongo"

"github.com/hmchangw/chat/pkg/model"
"github.com/hmchangw/chat/pkg/mongoutil"
)

const roomsCollection = "rooms"

type RoomRepo struct {
rooms *Collection[model.Room]
rooms *mongoutil.Collection[model.Room]
}

func NewRoomRepo(db *mongo.Database) *RoomRepo {
return &RoomRepo{
rooms: NewCollection[model.Room](db.Collection(roomsCollection)),
rooms: mongoutil.NewCollection[model.Room](db.Collection(roomsCollection)),
}
}

Expand All @@ -28,7 +29,7 @@ func NewRoomRepo(db *mongo.Database) *RoomRepo {
func (r *RoomRepo) GetMinUserLastSeenAt(ctx context.Context, roomID string) (*time.Time, error) {
room, err := r.rooms.FindOne(ctx,
bson.M{"_id": roomID},
WithProjection(bson.M{"minUserLastSeenAt": 1, "_id": 0}),
mongoutil.WithProjection(bson.M{"minUserLastSeenAt": 1, "_id": 0}),
)
if err != nil {
return nil, fmt.Errorf("get room %s minUserLastSeenAt: %w", roomID, err)
Expand Down
1 change: 1 addition & 0 deletions history-service/internal/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/hmchangw/chat/history-service/internal/cassrepo"
"github.com/hmchangw/chat/history-service/internal/models"
"github.com/hmchangw/chat/history-service/internal/mongorepo"
pkgmodel "github.com/hmchangw/chat/pkg/model"
"github.com/hmchangw/chat/pkg/mongoutil"
"github.com/hmchangw/chat/pkg/natsrouter"
Expand Down
44 changes: 33 additions & 11 deletions pkg/minioutil/minio_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func TestIntegration_Connect_Smoke(t *testing.T) {
func TestIntegration_NewBucket_Success(t *testing.T) {
client, bucketName := testutil.MinIO(t, "minioutil")

type doc struct{ Name string `json:"name"` }
type doc struct {
Name string `json:"name"`
}
b, err := NewBucket[doc](context.Background(), client, bucketName)
require.NoError(t, err)
require.NotNil(t, b)
Expand All @@ -51,7 +53,9 @@ func TestIntegration_NewBucket_Success(t *testing.T) {
func TestIntegration_NewBucket_MissingBucket(t *testing.T) {
client, _ := testutil.MinIO(t, "minioutil")

type doc struct{ Name string `json:"name"` }
type doc struct {
Name string `json:"name"`
}
_, err := NewBucket[doc](context.Background(), client, "definitely-does-not-exist")
require.Error(t, err)
assert.Contains(t, err.Error(), "definitely-does-not-exist")
Expand Down Expand Up @@ -93,7 +97,9 @@ func TestIntegration_Put_Overwrites(t *testing.T) {
client, bucketName := testutil.MinIO(t, "minioutil")
ctx := context.Background()

type doc struct{ V int `json:"v"` }
type doc struct {
V int `json:"v"`
}
b, err := NewBucket[doc](ctx, client, bucketName)
require.NoError(t, err)

Expand Down Expand Up @@ -138,7 +144,9 @@ func TestIntegration_Get_MissingKey(t *testing.T) {
client, bucketName := testutil.MinIO(t, "minioutil")
ctx := context.Background()

type doc struct{ V int `json:"v"` }
type doc struct {
V int `json:"v"`
}
b, err := NewBucket[doc](ctx, client, bucketName)
require.NoError(t, err)

Expand All @@ -155,7 +163,9 @@ func TestIntegration_Get_MalformedJSON(t *testing.T) {
client, bucketName := testutil.MinIO(t, "minioutil")
ctx := context.Background()

type doc struct{ V int `json:"v"` }
type doc struct {
V int `json:"v"`
}
b, err := NewBucket[doc](ctx, client, bucketName)
require.NoError(t, err)

Expand All @@ -174,7 +184,9 @@ func TestIntegration_List_Prefix(t *testing.T) {
client, bucketName := testutil.MinIO(t, "minioutil")
ctx := context.Background()

type doc struct{ V int `json:"v"` }
type doc struct {
V int `json:"v"`
}
b, err := NewBucket[doc](ctx, client, bucketName)
require.NoError(t, err)

Expand All @@ -200,7 +212,9 @@ func TestIntegration_List_ZeroMaxKeysReturnsAll(t *testing.T) {
client, bucketName := testutil.MinIO(t, "minioutil")
ctx := context.Background()

type doc struct{ V int `json:"v"` }
type doc struct {
V int `json:"v"`
}
b, err := NewBucket[doc](ctx, client, bucketName)
require.NoError(t, err)

Expand All @@ -221,7 +235,9 @@ func TestIntegration_List_MaxKeysCap(t *testing.T) {
client, bucketName := testutil.MinIO(t, "minioutil")
ctx := context.Background()

type doc struct{ V int `json:"v"` }
type doc struct {
V int `json:"v"`
}
b, err := NewBucket[doc](ctx, client, bucketName)
require.NoError(t, err)

Expand Down Expand Up @@ -254,7 +270,9 @@ func TestIntegration_List_EmptyResult(t *testing.T) {
client, bucketName := testutil.MinIO(t, "minioutil")
ctx := context.Background()

type doc struct{ V int `json:"v"` }
type doc struct {
V int `json:"v"`
}
b, err := NewBucket[doc](ctx, client, bucketName)
require.NoError(t, err)

Expand All @@ -270,7 +288,9 @@ func TestIntegration_Delete_RemovesExisting(t *testing.T) {
client, bucketName := testutil.MinIO(t, "minioutil")
ctx := context.Background()

type doc struct{ V int `json:"v"` }
type doc struct {
V int `json:"v"`
}
b, err := NewBucket[doc](ctx, client, bucketName)
require.NoError(t, err)

Expand All @@ -289,7 +309,9 @@ func TestIntegration_Delete_Idempotent(t *testing.T) {
client, bucketName := testutil.MinIO(t, "minioutil")
ctx := context.Background()

type doc struct{ V int `json:"v"` }
type doc struct {
V int `json:"v"`
}
b, err := NewBucket[doc](ctx, client, bucketName)
require.NoError(t, err)

Expand Down
32 changes: 23 additions & 9 deletions pkg/searchengine/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,30 @@ import (
"github.com/elastic/go-elasticsearch/v8"
)

// New creates a SearchEngine for the given backend ("elasticsearch" or "opensearch").
// It verifies connectivity via Ping before returning. When tlsSkipVerify is
// true, server certificate verification is disabled — intended for
// self-signed/internal ES clusters; MUST stay false in production.
func New(ctx context.Context, backend, url string, tlsSkipVerify bool) (SearchEngine, error) {
// Config bundles every connection-time knob for the search backend.
type Config struct {
Backend string
URL string
Username string
Password string
// TLSSkipVerify disables server cert verification — opt-in via env for
// self-signed/internal clusters; MUST stay false in production.
TLSSkipVerify bool
}

// New creates a SearchEngine for the configured backend
// ("elasticsearch" or "opensearch") and verifies connectivity via Ping
// before returning.
Comment thread
Joey0538 marked this conversation as resolved.
func New(ctx context.Context, cfg Config) (SearchEngine, error) {
var transport Transporter
switch backend {
switch cfg.Backend {
case "elasticsearch":
esCfg := elasticsearch.Config{Addresses: []string{url}}
if tlsSkipVerify {
esCfg := elasticsearch.Config{
Addresses: []string{cfg.URL},
Username: cfg.Username,
Password: cfg.Password,
}
if cfg.TLSSkipVerify {
dt, ok := http.DefaultTransport.(*http.Transport)
if !ok {
return nil, fmt.Errorf("create elasticsearch client: http.DefaultTransport is not *http.Transport")
Expand All @@ -37,7 +51,7 @@ func New(ctx context.Context, backend, url string, tlsSkipVerify bool) (SearchEn
}
transport = client
default:
return nil, fmt.Errorf("unsupported search backend: %s", backend)
return nil, fmt.Errorf("unsupported search backend: %s", cfg.Backend)
}

adapter := newAdapter(transport)
Expand Down
4 changes: 2 additions & 2 deletions search-service/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ func setupCCSFixture(t *testing.T) *ccsFixture {
waitForRemoteConnected(t, localURL, "remote1", 120*time.Second)
t.Logf("CCS fixture: remote1 connected")

localEngine, err := searchengine.New(ctx, "elasticsearch", localURL, false)
localEngine, err := searchengine.New(ctx, searchengine.Config{Backend: "elasticsearch", URL: localURL})
require.NoError(t, err, "build searchengine for local")
remoteEngine, err := searchengine.New(ctx, "elasticsearch", remoteURL, false)
remoteEngine, err := searchengine.New(ctx, searchengine.Config{Backend: "elasticsearch", URL: remoteURL})
require.NoError(t, err, "build searchengine for remote")

t.Logf("CCS fixture: starting valkey")
Expand Down
10 changes: 9 additions & 1 deletion search-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
type ESConfig struct {
URL string `env:"URL,required"`
Backend string `env:"BACKEND" envDefault:"elasticsearch"`
Username string `env:"USERNAME" envDefault:""`
Password string `env:"PASSWORD" envDefault:""`
TLSSkipVerify bool `env:"TLS_SKIP_VERIFY" envDefault:"false"`
}

Expand Down Expand Up @@ -81,7 +83,13 @@ func main() {
os.Exit(1)
}

engine, err := searchengine.New(ctx, cfg.ES.Backend, cfg.ES.URL, cfg.ES.TLSSkipVerify)
engine, err := searchengine.New(ctx, searchengine.Config{
Backend: cfg.ES.Backend,
URL: cfg.ES.URL,
Username: cfg.ES.Username,
Password: cfg.ES.Password,
TLSSkipVerify: cfg.ES.TLSSkipVerify,
})
if err != nil {
slog.Error("search engine connect failed", "error", err)
os.Exit(1)
Expand Down
10 changes: 5 additions & 5 deletions search-sync-worker/inbox_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestSpotlightSync_Integration(t *testing.T) {
indexName := "spotlight-site-spot-v1-chat"

// --- ES template + index ---
engine, err := searchengine.New(ctx, "elasticsearch", esURL, false)
engine, err := searchengine.New(ctx, searchengine.Config{Backend: "elasticsearch", URL: esURL})
require.NoError(t, err)
waitForClusterGreen(t, esURL, 120*time.Second)

Expand Down Expand Up @@ -245,7 +245,7 @@ func TestSpotlightSync_BulkInvite(t *testing.T) {
siteID := "site-spot-bulk"
indexName := "spotlight-site-spot-bulk-v1-chat"

engine, err := searchengine.New(ctx, "elasticsearch", esURL, false)
engine, err := searchengine.New(ctx, searchengine.Config{Backend: "elasticsearch", URL: esURL})
require.NoError(t, err)
waitForClusterGreen(t, esURL, 120*time.Second)

Expand Down Expand Up @@ -318,7 +318,7 @@ func TestUserRoomSync_Integration(t *testing.T) {
siteID := "site-ur"
indexName := "user-room-site-ur"

engine, err := searchengine.New(ctx, "elasticsearch", esURL, false)
engine, err := searchengine.New(ctx, searchengine.Config{Backend: "elasticsearch", URL: esURL})
require.NoError(t, err)
waitForClusterGreen(t, esURL, 120*time.Second)

Expand Down Expand Up @@ -440,7 +440,7 @@ func TestUserRoomSync_BulkInvite(t *testing.T) {
siteID := "site-ur-bulk"
indexName := "user-room-site-ur-bulk"

engine, err := searchengine.New(ctx, "elasticsearch", esURL, false)
engine, err := searchengine.New(ctx, searchengine.Config{Backend: "elasticsearch", URL: esURL})
require.NoError(t, err)
waitForClusterGreen(t, esURL, 120*time.Second)

Expand Down Expand Up @@ -555,7 +555,7 @@ func TestUserRoomSync_LWWGuard(t *testing.T) {
siteID := "site-lww"
indexName := "user-room-site-lww"

engine, err := searchengine.New(ctx, "elasticsearch", esURL, false)
engine, err := searchengine.New(ctx, searchengine.Config{Backend: "elasticsearch", URL: esURL})
require.NoError(t, err)
waitForClusterGreen(t, esURL, 120*time.Second)

Expand Down
4 changes: 2 additions & 2 deletions search-sync-worker/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func TestSearchSyncIntegration(t *testing.T) {

// --- Setup search engine + template ---
prefix := "msgs-inttest-v1"
engine, err := searchengine.New(ctx, "elasticsearch", esURL, false)
engine, err := searchengine.New(ctx, searchengine.Config{Backend: "elasticsearch", URL: esURL})
require.NoError(t, err, "create search engine")

// Wait for cluster to be green before creating indices.
Expand Down Expand Up @@ -496,7 +496,7 @@ func TestCustomAnalyzer(t *testing.T) {
ctx := context.Background()

prefix := "analyzer-test-v1"
engine, err := searchengine.New(ctx, "elasticsearch", esURL, false)
engine, err := searchengine.New(ctx, searchengine.Config{Backend: "elasticsearch", URL: esURL})
require.NoError(t, err)

waitForClusterGreen(t, esURL, 120*time.Second)
Expand Down
10 changes: 9 additions & 1 deletion search-sync-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type config struct {
SiteID string `env:"SITE_ID,required"`
SearchURL string `env:"SEARCH_URL,required"`
SearchBackend string `env:"SEARCH_BACKEND" envDefault:"elasticsearch"`
SearchUsername string `env:"SEARCH_USERNAME" envDefault:""`
SearchPassword string `env:"SEARCH_PASSWORD" envDefault:""`
SearchTLSSkipVerify bool `env:"SEARCH_TLS_SKIP_VERIFY" envDefault:"false"`
MsgIndexPrefix string `env:"MSG_INDEX_PREFIX,required"`
SpotlightIndex string `env:"SPOTLIGHT_INDEX" envDefault:""`
Expand Down Expand Up @@ -119,7 +121,13 @@ func main() {
os.Exit(1)
}

engine, err := searchengine.New(ctx, cfg.SearchBackend, cfg.SearchURL, cfg.SearchTLSSkipVerify)
engine, err := searchengine.New(ctx, searchengine.Config{
Backend: cfg.SearchBackend,
URL: cfg.SearchURL,
Username: cfg.SearchUsername,
Password: cfg.SearchPassword,
TLSSkipVerify: cfg.SearchTLSSkipVerify,
})
if err != nil {
slog.Error("search engine connect failed", "error", err)
os.Exit(1)
Expand Down
Loading