Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion history-service/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func main() {
slog.Info("message bucket configured",
"hours", cfg.MessageBucketHours,
"maxBuckets", cfg.MessageReadMaxBuckets,
"readBucketConcurrency", cfg.MessageReadBucketConcurrency,
"readEscalateAfter", cfg.MessageReadEscalateAfter,
"historyFloorDays", cfg.MessageHistoryFloorDays,
"largeRoomThreshold", cfg.LargeRoomThreshold,
"maxPinnedPerRoom", cfg.MaxPinnedPerRoom,
Expand Down Expand Up @@ -133,7 +135,8 @@ func main() {
cipher = atrest.NewCipher(w, atrest.NewMongoDEKStore(dekColl), cfg.Atrest)
}

cassRepo := cassrepo.NewRepository(cassSession, bucketSizer, cfg.MessageReadMaxBuckets, cipher)
cassRepo := cassrepo.NewRepository(cassSession, bucketSizer, cfg.MessageReadMaxBuckets, cipher,
cassrepo.WithReadConcurrency(cfg.MessageReadBucketConcurrency, cfg.MessageReadEscalateAfter))
db := mongoClient.Database(cfg.Mongo.DB)
subRepo := mongorepo.NewSubscriptionRepo(db)
roomRepo := mongorepo.NewRoomRepo(db)
Expand Down
8 changes: 4 additions & 4 deletions history-service/internal/cassrepo/messages_by_room.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (r *Repository) GetMessagesBefore(ctx context.Context, roomID string, befor
}

res, err := fillPage[models.Message](
ctx, r.bucket, walkDesc, startBucket, floorBucket, r.maxBuckets,
ctx, r.bucket, walkDesc, startBucket, floorBucket, r.walkCfg,
pageReq.PageSize, initialPageState, queryFn, r.scanMessagesUpTo(ctx),
)
if err != nil {
Expand Down Expand Up @@ -142,7 +142,7 @@ func (r *Repository) GetMessagesBetweenDesc(ctx context.Context, roomID string,
}

res, err := fillPage[models.Message](
ctx, r.bucket, walkDesc, startBucket, floorBucket, r.maxBuckets,
ctx, r.bucket, walkDesc, startBucket, floorBucket, r.walkCfg,
pageReq.PageSize, initialPageState, queryFn, r.scanMessagesUpTo(ctx),
)
if err != nil {
Expand Down Expand Up @@ -172,7 +172,7 @@ func (r *Repository) GetMessagesAfter(ctx context.Context, roomID string, after
}

res, err := fillPage[models.Message](
ctx, r.bucket, walkAsc, startBucket, ceilingBucket, r.maxBuckets,
ctx, r.bucket, walkAsc, startBucket, ceilingBucket, r.walkCfg,
pageReq.PageSize, initialPageState, queryFn, r.scanMessagesUpTo(ctx),
)
if err != nil {
Expand All @@ -196,7 +196,7 @@ func (r *Repository) GetAllMessagesAsc(ctx context.Context, roomID string, floor
}

res, err := fillPage[models.Message](
ctx, r.bucket, walkAsc, startBucket, ceilingBucket, r.maxBuckets,
ctx, r.bucket, walkAsc, startBucket, ceilingBucket, r.walkCfg,
pageReq.PageSize, initialPageState, queryFn, r.scanMessagesUpTo(ctx),
)
if err != nil {
Expand Down
45 changes: 34 additions & 11 deletions history-service/internal/cassrepo/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,45 @@ import (
// configuration shared by all queries against bucketed message tables, plus
// an optional at-rest Cipher for encrypted message bodies.
type Repository struct {
session *gocql.Session
bucket msgbucket.Sizer
maxBuckets int
cipher atrest.Cipher // nil when ATREST_ENABLED=false
session *gocql.Session
bucket msgbucket.Sizer
walkCfg walkConfig
cipher atrest.Cipher // nil when ATREST_ENABLED=false
}

// Option customizes a Repository at construction time.
type Option func(*Repository)

// WithReadConcurrency enables the parallel empty-bucket skip on paginated reads:
// after escalateAfter consecutive empty buckets, up to concurrency buckets are
// probed concurrently. Values <=1 / <=0 leave the walk strictly serial (the
// default), so this is opt-in and byte-compatible with the serial behavior.
func WithReadConcurrency(concurrency, escalateAfter int) Option {
return func(r *Repository) {
if concurrency > 1 {
r.walkCfg.concurrency = concurrency
}
if escalateAfter > 0 {
r.walkCfg.escalateAfter = escalateAfter
}
}
}

// NewRepository wires a session, bucket sizer, max-walk depth, and (optional)
// at-rest Cipher. maxBuckets caps how far a paginated read walks through empty
// buckets before returning a non-terminal cursor. cipher may be nil; when nil
// the read path treats encountered enc_payload rows as a configuration error
// and the write path uses legacy plaintext columns.
func NewRepository(session *gocql.Session, bucket msgbucket.Sizer, maxBuckets int, cipher atrest.Cipher) *Repository {
return &Repository{
session: session,
bucket: bucket,
maxBuckets: maxBuckets,
cipher: cipher,
// and the write path uses legacy plaintext columns. Reads default to a serial
// bucket walk; pass WithReadConcurrency to enable parallel empty-bucket skips.
func NewRepository(session *gocql.Session, bucket msgbucket.Sizer, maxBuckets int, cipher atrest.Cipher, opts ...Option) *Repository {
r := &Repository{
session: session,
bucket: bucket,
walkCfg: walkConfig{maxBuckets: maxBuckets, concurrency: 1, escalateAfter: 0},
cipher: cipher,
}
for _, opt := range opts {
opt(r)
}
return r
}
Loading
Loading