@@ -72,10 +72,8 @@ public void onConnected(SessionConnectedEvent event) {
7272 public void onSubscribe (SessionSubscribeEvent event ) {
7373 // First let's extract all the necessary info about new watching from the subscribe request
7474 StompHeaderAccessor headers = StompHeaderAccessor .wrap (event .getMessage ());
75- List <String > rawHeaderValue = headers .getNativeHeader ("isPlain" );
76- assert (rawHeaderValue != null ) && (rawHeaderValue .size () == 1 )
77- : "'isPlain' header of SUBSCRIBE command is absent or malformed" ;
78- Boolean isPlain = Boolean .valueOf (rawHeaderValue .get (0 ));
75+ boolean isPlain = getBooleanNativeHeader (headers , "isPlain" );
76+ boolean isTailNeeded = getBooleanNativeHeader (headers , "isTailNeeded" );
7977 String sessionId = headers .getSessionId ();
8078 String destination = headers .getDestination ();
8179 assert (destination != null ) && destination .startsWith (WEBSOCKET_TOPIC_PREFIX )
@@ -98,7 +96,7 @@ public void onSubscribe(SessionSubscribeEvent event) {
9896 // 1. Ensure that all RMI registration channels are created
9997 ensureRegistrationChannelsCreated (logConfig );
10098 // 2. Register the tracking on specified nodes
101- switchTracking (logConfig , true );
99+ startTrackingOnServer (logConfig , isTailNeeded );
102100 // 3. Remember the tracking in the registry
103101 registry .addEntry (logConfig , sessionId );
104102 log .info ("New tracking for log '{}' has started within session id={}." , logConfig .getUid (), sessionId );
@@ -108,6 +106,9 @@ public void onSubscribe(SessionSubscribeEvent event) {
108106 ServerFailure failure = new ServerFailure (e .getMessage (), now ());
109107 messagingTemplate .convertAndSend (WEBSOCKET_TOPIC_PREFIX + logConfig .getUid (),
110108 failure , singletonMap (MESSAGE_TYPE_HEADER , MessageType .FAILURE ));
109+ // TODO Научиться по аналогии с этим отправлять сообщение об успешной настройке подписки, для чего превратить
110+ // serverFailure в более общий тип сообщения. При получении этого типа убирать на клиенте прелоадер,
111+ // выставленный перед отправкой запроса на подписку.
111112 }
112113
113114 } else { // i.e. if there are watching sessions already in registry
@@ -141,11 +142,11 @@ private void stopTrackingIfNeeded(Message<byte[]> message, boolean isUnsubscribi
141142 // check if there is no such session(s)
142143 if (fellows == null ) {
143144 if (isUnsubscribing ) { // in case of unsubscribing it is incorrect situation
144- throw new IllegalStateException ("No registered session(s) found for sessionId=" + sessionId );
145+ log . warn ("No registered session(s) found for sessionId={}" , sessionId );
145146 } else { // but in case of disconnecting it is quite right
146147 log .info ("No registered session found for sessionId={}." , sessionId );
147- return ;
148148 }
149+ return ;
149150 }
150151 // check if there any other session(s) left (may require synchronization on registry object)
151152 if (fellows .size () > 1 ) {
@@ -157,13 +158,30 @@ private void stopTrackingIfNeeded(Message<byte[]> message, boolean isUnsubscribi
157158 // in case it was the latest session watching that log we should stop the tracking
158159 LogConfigEntry watchingLog = registry .findLogConfigEntryBy (sessionId );
159160 log .debug ("No sessions left watching log '{}'. Will deactivate the tracking..." , watchingLog .getUid ());
160- switchTracking (watchingLog , false );
161+ stopTrackingOnServer (watchingLog );
161162 // now that the log is not tracked anymore we need to remove it from the registry
162163 registry .removeEntry (watchingLog );
163164 log .info ("Current node has unregistered itself from tracking log '{}' as there is no watching sessions anymore." ,
164165 watchingLog .getUid ());
165166 }
166167
168+ private boolean getBooleanNativeHeader (StompHeaderAccessor headers , String name ) {
169+ List <String > rawHeaderValue = headers .getNativeHeader (name );
170+ assert (rawHeaderValue != null ) && (rawHeaderValue .size () == 1 )
171+ : format ("'%s' header of SUBSCRIBE command is absent or malformed" , name );
172+ return Boolean .valueOf (rawHeaderValue .get (0 ));
173+ }
174+
175+ /**
176+ * When watching request comes for a plain log AnaLog does not tries to find corresponding log config entry.
177+ * Instead it just creates new ('artificial') entry and then works with it only. This approach allows AnaLog to
178+ * watch arbitrary plain logs independently of its configuration. Particularly, it means that a user can set any
179+ * path into AnaLog's address line and start to watch it the same way as if it was pre-configured as a choice
180+ * variant in AnaLog configuration.
181+ *
182+ * @param path full path of log file to watch for
183+ * @return newly created log config entry for the specified path
184+ */
167185 @ NotNull
168186 private LogConfigEntry createPlainLogConfigEntry (String path ) {
169187 LogConfigEntry artificialEntry = new LogConfigEntry ();
@@ -206,13 +224,22 @@ private void ensureRegistrationChannelsCreated(LogConfigEntry matchingEntry) {
206224 });
207225 }
208226
209- private void switchTracking (LogConfigEntry logConfigEntry , boolean isOn ) {
227+ private void stopTrackingOnServer (LogConfigEntry logConfigEntry ) {
228+ switchTrackingOnServer (logConfigEntry , false , false );
229+ }
230+
231+ private void startTrackingOnServer (LogConfigEntry logConfigEntry , boolean isTailNeeded ) {
232+ switchTrackingOnServer (logConfigEntry , true , isTailNeeded );
233+ }
234+
235+ private void switchTrackingOnServer (LogConfigEntry logConfigEntry , boolean isOn , boolean isTailNeeded ) {
236+ assert !(isTailNeeded && !isOn ) : "isTailNeeded flag should not be raised when switching the tracking off" ;
210237 ClusterNode myselfNode = clusterProperties .getMyselfNode ();
211238 String fullPath = buildFullPath (logConfigEntry );
212239 String nodeName = nvls (logConfigEntry .getNode (), myselfNode .getName ());
213240
214241 // send registration request for the main entry
215- TrackingRequest primaryRequest = new TrackingRequest (fullPath , logConfigEntry .getTimestamp (), nodeName , logConfigEntry .getUid ());
242+ TrackingRequest primaryRequest = new TrackingRequest (fullPath , logConfigEntry .getTimestamp (), nodeName , logConfigEntry .getUid (), isTailNeeded );
216243 log .debug ("Switching {} the registration by PRIMARY request: {}" , isOn ? "ON" :"OFF" , primaryRequest );
217244 remoteGateway .switchRegistration (primaryRequest , isOn );
218245
@@ -224,7 +251,8 @@ private void switchTracking(LogConfigEntry logConfigEntry, boolean isOn) {
224251 normalizePath (included .getPath ()),
225252 included .getTimestamp (),
226253 nvls (included .getNode (), myselfNode .getName ()),
227- logConfigEntry .getUid ());
254+ logConfigEntry .getUid (),
255+ isTailNeeded );
228256 log .debug ("Switching {} the registration by INCLUDED request: {}" , isOn ? "ON" :"OFF" , includedRequest );
229257 remoteGateway .switchRegistration (includedRequest , isOn );
230258
0 commit comments