-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathReplicationQueuePopulator.js
More file actions
138 lines (124 loc) · 5.43 KB
/
ReplicationQueuePopulator.js
File metadata and controls
138 lines (124 loc) · 5.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
const { isMasterKey } = require('arsenal').versioning;
const { usersBucket, mpuBucketPrefix } = require('arsenal').constants;
const QueuePopulatorExtension =
require('../../lib/queuePopulator/QueuePopulatorExtension');
const ObjectQueueEntry = require('../../lib/models/ObjectQueueEntry');
const locationsConfig = require('../../conf/locationConfig.json') || {};
const { traceHeadersFromEntry } = require('../../lib/tracing/kafkaTraceContext');
class ReplicationQueuePopulator extends QueuePopulatorExtension {
constructor(params) {
super(params);
this.repConfig = params.config;
this.metricsHandler = params.metricsHandler;
}
filter(entry) {
if (entry.key === undefined || entry.value === undefined) {
// bucket updates have no key in raft log
return undefined;
}
// users..bucket has a special role for "echo mode"
if (entry.bucket === usersBucket) {
return this._filterBucketOp(entry);
}
// internal buckets (other than users..bucket) are ignored
if (entry.bucket.includes('..')) {
return undefined;
}
return this._filterKeyOp(entry);
}
_filterBucketOp(entry) {
if (entry.type !== 'put' ||
entry.key.startsWith(mpuBucketPrefix)) {
return;
}
// remove logReader to prevent circular stringify
const publishedEntry = Object.assign({}, entry);
delete publishedEntry.logReader;
this.log.trace('publishing bucket replication entry',
{ bucket: entry.bucket });
this.publish(this.repConfig.topic,
entry.bucket, JSON.stringify(publishedEntry));
}
_filterKeyOp(entry) {
if (entry.type !== 'put') {
return;
}
const value = JSON.parse(entry.value);
const queueEntry = new ObjectQueueEntry(entry.bucket,
entry.key, value);
const sanityCheckRes = queueEntry.checkSanity();
if (sanityCheckRes) {
return;
}
// Allow a non-versioned object if being replicated from an NFS bucket.
// Or if the master key is of a non versioned object
if (!this._entryCanBeReplicated(queueEntry)) {
return;
}
if (queueEntry.getReplicationStatus() !== 'PENDING') {
return;
}
const dataStoreName = queueEntry.getDataStoreName();
const isObjectCold = dataStoreName && locationsConfig[dataStoreName]
&& locationsConfig[dataStoreName].isCold;
// We do not replicate cold objects.
if (isObjectCold) {
return;
}
// remove logReader to prevent circular stringify
const repSites = queueEntry.getReplicationInfo().backends;
const content = queueEntry.getReplicationContent();
const bytes = content.includes('DATA') ?
queueEntry.getContentLength() : 0;
// record replication metrics by site
repSites.filter(entry => entry.status === 'PENDING')
.forEach(backend => {
this._incrementMetrics(backend.site, bytes);
});
// TODO: replication specific metrics go here
this.metricsHandler.bytes(
entry.logReader.getMetricLabels(),
bytes
);
this.metricsHandler.objects(
entry.logReader.getMetricLabels()
);
const publishedEntry = Object.assign({}, entry);
delete publishedEntry.logReader;
const traceHeaders = traceHeadersFromEntry(value);
this.log.trace('publishing object replication entry',
{ entry: queueEntry.getLogInfo() });
this.publish(this.repConfig.topic,
`${queueEntry.getBucket()}/${queueEntry.getObjectKey()}`,
JSON.stringify(publishedEntry),
undefined,
traceHeaders);
}
/**
* Filter if the entry is considered a valid master key entry.
* There is a case where a single null entry looks like a master key and
* will not have a duplicate versioned key. They are created when you have a
* non-versioned bucket with objects, and then convert bucket to versioned.
* If no new versioned objects are added for given object(s), they look like
* standalone master keys. The `isNull` case is undefined for these entries.
* Non-versioned objects if being replicated from an NFS bucket are also allowed
* Null versions which are objects created after suspending versioning are allowed,
* these only have a master object that has an internal versionId and a 'isNull' flag.
* @param {ObjectQueueEntry} entry - raw queue entry
* @return {Boolean} true if we should filter entry
*/
_entryCanBeReplicated(entry) {
const isMaster = isMasterKey(entry.getObjectVersionedKey());
const isNFS = entry.getReplicationIsNFS();
// single null entries will have a version id as undefined or null.
// do not filter single null entries
const isNonVersionedMaster = entry.getVersionId() === undefined;
const isNullVersionedMaster = entry.getIsNull();
if (isMaster && !isNFS && !isNonVersionedMaster && !isNullVersionedMaster) {
this.log.trace('skipping master key entry');
return false;
}
return true;
}
}
module.exports = ReplicationQueuePopulator;