|
| 1 | +// redis cache backend implementation for persistent storage |
| 2 | + |
1 | 3 | package main |
2 | 4 |
|
3 | 5 | import ( |
4 | | - "bytes" |
| 6 | + "encoding/json" |
5 | 7 | "fmt" |
6 | | - "os" |
7 | | - "strconv" |
8 | 8 |
|
9 | 9 | log "github.com/Sirupsen/logrus" |
10 | | - "github.com/garyburd/redigo/redis" |
| 10 | + "github.com/boltdb/bolt" |
11 | 11 | ) |
12 | 12 |
|
13 | | -const prefix = "genproxy:" |
| 13 | +const requestsBucketName = "rqbucket" |
14 | 14 |
|
15 | 15 | type Cache struct { |
16 | | - pool *redis.Pool |
17 | | - prefix string |
| 16 | + db *bolt.DB |
| 17 | + requestsBucket []byte |
18 | 18 | } |
19 | 19 |
|
20 | | -// set records a key in cache (redis) |
21 | | -func (c *Cache) set(key string, value interface{}) error { |
22 | | - |
23 | | - if c.prefix == "" { |
24 | | - c.prefix = prefix |
25 | | - } |
26 | | - |
27 | | - client := c.pool.Get() |
28 | | - defer client.Close() |
29 | | - |
30 | | - _, err := client.Do("SET", fmt.Sprintf(c.prefix+key), value) |
31 | | - |
| 20 | +func getDB(name string) *bolt.DB { |
| 21 | + db, err := bolt.Open(name, 0600, nil) |
32 | 22 | if err != nil { |
33 | | - // getting payload for debugging |
34 | | - var payload string |
35 | | - switch v := value.(type) { |
36 | | - case bytes.Buffer: // return as is |
37 | | - payload = v.String() // Here v is of type bytes.Buffer |
38 | | - } |
39 | | - log.WithFields(log.Fields{ |
40 | | - "error": err.Error(), |
41 | | - "key": fmt.Sprintf(c.prefix + key), |
42 | | - "payload": payload, |
43 | | - }).Error("Failed to SET key...") |
44 | | - } else { |
45 | | - log.WithFields(log.Fields{ |
46 | | - "key": fmt.Sprintf(c.prefix + key), |
47 | | - }).Info("Key/value SET successfuly!") |
48 | | - } |
49 | | - |
50 | | - return err |
51 | | -} |
52 | | - |
53 | | -// getAllKeys returns all keys for specified (or default) prefix |
54 | | -func (c *Cache) getAllKeys() ([]string, error) { |
55 | | - |
56 | | - client := c.pool.Get() |
57 | | - defer client.Close() |
58 | | - |
59 | | - values, err := redis.Strings(client.Do("KEYS", fmt.Sprintf(c.prefix+"*"))) |
60 | | - |
61 | | - return values, err |
62 | | -} |
63 | | - |
64 | | -// getAllValues returns values for specified keys |
65 | | -func (c *Cache) getAllValues(keys []string) ([]string, error) { |
66 | | - |
67 | | - log.WithFields(log.Fields{ |
68 | | - "keys": keys, |
69 | | - }).Debug("Getting all supplied values") |
70 | | - |
71 | | - client := c.pool.Get() |
72 | | - defer client.Close() |
73 | | - |
74 | | - // preparing keys |
75 | | - var args []interface{} |
76 | | - for _, k := range keys { |
77 | | - args = append(args, k) |
| 23 | + log.Fatal(err) |
78 | 24 | } |
79 | 25 |
|
80 | | - jsonStr, err := redis.Strings(client.Do("MGET", args...)) |
81 | | - |
82 | | - log.WithFields(log.Fields{ |
83 | | - "keys": keys, |
84 | | - }).Debug("Returning supplied values") |
85 | | - |
86 | | - return jsonStr, err |
87 | | - |
| 26 | + return db |
88 | 27 | } |
89 | 28 |
|
90 | | -// get returns key from cache |
91 | | -func (c *Cache) get(key string) (interface{}, error) { |
92 | | - |
93 | | - client := c.pool.Get() |
94 | | - defer client.Close() |
95 | | - |
96 | | - value, err := client.Do("GET", fmt.Sprintf(c.prefix+key)) |
97 | | - |
98 | | - if err != nil { |
99 | | - log.WithFields(log.Fields{ |
100 | | - "error": err.Error(), |
101 | | - "key": fmt.Sprintf(c.prefix + key), |
102 | | - }).Error("Failed to GET key...") |
103 | | - } else { |
104 | | - log.WithFields(log.Fields{ |
105 | | - "key": fmt.Sprintf(c.prefix + key), |
106 | | - }).Debug("Key found!") |
107 | | - } |
| 29 | +func (c *Cache) Set(key, value []byte) error { |
| 30 | + err := c.db.Update(func(tx *bolt.Tx) error { |
| 31 | + bucket, err := tx.CreateBucketIfNotExists(c.requestsBucket) |
| 32 | + if err != nil { |
| 33 | + return err |
| 34 | + } |
| 35 | + err = bucket.Put(key, value) |
| 36 | + if err != nil { |
| 37 | + return err |
| 38 | + } |
| 39 | + return nil |
| 40 | + }) |
108 | 41 |
|
109 | | - return value, err |
| 42 | + return err |
110 | 43 | } |
111 | 44 |
|
112 | | -// delete removes specified entry from cache |
113 | | -func (c *Cache) delete(key string) error { |
114 | | - if c.prefix == "" { |
115 | | - c.prefix = prefix |
116 | | - } |
117 | | - |
118 | | - client := c.pool.Get() |
119 | | - defer client.Close() |
120 | | - |
121 | | - _, err := client.Do("DEL", key) |
122 | | - |
123 | | - return err |
| 45 | +func (c *Cache) Get(key []byte) (value []byte, err error) { |
| 46 | + err = c.db.View(func(tx *bolt.Tx) error { |
| 47 | + bucket := tx.Bucket(c.requestsBucket) |
| 48 | + if bucket == nil { |
| 49 | + return fmt.Errorf("Bucket %q not found!", c.requestsBucket) |
| 50 | + } |
| 51 | + value = bucket.Get(key) |
| 52 | + return nil |
| 53 | + }) |
124 | 54 |
|
| 55 | + return |
125 | 56 | } |
126 | 57 |
|
127 | | -// getRedisPool returns thread safe Redis connection pool |
128 | | -func getRedisPool() *redis.Pool { |
129 | | - |
130 | | - // getting redis connection |
131 | | - maxConnections := 10 |
132 | | - mc := os.Getenv("MaxConnections") |
133 | | - if mc != "" { |
134 | | - maxCons, err := strconv.Atoi(mc) |
135 | | - if err != nil { |
136 | | - maxConnections = 10 |
137 | | - } else { |
138 | | - maxConnections = maxCons |
| 58 | +func (c *Cache) GetAllRequests() (payloads []Payload, err error) { |
| 59 | + err = c.db.View(func(tx *bolt.Tx) error { |
| 60 | + b := tx.Bucket(c.requestsBucket) |
| 61 | + if b == nil { |
| 62 | + // bucket doesn't exist |
| 63 | + return nil |
139 | 64 | } |
140 | | - } |
| 65 | + c := b.Cursor() |
141 | 66 |
|
142 | | - // getting redis client for state storing |
143 | | - redisPool := redis.NewPool(func() (redis.Conn, error) { |
144 | | - c, err := redis.Dial("tcp", AppConfig.redisAddress) |
| 67 | + for k, v := c.First(); k != nil; k, v = c.Next() { |
| 68 | + var pl Payload |
| 69 | + err = json.Unmarshal(v, &pl) |
145 | 70 |
|
146 | | - if err != nil { |
147 | | - log.WithFields(log.Fields{ |
148 | | - "Error": err.Error(), |
149 | | - }).Warn("Failed to create Redis connection pool!") |
150 | | - return nil, err |
151 | | - } |
152 | | - if AppConfig.redisPassword != "" { |
153 | | - if _, err := c.Do("AUTH", AppConfig.redisPassword); err != nil { |
| 71 | + if err != nil { |
154 | 72 | log.WithFields(log.Fields{ |
155 | | - "Error": err.Error(), |
156 | | - "PasswordUsed": AppConfig.redisPassword, |
157 | | - }).Warn("Failed to authenticate to Redis!") |
158 | | - c.Close() |
159 | | - return nil, err |
| 73 | + "error": err.Error(), |
| 74 | + "json": v, |
| 75 | + }).Warning("Failed to deserialize json") |
160 | 76 | } else { |
161 | | - log.Debug("Authenticated to Redis successfully! ") |
| 77 | + payloads = append(payloads, pl) |
162 | 78 | } |
163 | 79 | } |
| 80 | + return nil |
| 81 | + }) |
| 82 | + return |
| 83 | +} |
164 | 84 |
|
165 | | - return c, err |
166 | | - }, maxConnections) |
167 | | - |
168 | | - return redisPool |
| 85 | +func (c *Cache) DeleteBucket(name []byte) (err error) { |
| 86 | + err = c.db.Update(func(tx *bolt.Tx) error { |
| 87 | + err = tx.DeleteBucket(name) |
| 88 | + if err != nil { |
| 89 | + log.WithFields(log.Fields{ |
| 90 | + "error": err.Error(), |
| 91 | + "name": string(name), |
| 92 | + }).Warning("Failed to delete bucket") |
| 93 | + return err |
| 94 | + } else { |
| 95 | + return nil |
| 96 | + } |
| 97 | + }) |
| 98 | + return |
169 | 99 | } |
0 commit comments