-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathservice.go
More file actions
111 lines (96 loc) · 5.33 KB
/
service.go
File metadata and controls
111 lines (96 loc) · 5.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package service
import (
"context"
"time"
"github.com/hmchangw/chat/history-service/internal/cassrepo"
"github.com/hmchangw/chat/history-service/internal/models"
"github.com/hmchangw/chat/history-service/internal/mongorepo"
pkgmodel "github.com/hmchangw/chat/pkg/model"
"github.com/hmchangw/chat/pkg/mongoutil"
"github.com/hmchangw/chat/pkg/natsrouter"
"github.com/hmchangw/chat/pkg/roomkeystore"
"github.com/hmchangw/chat/pkg/subject"
)
//go:generate mockgen -destination=mocks/mock_repository.go -package=mocks . MessageReader,MessageWriter,MessageRepository,SubscriptionRepository,RoomRepository,EventPublisher,ThreadRoomRepository,RoomKeyProvider
type MessageReader interface {
GetMessagesBefore(ctx context.Context, roomID string, before time.Time, pageReq cassrepo.PageRequest) (cassrepo.Page[models.Message], error)
GetMessagesBetweenDesc(ctx context.Context, roomID string, since, before time.Time, pageReq cassrepo.PageRequest) (cassrepo.Page[models.Message], error)
GetMessagesAfter(ctx context.Context, roomID string, after time.Time, pageReq cassrepo.PageRequest) (cassrepo.Page[models.Message], error)
GetAllMessagesAsc(ctx context.Context, roomID string, pageReq cassrepo.PageRequest) (cassrepo.Page[models.Message], error)
GetMessageByID(ctx context.Context, messageID string) (*models.Message, error)
GetThreadMessages(ctx context.Context, roomID, threadRoomID string, pageReq cassrepo.PageRequest) (cassrepo.Page[models.Message], error)
GetMessagesByIDs(ctx context.Context, messageIDs []string) ([]models.Message, error)
}
type MessageWriter interface {
UpdateMessageContent(ctx context.Context, msg *models.Message, newMsg string, editedAt time.Time) error
// SoftDeleteMessage performs a Cassandra LWT on messages_by_id and only
// runs the mirror-table and parent-tcount work when the LWT applies.
// Returns the updated_at value now persisted (the deletedAt argument when
// applied; the existing value when a concurrent delete won the race).
SoftDeleteMessage(ctx context.Context, msg *models.Message, deletedAt time.Time) (actualDeletedAt time.Time, applied bool, err error)
}
// MessageRepository composes read and write access; satisfied by *cassrepo.Repository.
type MessageRepository interface {
MessageReader
MessageWriter
}
type SubscriptionRepository interface {
GetHistorySharedSince(ctx context.Context, account, roomID string) (*time.Time, bool, error)
}
type RoomRepository interface {
GetMinUserLastSeenAt(ctx context.Context, roomID string) (*time.Time, error)
}
// EventPublisher publishes live events to a NATS subject. Implemented by a
// thin wrapper around *otelnats.Conn in main.go.
type EventPublisher interface {
Publish(ctx context.Context, subject string, data []byte) error
}
type ThreadRoomRepository interface {
GetThreadRooms(ctx context.Context, roomID string, accessSince *time.Time, req mongoutil.OffsetPageRequest) (mongoutil.OffsetPage[pkgmodel.ThreadRoom], error)
GetFollowingThreadRooms(ctx context.Context, roomID, account string, accessSince *time.Time, req mongoutil.OffsetPageRequest) (mongoutil.OffsetPage[pkgmodel.ThreadRoom], error)
GetUnreadThreadRooms(ctx context.Context, roomID, account string, accessSince *time.Time, req mongoutil.OffsetPageRequest) (mongoutil.OffsetPage[pkgmodel.ThreadRoom], error)
}
// RoomKeyProvider fetches the current encryption key for a room.
// Defined here (not imported from pkg/roomkeystore directly) to keep the
// dependency contract narrow — only Get is used by history-service.
type RoomKeyProvider interface {
Get(ctx context.Context, roomID string) (*roomkeystore.VersionedKeyPair, error)
}
// HistoryService handles message history queries and mutations. Transport-agnostic.
type HistoryService struct {
msgReader MessageReader
msgWriter MessageWriter
subscriptions SubscriptionRepository
rooms RoomRepository
publisher EventPublisher
threadRooms ThreadRoomRepository
keyProvider RoomKeyProvider
encrypt bool
}
// New creates a HistoryService with the given repositories and event publisher.
func New(msgs MessageRepository, subs SubscriptionRepository, rooms RoomRepository, pub EventPublisher, threadRooms ThreadRoomRepository, keyProvider RoomKeyProvider, encrypt bool) *HistoryService {
return &HistoryService{
msgReader: msgs,
msgWriter: msgs,
subscriptions: subs,
rooms: rooms,
publisher: pub,
threadRooms: threadRooms,
keyProvider: keyProvider,
encrypt: encrypt,
}
}
// RegisterHandlers wires all NATS endpoints. Panics on subscription failure (fatal at startup).
func (s *HistoryService) RegisterHandlers(r *natsrouter.Router, siteID string) {
natsrouter.Register(r, subject.MsgHistoryPattern(siteID), s.LoadHistory)
natsrouter.Register(r, subject.MsgNextPattern(siteID), s.LoadNextMessages)
natsrouter.Register(r, subject.MsgSurroundingPattern(siteID), s.LoadSurroundingMessages)
natsrouter.Register(r, subject.MsgGetPattern(siteID), s.GetMessageByID)
natsrouter.Register(r, subject.MsgEditPattern(siteID), s.EditMessage)
natsrouter.Register(r, subject.MsgDeletePattern(siteID), s.DeleteMessage)
natsrouter.Register(r, subject.MsgThreadPattern(siteID), s.GetThreadMessages)
natsrouter.Register(r, subject.MsgThreadParentPattern(siteID), s.GetThreadParentMessages)
}
// Compile-time checks.
var _ MessageRepository = (*cassrepo.Repository)(nil)
var _ RoomRepository = (*mongorepo.RoomRepo)(nil)