Skip to content

Commit f2799b8

Browse files
committed
Implement singleflight
1 parent 9390626 commit f2799b8

2 files changed

Lines changed: 24 additions & 16 deletions

File tree

gateway/middleware.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,6 @@ func (t *BaseMiddleware) SetOrgExpiry(orgid string, expiry int64) {
371371
func (t *BaseMiddleware) OrgSessionExpiry(orgid string) int64 {
372372
t.Logger().Debug("Checking: ", orgid)
373373

374-
// Emergency Mode: Return default immediately
375374
if rpc.IsEmergencyMode() {
376375
return DEFAULT_ORG_SESSION_EXPIRATION
377376
}
@@ -382,22 +381,23 @@ func (t *BaseMiddleware) OrgSessionExpiry(orgid string) int64 {
382381
return cachedVal.(int64)
383382
}
384383

385-
// Background Refresh: Start async refresh in background
384+
// Start async refresh in background
386385
go t.refreshOrgSessionExpiry(orgid)
387386

388-
// Return default immediately
389387
return DEFAULT_ORG_SESSION_EXPIRATION
390388
}
391389

392390
func (t *BaseMiddleware) refreshOrgSessionExpiry(orgid string) {
393-
// This RPC call now happens safely in the background
394-
s, found := t.OrgSession(orgid)
395-
if found && t.Spec.GlobalConfig.EnforceOrgDataAge {
396-
t.SetOrgExpiry(orgid, s.DataExpires)
397-
} else {
391+
orgSessionExpiryCache.Do(orgid, func() (interface{}, error) {
392+
s, found := t.OrgSession(orgid)
393+
if found && t.Spec.GlobalConfig.EnforceOrgDataAge {
394+
t.SetOrgExpiry(orgid, s.DataExpires)
395+
return s.DataExpires, nil
396+
}
398397
// On failure or if not found, cache the default value
399398
t.SetOrgExpiry(orgid, DEFAULT_ORG_SESSION_EXPIRATION)
400-
}
399+
return DEFAULT_ORG_SESSION_EXPIRATION, nil
400+
})
401401
}
402402

403403
func (t *BaseMiddleware) UpdateRequestSession(r *http.Request) bool {

gateway/mw_organisation_activity.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"sync"
77
"time"
88

9+
"golang.org/x/sync/singleflight"
10+
911
"github.com/TykTechnologies/tyk/ctx"
1012
"github.com/TykTechnologies/tyk/request"
1113
"github.com/TykTechnologies/tyk/user"
@@ -18,6 +20,7 @@ type orgChanMapMu struct {
1820

1921
var orgChanMap = orgChanMapMu{channels: map[string](chan bool){}}
2022
var orgActiveMap sync.Map
23+
var orgSessionFetchGroup singleflight.Group
2124

2225
// RateLimitAndQuotaCheck will check the incoming request and key whether it is within it's quota and
2326
// within it's rate limit, it makes use of the SessionLimiter object to do this
@@ -101,13 +104,18 @@ func (k *OrganizationMonitor) ProcessRequest(w http.ResponseWriter, r *http.Requ
101104
}
102105

103106
func (k *OrganizationMonitor) refreshOrgSession(orgID string) {
104-
session, found := k.OrgSession(orgID)
105-
if found && !k.Spec.GlobalConfig.LocalSessionCache.DisableCacheSessionState {
106-
k.Gw.SessionCache.Set(orgID, session.Clone(), int64(time.Hour))
107-
k.Logger().Debug("Background org session fetch completed for: ", orgID)
108-
} else if !found {
109-
k.setOrgHasNoSession(true)
110-
}
107+
orgSessionFetchGroup.Do(orgID, func() (interface{}, error) {
108+
session, found := k.OrgSession(orgID)
109+
if found && !k.Spec.GlobalConfig.LocalSessionCache.DisableCacheSessionState {
110+
k.Gw.SessionCache.Set(orgID, session.Clone(), int64(time.Hour))
111+
k.Logger().Debug("Background org session fetch completed for: ", orgID)
112+
return session, nil
113+
}
114+
if !found {
115+
k.setOrgHasNoSession(true)
116+
}
117+
return nil, nil
118+
})
111119
}
112120

113121
// ProcessRequest will run any checks on the request on the way through the system, return an error to have the chain fail

0 commit comments

Comments
 (0)