Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions internal/topo/node/cache/sync_cache.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2023 EMQ Technologies Co., Ltd.
// Copyright 2022-2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -114,8 +114,8 @@ type SyncCache struct {
maxDiskPage int
maxMemPage int
// cache storage
memCache []*page
diskBufferPage *page
memCache []*page // read pages
diskBufferPage *page // write page, only one in the tail
// status
diskSize int // the count of pages has been saved
CacheLength int // readonly, for metrics only to save calculation
Expand Down Expand Up @@ -232,8 +232,14 @@ func (c *SyncCache) send(ctx api.StreamContext) {
// addCache not thread safe!
func (c *SyncCache) addCache(ctx api.StreamContext, item []map[string]interface{}) {
metrics.SyncCacheOpCnter.WithLabelValues(addLbl, c.ruleID, c.opID).Inc()
isNotFull := c.appendMemCache(item)
if !isNotFull {
saveToDisk := c.diskSize > 0 || c.diskBufferPage != nil
if !saveToDisk {
isNotFull := c.appendMemCache(item)
if !isNotFull {
saveToDisk = true
}
}
if saveToDisk {
if c.diskBufferPage == nil {
c.diskBufferPage = newPage(c.cacheConf.BufferPageSize)
}
Expand All @@ -242,7 +248,7 @@ func (c *SyncCache) addCache(ctx api.StreamContext, item []map[string]interface{
if c.diskSize == c.maxDiskPage {
// disk full, read the oldest page to the hot page
c.loadFromDisk(ctx)
ctx.GetLogger().Debug("disk full, remove the last page")
ctx.GetLogger().Info("disk full, remove the last page")
}
start := time.Now()
defer func() {
Expand Down
Loading