@@ -30,6 +30,10 @@ define(['ably', 'shared_helper', 'chai', 'liveobjects', 'liveobjects_helper'], f
3030 return helper . AblyRealtime ( { ...options , plugins : { LiveObjects : LiveObjectsPlugin } } ) ;
3131 }
3232
33+ function RestWithLiveObjects ( helper , options ) {
34+ return helper . AblyRest ( { ...options , plugins : { LiveObjects : LiveObjectsPlugin } } ) ;
35+ }
36+
3337 function channelOptionsWithObjectModes ( options ) {
3438 return {
3539 ...options ,
@@ -894,82 +898,79 @@ define(['ably', 'shared_helper', 'chai', 'liveobjects', 'liveobjects_helper'], f
894898 } ,
895899
896900 {
901+ allTransportsAndProtocols : true ,
897902 description : 'partial OBJECT_SYNC merges map entries across multiple messages for the same objectId' ,
898903 action : async ( ctx ) => {
899- const { channel , objectsHelper , entryPathObject } = ctx ;
904+ const { helper , client , clientOptions , channelName , entryInstance , restChannel } = ctx ;
900905
901- const mapId = objectsHelper . fakeMapObjectId ( ) ;
906+ helper . recordPrivateApi ( 'read.realtime.options.maxMessageSize' ) ;
907+ const maxMessageSize = client . options . maxMessageSize ;
902908
903- // assign map object to root
904- await objectsHelper . processObjectStateMessageOnChannel ( {
905- channel,
906- syncSerial : 'serial:cursor1' ,
907- state : [
908- objectsHelper . mapObject ( {
909- objectId : 'root' ,
910- siteTimeserials : { aaa : lexicoTimeserial ( 'aaa' , 0 , 0 ) } ,
911- initialEntries : {
912- map : { timeserial : lexicoTimeserial ( 'aaa' , 0 , 0 ) , data : { objectId : mapId } } ,
913- } ,
914- } ) ,
915- ] ,
916- } ) ;
909+ // create 3 keys on root, each with a string value nearly maxMessageSize in length.
910+ // this ensures the server must split the sync across multiple OBJECT_SYNC messages
911+ // since each key alone nearly fills a single message.
912+ const keyNames = [ 'largeKey1' , 'largeKey2' , 'largeKey3' ] ;
913+ // map set op size = key length + data string length, so this reaches exactly maxMessageSize
914+ const largeValue = 'x' . repeat ( maxMessageSize - keyNames [ 0 ] . length ) ;
917915
918- // send partial sync messages for the same map object, each with different materialised entries.
919- // initialEntries are identical across all partial messages for the same object - a server guarantee.
920- const partialMessages = [
921- {
922- syncSerial : 'serial:cursor2' ,
923- materialisedEntries : {
924- key1 : { timeserial : lexicoTimeserial ( 'aaa' , 0 , 0 ) , data : { number : 1 } } ,
925- key2 : { timeserial : lexicoTimeserial ( 'aaa' , 0 , 0 ) , data : { string : 'two' } } ,
926- } ,
927- } ,
928- {
929- syncSerial : 'serial:cursor3' ,
930- materialisedEntries : {
931- key3 : { timeserial : lexicoTimeserial ( 'aaa' , 0 , 0 ) , data : { number : 3 } } ,
932- key4 : { timeserial : lexicoTimeserial ( 'aaa' , 0 , 0 ) , data : { boolean : true } } ,
933- } ,
934- } ,
935- {
936- syncSerial : 'serial:' , // end sync sequence
937- materialisedEntries : {
938- key5 : { timeserial : lexicoTimeserial ( 'aaa' , 0 , 0 ) , data : { string : 'five' } } ,
939- } ,
940- } ,
941- ] ;
942-
943- for ( const partial of partialMessages ) {
944- await objectsHelper . processObjectStateMessageOnChannel ( {
945- channel,
946- syncSerial : partial . syncSerial ,
947- state : [
948- objectsHelper . mapObject ( {
949- objectId : mapId ,
950- siteTimeserials : { aaa : lexicoTimeserial ( 'aaa' , 0 , 0 ) } ,
951- initialEntries : {
952- initialKey : { timeserial : lexicoTimeserial ( 'aaa' , 0 , 0 ) , data : { string : 'initial' } } ,
953- } ,
954- materialisedEntries : partial . materialisedEntries ,
955- } ) ,
956- ] ,
916+ // set keys via the REST client to guarantee they are persisted on the server before the second client syncs.
917+ // wait for each key update on the realtime client to confirm the server has processed the operation.
918+ const keysUpdatedPromise = Promise . all ( keyNames . map ( ( key ) => waitForMapKeyUpdate ( entryInstance , key ) ) ) ;
919+ for ( const key of keyNames ) {
920+ await restChannel . object . publish ( {
921+ objectId : 'root' ,
922+ mapSet : { key, value : { string : largeValue } } ,
957923 } ) ;
958924 }
925+ await keysUpdatedPromise ;
959926
960- const map = entryPathObject . get ( 'map' ) ;
927+ // use a separate client to sync and verify the values, so we can be confident
928+ // the values came from the partial sync and aren't leftover pre-existing state
929+ const client2 = RealtimeWithLiveObjects ( helper , clientOptions ) ;
961930
962- expect ( map . get ( 'initialKey' ) . value ( ) ) . to . equal (
963- 'initial' ,
964- 'Check keys from the create operation are present' ,
965- ) ;
931+ await helper . monitorConnectionThenCloseAndFinishAsync ( async ( ) => {
932+ await client2 . connection . whenState ( 'connected' ) ;
933+ helper . recordPrivateApi ( 'read.realtime.options.maxMessageSize' ) ;
934+ expect ( client2 . options . maxMessageSize ) . to . equal (
935+ maxMessageSize ,
936+ 'Check second client has the same maxMessageSize' ,
937+ ) ;
938+
939+ // count OBJECT_SYNC messages received during sync
940+ helper . recordPrivateApi ( 'call.connectionManager.activeProtocol.getTransport' ) ;
941+ const transport = client2 . connection . connectionManager . activeProtocol . getTransport ( ) ;
942+ const onProtocolMessageOriginal = transport . onProtocolMessage ;
943+ let objectSyncMessageCount = 0 ;
966944
967- // check that materialised entries from all partial messages were merged
968- expect ( map . get ( 'key1' ) . value ( ) ) . to . equal ( 1 , 'Check key1 from first partial sync' ) ;
969- expect ( map . get ( 'key2' ) . value ( ) ) . to . equal ( 'two' , 'Check key2 from first partial sync' ) ;
970- expect ( map . get ( 'key3' ) . value ( ) ) . to . equal ( 3 , 'Check key3 from second partial sync' ) ;
971- expect ( map . get ( 'key4' ) . value ( ) ) . to . equal ( true , 'Check key4 from second partial sync' ) ;
972- expect ( map . get ( 'key5' ) . value ( ) ) . to . equal ( 'five' , 'Check key5 from third partial sync' ) ;
945+ helper . recordPrivateApi ( 'replace.transport.onProtocolMessage' ) ;
946+ transport . onProtocolMessage = function ( message ) {
947+ if ( message . action === 20 ) {
948+ objectSyncMessageCount ++ ;
949+ }
950+ helper . recordPrivateApi ( 'call.transport.onProtocolMessage' ) ;
951+ onProtocolMessageOriginal . call ( transport , message ) ;
952+ } ;
953+
954+ const channel2 = client2 . channels . get ( channelName , channelOptionsWithObjectModes ( ) ) ;
955+ const syncedPromise = new Promise ( ( resolve ) => channel2 . object . on ( 'synced' , resolve ) ) ;
956+ await channel2 . attach ( ) ;
957+ await syncedPromise ;
958+
959+ // verify the server sent multiple OBJECT_SYNC messages due to the partial sync, one for each large key
960+ expect ( objectSyncMessageCount ) . to . be . at . least (
961+ 3 ,
962+ 'Check that multiple OBJECT_SYNC messages were received (partial sync)' ,
963+ ) ;
964+
965+ // verify all keys were synced correctly on the new client
966+ const entryPathObject2 = await channel2 . object . get ( ) ;
967+ for ( const key of keyNames ) {
968+ expect ( entryPathObject2 . get ( key ) . value ( ) ) . to . equal (
969+ largeValue ,
970+ `Check ${ key } was synced correctly after partial OBJECT_SYNC` ,
971+ ) ;
972+ }
973+ } , client2 ) ;
973974 } ,
974975 } ,
975976
@@ -7738,9 +7739,11 @@ define(['ably', 'shared_helper', 'chai', 'liveobjects', 'liveobjects_helper'], f
77387739 async function ( helper , scenario , clientOptions , channelName ) {
77397740 const objectsHelper = new LiveObjectsHelper ( helper ) ;
77407741 const client = RealtimeWithLiveObjects ( helper , clientOptions ) ;
7742+ const restClient = RestWithLiveObjects ( helper , clientOptions ) ;
77417743
77427744 await helper . monitorConnectionThenCloseAndFinishAsync ( async ( ) => {
77437745 const channel = client . channels . get ( channelName , channelOptionsWithObjectModes ( ) ) ;
7746+ const restChannel = restClient . channels . get ( channelName ) ;
77447747 const realtimeObject = channel . object ;
77457748
77467749 await channel . attach ( ) ;
@@ -7754,6 +7757,7 @@ define(['ably', 'shared_helper', 'chai', 'liveobjects', 'liveobjects_helper'], f
77547757 objectsHelper,
77557758 channelName,
77567759 channel,
7760+ restChannel,
77577761 client,
77587762 helper,
77597763 clientOptions,
0 commit comments