@@ -896,80 +896,63 @@ define(['ably', 'shared_helper', 'chai', 'liveobjects', 'liveobjects_helper'], f
896896 {
897897 description : 'partial OBJECT_SYNC merges map entries across multiple messages for the same objectId' ,
898898 action : async ( ctx ) => {
899- const { channel, objectsHelper , entryPathObject } = ctx ;
899+ const { helper , channel, client , entryInstance , realtimeObject } = ctx ;
900900
901- const mapId = objectsHelper . fakeMapObjectId ( ) ;
901+ helper . recordPrivateApi ( 'read.realtime.options.maxMessageSize' ) ;
902+ const maxMessageSize = client . options . maxMessageSize ;
902903
903- // assign map object to root
904- await objectsHelper . processObjectStateMessageOnChannel ( {
905- channel,
906- syncSerial : 'serial:cursor1' ,
907- state : [
908- objectsHelper . mapObject ( {
909- objectId : 'root' ,
910- siteTimeserials : { aaa : lexicoTimeserial ( 'aaa' , 0 , 0 ) } ,
911- initialEntries : {
912- map : { timeserial : lexicoTimeserial ( 'aaa' , 0 , 0 ) , data : { objectId : mapId } } ,
913- } ,
914- } ) ,
915- ] ,
916- } ) ;
904+ // create 3 keys on root, each with a string value nearly maxMessageSize in length.
905+ // this ensures the server must split the sync across multiple OBJECT_SYNC messages
906+ // since each key alone nearly fills a single message.
907+ const keyNames = [ 'largeKey1' , 'largeKey2' , 'largeKey3' ] ;
908+ // map set op size = key length + data string length, so this reaches exactly maxMessageSize
909+ const largeValue = 'x' . repeat ( maxMessageSize - keyNames [ 0 ] . length ) ;
917910
918- // send partial sync messages for the same map object, each with different materialised entries.
919- // initialEntries are identical across all partial messages for the same object - a server guarantee.
920- const partialMessages = [
921- {
922- syncSerial : 'serial:cursor2' ,
923- materialisedEntries : {
924- key1 : { timeserial : lexicoTimeserial ( 'aaa' , 0 , 0 ) , data : { number : 1 } } ,
925- key2 : { timeserial : lexicoTimeserial ( 'aaa' , 0 , 0 ) , data : { string : 'two' } } ,
926- } ,
927- } ,
928- {
929- syncSerial : 'serial:cursor3' ,
930- materialisedEntries : {
931- key3 : { timeserial : lexicoTimeserial ( 'aaa' , 0 , 0 ) , data : { number : 3 } } ,
932- key4 : { timeserial : lexicoTimeserial ( 'aaa' , 0 , 0 ) , data : { boolean : true } } ,
933- } ,
934- } ,
935- {
936- syncSerial : 'serial:' , // end sync sequence
937- materialisedEntries : {
938- key5 : { timeserial : lexicoTimeserial ( 'aaa' , 0 , 0 ) , data : { string : 'five' } } ,
939- } ,
940- } ,
941- ] ;
911+ for ( const key of keyNames ) {
912+ await entryInstance . set ( key , largeValue ) ;
913+ }
942914
943- for ( const partial of partialMessages ) {
944- await objectsHelper . processObjectStateMessageOnChannel ( {
945- channel,
946- syncSerial : partial . syncSerial ,
947- state : [
948- objectsHelper . mapObject ( {
949- objectId : mapId ,
950- siteTimeserials : { aaa : lexicoTimeserial ( 'aaa' , 0 , 0 ) } ,
951- initialEntries : {
952- initialKey : { timeserial : lexicoTimeserial ( 'aaa' , 0 , 0 ) , data : { string : 'initial' } } ,
953- } ,
954- materialisedEntries : partial . materialisedEntries ,
955- } ) ,
956- ] ,
957- } ) ;
915+ // verify keys were set before detach
916+ for ( const key of keyNames ) {
917+ expect ( entryInstance . get ( key ) . value ( ) ) . to . equal ( largeValue , `Check ${ key } was set before detach` ) ;
958918 }
959919
960- const map = entryPathObject . get ( 'map' ) ;
920+ await channel . detach ( ) ;
921+
922+ // intercept transport to count OBJECT_SYNC messages received after reattach
923+ let objectSyncMessageCount = 0 ;
924+ helper . recordPrivateApi ( 'call.connectionManager.activeProtocol.getTransport' ) ;
925+ const transport = client . connection . connectionManager . activeProtocol . getTransport ( ) ;
926+ const onProtocolMessageOriginal = transport . onProtocolMessage ;
927+
928+ helper . recordPrivateApi ( 'replace.transport.onProtocolMessage' ) ;
929+ transport . onProtocolMessage = function ( message ) {
930+ if ( message . action === 20 ) {
931+ objectSyncMessageCount ++ ;
932+ }
933+ helper . recordPrivateApi ( 'call.transport.onProtocolMessage' ) ;
934+ onProtocolMessageOriginal . call ( transport , message ) ;
935+ } ;
961936
962- expect ( map . get ( 'initialKey' ) . value ( ) ) . to . equal (
963- 'initial' ,
964- 'Check keys from the create operation are present' ,
937+ // reattach and wait for the new sync sequence to complete
938+ const syncedPromise = new Promise ( ( resolve ) => realtimeObject . on ( 'synced' , resolve ) ) ;
939+ await channel . attach ( ) ;
940+ await syncedPromise ;
941+
942+ // verify the server sent multiple OBJECT_SYNC messages due to the partial sync, one for each large key
943+ expect ( objectSyncMessageCount ) . to . be . at . least (
944+ 3 ,
945+ 'Check that multiple OBJECT_SYNC messages were received (partial sync)' ,
965946 ) ;
966947
967- // check that materialised entries from all partial messages were merged
968- expect ( map . get ( 'key1' ) . value ( ) ) . to . equal ( 1 , 'Check key1 from first partial sync' ) ;
969- expect ( map . get ( 'key2' ) . value ( ) ) . to . equal ( 'two' , 'Check key2 from first partial sync' ) ;
970- expect ( map . get ( 'key3' ) . value ( ) ) . to . equal ( 3 , 'Check key3 from second partial sync' ) ;
971- expect ( map . get ( 'key4' ) . value ( ) ) . to . equal ( true , 'Check key4 from second partial sync' ) ;
972- expect ( map . get ( 'key5' ) . value ( ) ) . to . equal ( 'five' , 'Check key5 from third partial sync' ) ;
948+ // verify all keys were synced correctly
949+ const newEntryPathObject = await channel . object . get ( ) ;
950+ for ( const key of keyNames ) {
951+ expect ( newEntryPathObject . get ( key ) . value ( ) ) . to . equal (
952+ largeValue ,
953+ `Check ${ key } was synced correctly after partial OBJECT_SYNC` ,
954+ ) ;
955+ }
973956 } ,
974957 } ,
975958
0 commit comments