Skip to content

Commit 4a7c62f

Browse files
Add ConfigDB Journal
1 parent 3b31e4b commit 4a7c62f

File tree

6 files changed

+564
-13
lines changed

6 files changed

+564
-13
lines changed

gnmi_server/db_journal.go

Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
package gnmi
2+
3+
import (
4+
"compress/gzip"
5+
"errors"
6+
"fmt"
7+
"github.com/Azure/sonic-mgmt-common/translib/db"
8+
"github.com/go-redis/redis/v7"
9+
log "github.com/golang/glog"
10+
sdcfg "github.com/sonic-net/sonic-gnmi/sonic_db_config"
11+
"io"
12+
"os"
13+
"path/filepath"
14+
"strings"
15+
"time"
16+
)
17+
18+
const (
19+
maxFileSize = 2000000 // Bytes
20+
maxBackups = 1
21+
)
22+
23+
type DbJournal struct {
24+
database string
25+
rc *redis.Client
26+
ps *redis.PubSub
27+
notifications <-chan *redis.Message
28+
cache map[string]map[string]string
29+
file *os.File
30+
fileName string
31+
done chan bool
32+
}
33+
34+
var dbNums = map[string]db.DBNum{
35+
"CONFIG_DB": db.ConfigDB,
36+
"STATE_DB": db.StateDB,
37+
}
38+
39+
// NewDbJournal returns a new DbJournal for the specified database.
40+
// database must be in the form of "CONFIG_DB", "APP_STATE_DB", etc.
41+
func NewDbJournal(database string) (*DbJournal, error) {
42+
var err error
43+
journal := &DbJournal{}
44+
journal.database = database
45+
dbNum, ok := dbNums[journal.database]
46+
if !ok {
47+
return nil, errors.New("Invalid database passed into NewDbJournal")
48+
}
49+
ns, _ := sdcfg.GetDbDefaultNamespace()
50+
addr, _ := sdcfg.GetDbTcpAddr(journal.database, ns)
51+
dbId, _ := sdcfg.GetDbId(journal.database, ns)
52+
journal.rc = redis.NewClient(&redis.Options{
53+
Network: "tcp",
54+
Addr: addr,
55+
Password: "",
56+
DB: dbId,
57+
DialTimeout: 0,
58+
})
59+
if err = journal.init(); err != nil {
60+
return nil, err
61+
}
62+
keyspace := fmt.Sprintf("__keyspace@%d__:*", dbNum)
63+
keyevent := fmt.Sprintf("__keyevent@%d__:*", dbNum)
64+
journal.ps = journal.rc.PSubscribe(keyspace, keyevent)
65+
if _, err = journal.ps.Receive(); err != nil {
66+
return nil, err
67+
}
68+
journal.notifications = journal.ps.Channel()
69+
journal.fileName = filepath.Join(HostVarLogPath, strings.ToLower(journal.database)+".txt")
70+
if journal.file, err = os.OpenFile(journal.fileName, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644); err != nil {
71+
return nil, err
72+
}
73+
journal.done = make(chan bool, 1)
74+
go journal.journal()
75+
log.V(2).Infof("Successfully started the DbJournal for %v", journal.database)
76+
return journal, nil
77+
}
78+
79+
// Close closes the redis objects and the journal file.
80+
func (dbj *DbJournal) Close() {
81+
if dbj == nil {
82+
return
83+
}
84+
dbj.done <- true
85+
}
86+
func (dbj *DbJournal) cleanup() {
87+
if dbj == nil {
88+
return
89+
}
90+
if dbj.ps != nil {
91+
dbj.ps.Close()
92+
}
93+
if dbj.rc != nil {
94+
dbj.rc.Close()
95+
dbj.rc = nil
96+
}
97+
if dbj.file != nil {
98+
dbj.file.Close()
99+
}
100+
if dbj.cache != nil {
101+
dbj.cache = map[string]map[string]string{}
102+
}
103+
log.V(2).Infof("DbJournal closed successfully!")
104+
}
105+
106+
// init initializes the journal's cache.
107+
func (dbj *DbJournal) init() error {
108+
if dbj == nil || dbj.rc == nil {
109+
return errors.New("DbJournal: redis client is nil")
110+
}
111+
dbj.cache = map[string]map[string]string{}
112+
keys, kErr := dbj.rc.Keys("*").Result()
113+
if kErr != nil {
114+
return kErr
115+
}
116+
for _, key := range keys {
117+
entry, eErr := dbj.rc.HGetAll(key).Result()
118+
if eErr != nil {
119+
entry = map[string]string{}
120+
}
121+
dbj.cache[key] = entry
122+
}
123+
return nil
124+
}
125+
126+
// journal monitors the database notifications and logs events to the file.
127+
func (dbj *DbJournal) journal() {
128+
if dbj == nil {
129+
return
130+
}
131+
defer dbj.cleanup()
132+
var event []string
133+
for {
134+
select {
135+
case msg := <-dbj.notifications:
136+
event = append(event, msg.Payload)
137+
if len(event) != 2 {
138+
continue
139+
}
140+
op := event[0]
141+
table := event[1]
142+
entry := fmt.Sprintf("%v: %v %v", time.Now().Format("2006-01-02.15:04:05.000000"), op, table)
143+
diff, dErr := dbj.updateCache(event)
144+
if dErr != nil {
145+
log.V(0).Infof("Shutting down %v Journal: %v", dbj.database, dErr)
146+
return
147+
}
148+
event = []string{}
149+
if diff != "" {
150+
entry += " " + diff
151+
}
152+
// If no fields were changed or the operation is a set on a table that contains the DB name, don't log the event.
153+
if (diff == "" && (op == "hset" || op == "hdel")) || (op == "set" && strings.Contains(table, dbj.database)) {
154+
continue
155+
}
156+
if err := dbj.rotateFile(); err != nil {
157+
log.V(0).Infof("Shutting down DbJournal, failed to manage file rotation: %v", err)
158+
return
159+
}
160+
_, writeErr := dbj.file.Write([]byte(entry + "\n"))
161+
if writeErr != nil {
162+
log.V(0).Infof("Failed to write to DbJournal file: %v", writeErr)
163+
}
164+
case <-dbj.done:
165+
return
166+
}
167+
}
168+
}
169+
170+
// updateCache updates the cache with the latest database entry and returns the diff.
171+
func (dbj *DbJournal) updateCache(event []string) (string, error) {
172+
op := event[0]
173+
table := event[1]
174+
if dbj == nil || dbj.cache == nil || dbj.rc == nil {
175+
return "", errors.New("nil members present in DbJournal")
176+
}
177+
oldEntry, ok := dbj.cache[table]
178+
if !ok {
179+
oldEntry = map[string]string{}
180+
}
181+
newEntry, err := dbj.rc.HGetAll(table).Result()
182+
if err != nil {
183+
newEntry = map[string]string{}
184+
}
185+
// Update the cache
186+
dbj.cache[table] = newEntry
187+
if op == "del" {
188+
return "", nil
189+
}
190+
diff := ""
191+
// Find deleted and changed fields
192+
for k, v := range oldEntry {
193+
newVal, ok := newEntry[k]
194+
if !ok {
195+
diff += "-" + k + " "
196+
continue
197+
}
198+
if newVal != v {
199+
diff += k + "=" + newVal + " "
200+
}
201+
}
202+
// Find added fields
203+
for k, v := range newEntry {
204+
if _, ok := oldEntry[k]; !ok {
205+
diff += "+" + k + ":" + v + " "
206+
}
207+
}
208+
return diff, nil
209+
}
210+
211+
// rotateFile makes sure the journal file is opened correctly and rotates it
212+
// if it exceeds the maximum size.
213+
func (dbj *DbJournal) rotateFile() error {
214+
if dbj == nil {
215+
return errors.New("Couldn't rotate file, DbJournal is nil")
216+
}
217+
fileStat, err := os.Stat(dbj.fileName)
218+
if err != nil || dbj.file == nil {
219+
// File does not exist or it is closed, create/open it
220+
if dbj.file, err = os.OpenFile(dbj.fileName, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644); err != nil {
221+
return err
222+
}
223+
return nil
224+
}
225+
if fileStat.Size() >= maxFileSize {
226+
// Close the journal file and open it as read-only to copy it
227+
dbj.file.Close()
228+
if dbj.file, err = os.OpenFile(dbj.fileName, os.O_RDONLY, 0644); err != nil {
229+
return err
230+
}
231+
// Remove a rotated, zipped file if the maxBackups limit is reached
232+
files, err := os.ReadDir(HostVarLogPath)
233+
if err != nil {
234+
return err
235+
}
236+
var count uint
237+
var oldest string
238+
for _, file := range files {
239+
if strings.HasPrefix(file.Name(), strings.ToLower(dbj.database)) && strings.HasSuffix(file.Name(), ".gz") {
240+
count++
241+
if strings.Compare(file.Name(), oldest) == -1 || oldest == "" {
242+
oldest = file.Name()
243+
}
244+
}
245+
}
246+
if count >= maxBackups {
247+
if err := os.Remove(filepath.Join(HostVarLogPath, oldest)); err != nil {
248+
return err
249+
}
250+
}
251+
// Compress the file
252+
zipName := filepath.Join(HostVarLogPath, strings.ToLower(dbj.database)+"_"+time.Now().Format("20060102150405")+".gz")
253+
zipFile, err := os.Create(zipName)
254+
if err != nil {
255+
return err
256+
}
257+
defer zipFile.Close()
258+
zipWriter := gzip.NewWriter(zipFile)
259+
defer zipWriter.Close()
260+
if _, err = io.Copy(zipWriter, dbj.file); err != nil {
261+
return err
262+
}
263+
if err = zipWriter.Flush(); err != nil {
264+
return err
265+
}
266+
// Recreate the journal file
267+
if dbj.file, err = os.Create(dbj.fileName); err != nil {
268+
return err
269+
}
270+
}
271+
return nil
272+
}

0 commit comments

Comments
 (0)