Skip to content

Commit 5e86af7

Browse files
committed
Implement cache for VV in MongoDB client (#1231)
Introduced an LRU cache in the MongoDB client to store version vectors per document, reducing redundant DB queries. The cache is initialized on client creation and used in UpdateAndFindMinSyncedVersionVector.
1 parent d420c7d commit 5e86af7

File tree

2 files changed

+71
-32
lines changed

2 files changed

+71
-32
lines changed

server/backend/database/database.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ var (
5858

5959
// ErrProjectNameAlreadyExists is returned when the project name already exists.
6060
ErrProjectNameAlreadyExists = errors.New("project name already exists")
61+
62+
// ErrVersionVectorNotFound is returned when the version vector could not be found.
63+
ErrVersionVectorNotFound = errors.New("version vector not found")
6164
)
6265

6366
// Database represents database which reads or saves Yorkie data.

server/backend/database/mongo/client.go

Lines changed: 68 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"strings"
2626
gotime "time"
2727

28+
lru "github.com/hashicorp/golang-lru/v2"
2829
"go.mongodb.org/mongo-driver/bson"
2930
"go.mongodb.org/mongo-driver/bson/primitive"
3031
"go.mongodb.org/mongo-driver/mongo"
@@ -33,6 +34,7 @@ import (
3334

3435
"github.com/yorkie-team/yorkie/api/converter"
3536
"github.com/yorkie-team/yorkie/api/types"
37+
"github.com/yorkie-team/yorkie/pkg/cmap"
3638
"github.com/yorkie-team/yorkie/pkg/document"
3739
"github.com/yorkie-team/yorkie/pkg/document/change"
3840
"github.com/yorkie-team/yorkie/pkg/document/key"
@@ -48,8 +50,9 @@ const (
4850

4951
// Client is a client that connects to Mongo DB and reads or saves Yorkie data.
5052
type Client struct {
51-
config *Config
52-
client *mongo.Client
53+
config *Config
54+
client *mongo.Client
55+
vvCache *lru.Cache[types.DocRefKey, *cmap.Map[types.ID, time.VersionVector]]
5356
}
5457

5558
// Dial creates an instance of Client and dials the given MongoDB.
@@ -79,11 +82,17 @@ func Dial(conf *Config) (*Client, error) {
7982
return nil, err
8083
}
8184

85+
cache, err := lru.New[types.DocRefKey, *cmap.Map[types.ID, time.VersionVector]](1000)
86+
if err != nil {
87+
return nil, fmt.Errorf("initialize VV cache: %w", err)
88+
}
89+
8290
logging.DefaultLogger().Infof("MongoDB connected, URI: %s, DB: %s", conf.ConnectionURI, conf.YorkieDatabase)
8391

8492
return &Client{
85-
config: conf,
86-
client: client,
93+
config: conf,
94+
client: client,
95+
vvCache: cache,
8796
}, nil
8897
}
8998

@@ -93,6 +102,8 @@ func (c *Client) Close() error {
93102
return fmt.Errorf("close mongo client: %w", err)
94103
}
95104

105+
c.vvCache.Purge()
106+
96107
return nil
97108
}
98109

@@ -1314,54 +1325,79 @@ func (c *Client) UpdateAndFindMinSyncedTicket(
13141325
), nil
13151326
}
13161327

