-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathReplicationAPI.js
More file actions
164 lines (156 loc) · 6.58 KB
/
ReplicationAPI.js
File metadata and controls
164 lines (156 loc) · 6.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
const config = require('../../lib/Config');
const locations = require('../../conf/locationConfig.json') || {};
const ActionQueueEntry = require('../../lib/models/ActionQueueEntry');
const ReplicationMetrics = require('./ReplicationMetrics');
const { linkHeadersFromCurrentContext } = require('../../lib/tracing/kafkaTraceContext');
let { dataMoverTopic } = config.extensions.replication;
const { coldStorageArchiveTopicPrefix } = config.extensions.lifecycle;
const { LifecycleMetrics } = require('../lifecycle/LifecycleMetrics');
class ReplicationAPI {
/**
* Create an action to copy an object's data to a new location.
*
* Pass the returned object to
* {@link ReplicationAPI.sendDataMoverAction()} to queue the action
* to the data mover service.
*
* @param {object} params - params object
* @param {string} params.bucketName - bucket name
* @param {string} params.objectKey - object key
* @param {string} params.accountId - object's bucket's account id
* @param {string} [params.versionId] - encoded version ID
* @param {string} [params.eTag] - ETag of object
* @param {string} [params.lastModified] - object last modification date
* @param {string} params.toLocation - name of target location
* @param {string} params.originLabel - label to mark origin of
* request (for metrics accounting)
* @param {string} params.fromLocation - source location name (for
* metrics accounting)
* @param {string} params.contentLength - content length (for
* metrics accounting)
* @param {string} [params.resultsTopic] - name of topic to get
* result message (no result will be published if not provided)
* @param {string} params.transitionTime - transition time
* @return {ActionQueueEntry} new action entry
*/
static createCopyLocationAction(params) {
const action = ActionQueueEntry.create('copyLocation');
action
.setAttribute('target', {
accountId: params.accountId,
owner: params.owner,
bucket: params.bucketName,
key: params.objectKey,
version: params.versionId,
eTag: params.eTag,
attempt: params.attempt,
lastModified: params.lastModified,
})
.setAttribute('toLocation', params.toLocation)
.setAttribute('metrics', {
origin: params.originLabel,
fromLocation: params.fromLocation,
contentLength: params.contentLength,
transitionTime: params.transitionTime,
});
if (params.resultsTopic) {
action.setResultsTopic(params.resultsTopic);
}
return action;
}
/**
* Send an action to the data mover service
*
* @param {BackbeatProducer} producer - backbeat producer instance
* @param {ActionQueueEntry} action - The action entry to send to
* the data mover service
* @param {Logger.newRequestLogger} log - logger object
* @param {Function} cb - callback: cb(err)
* @return {undefined}
*/
static sendDataMoverAction(producer, action, log, cb) {
const { accountId, bucket, key, version, eTag, attempt } = action.getAttribute('target');
const { origin, fromLocation, contentLength, transitionTime } = action.getAttribute('metrics');
const kafkaEntry = {
key: `${bucket}/${key}`,
message: action.toKafkaMessage(),
};
// Fan-out transitions can produce many entries per bucket-processor
// span. Use link-headers so the data-mover consumer starts a new
// root trace rather than a child of this one. If no active OTEL
// span (OTEL disabled), linkHeadersFromCurrentContext returns
// undefined and we ship the entry without headers.
const linkHeaders = linkHeadersFromCurrentContext();
if (linkHeaders) kafkaEntry.headers = linkHeaders;
let topic = dataMoverTopic;
const toLocation = action.getAttribute('toLocation');
const locationConfig = locations[toLocation];
if (!locationConfig) {
const errorMsg = 'could not get destination location configuration';
log.error(errorMsg, { method: 'ReplicationAPI.sendDataMoverAction' });
return cb(new Error(errorMsg));
}
if (locationConfig.isCold) {
topic = `${coldStorageArchiveTopicPrefix}${toLocation}`;
const { reqId } = action.getContext();
const message = {
accountId,
bucketName: bucket,
objectKey: key,
objectVersion: version,
requestId: reqId,
// TODO: BB-217 do not use contentLength from metrics
size: contentLength,
eTag,
try: attempt,
transitionTime,
};
kafkaEntry.message = JSON.stringify(message);
}
return producer.sendToTopic(topic, [kafkaEntry], (err, reports) => {
LifecycleMetrics.onKafkaPublish(log,
locationConfig.isCold ? 'ColdStorageArchiveTopic' : 'DataMoverTopic',
'bucket', err, 1);
if (err) {
log.error('could not send data mover action',
Object.assign({
method: 'ReplicationAPI.sendDataMoverAction',
error: err,
}, action.getLogInfo()));
return cb(err);
}
log.debug('sent action to the data mover',
Object.assign({
method: 'ReplicationAPI.sendDataMoverAction',
}, action.getLogInfo()));
ReplicationMetrics.onReplicationQueued(
origin, fromLocation, toLocation,
contentLength, reports[0].partition);
return cb();
});
}
/**
* Returns the appropriate topic based on
* the location temperature
* @param {string} location location name
* @returns {string} topic name
*/
static getDataMoverTopicPerLocation(location) {
let topic = dataMoverTopic;
const locationConfig = locations[location];
if (!locationConfig) {
return '';
}
if (locationConfig.isCold) {
topic = `${coldStorageArchiveTopicPrefix}${location}`;
}
return topic;
}
static getDataMoverTopic() {
return dataMoverTopic;
}
static setDataMoverTopic(newTopicName) {
dataMoverTopic = newTopicName;
}
}
module.exports = ReplicationAPI;