Skip to content

Commit 16cf39b

Browse files
MericFeyzwagmarcel
authored andcommitted
Implement merge behavior to the ngsild bridge and update Scorpio Version
Update operation of ngsild bridge now exclusively uses batch processed merge patch operation. Tests are adapted to reflect behavioral change. New tests are added to verify new behavior. Scorpio version upgraded to v5.0.5. Signed-off-by: Meric Feyzullahoglu <[email protected]>
1 parent 128617c commit 16cf39b

7 files changed

+307
-110
lines changed

KafkaBridge/lib/ngsild.js

+20
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,26 @@ function fiwareApi (conf) {
499499
return rest.postBody({ options, body: data });
500500
};
501501

502+
/**
503+
* Run batch merge operation on the entities
504+
* @param {array[Object]} entities - Array of JSON patches to merge
505+
* @param {array[Object]} headers - additional headers
506+
*/
507+
this.batchMerge = function (entities, { headers }) {
508+
headers = headers || {};
509+
headers['Content-Type'] = 'application/ld+json';
510+
511+
const options = {
512+
hostname: config.ngsildServer.hostname,
513+
protocol: config.ngsildServer.protocol,
514+
port: config.ngsildServer.port,
515+
path: '/ngsi-ld/v1/entityOperations/merge',
516+
headers: headers,
517+
method: 'POST'
518+
};
519+
return rest.postBody({ options, body: entities });
520+
};
521+
502522
/**
503523
* Helpers
504524
*/

KafkaBridge/lib/ngsildUpdates.js