1317-
// UpdateAndFindMinSyncedVersionVector returns min synced version vector
1328+
// UpdateAndFindMinSyncedVersionVector returns min synced version vector.
13181329
func (c *Client) UpdateAndFindMinSyncedVersionVector(
13191330
ctx context.Context,
13201331
clientInfo *database.ClientInfo,
13211332
docRefKey types.DocRefKey,
1322-
versionVector time.VersionVector,
1333+
vector time.VersionVector,
13231334
) (time.VersionVector, error) {
1324-
// TODO(JOOHOJANG): We have to consider removing detached client's lamport
1325-
// from min version vector.
1326-
var versionVectorInfos []database.VersionVectorInfo
1327-
1328-
// 01. Find all version vectors of the given document from DB.
1329-
cursor, err := c.collection(ColVersionVectors).Find(ctx, bson.M{
1330-
"project_id": docRefKey.ProjectID,
1331-
"doc_id": docRefKey.DocID,
1332-
})
1333-
if err != nil {
1334-
return nil, fmt.Errorf("find all version vectors: %w", err)
1335+
// 01. Update synced version vector of the given client and document.
1336+
if err := c.UpdateVersionVector(ctx, clientInfo, docRefKey, vector); err != nil {
1337+
return nil, err
13351338
}
13361339

1337-
if err := cursor.All(ctx, &versionVectorInfos); err != nil {
1338-
return nil, fmt.Errorf("decode version vectors: %w", err)
1340+
// 02. Update current client's version vector. If the client is detached, remove it.
1341+
// This is only for the current client and does not affect the version vector of other clients.
1342+
if vvMap, ok := c.vvCache.Get(docRefKey); ok {
1343+
attached, err := clientInfo.IsAttached(docRefKey.DocID)
1344+
if err != nil {
1345+
return nil, err
1346+
}
1347+
1348+
if attached {
1349+
vvMap.Upsert(clientInfo.ID, func(value time.VersionVector, exists bool) time.VersionVector {
1350+
return vector
1351+
})
1352+
} else {
1353+
// NOTE(hackerwins): Considering removing the detached client's lamport
1354+
// from the other clients' version vectors. For now, we just ignore it.
1355+
vvMap.Delete(clientInfo.ID, func(value time.VersionVector, exists bool) bool {
1356+
return exists
1357+
})
1358+
}
13391359
}
13401360

1341-
// 02. Compute min version vector.
1342-
minVersionVector := versionVector.DeepCopy()
1343-
for i, vvi := range versionVectorInfos {
1344-
if vvi.ClientID == clientInfo.ID {
1345-
continue
1361+
// 03. Calculate the minimum version vector of the given document.
1362+
if !c.vvCache.Contains(docRefKey) {
1363+
var infos []database.VersionVectorInfo
1364+
cursor, err := c.collection(ColVersionVectors).Find(ctx, bson.M{
1365+
"project_id": docRefKey.ProjectID,
1366+
"doc_id": docRefKey.DocID,
1367+
})
1368+
if err != nil {
1369+
return nil, fmt.Errorf("find all version vectors: %w", err)
1370+
}
1371+
if err := cursor.All(ctx, &infos); err != nil {
1372+
return nil, fmt.Errorf("decode version vectors: %w", err)
1373+
}
1374+
1375+
infoMap := cmap.New[types.ID, time.VersionVector]()
1376+
for i := range infos {
1377+
infoMap.Set(infos[i].ClientID, infos[i].VersionVector)
13461378
}
1347-
minVersionVector.Min(&versionVectorInfos[i].VersionVector)
1379+
1380+
c.vvCache.Add(docRefKey, infoMap)
1381+
}
1382+
vvMap, ok := c.vvCache.Get(docRefKey)
1383+
if !ok {
1384+
return nil, fmt.Errorf("version vectors from cache: %w", database.ErrVersionVectorNotFound)
13481385
}
13491386

1350-
// 03. Update current client's version vector. If the client is detached, remove it.
1351-
// This is only for the current client and does not affect the version vector of other clients.
1352-
if err = c.UpdateVersionVector(ctx, clientInfo, docRefKey, versionVector); err != nil {
1353-
return nil, err
1387+
minVector := vector.DeepCopy()
1388+
for _, vv := range vvMap.Values() {
1389+
minVector.Min(&vv)
13541390
}
13551391

1356-
return minVersionVector, nil
1392+
return minVector, nil
13571393
}
13581394

13591395
// UpdateVersionVector updates the given version vector of the given client
13601396
func (c *Client) UpdateVersionVector(
13611397
ctx context.Context,
13621398
clientInfo *database.ClientInfo,
13631399
docRefKey types.DocRefKey,
1364-
versionVector time.VersionVector,
1400+
vector time.VersionVector,
13651401
) error {
13661402
isAttached, err := clientInfo.IsAttached(docRefKey.DocID)
13671403
if err != nil {
@@ -1385,7 +1421,7 @@ func (c *Client) UpdateVersionVector(
13851421
"client_id": clientInfo.ID,
13861422
}, bson.M{
13871423
"$set": bson.M{
1388-
"version_vector": versionVector,
1424+
"version_vector": vector,
13891425
},
13901426
}, options.Update().SetUpsert(true))
13911427
if err != nil {

0 commit comments

Comments
 (0)