Skip to content

Commit 752f1d0

Browse files
Add ConfigDB Journal
Signed-off-by: Niranjani Vivek <niranjaniv@google.com>
1 parent 488ae59 commit 752f1d0

File tree

6 files changed

+606
-13
lines changed

6 files changed

+606
-13
lines changed

gnmi_server/db_journal.go

Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
package gnmi
2+
3+
import (
4+
"compress/gzip"
5+
"context"
6+
"errors"
7+
"fmt"
8+
"io"
9+
"os"
10+
"path/filepath"
11+
"strings"
12+
"time"
13+
14+
log "github.com/golang/glog"
15+
"github.com/redis/go-redis/v9"
16+
17+
"github.com/Azure/sonic-mgmt-common/translib/db"
18+
sdcfg "github.com/sonic-net/sonic-gnmi/sonic_db_config"
19+
)
20+
21+
const (
22+
maxFileSize = 2000000 // Bytes
23+
maxBackups = 1
24+
)
25+
26+
type DbJournal struct {
27+
database string
28+
rc *redis.Client
29+
ps *redis.PubSub
30+
notifications <-chan *redis.Message
31+
cache map[string]map[string]string
32+
file *os.File
33+
fileName string
34+
done chan bool
35+
}
36+
37+
var dbNums = map[string]db.DBNum{
38+
"CONFIG_DB": db.ConfigDB,
39+
"STATE_DB": db.StateDB,
40+
}
41+
42+
// NewDbJournal returns a new DbJournal for the specified database.
43+
func NewDbJournal(database string) (*DbJournal, error) {
44+
var err error
45+
journal := &DbJournal{}
46+
journal.database = database
47+
dbNum, ok := dbNums[journal.database]
48+
if !ok {
49+
return nil, errors.New("Invalid database passed into NewDbJournal")
50+
}
51+
52+
ns, _ := sdcfg.GetDbDefaultNamespace()
53+
addr, _ := sdcfg.GetDbTcpAddr(journal.database, ns)
54+
dbId, _ := sdcfg.GetDbId(journal.database, ns)
55+
journal.rc = db.TransactionalRedisClientWithOpts(&redis.Options{
56+
Network: "tcp",
57+
Addr: addr,
58+
Password: "",
59+
DB: dbId,
60+
DialTimeout: 0,
61+
})
62+
63+
if err = journal.init(); err != nil {
64+
return nil, err
65+
}
66+
67+
keyspace := fmt.Sprintf("__keyspace@%d__:*", dbNum)
68+
keyevent := fmt.Sprintf("__keyevent@%d__:*", dbNum)
69+
journal.ps = journal.rc.PSubscribe(context.Background(), keyspace, keyevent)
70+
if _, err = journal.ps.Receive(context.Background()); err != nil {
71+
return nil, err
72+
}
73+
74+
journal.notifications = journal.ps.Channel()
75+
76+
journal.fileName = filepath.Join(HostVarLogPath, strings.ToLower(journal.database)+".txt")
77+
if journal.file, err = os.OpenFile(journal.fileName, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644); err != nil {
78+
return nil, err
79+
}
80+
81+
journal.done = make(chan bool, 1)
82+
83+
go journal.journal()
84+
log.V(2).Infof("Successfully started the DbJournal for %v", journal.database)
85+
return journal, nil
86+
}
87+
88+
// Close closes the redis objects and the journal file.
89+
func (dbj *DbJournal) Close() {
90+
if dbj == nil {
91+
return
92+
}
93+
dbj.done <- true
94+
}
95+
96+
func (dbj *DbJournal) cleanup() {
97+
if dbj == nil {
98+
return
99+
}
100+
if dbj.ps != nil {
101+
dbj.ps.Close()
102+
}
103+
if dbj.rc != nil {
104+
db.CloseRedisClient(dbj.rc)
105+
dbj.rc = nil
106+
}
107+
if dbj.file != nil {
108+
dbj.file.Close()
109+
}
110+
if dbj.cache != nil {
111+
dbj.cache = map[string]map[string]string{}
112+
}
113+
log.V(2).Infof("DbJournal closed successfully!")
114+
}
115+
116+
// init initializes the journal's cache.
117+
func (dbj *DbJournal) init() error {
118+
if dbj == nil || dbj.rc == nil {
119+
return errors.New("DbJournal: redis client is nil")
120+
}
121+
dbj.cache = map[string]map[string]string{}
122+
keys, kErr := dbj.rc.Keys(context.Background(), "*").Result()
123+
if kErr != nil {
124+
return kErr
125+
}
126+
for _, key := range keys {
127+
entry, eErr := dbj.rc.HGetAll(context.Background(), key).Result()
128+
if eErr != nil {
129+
entry = map[string]string{}
130+
}
131+
dbj.cache[key] = entry
132+
}
133+
return nil
134+
}
135+
136+
// journal monitors the database notifications and logs events to the file.
137+
func (dbj *DbJournal) journal() {
138+
if dbj == nil {
139+
return
140+
}
141+
defer dbj.cleanup()
142+
var event []string
143+
for {
144+
select {
145+
case msg := <-dbj.notifications:
146+
event = append(event, msg.Payload)
147+
if len(event) != 2 {
148+
continue
149+
}
150+
op := event[0]
151+
table := event[1]
152+
entry := fmt.Sprintf("%v: %v %v", time.Now().Format("2006-01-02.15:04:05.000000"), op, table)
153+
diff, dErr := dbj.updateCache(event)
154+
if dErr != nil {
155+
log.V(0).Infof("Shutting down %v Journal: %v", dbj.database, dErr)
156+
return
157+
}
158+
event = []string{}
159+
160+
if diff != "" {
161+
entry += " " + diff
162+
}
163+
// If no fields were changed or the operation is a set on a table that contains the DB name, don't log the event.
164+
if (diff == "" && (op == "hset" || op == "hdel")) || (op == "set" && strings.Contains(table, dbj.database)) {
165+
continue
166+
}
167+
168+
if err := dbj.rotateFile(); err != nil {
169+
log.V(0).Infof("Shutting down DbJournal, failed to manage file rotation: %v", err)
170+
return
171+
}
172+
_, writeErr := dbj.file.Write([]byte(entry + "\n"))
173+
if writeErr != nil {
174+
log.V(0).Infof("Failed to write to DbJournal file: %v", writeErr)
175+
}
176+
case <-dbj.done:
177+
return
178+
}
179+
}
180+
}
181+
182+
// updateCache updates the cache with the latest database entry and returns the diff.
183+
func (dbj *DbJournal) updateCache(event []string) (string, error) {
184+
op := event[0]
185+
table := event[1]
186+
if dbj == nil || dbj.cache == nil || dbj.rc == nil {
187+
return "", errors.New("nil members present in DbJournal")
188+
}
189+
oldEntry, ok := dbj.cache[table]
190+
if !ok {
191+
oldEntry = map[string]string{}
192+
}
193+
newEntry, err := dbj.rc.HGetAll(context.Background(), table).Result()
194+
if err != nil {
195+
newEntry = map[string]string{}
196+
}
197+
// Update the cache
198+
dbj.cache[table] = newEntry
199+
200+
if op == "del" {
201+
return "", nil
202+
}
203+
204+
diff := ""
205+
// Find deleted and changed fields
206+
for k, v := range oldEntry {
207+
newVal, ok := newEntry[k]
208+
if !ok {
209+
diff += "-" + k + " "
210+
continue
211+
}
212+
if newVal != v {
213+
diff += k + "=" + newVal + " "
214+
}
215+
}
216+
217+
// Find added fields
218+
for k, v := range newEntry {
219+
if _, ok := oldEntry[k]; !ok {
220+
diff += "+" + k + ":" + v + " "
221+
}
222+
}
223+
224+
return diff, nil
225+
}
226+
227+
// rotateFile makes sure the journal file is opened correctly and rotates it
228+
// if it exceeds the maximum size.
229+
func (dbj *DbJournal) rotateFile() error {
230+
if dbj == nil {
231+
return errors.New("Couldn't rotate file, DbJournal is nil")
232+
}
233+
fileStat, err := os.Stat(dbj.fileName)
234+
if err != nil || dbj.file == nil {
235+
// File does not exist or it is closed, create/open it
236+
if dbj.file, err = os.OpenFile(dbj.fileName, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644); err != nil {
237+
return err
238+
}
239+
return nil
240+
}
241+
242+
if fileStat.Size() >= maxFileSize {
243+
// Close the journal file and open it as read-only to copy it
244+
dbj.file.Close()
245+
if dbj.file, err = os.OpenFile(dbj.fileName, os.O_RDONLY, 0644); err != nil {
246+
return err
247+
}
248+
249+
// Remove a rotated, zipped file if the maxBackups limit is reached
250+
files, err := os.ReadDir(HostVarLogPath)
251+
if err != nil {
252+
return err
253+
}
254+
var count uint
255+
var oldest string
256+
for _, file := range files {
257+
if strings.HasPrefix(file.Name(), strings.ToLower(dbj.database)) && strings.HasSuffix(file.Name(), ".gz") {
258+
count++
259+
if strings.Compare(file.Name(), oldest) == -1 || oldest == "" {
260+
oldest = file.Name()
261+
}
262+
}
263+
}
264+
if count >= maxBackups {
265+
if err := os.Remove(filepath.Join(HostVarLogPath, oldest)); err != nil {
266+
return err
267+
}
268+
}
269+
270+
// Compress the file
271+
zipName := filepath.Join(HostVarLogPath, strings.ToLower(dbj.database)+"_"+time.Now().Format("20060102150405")+".gz")
272+
zipFile, err := os.Create(zipName)
273+
if err != nil {
274+
return err
275+
}
276+
defer zipFile.Close()
277+
zipWriter := gzip.NewWriter(zipFile)
278+
defer zipWriter.Close()
279+
280+
if _, err = io.Copy(zipWriter, dbj.file); err != nil {
281+
return err
282+
}
283+
if err = zipWriter.Flush(); err != nil {
284+
return err
285+
}
286+
287+
// Recreate the journal file
288+
if dbj.file, err = os.Create(dbj.fileName); err != nil {
289+
return err
290+
}
291+
}
292+
return nil
293+
}

0 commit comments

Comments
 (0)