@@ -34,7 +34,7 @@ type Indexer interface {
34
34
Index (rt * runtime.Runtime , rebuild , cleanup bool ) (string , error )
35
35
Stats () Stats
36
36
37
- GetESLastModified (index string ) (time.Time , error )
37
+ GetESLastModified (ctx context. Context , index string ) (time.Time , error )
38
38
GetDBLastModified (ctx context.Context , db * sql.DB ) (time.Time , error )
39
39
}
40
40
@@ -98,9 +98,9 @@ func (i *baseIndexer) recordActivity(indexed, deleted int, elapsed time.Duration
98
98
type infoResponse map [string ]interface {}
99
99
100
100
// FindIndexes finds all our physical indexes
101
- func (i * baseIndexer ) FindIndexes () []string {
101
+ func (i * baseIndexer ) FindIndexes (ctx context. Context ) []string {
102
102
response := infoResponse {}
103
- _ , err := utils .MakeJSONRequest (http .MethodGet , fmt .Sprintf ("%s/%s" , i .elasticURL , i .name ), nil , & response )
103
+ _ , err := utils .MakeJSONRequest (ctx , http .MethodGet , fmt .Sprintf ("%s/%s" , i .elasticURL , i .name ), nil , & response )
104
104
indexes := make ([]string , 0 )
105
105
106
106
// error could mean a variety of things, but we'll figure that out later
@@ -128,7 +128,7 @@ func (i *baseIndexer) FindIndexes() []string {
128
128
// that index to `contacts`.
129
129
//
130
130
// If the day-specific name already exists, we append a .1 or .2 to the name.
131
- func (i * baseIndexer ) createNewIndex (def * IndexDefinition ) (string , error ) {
131
+ func (i * baseIndexer ) createNewIndex (ctx context. Context , def * IndexDefinition ) (string , error ) {
132
132
// create our day-specific name
133
133
index := fmt .Sprintf ("%s_%s" , i .name , time .Now ().Format ("2006_01_02" ))
134
134
idx := 0
@@ -152,7 +152,7 @@ func (i *baseIndexer) createNewIndex(def *IndexDefinition) (string, error) {
152
152
// create the new index
153
153
settings := jsonx .MustMarshal (def )
154
154
155
- _ , err := utils .MakeJSONRequest (http .MethodPut , fmt .Sprintf ("%s/%s" , i .elasticURL , index ), settings , nil )
155
+ _ , err := utils .MakeJSONRequest (ctx , http .MethodPut , fmt .Sprintf ("%s/%s" , i .elasticURL , index ), settings , nil )
156
156
if err != nil {
157
157
return "" , err
158
158
}
@@ -185,11 +185,11 @@ type removeAliasCommand struct {
185
185
}
186
186
187
187
// maps this indexer's alias to the new physical index, removing existing aliases if they exist
188
- func (i * baseIndexer ) updateAlias (newIndex string ) error {
188
+ func (i * baseIndexer ) updateAlias (ctx context. Context , newIndex string ) error {
189
189
commands := make ([]interface {}, 0 )
190
190
191
191
// find existing physical indexes
192
- existing := i .FindIndexes ()
192
+ existing := i .FindIndexes (ctx )
193
193
for _ , idx := range existing {
194
194
remove := removeAliasCommand {}
195
195
remove .Remove .Alias = i .name
@@ -207,7 +207,7 @@ func (i *baseIndexer) updateAlias(newIndex string) error {
207
207
208
208
aliasJSON := jsonx .MustMarshal (aliasCommand {Actions : commands })
209
209
210
- _ , err := utils .MakeJSONRequest (http .MethodPost , fmt .Sprintf ("%s/_aliases" , i .elasticURL ), aliasJSON , nil )
210
+ _ , err := utils .MakeJSONRequest (ctx , http .MethodPost , fmt .Sprintf ("%s/_aliases" , i .elasticURL ), aliasJSON , nil )
211
211
212
212
i .log ().Info ("updated alias" , "index" , newIndex )
213
213
@@ -222,9 +222,9 @@ type healthResponse struct {
222
222
}
223
223
224
224
// removes all indexes that are older than the currently active index
225
- func (i * baseIndexer ) cleanupIndexes () error {
225
+ func (i * baseIndexer ) cleanupIndexes (ctx context. Context ) error {
226
226
// find our current indexes
227
- currents := i .FindIndexes ()
227
+ currents := i .FindIndexes (ctx )
228
228
229
229
// no current indexes? this a noop
230
230
if len (currents ) == 0 {
@@ -233,7 +233,7 @@ func (i *baseIndexer) cleanupIndexes() error {
233
233
234
234
// find all the current indexes
235
235
healthResponse := healthResponse {}
236
- _ , err := utils .MakeJSONRequest (http .MethodGet , fmt .Sprintf ("%s/%s" , i .elasticURL , "_cluster/health?level=indices" ), nil , & healthResponse )
236
+ _ , err := utils .MakeJSONRequest (ctx , http .MethodGet , fmt .Sprintf ("%s/%s" , i .elasticURL , "_cluster/health?level=indices" ), nil , & healthResponse )
237
237
if err != nil {
238
238
return err
239
239
}
@@ -242,7 +242,7 @@ func (i *baseIndexer) cleanupIndexes() error {
242
242
for key := range healthResponse .Indices {
243
243
if strings .HasPrefix (key , i .name ) && strings .Compare (key , currents [0 ]) < 0 {
244
244
slog .Info ("removing old index" , "index" , key )
245
- _ , err = utils .MakeJSONRequest (http .MethodDelete , fmt .Sprintf ("%s/%s" , i .elasticURL , key ), nil , nil )
245
+ _ , err = utils .MakeJSONRequest (ctx , http .MethodDelete , fmt .Sprintf ("%s/%s" , i .elasticURL , key ), nil , nil )
246
246
if err != nil {
247
247
return err
248
248
}
@@ -268,11 +268,11 @@ type indexResponse struct {
268
268
}
269
269
270
270
// indexes the batch of contacts
271
- func (i * baseIndexer ) indexBatch (index string , batch []byte ) (int , int , int , error ) {
271
+ func (i * baseIndexer ) indexBatch (ctx context. Context , index string , batch []byte ) (int , int , int , error ) {
272
272
response := indexResponse {}
273
273
indexURL := fmt .Sprintf ("%s/%s/_bulk" , i .elasticURL , index )
274
274
275
- _ , err := utils .MakeJSONRequest (http .MethodPut , indexURL , batch , & response )
275
+ _ , err := utils .MakeJSONRequest (ctx , http .MethodPut , indexURL , batch , & response )
276
276
if err != nil {
277
277
return 0 , 0 , 0 , err
278
278
}
@@ -324,12 +324,13 @@ type queryResponse struct {
324
324
}
325
325
326
326
// GetESLastModified queries a concrete index and finds the last modified document, returning its modified time
327
- func (i * baseIndexer ) GetESLastModified (index string ) (time.Time , error ) {
327
+ func (i * baseIndexer ) GetESLastModified (ctx context. Context , index string ) (time.Time , error ) {
328
328
lastModified := time.Time {}
329
329
330
330
// get the newest document on our index
331
331
queryResponse := & queryResponse {}
332
332
_ , err := utils .MakeJSONRequest (
333
+ ctx ,
333
334
http .MethodPost ,
334
335
fmt .Sprintf ("%s/%s/_search" , i .elasticURL , index ),
335
336
[]byte (`{ "sort": [{ "modified_on_mu": "desc" }], "_source": {"includes": ["modified_on", "id"]}, "size": 1, "track_total_hits": false}` ),
0 commit comments