Skip to content

Commit 3ad6b3b

Browse files
authored
Merge pull request #6 from fkrueger/master
In response to deric/es-deduplicator, issue #5, kinda
2 parents f05ba82 + 0f8c9ec commit 3ad6b3b

1 file changed

Lines changed: 22 additions & 11 deletions

File tree

dedupe.py

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def run(args):
9090
indices[idxname] = storesize
9191
else:
9292
if args.verbose:
93-
logme("# WARNING - Could not find settings for index '{0}'".format(idxname))
93+
logme("# WARNING - Couldn't find settings for index '{0}'".format(idxname))
9494
logme("The following indices matched your name pattern {0} :\n{1}\n\n".format(idxlist_uri(args), pp.pformat(indices, 4, -1)))
9595
for idxname in indices:
9696
if (workisdone == False):
@@ -115,17 +115,17 @@ def run(args):
115115
else:
116116
logme("ERROR - Unexpected response {}".format(resp))
117117
workisdone = True
118-
logme("ES query took {}, retrieved {} unique docs".format(timedelta(seconds=(qe - qs)), docs))
118+
logme("ES query took {}, retrieved {} unique docs that have dupes".format(timedelta(seconds=(qe - qs)), docs))
119119

120120
if (docs >= 0):
121121
bs = time.time()
122122
# now update write to false if it is not, and return it to original after we are done.
123123
if (args.noop == False):
124124
skipremoval = False
125125
if (idxname not in idx2settings):
126-
logme("WARNING - could not find settings for index '{0}'".format(idxname))
126+
logme("WARNING - Couldn't find settings for index '{0}'".format(idxname))
127127
else:
128-
if (idx2settings[idxname]['write'] != "false"):
128+
if (('write' in idx2settings[idxname]) and (idx2settings[idxname]['write'] != "false")):
129129
if args.verbose:
130130
logme("# Index '{0}' is not writable in settings, updating blocks-write to false".format(idxname))
131131
if (set_index_writable(args, idxname, "false") == False):
@@ -159,7 +159,7 @@ def run(args):
159159
total += removed
160160
logme(" 2ndChck removed {}, in total {:,}".format(removed, total))
161161
os.remove(args.log_agg)
162-
if (idx2settings[idxname]['_esdedup_changed_writeflag'] == True):
162+
if (('_esdedup_changed_writeflag' in idx2settings[idxname]) and (idx2settings[idxname]['_esdedup_changed_writeflag'] == True)):
163163
if (set_index_writable(args, idxname, idx2settings[idxname]['write']) == False):
164164
logme("WARNING - Index '{0}' writable setting could not be reset to {1}.".format(idxname, idx2settings[idxname]['write']))
165165
else:
@@ -231,6 +231,7 @@ def fetch_indexlist(args):
231231

232232

233233
def fetch(idxname, args):
234+
global es_headers
234235
uri = search_uri(idxname, args)
235236
payload = {"size": 0,
236237
"aggs": {
@@ -250,7 +251,7 @@ def fetch(idxname, args):
250251
if args.verbose:
251252
logme("# idxname {0}: POST {1}".format(idxname, uri))
252253
logme("#\tdata: {0}".format(json))
253-
resp = requests.post(uri, data=json)
254+
resp = requests.post(uri, data=json, headers=es_headers)
254255
if args.debug:
255256
logme("## idxname {0}, resp: {1}".format(idxname, resp.text))
256257
if (resp.status_code == 200):
@@ -344,6 +345,7 @@ def log_done(buf, doc, idxname, type, id):
344345

345346
# returns number of deleted items
346347
def bulk_remove(buf, args):
348+
global es_headers
347349
try:
348350
uri = bulk_uri(args)
349351
if args.verbose:
@@ -352,7 +354,7 @@ def bulk_remove(buf, args):
352354
logme("NOT using delete query: {}".format(buf.getvalue()))
353355
return 0
354356

355-
resp = requests.post(uri, data=buf.getvalue())
357+
resp = requests.post(uri, data=buf.getvalue(), headers=es_headers)
356358
if args.debug:
357359
logme("## resp: {0}".format(resp.text))
358360
if (resp.status_code == 200):
@@ -394,11 +396,19 @@ def fetch_allsettings(args):
394396
if ('errors' in r):
395397
logme(r)
396398
for idxname in r:
397-
if (('settings' in r[idxname]) and ('index' in r[idxname]['settings']) and ('blocks' in r[idxname]['settings']['index'])):
398-
tmpblocks = r[idxname]['settings']['index']['blocks']
399-
tmpblocks['_esdedup_changed_writeflag'] = False # sic, we are using a python Boolean here, instead of json text "bool"
399+
tmpblocks = {}
400+
if (('settings' in r[idxname]) and ('index' in r[idxname]['settings'])):
401+
if ('blocks' in r[idxname]['settings']['index']):
402+
tmpblocks = r[idxname]['settings']['index']['blocks']
403+
tmpblocks['_esdedup_changed_writeflag'] = False # sic, we are using a python Boolean here, instead of json text "bool"
404+
elif ('uuid' in r[idxname]['settings']['index']):
405+
tmpblocks = {}
406+
else:
407+
tmpblocks = None
408+
if (tmpblocks != None):
400409
if (idxname not in tmpidx2settings):
401410
tmpidx2settings[idxname] = copy.copy(tmpblocks)
411+
402412
except requests.exceptions.ConnectionError as e:
403413
logme("ERROR - connection failed, check --host argument and port. Is ES running on {0}?".format(es_uri(args)))
404414
logme(e)
@@ -409,6 +419,7 @@ def fetch_allsettings(args):
409419

410420

411421
def set_index_writable(args, idxname, flag):
422+
global es_headers
412423
rc = False
413424
try:
414425
if (flag == "true"): flag = "true"
@@ -419,7 +430,7 @@ def set_index_writable(args, idxname, flag):
419430
if args.verbose:
420431
logme("# idxname {0}: PUT {1}".format(idxname, uri))
421432
logme("#\tdata: {0}".format(json))
422-
resp = requests.put(uri, data=json)
433+
resp = requests.put(uri, data=json, headers=es_headers)
423434
r = {}
424435
if args.debug:
425436
logme("## idxname {0}, resp: {1}".format(idxname, resp.text))

0 commit comments

Comments
 (0)