Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 70 additions & 66 deletions test/realtime/liveobjects.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ define(['ably', 'shared_helper', 'chai', 'liveobjects', 'liveobjects_helper'], f
return helper.AblyRealtime({ ...options, plugins: { LiveObjects: LiveObjectsPlugin } });
}

function RestWithLiveObjects(helper, options) {
return helper.AblyRest({ ...options, plugins: { LiveObjects: LiveObjectsPlugin } });
}

function channelOptionsWithObjectModes(options) {
return {
...options,
Expand Down Expand Up @@ -894,82 +898,79 @@ define(['ably', 'shared_helper', 'chai', 'liveobjects', 'liveobjects_helper'], f
},

{
allTransportsAndProtocols: true,
description: 'partial OBJECT_SYNC merges map entries across multiple messages for the same objectId',
action: async (ctx) => {
const { channel, objectsHelper, entryPathObject } = ctx;
const { helper, client, clientOptions, channelName, entryInstance, restChannel } = ctx;

const mapId = objectsHelper.fakeMapObjectId();
helper.recordPrivateApi('read.realtime.options.maxMessageSize');
const maxMessageSize = client.options.maxMessageSize;

// assign map object to root
await objectsHelper.processObjectStateMessageOnChannel({
channel,
syncSerial: 'serial:cursor1',
state: [
objectsHelper.mapObject({
objectId: 'root',
siteTimeserials: { aaa: lexicoTimeserial('aaa', 0, 0) },
initialEntries: {
map: { timeserial: lexicoTimeserial('aaa', 0, 0), data: { objectId: mapId } },
},
}),
],
});
// create 3 keys on root, each with a string value nearly maxMessageSize in length.
// this ensures the server must split the sync across multiple OBJECT_SYNC messages
// since each key alone nearly fills a single message.
const keyNames = ['largeKey1', 'largeKey2', 'largeKey3'];
// map set op size = key length + data string length, so this reaches exactly maxMessageSize
const largeValue = 'x'.repeat(maxMessageSize - keyNames[0].length);

// send partial sync messages for the same map object, each with different materialised entries.
// initialEntries are identical across all partial messages for the same object - a server guarantee.
const partialMessages = [
{
syncSerial: 'serial:cursor2',
materialisedEntries: {
key1: { timeserial: lexicoTimeserial('aaa', 0, 0), data: { number: 1 } },
key2: { timeserial: lexicoTimeserial('aaa', 0, 0), data: { string: 'two' } },
},
},
{
syncSerial: 'serial:cursor3',
materialisedEntries: {
key3: { timeserial: lexicoTimeserial('aaa', 0, 0), data: { number: 3 } },
key4: { timeserial: lexicoTimeserial('aaa', 0, 0), data: { boolean: true } },
},
},
{
syncSerial: 'serial:', // end sync sequence
materialisedEntries: {
key5: { timeserial: lexicoTimeserial('aaa', 0, 0), data: { string: 'five' } },
},
},
];

for (const partial of partialMessages) {
await objectsHelper.processObjectStateMessageOnChannel({
channel,
syncSerial: partial.syncSerial,
state: [
objectsHelper.mapObject({
objectId: mapId,
siteTimeserials: { aaa: lexicoTimeserial('aaa', 0, 0) },
initialEntries: {
initialKey: { timeserial: lexicoTimeserial('aaa', 0, 0), data: { string: 'initial' } },
},
materialisedEntries: partial.materialisedEntries,
}),
],
// set keys via the REST client to guarantee they are persisted on the server before the second client syncs.
// wait for each key update on the realtime client to confirm the server has processed the operation.
const keysUpdatedPromise = Promise.all(keyNames.map((key) => waitForMapKeyUpdate(entryInstance, key)));
for (const key of keyNames) {
await restChannel.object.publish({
objectId: 'root',
mapSet: { key, value: { string: largeValue } },
});
}
await keysUpdatedPromise;

const map = entryPathObject.get('map');
// use a separate client to sync and verify the values, so we can be confident
// the values came from the partial sync and aren't leftover pre-existing state
const client2 = RealtimeWithLiveObjects(helper, clientOptions);

expect(map.get('initialKey').value()).to.equal(
'initial',
'Check keys from the create operation are present',
);
await helper.monitorConnectionThenCloseAndFinishAsync(async () => {
await client2.connection.whenState('connected');
helper.recordPrivateApi('read.realtime.options.maxMessageSize');
expect(client2.options.maxMessageSize).to.equal(
maxMessageSize,
'Check second client has the same maxMessageSize',
);

// count OBJECT_SYNC messages received during sync
helper.recordPrivateApi('call.connectionManager.activeProtocol.getTransport');
const transport = client2.connection.connectionManager.activeProtocol.getTransport();
const onProtocolMessageOriginal = transport.onProtocolMessage;
let objectSyncMessageCount = 0;

// check that materialised entries from all partial messages were merged
expect(map.get('key1').value()).to.equal(1, 'Check key1 from first partial sync');
expect(map.get('key2').value()).to.equal('two', 'Check key2 from first partial sync');
expect(map.get('key3').value()).to.equal(3, 'Check key3 from second partial sync');
expect(map.get('key4').value()).to.equal(true, 'Check key4 from second partial sync');
expect(map.get('key5').value()).to.equal('five', 'Check key5 from third partial sync');
helper.recordPrivateApi('replace.transport.onProtocolMessage');
transport.onProtocolMessage = function (message) {
if (message.action === 20) {
objectSyncMessageCount++;
}
helper.recordPrivateApi('call.transport.onProtocolMessage');
onProtocolMessageOriginal.call(transport, message);
};

const channel2 = client2.channels.get(channelName, channelOptionsWithObjectModes());
const syncedPromise = new Promise((resolve) => channel2.object.on('synced', resolve));
await channel2.attach();
await syncedPromise;

// verify the server sent multiple OBJECT_SYNC messages due to the partial sync, one for each large key
expect(objectSyncMessageCount).to.be.at.least(
3,
'Check that multiple OBJECT_SYNC messages were received (partial sync)',
);

// verify all keys were synced correctly on the new client
const entryPathObject2 = await channel2.object.get();
for (const key of keyNames) {
expect(entryPathObject2.get(key).value()).to.equal(
largeValue,
`Check ${key} was synced correctly after partial OBJECT_SYNC`,
);
}
}, client2);
},
},

Expand Down Expand Up @@ -7738,9 +7739,11 @@ define(['ably', 'shared_helper', 'chai', 'liveobjects', 'liveobjects_helper'], f
async function (helper, scenario, clientOptions, channelName) {
const objectsHelper = new LiveObjectsHelper(helper);
const client = RealtimeWithLiveObjects(helper, clientOptions);
const restClient = RestWithLiveObjects(helper, clientOptions);

await helper.monitorConnectionThenCloseAndFinishAsync(async () => {
const channel = client.channels.get(channelName, channelOptionsWithObjectModes());
const restChannel = restClient.channels.get(channelName);
const realtimeObject = channel.object;

await channel.attach();
Expand All @@ -7754,6 +7757,7 @@ define(['ably', 'shared_helper', 'chai', 'liveobjects', 'liveobjects_helper'], f
objectsHelper,
channelName,
channel,
restChannel,
client,
helper,
clientOptions,
Expand Down
Loading