@@ -90,37 +90,11 @@ type Client struct {
90
90
importLogEncoder encoder
91
91
logLock sync.Mutex
92
92
93
- // TODO shardNodes needs to be invalidated/updated when cluster topology changes.
94
93
shardNodes shardNodes
95
94
tick * time.Ticker
95
+ done chan struct {}
96
96
}
97
97
98
- // func (c *Client) translateCol(index, key string) (uint64, bool) {
99
- // c.tlock.RLock()
100
- // v, b := c.translator.GetCol(index, key)
101
- // c.tlock.RUnlock()
102
- // return v, b
103
- // }
104
-
105
- // func (c *Client) translateRow(index, field, key string) (uint64, bool) {
106
- // c.tlock.RLock()
107
- // v, b := c.translator.GetRow(index, field, key)
108
- // c.tlock.RUnlock()
109
- // return v, b
110
- // }
111
-
112
- // func (c *Client) addTranslateCol(index, key string, value uint64) {
113
- // c.tlock.Lock()
114
- // c.translator.AddCol(index, key, value)
115
- // c.tlock.Unlock()
116
- // }
117
-
118
- // func (c *Client) addTranslateRow(index, field, key string, value uint64) {
119
- // c.tlock.Lock()
120
- // c.translator.AddRow(index, field, key, value)
121
- // c.tlock.Unlock()
122
- // }
123
-
124
98
func (c * Client ) getURIsForShard (index string , shard uint64 ) ([]* URI , error ) {
125
99
uris , ok := c .shardNodes .Get (index , shard )
126
100
if ok {
@@ -139,13 +113,22 @@ func (c *Client) getURIsForShard(index string, shard uint64) ([]*URI, error) {
139
113
}
140
114
141
115
func (c * Client ) runChangeDetection () {
142
- c .tick = time .NewTicker (time .Minute )
143
-
144
- for range c .tick .C {
145
- c .detectClusterChanges ()
116
+ for {
117
+ select {
118
+ case <- c .tick .C :
119
+ c .detectClusterChanges ()
120
+ case <- c .done :
121
+ return
122
+ }
146
123
}
147
124
}
148
125
126
+ func (c * Client ) Close () error {
127
+ c .tick .Stop ()
128
+ close (c .done )
129
+ return nil
130
+ }
131
+
149
132
// detectClusterChanges chooses a random index and shard from the
150
133
// shardNodes cache and deletes it. It then looks it up from Pilosa to
151
134
// see if it still matches, and if not it drops the whole cache.
@@ -232,6 +215,8 @@ func newClientWithOptions(options *ClientOptions) *Client {
232
215
coordinatorLock : & sync.RWMutex {},
233
216
234
217
shardNodes : newShardNodes (),
218
+ tick : time .NewTicker (time .Minute ),
219
+ done : make (chan struct {}, 0 ),
235
220
}
236
221
if options .importLogWriter != nil {
237
222
c .importLogEncoder = newImportLogEncoder (options .importLogWriter )
@@ -245,6 +230,7 @@ func newClientWithOptions(options *ClientOptions) *Client {
245
230
c .minRetrySleepTime = 1 * time .Second
246
231
c .maxRetrySleepTime = 2 * time .Minute
247
232
c .importManager = newRecordImportManager (c )
233
+ go c .runChangeDetection ()
248
234
return c
249
235
250
236
}
0 commit comments