@@ -48,6 +48,10 @@ function Agent(backend, stream) {
48
48
// request if the client disconnects ungracefully. This is a
49
49
// map of channel -> id -> request
50
50
this . presenceRequests = Object . create ( null ) ;
51
+ // Keep track of the latest known Doc version, so that we can avoid fetching
52
+ // ops to transform presence if not needed
53
+ this . latestDocVersionStreams = Object . create ( null ) ;
54
+ this . latestDocVersions = Object . create ( null ) ;
51
55
52
56
// We need to track this manually to make sure we don't reply to messages
53
57
// after the stream was closed.
@@ -108,24 +112,21 @@ Agent.prototype._cleanup = function() {
108
112
emitter . destroy ( ) ;
109
113
}
110
114
this . subscribedQueries = Object . create ( null ) ;
115
+
116
+ for ( var collection in this . latestDocVersionStreams ) {
117
+ var streams = this . latestDocVersionStreams [ collection ] ;
118
+ for ( var id in streams ) streams [ id ] . destroy ( ) ;
119
+ }
120
+ this . latestDocVersionStreams = Object . create ( null ) ;
111
121
} ;
112
122
113
123
/**
114
124
* Passes operation data received on stream to the agent stream via
115
125
* _sendOp()
116
126
*/
117
127
Agent . prototype . _subscribeToStream = function ( collection , id , stream ) {
118
- if ( this . closed ) return stream . destroy ( ) ;
119
-
120
- var streams = this . subscribedDocs [ collection ] || ( this . subscribedDocs [ collection ] = Object . create ( null ) ) ;
121
-
122
- // If already subscribed to this document, destroy the previously subscribed stream
123
- var previous = streams [ id ] ;
124
- if ( previous ) previous . destroy ( ) ;
125
- streams [ id ] = stream ;
126
-
127
128
var agent = this ;
128
- stream . on ( 'data' , function ( data ) {
129
+ this . _subscribeMapToStream ( this . subscribedDocs , collection , id , stream , function ( data ) {
129
130
if ( data . error ) {
130
131
// Log then silently ignore errors in a subscription stream, since these
131
132
// may not be the client's fault, and they were not the result of a
@@ -135,13 +136,26 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) {
135
136
}
136
137
agent . _onOp ( collection , id , data ) ;
137
138
} ) ;
139
+ } ;
140
+
141
+ Agent . prototype . _subscribeMapToStream = function ( map , collection , id , stream , dataHandler ) {
142
+ if ( this . closed ) return stream . destroy ( ) ;
143
+
144
+ var streams = map [ collection ] || ( map [ collection ] = Object . create ( null ) ) ;
145
+
146
+ // If already subscribed to this document, destroy the previously subscribed stream
147
+ var previous = streams [ id ] ;
148
+ if ( previous ) previous . destroy ( ) ;
149
+ streams [ id ] = stream ;
150
+
151
+ stream . on ( 'data' , dataHandler ) ;
138
152
stream . on ( 'end' , function ( ) {
139
153
// The op stream is done sending, so release its reference
140
- var streams = agent . subscribedDocs [ collection ] ;
154
+ var streams = map [ collection ] ;
141
155
if ( ! streams || streams [ id ] !== stream ) return ;
142
156
delete streams [ id ] ;
143
157
if ( util . hasKeys ( streams ) ) return ;
144
- delete agent . subscribedDocs [ collection ] ;
158
+ delete map [ collection ] ;
145
159
} ) ;
146
160
} ;
147
161
@@ -794,25 +808,83 @@ Agent.prototype._broadcastPresence = function(presence, callback) {
794
808
collection : presence . c
795
809
} ;
796
810
var start = Date . now ( ) ;
797
- backend . trigger ( backend . MIDDLEWARE_ACTIONS . receivePresence , this , context , function ( error ) {
811
+
812
+ var subscriptionUpdater = presence . p === null ?
813
+ this . _unsubscribeDocVersion . bind ( this ) :
814
+ this . _subscribeDocVersion . bind ( this ) ;
815
+
816
+ subscriptionUpdater ( presence . c , presence . d , function ( error ) {
798
817
if ( error ) return callback ( error ) ;
799
- var requests = presenceRequests [ presence . ch ] || ( presenceRequests [ presence . ch ] = Object . create ( null ) ) ;
800
- var previousRequest = requests [ presence . id ] ;
801
- if ( ! previousRequest || previousRequest . pv < presence . pv ) {
802
- presenceRequests [ presence . ch ] [ presence . id ] = presence ;
803
- }
804
- backend . transformPresenceToLatestVersion ( agent , presence , function ( error , presence ) {
818
+ backend . trigger ( backend . MIDDLEWARE_ACTIONS . receivePresence , agent , context , function ( error ) {
805
819
if ( error ) return callback ( error ) ;
806
- var channel = agent . _getPresenceChannel ( presence . ch ) ;
807
- agent . backend . pubsub . publish ( [ channel ] , presence , function ( error ) {
808
- if ( error ) return callback ( error ) ;
809
- backend . emit ( 'timing' , 'presence.broadcast' , Date . now ( ) - start , context ) ;
820
+ var requests = presenceRequests [ presence . ch ] || ( presenceRequests [ presence . ch ] = Object . create ( null ) ) ;
821
+ var previousRequest = requests [ presence . id ] ;
822
+ if ( ! previousRequest || previousRequest . pv < presence . pv ) {
823
+ presenceRequests [ presence . ch ] [ presence . id ] = presence ;
824
+ }
825
+
826
+ var transformer = function ( agent , presence , callback ) {
810
827
callback ( null , presence ) ;
828
+ } ;
829
+
830
+ var latestDocVersion = util . dig ( agent . latestDocVersions , presence . c , presence . d ) ;
831
+ var presenceIsUpToDate = presence . v === latestDocVersion ;
832
+ if ( ! presenceIsUpToDate ) {
833
+ // null presence can't be transformed, so skip the database call and just
834
+ // set the version to the latest known Doc version
835
+ if ( presence . p === null ) {
836
+ transformer = function ( agent , presence , callback ) {
837
+ presence . v = latestDocVersion ;
838
+ callback ( null , presence ) ;
839
+ } ;
840
+ } else {
841
+ transformer = backend . transformPresenceToLatestVersion . bind ( backend ) ;
842
+ }
843
+ }
844
+
845
+ transformer ( agent , presence , function ( error , presence ) {
846
+ if ( error ) return callback ( error ) ;
847
+ var channel = agent . _getPresenceChannel ( presence . ch ) ;
848
+ agent . backend . pubsub . publish ( [ channel ] , presence , function ( error ) {
849
+ if ( error ) return callback ( error ) ;
850
+ backend . emit ( 'timing' , 'presence.broadcast' , Date . now ( ) - start , context ) ;
851
+ callback ( null , presence ) ;
852
+ } ) ;
811
853
} ) ;
812
854
} ) ;
813
855
} ) ;
814
856
} ;
815
857
858
+ Agent . prototype . _subscribeDocVersion = function ( collection , id , callback ) {
859
+ if ( ! collection || ! id ) return callback ( ) ;
860
+
861
+ var latestDocVersions = this . latestDocVersions ;
862
+ var isSubscribed = util . dig ( latestDocVersions , collection , id ) !== undefined ;
863
+ if ( isSubscribed ) return callback ( ) ;
864
+
865
+ var agent = this ;
866
+ this . backend . subscribe ( this , collection , id , null , function ( error , stream , snapshot ) {
867
+ if ( error ) return callback ( error ) ;
868
+
869
+ util . digOrCreate ( latestDocVersions , collection , id , function ( ) {
870
+ return snapshot . v ;
871
+ } ) ;
872
+
873
+ agent . _subscribeMapToStream ( agent . latestDocVersionStreams , collection , id , stream , function ( op ) {
874
+ // op.v behind snapshot.v by 1
875
+ latestDocVersions [ collection ] [ id ] = op . v + 1 ;
876
+ } ) ;
877
+
878
+ callback ( ) ;
879
+ } ) ;
880
+ } ;
881
+
882
+ Agent . prototype . _unsubscribeDocVersion = function ( collection , id , callback ) {
883
+ var stream = util . dig ( this . latestDocVersionStreams , collection , id ) ;
884
+ if ( stream ) stream . destroy ( ) ;
885
+ util . nextTick ( callback ) ;
886
+ } ;
887
+
816
888
Agent . prototype . _createPresence = function ( request ) {
817
889
return {
818
890
a : ACTIONS . presence ,
0 commit comments