diff --git a/radosgw_agent/worker.py b/radosgw_agent/worker.py index 0087da6..98c366e 100644 --- a/radosgw_agent/worker.py +++ b/radosgw_agent/worker.py @@ -110,6 +110,7 @@ def _meta_entry_from_json(entry): 'marker', 'timestamp', 'op', + 'state', 'versioned', 'ver', 'name', @@ -141,6 +142,7 @@ def _bi_entry_from_json(entry): entry['op_id'], entry['timestamp'], entry.get('op', ''), + entry['state'], entry.get('versioned', False), entry_ver, entry['object'], @@ -149,6 +151,19 @@ def _bi_entry_from_json(entry): ) +def filter_pending_entry(entry): + """ + When put a object to radosgw, there are two bilog to generate. + One is "pending" state, the other is "complete" state. + This should be ignored when the entry is "pending" state, + otherwise the same object will be copied twice. + """ + if entry.op == 'write' and entry.state == 'pending': + return + + return entry + + def filter_versioned_objects(entry): """ On incremental sync operations, the log may indicate that 'olh' entries, @@ -356,6 +371,9 @@ def inc_sync_bucket_instance(self, instance, marker, timestamp, retries): # regardless if entries are versioned, make sure we filter them entries = [i for i in ifilter(filter_versioned_objects, entries)] + + # if entries are "pending" state, we ignore them. + entries = [i for i in ifilter(filter_pending_entry, entries)] objects = set([entry for entry in entries]) bucket = self.get_bucket(instance)