@@ -51,11 +51,12 @@ type Client struct {
5151 config * Config
5252 client * mongo.Client
5353
54- cacheManager * cache.Manager
55- clientCache * cache.LRUWithStats [types.ClientRefKey , * database.ClientInfo ]
56- docCache * cache.LRUWithStats [types.DocRefKey , * database.DocInfo ]
57- changeCache * cache.LRUWithStats [types.DocRefKey , * ChangeStore ]
58- vectorCache * cache.LRUWithStats [types.DocRefKey , * cmap.Map [types.ID , time.VersionVector ]]
54+ cacheManager * cache.Manager
55+ clientCache * cache.LRUWithStats [types.ClientRefKey , * database.ClientInfo ]
56+ docCache * cache.LRUWithStats [types.DocRefKey , * database.DocInfo ]
57+ changeCache * cache.LRUWithStats [types.DocRefKey , * ChangeStore ]
58+ presenceCache * cache.LRUWithStats [types.DocRefKey , * ChangeStore ]
59+ vectorCache * cache.LRUWithStats [types.DocRefKey , * cmap.Map [types.ID , time.VersionVector ]]
5960}
6061
6162// Dial creates an instance of Client and dials the given MongoDB.
@@ -119,6 +120,12 @@ func Dial(conf *Config) (*Client, error) {
119120 }
120121 cacheManager .RegisterCache (changeCache )
121122
123+ presenceCache , err := cache .NewLRUWithStats [types.DocRefKey , * ChangeStore ](conf .ChangeCacheSize , "presences" )
124+ if err != nil {
125+ return nil , fmt .Errorf ("initialize presence cache: %w" , err )
126+ }
127+ cacheManager .RegisterCache (presenceCache )
128+
122129 vectorCache , err := cache .NewLRUWithStats [types.DocRefKey , * cmap.Map [types.ID , time.VersionVector ]](
123130 conf .VectorCacheSize , "vectors" ,
124131 )
@@ -135,10 +142,11 @@ func Dial(conf *Config) (*Client, error) {
135142
136143 cacheManager : cacheManager ,
137144
138- clientCache : clientCache ,
139- docCache : docCache ,
140- changeCache : changeCache ,
141- vectorCache : vectorCache ,
145+ clientCache : clientCache ,
146+ docCache : docCache ,
147+ changeCache : changeCache ,
148+ presenceCache : presenceCache ,
149+ vectorCache : vectorCache ,
142150 }
143151
144152 if conf .CacheStatsEnabled {
@@ -159,6 +167,7 @@ func (c *Client) Close() error {
159167 c .clientCache .Purge ()
160168 c .docCache .Purge ()
161169 c .changeCache .Purge ()
170+ c .presenceCache .Purge ()
162171 c .vectorCache .Purge ()
163172
164173 return nil
@@ -1020,6 +1029,13 @@ func (c *Client) UpdateClientInfoAfterPushPull(
10201029 return err
10211030 }
10221031
1032+ // NOTE(hackerwins): Clear presence changes of the given client on the
1033+ // document if the client is no longer attached to the document.
1034+ docKey := types.DocRefKey {ProjectID : info .ProjectID , DocID : docInfo .ID }
1035+ if pCache , ok := c .presenceCache .Get (docKey ); ok && ! attached {
1036+ pCache .RemoveChangesByActor (info .ID )
1037+ }
1038+
10231039 var updater bson.M
10241040 if attached {
10251041 updater = bson.M {
@@ -1467,25 +1483,31 @@ func (c *Client) CreateChangeInfos(
14671483 return docInfo , checkpoint , nil
14681484 }
14691485
1470- // 02. Optimized batch processing
14711486 initialServerSeq := docInfo .ServerSeq
14721487 now := gotime .Now ()
14731488
1474- // Pre-allocate models slice to avoid dynamic allocations
1475- models := make ([]mongo.WriteModel , 0 , len (changes ))
1489+ // 02. Store the changes.
1490+ // - presence-only changes are stored in presenceCache.
1491+ // - regular changes are stored in DB.
14761492 hasOperations := false
1477-
1493+ var prChanges []* database.ChangeInfo
1494+ var opChanges []mongo.WriteModel
14781495 for _ , cn := range changes {
14791496 serverSeq := docInfo .IncreaseServerSeq ()
14801497 checkpoint = checkpoint .NextServerSeq (serverSeq )
14811498 cn .ServerSeq = serverSeq
14821499 checkpoint = checkpoint .SyncClientSeq (cn .ClientSeq )
14831500
1501+ if cn .PresenceOnly () {
1502+ prChanges = append (prChanges , cn )
1503+ continue
1504+ }
1505+
14841506 if len (cn .Operations ) > 0 {
14851507 hasOperations = true
14861508 }
14871509
1488- models = append (models , mongo .NewUpdateOneModel ().SetFilter (bson.M {
1510+ opChanges = append (opChanges , mongo .NewUpdateOneModel ().SetFilter (bson.M {
14891511 "project_id" : refKey .ProjectID ,
14901512 "doc_id" : refKey .DocID ,
14911513 "server_seq" : cn .ServerSeq ,
@@ -1500,10 +1522,22 @@ func (c *Client) CreateChangeInfos(
15001522 }}).SetUpsert (true ))
15011523 }
15021524
1503- if len (changes ) > 0 {
1525+ if len (prChanges ) > 0 {
1526+ var store * ChangeStore
1527+ if cached , ok := c .presenceCache .Get (refKey ); ok {
1528+ store = cached
1529+ } else {
1530+ store = NewChangeStore ()
1531+ c .presenceCache .Add (refKey , store )
1532+ }
1533+
1534+ store .ReplaceOrInsert (prChanges )
1535+ }
1536+
1537+ if len (opChanges ) > 0 {
15041538 if _ , err := c .collection (ColChanges ).BulkWrite (
15051539 ctx ,
1506- models ,
1540+ opChanges ,
15071541 options .BulkWrite ().SetOrdered (false ),
15081542 ); err != nil {
15091543 return nil , change .InitialCheckpoint , fmt .Errorf ("create changes of %s: %w" , refKey , err )
@@ -1679,17 +1713,25 @@ func (c *Client) FindChangeInfosBetweenServerSeqs(
16791713 return nil , nil
16801714 }
16811715
1682- // Get or create a change range store for this document
1683- var store * ChangeStore
1716+ // 01. Create a temporary change store to hold the changes.
1717+ store := NewChangeStore ()
1718+
1719+ // 02. Fill the store with presence only changes.
1720+ if prStore , ok := c .presenceCache .Get (docRefKey ); ok {
1721+ store .ReplaceOrInsert (prStore .ChangesInRange (from , to ))
1722+ }
1723+
1724+ // 03. Fill the store with regular changes.
1725+ var opStore * ChangeStore
16841726 if cached , ok := c .changeCache .Get (docRefKey ); ok {
1685- store = cached
1727+ opStore = cached
16861728 } else {
1687- store = NewChangeStore ()
1688- c .changeCache .Add (docRefKey , store )
1729+ opStore = NewChangeStore ()
1730+ c .changeCache .Add (docRefKey , opStore )
16891731 }
16901732
16911733 // Calculate missing ranges and fetch them in a single operation
1692- if err := store .EnsureChanges (from , to , func (from , to int64 ) ([]* database.ChangeInfo , error ) {
1734+ if err := opStore .EnsureChanges (from , to , func (from , to int64 ) ([]* database.ChangeInfo , error ) {
16931735 // NOTE(hackerwins): Paginate fetching to avoid loading too many ChangeInfos at once.
16941736 const chunkSize int64 = 1000
16951737 var infos []* database.ChangeInfo
@@ -1724,6 +1766,8 @@ func (c *Client) FindChangeInfosBetweenServerSeqs(
17241766 return nil , err
17251767 }
17261768
1769+ store .ReplaceOrInsert (opStore .ChangesInRange (from , to ))
1770+
17271771 return store .ChangesInRange (from , to ), nil
17281772}
17291773
@@ -2211,6 +2255,9 @@ func (c *Client) purgeDocumentInternals(
22112255 counts := make (map [string ]int64 )
22122256
22132257 c .changeCache .Remove (types.DocRefKey {ProjectID : projectID , DocID : docID })
2258+ c .presenceCache .Remove (types.DocRefKey {ProjectID : projectID , DocID : docID })
2259+ c .vectorCache .Remove (types.DocRefKey {ProjectID : projectID , DocID : docID })
2260+
22142261 res , err := c .collection (ColChanges ).DeleteMany (ctx , bson.M {
22152262 "project_id" : projectID ,
22162263 "doc_id" : docID ,
0 commit comments