Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions store/sqlstore/lidmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type CachedLIDMap struct {

var _ store.LIDStore = (*CachedLIDMap)(nil)

var putLIDMappingsMassInsertBuilder = dbutil.NewMassInsertBuilder[store.LIDMapping, [0]any](
putLIDMappingQuery, "($%d, $%d)",
)

func NewCachedLIDMap(db *dbutil.Database) *CachedLIDMap {
return &CachedLIDMap{
db: db,
Expand All @@ -45,6 +49,7 @@ func NewCachedLIDMap(db *dbutil.Database) *CachedLIDMap {
}

const (
lidMappingBatchSize = 500
deleteExistingLIDMappingQuery = `DELETE FROM whatsmeow_lid_map WHERE (lid<>$1 AND pn=$2)`
putLIDMappingQuery = `
INSERT INTO whatsmeow_lid_map (lid, pn)
Expand Down Expand Up @@ -237,11 +242,20 @@ func (s *CachedLIDMap) PutManyLIDMappings(ctx context.Context, mappings []store.
}
return s.db.DoTxn(ctx, nil, func(ctx context.Context) error {
for _, mapping := range mappings {
err := s.unlockedPutLIDMapping(ctx, mapping.LID, mapping.PN)
if err != nil {
if _, err := s.db.Exec(ctx, deleteExistingLIDMappingQuery, mapping.LID.User, mapping.PN.User); err != nil {
return err
}
}
for chunk := range slices.Chunk(mappings, lidMappingBatchSize) {
query, params := putLIDMappingsMassInsertBuilder.Build([0]any{}, chunk)
if _, err := s.db.Exec(ctx, query, params...); err != nil {
return err
}
}
for _, mapping := range mappings {
s.pnToLIDCache[mapping.PN.User] = mapping.LID.User
s.lidToPNCache[mapping.LID.User] = mapping.PN.User
}
return nil
})
}
Expand Down
37 changes: 30 additions & 7 deletions store/sqlstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ type addressSessionTuple struct {
Session []byte
}

func (t addressSessionTuple) GetMassInsertValues() [2]any {
return [2]any{t.Address, t.Session}
}

var sessionScanner = dbutil.ConvertRowFn[addressSessionTuple](func(row dbutil.Scannable) (out addressSessionTuple, err error) {
err = row.Scan(&out.Address, &out.Session)
return
Expand Down Expand Up @@ -204,11 +208,20 @@ func (s *SQLStore) GetManySessions(ctx context.Context, addresses []string) (map
return result, nil
}

const sessionsBatchSize = 500

func (s *SQLStore) PutManySessions(ctx context.Context, sessions map[string][]byte) error {
if len(sessions) == 0 {
return nil
}
rows := make([]addressSessionTuple, 0, len(sessions))
for addr, sess := range sessions {
rows = append(rows, addressSessionTuple{addr, sess})
}
return s.db.DoTxn(ctx, nil, func(ctx context.Context) error {
for addr, sess := range sessions {
err := s.PutSession(ctx, addr, sess)
if err != nil {
for chunk := range slices.Chunk(rows, sessionsBatchSize) {
query, params := putSessionsMassInsertBuilder.Build([1]any{s.JID}, chunk)
if _, err := s.db.Exec(ctx, query, params...); err != nil {
return err
}
}
Expand Down Expand Up @@ -631,6 +644,14 @@ var putRedactedPhonesMassInsertBuilder = dbutil.NewMassInsertBuilder[store.Redac
putRedactedPhoneQuery, "($1, $%d, $%d)",
)

var putSessionsMassInsertBuilder = dbutil.NewMassInsertBuilder[addressSessionTuple, [1]any](
putSessionQuery, "($1, $%d, $%d)",
)

var putMsgSecretsMassInsertBuilder = dbutil.NewMassInsertBuilder[store.MessageSecretInsert, [1]any](
putMsgSecret, "($1, $%d, $%d, $%d, $%d)",
)

func (s *SQLStore) PutPushName(ctx context.Context, user types.JID, pushName string) (bool, string, error) {
s.contactCacheLock.Lock()
defer s.contactCacheLock.Unlock()
Expand Down Expand Up @@ -899,14 +920,16 @@ const (
`
)

func (s *SQLStore) PutMessageSecrets(ctx context.Context, inserts []store.MessageSecretInsert) (err error) {
const msgSecretsBatchSize = 500

func (s *SQLStore) PutMessageSecrets(ctx context.Context, inserts []store.MessageSecretInsert) error {
if len(inserts) == 0 {
return nil
}
return s.db.DoTxn(ctx, nil, func(ctx context.Context) error {
for _, insert := range inserts {
_, err = s.db.Exec(ctx, putMsgSecret, s.JID, insert.Chat.ToNonAD(), insert.Sender.ToNonAD(), insert.ID, insert.Secret)
if err != nil {
for chunk := range slices.Chunk(inserts, msgSecretsBatchSize) {
query, params := putMsgSecretsMassInsertBuilder.Build([1]any{s.JID}, chunk)
if _, err := s.db.Exec(ctx, query, params...); err != nil {
return err
}
}
Expand Down
4 changes: 4 additions & 0 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ type MessageSecretInsert struct {
Secret []byte
}

func (m MessageSecretInsert) GetMassInsertValues() [4]any {
return [4]any{m.Chat.ToNonAD(), m.Sender.ToNonAD(), m.ID, m.Secret}
}

type MsgSecretStore interface {
PutMessageSecrets(ctx context.Context, inserts []MessageSecretInsert) error
PutMessageSecret(ctx context.Context, chat, sender types.JID, id types.MessageID, secret []byte) error
Expand Down