@@ -216,7 +216,20 @@ class PollingExchanger() {
216216}
217217
218218object PollingExchanger extends Logging {
219- val pollingExchangerQueue = new LinkedBlockingQueue [PollingExchanger ]()
219+ val pollingExchangerQueueMap = new java.util.concurrent.ConcurrentHashMap [String , LinkedBlockingQueue [PollingExchanger ]]()
220+
221+ def getPollingExchangerQueue (partyId : String ): LinkedBlockingQueue [PollingExchanger ] = {
222+ if (! pollingExchangerQueueMap.containsKey(partyId)) {
223+ this .synchronized {
224+ if (! pollingExchangerQueueMap.containsKey(partyId)) {
225+ val pollingExchangerQueue = new LinkedBlockingQueue [PollingExchanger ]()
226+ pollingExchangerQueueMap.put(partyId, pollingExchangerQueue)
227+ logDebug(s " created pollingExchangerQueue for party= ${partyId}" )
228+ }
229+ }
230+ }
231+ pollingExchangerQueueMap.get(partyId)
232+ }
220233
221234 def offer (data : Proxy .PollingFrame , q : SynchronousQueue [Proxy .PollingFrame ], logPrefix : String , rsHeader : ErRollSiteHeader = null , metadataString : String = null ): Boolean = {
222235 var done = false
@@ -336,17 +349,18 @@ class DispatchPollingReqSO(eggSiteServicerPollingRespSO: ServerCallStreamObserve
336349 pollingExchanger = new PollingExchanger ()
337350 var done = false
338351 var i = 0
352+ val partyId = req.getMetadata.getDst.getPartyId
339353 val exchangerDataOpTimeout = System .currentTimeMillis() + RollSiteConfKeys .EGGROLL_ROLLSITE_POLLING_EXCHANGER_DATA_OP_TIMEOUT_SEC .get().toLong * 1000
340354 while (! done && System .currentTimeMillis() < exchangerDataOpTimeout) {
341- done = PollingExchanger .pollingExchangerQueue .offer(pollingExchanger,
355+ done = PollingExchanger .getPollingExchangerQueue(partyId) .offer(pollingExchanger,
342356 RollSiteConfKeys .EGGROLL_ROLLSITE_POLLING_Q_OFFER_INTERVAL_SEC .get().toLong, TimeUnit .SECONDS )
343- logTrace(s " DispatchPollingReqSO.ensureInited calling, getting from pollingExchangerQueue. i= ${i}" )
357+ logTrace(s " DispatchPollingReqSO.ensureInited calling, getting from pollingExchangerQueue. partyId= ${partyId} , i= ${i}" )
344358 i += 1
345359 }
346360
347361 if (! done) {
348- onError(new TimeoutException (" timeout when offering pollingExchanger to queue" ))
349- PollingExchanger .pollingExchangerQueue .remove(pollingExchanger)
362+ onError(new TimeoutException (s " timeout when offering pollingExchanger to queue, partyId= ${partyId} " ))
363+ PollingExchanger .getPollingExchangerQueue(partyId) .remove(pollingExchanger)
350364 return
351365 }
352366 // synchronise point for incoming push / unary_call request
@@ -369,8 +383,8 @@ class DispatchPollingReqSO(eggSiteServicerPollingRespSO: ServerCallStreamObserve
369383 case PollingMethods .MOCK =>
370384 delegateSO = new MockPollingReqSO (eggSiteServicerPollingRespSO)
371385 case null =>
372- PollingExchanger .pollingExchangerQueue .remove(pollingExchanger)
373- throw new CancellationException (" timeout in waiting polling method" )
386+ PollingExchanger .getPollingExchangerQueue(partyId) .remove(pollingExchanger)
387+ throw new CancellationException (s " timeout in waiting polling method, partyId= ${partyId} " )
374388 case _ =>
375389 val e = new NotImplementedError (s " method ${method} not supported " )
376390 logError(e)
0 commit comments