+9-13
Original file line numberDiff line numberDiff line change
@@ -128,20 +128,16 @@ module.exports = function NgsildUpdates (conf) {
128128
try {
129129
// update the entity - do not create it
130130
if (op === 'update') {
131-
// NOTE: The batch update API of Scorpio does not yet support noOverwrite options. For the time being
132-
// the batch processing will be done sequentially - until this is fixed in Scorpio
133-
for (const entity of entities) { // olet i = 0; i < entities.length; i ++) {
134-
// basic health check of entity
135-
if (entity.id === undefined || entity.id == null) {
136-
logger.error('Unhealthy entity - ignoring it:' + JSON.stringify(entity));
137-
} else {
138-
logger.debug('Updating: ' + JSON.stringify(entities));
139-
result = await ngsild.updateProperties({ id: entity.id, body: entity, isOverwrite: overwriteOrReplace }, { headers });
140-
if (result.statusCode !== 204 && result.statusCode !== 207) {
141-
logger.error('Entity cannot update entity:' + JSON.stringify(result.body) + ' and status code ' + result.statusCode); // throw no error, log it and ignore it, repeating would probably not solve it
142-
}
131+
// Only batch merge is run
132+
if (entities === undefined || entities == null) {
133+
logger.error('Unhealthy entities - ignoring it:' + JSON.stringify(entities));
134+
} else {
135+
logger.debug('Updating: ' + JSON.stringify(entities));
136+
result = await ngsild.batchMerge(entities, { headers });
137+
if (result.statusCode !== 204 && result.statusCode !== 207) {
138+
logger.error('Entity cannot run merge:' + JSON.stringify(result.body) + ' and status code ' + result.statusCode); // throw no error, log it and ignore it, repeating would probably not solve it
143139
}
144-
};
140+
}
145141
} else if (op === 'upsert') {
146142
// in this case, entity will be created if not existing
147143
logger.debug('Upserting: ' + JSON.stringify(entities));

KafkaBridge/test/testLibNgsild.js

+41
Original file line numberDiff line numberDiff line change
@@ -971,3 +971,44 @@ describe('Test updateEntities', function () {
971971
revert();
972972
});
973973
});
974+
describe('Test batchMerge', function () {
975+
it('Should use correct options and headers', async function () {
976+
const Logger = function () {
977+
return logger;
978+
};
979+
const Rest = function () {
980+
return rest;
981+
};
982+
const headers = { Authorization: 'Bearer token' };
983+
const expectedOptions = {
984+
hostname: 'hostname',
985+
protocol: 'http:',
986+
port: 1234,
987+
method: 'POST',
988+
path: '/ngsi-ld/v1/entityOperations/merge',
989+
headers: {
990+
'Content-Type': 'application/ld+json',
991+
Authorization: 'Bearer token'
992+
}
993+
};
994+
const rest = {
995+
postBody: function (obj) {
996+
assert.deepEqual(obj.options, expectedOptions);
997+
assert.deepEqual(obj.body, entities);
998+
return Promise.resolve('merged');
999+
}
1000+
};
1001+
1002+
const entities = [
1003+
{ id: 'id1', type: 'type1', attr1: 'value1' },
1004+
{ id: 'id2', type: 'type2', attr2: 'value2' }
1005+
];
1006+
1007+
const revert = ToTest.__set__('Logger', Logger);
1008+
ToTest.__set__('Rest', Rest);
1009+
const ngsild = new ToTest(config);
1010+
const result = await ngsild.batchMerge(entities, { headers });
1011+
result.should.equal('merged');
1012+
revert();
1013+
});
1014+
});

KafkaBridge/test/testLibNgsildUpdates.js

+46-40
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ const logger = {
3030
const addSyncOnAttribute = function () {};
3131

3232
describe('Test libNgsildUpdates', function () {
33-
it('Should post body with correct path and token for nonOverwrite update', async function () {
34-
let updatePropertiesCalled = false;
33+
it('Should post entities with correct path and token for nonOverwrite update using batchMerge', async function () {
34+
let batchMergeCalled = false;
3535
const config = {
3636
ngsildUpdates: {
3737
clientSecretVariable: 'CLIENT_SECRET',
@@ -66,19 +66,20 @@ describe('Test libNgsildUpdates', function () {
6666
};
6767
const Ngsild = function () {
6868
return {
69-
updateProperties: function ({ id, body, isOverwrite }, { headers }) {
70-
updatePropertiesCalled = true;
71-
id.should.equal('id');
72-
assert.deepEqual(body, { id: 'id', type: 'type' });
73-
isOverwrite.should.equal(false);
69+
batchMerge: function (entities, { headers }) {
70+
batchMergeCalled = true;
71+
assert.deepEqual(entities, body.entities);
7472
assert.deepEqual(headers, expHeaders);
7573
return new Promise(function (resolve) {
7674
resolve({
7775
statusCode: 204
7876
});
7977
});
8078
},
79+
// Stub updateProperties if needed
80+
updateProperties: function () {},
8181
replaceEntities: function () {
82+
8283
}
8384
};
8485
};
@@ -108,11 +109,11 @@ describe('Test libNgsildUpdates', function () {
108109
ToTest.__set__('addSyncOnAttribute', addSyncOnAttribute);
109110
const ngsildUpdates = new ToTest(config);
110111
await ngsildUpdates.ngsildUpdates(body);
111-
updatePropertiesCalled.should.equal(true);
112+
batchMergeCalled.should.equal(true);
112113
revert();
113114
});
114-
it('Should post body and filter out datasetId === "@none"', async function () {
115-
let updatePropertiesCalled = false;
115+
it('Should post entities and filter out datasetId === "@none"', async function () {
116+
let batchMergeCalled = false;
116117
const config = {
117118
ngsildUpdates: {
118119
clientSecretVariable: 'CLIENT_SECRET',
@@ -151,19 +152,24 @@ describe('Test libNgsildUpdates', function () {
151152
};
152153
const Ngsild = function () {
153154
return {
154-
updateProperties: function ({ id, body, isOverwrite }, { headers }) {
155-
updatePropertiesCalled = true;
156-
id.should.equal('id');
157-
assert.deepEqual(body, { id: 'id', type: 'type', attribute: { value: 'value' } });
158-
isOverwrite.should.equal(false);
155+
batchMerge: function (entities, { headers }) {
156+
batchMergeCalled = true;
157+
entities.forEach(entity => {
158+
// Check top-level properties
159+
assert.equal(entity.id, 'id');
160+
assert.equal(entity.type, 'type');
161+
162+
// Check attribute properties
163+
assert.isUndefined(entity.attribute.datasetId, 'datasetId should be filtered out');
164+
assert.property(entity.attribute, 'value');
165+
assert.equal(entity.attribute.value, 'value');
166+
});
159167
assert.deepEqual(headers, expHeaders);
160168
return new Promise(function (resolve) {
161169
resolve({
162170
statusCode: 204
163171
});
164172
});
165-
},
166-
replaceEntities: function () {
167173
}
168174
};
169175
};
@@ -192,11 +198,11 @@ describe('Test libNgsildUpdates', function () {
192198
ToTest.__set__('addSyncOnAttribute', addSyncOnAttribute);
193199
const ngsildUpdates = new ToTest(config);
194200
await ngsildUpdates.ngsildUpdates(body);
195-
updatePropertiesCalled.should.equal(true);
201+
batchMergeCalled.should.equal(true);
196202
revert();
197203
});
198204
it('Should post body and filter out datasetId === "@none" from attribute array', async function () {
199-
let updatePropertiesCalled = false;
205+
let batchMergeCalled = false;
200206
const config = {
201207
ngsildUpdates: {
202208
clientSecretVariable: 'CLIENT_SECRET',
@@ -241,11 +247,12 @@ describe('Test libNgsildUpdates', function () {
241247
};
242248
const Ngsild = function () {
243249
return {
244-
updateProperties: function ({ id, body, isOverwrite }, { headers }) {
245-
updatePropertiesCalled = true;
246-
id.should.equal('id');
247-
assert.deepEqual(body, { id: 'id', type: 'type', attribute: [{ value: 'value' }, { value: 'value2', datasetId: 'http://example.com#source10' }] });
248-
isOverwrite.should.equal(false);
250+
batchMerge: function (entities, { headers }) {
251+
batchMergeCalled = true;
252+
entities.forEach(entity => {
253+
assert.deepEqual(entity.id, 'id');
254+
assert.deepEqual(entity, { id: 'id', type: 'type', attribute: [{ value: 'value' }, { value: 'value2', datasetId: 'http://example.com#source10' }] });
255+
});
249256
assert.deepEqual(headers, expHeaders);
250257
return new Promise(function (resolve) {
251258
resolve({
@@ -282,11 +289,11 @@ describe('Test libNgsildUpdates', function () {
282289
ToTest.__set__('addSyncOnAttribute', addSyncOnAttribute);
283290
const ngsildUpdates = new ToTest(config);
284291
await ngsildUpdates.ngsildUpdates(body);
285-
updatePropertiesCalled.should.equal(true);
292+
batchMergeCalled.should.equal(true);
286293
revert();
287294
});
288-
it('Should post body and not filter out datasetId !== "@none"', async function () {
289-
let updatePropertiesCalled = false;
295+
it('Should post entities and not filter out datasetId !== "@none"', async function () {
296+
let batchMergeCalled = false;
290297
const config = {
291298
ngsildUpdates: {
292299
clientSecretVariable: 'CLIENT_SECRET',
@@ -325,11 +332,9 @@ describe('Test libNgsildUpdates', function () {
325332
};
326333
const Ngsild = function () {
327334
return {
328-
updateProperties: function ({ id, body, isOverwrite }, { headers }) {
329-
updatePropertiesCalled = true;
330-
id.should.equal('id');
331-
assert.deepEqual(body, { id: 'id', type: 'type', attribute: { datasetId: 'https://example.com/source1', value: 'value' } });
332-
isOverwrite.should.equal(false);
335+
batchMerge: function (entities, { headers }) {
336+
batchMergeCalled = true;
337+
assert.deepEqual(entities, body.entities);
333338
assert.deepEqual(headers, expHeaders);
334339
return new Promise(function (resolve) {
335340
resolve({
@@ -366,7 +371,7 @@ describe('Test libNgsildUpdates', function () {
366371
ToTest.__set__('addSyncOnAttribute', addSyncOnAttribute);
367372
const ngsildUpdates = new ToTest(config);
368373
await ngsildUpdates.ngsildUpdates(body);
369-
updatePropertiesCalled.should.equal(true);
374+
batchMergeCalled.should.equal(true);
370375
revert();
371376
});
372377
it('Should post body with correct path and token for nonOverwrite upsert', async function () {
@@ -446,7 +451,7 @@ describe('Test libNgsildUpdates', function () {
446451
revert();
447452
});
448453
it('Should post body with string entity', async function () {
449-
let updatePropertiesCalled = false;
454+
let batchMergeCalled = false;
450455
const config = {
451456
ngsildUpdates: {
452457
clientSecretVariable: 'CLIENT_SECRET',
@@ -481,11 +486,12 @@ describe('Test libNgsildUpdates', function () {
481486
};
482487
const Ngsild = function () {
483488
return {
484-
updateProperties: function ({ id, body, isOverwrite }, { headers }) {
485-
updatePropertiesCalled = true;
486-
id.should.equal('id');
487-
assert.deepEqual(body, { id: 'id', type: 'type' });
488-
isOverwrite.should.equal(false);
489+
batchMerge: function (entities, { headers }) {
490+
batchMergeCalled = true;
491+
entities.forEach(entity => {
492+
assert.deepEqual(entity.id, 'id');
493+
assert.deepEqual(entity, { id: 'id', type: 'type' });
494+
});
489495
assert.deepEqual(headers, expHeaders);
490496
return new Promise(function (resolve) {
491497
resolve({
@@ -524,7 +530,7 @@ describe('Test libNgsildUpdates', function () {
524530
const ngsildUpdates = new ToTest(config);
525531
body.entities = JSON.stringify(body.entities);
526532
await ngsildUpdates.ngsildUpdates(body);
527-
updatePropertiesCalled.should.equal(true);
533+
batchMergeCalled.should.equal(true);
528534
revert();
529535
});
530536
});

0 commit comments

Comments
 (0)