Skip to content

Commit fea50ac

Browse files
committed
Simplify
1 parent a3a6e68 commit fea50ac

2 files changed

Lines changed: 118 additions & 320 deletions

File tree

gateway/middleware.go

Lines changed: 39 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -39,24 +39,14 @@ import (
3939

4040
const (
4141
DEFAULT_ORG_SESSION_EXPIRATION = int64(604800)
42-
orgSessionSoftExpiry = 10 * time.Minute
43-
orgSessionHardExpiry = 1 * time.Hour
44-
orgSessionMaxStale = 24 * time.Hour
4542
orgSessionFetchTimeout = 2 * time.Second
4643
)
4744

4845
var (
4946
GlobalRate = ratecounter.NewRateCounter(1 * time.Second)
5047
orgSessionExpiryCache singleflight.Group
51-
orgRefreshInProgress sync.Map
5248
)
5349

54-
// orgCacheEntry holds org session with timestamp for stale-while-revalidate
55-
type orgCacheEntry struct {
56-
session user.SessionState
57-
cachedAt int64
58-
}
59-
6050
type TykMiddleware interface {
6151
Base() *BaseMiddleware
6252
GetSpec() *APISpec
@@ -404,114 +394,31 @@ func (t *BaseMiddleware) fetchOrgSessionWithTimeout(orgID string) (user.SessionS
404394
}
405395
}
406396

407-
// refreshOrgSession refreshes org session in the background
408-
func (t *BaseMiddleware) refreshOrgSession(orgID string) {
409-
defer orgRefreshInProgress.Delete(orgID)
410-
411-
t.Logger().Debug("Background refresh started for org session")
412-
413-
session, found := t.fetchOrgSessionWithTimeout(orgID)
414-
if !found {
415-
t.Logger().Warning("Background refresh timed out after 2s")
416-
return
417-
}
418-
419-
t.cacheOrgSession(orgID, session)
420-
t.Logger().Debug("Background refresh completed successfully")
421-
}
422-
423-
// cacheOrgSession stores org session with timestamp for stale-while-revalidate
424-
func (t *BaseMiddleware) cacheOrgSession(orgID string, session user.SessionState) {
425-
entry := orgCacheEntry{
426-
session: session.Clone(),
427-
cachedAt: time.Now().UnixNano(),
428-
}
429-
430-
cacheKey := "org:" + orgID
431-
t.Gw.SessionCache.Set(cacheKey, entry, int64(orgSessionMaxStale))
432-
t.Logger().Debug("Cached org session")
433-
}
434-
435-
// OrgSession implements stale-while-revalidate pattern for org sessions:
436-
// - Soft expiry (10 min): Return stale data, trigger background refresh
437-
// - Hard expiry (1 hour): Try to fetch fresh data, fall back to serving stale data
438-
// - Max stale (24 hours): Gateway cache TTL prevents indefinite memory growth
439397
func (t *BaseMiddleware) OrgSession(orgID string) (user.SessionState, bool) {
440398
if rpc.IsEmergencyMode() {
441399
return user.SessionState{}, false
442400
}
443401

444402
cacheKey := "org:" + orgID
445403

446-
// Check Gateway cache
447-
if cached, found := t.Gw.SessionCache.Get(cacheKey); found {
448-
entry, ok := cached.(orgCacheEntry)
449-
if !ok {
450-
t.Logger().Error("Invalid org cache entry type")
451-
t.Gw.SessionCache.Delete(cacheKey)
452-
// Fall through to fetch fresh
453-
} else {
454-
now := time.Now()
455-
age := now.Sub(time.Unix(0, entry.cachedAt))
456-
457-
if age < orgSessionSoftExpiry {
458-
// Fresh, return immediately
459-
t.Logger().Debug("Using fresh org session from cache")
460-
return entry.session.Clone(), true
461-
}
462-
463-
if age < orgSessionHardExpiry {
464-
// Stale but within hard expiry, trigger background refresh
465-
t.Logger().Debug("Using stale org session, triggering background refresh")
466-
467-
if _, inProgress := orgRefreshInProgress.LoadOrStore(orgID, true); !inProgress {
468-
go func() {
469-
defer func() {
470-
if r := recover(); r != nil {
471-
t.Logger().Errorf("Panic recovered during org session refresh for org %s: %v", orgID, r)
472-
}
473-
}()
474-
t.refreshOrgSession(orgID)
475-
}()
476-
}
477-
478-
return entry.session.Clone(), true
479-
}
480-
481-
// Beyond hard expiry, check if refresh is already in progress
482-
if _, inProgress := orgRefreshInProgress.Load(orgID); inProgress {
483-
// Background refresh already running, serve stale data to avoid duplicate fetch
484-
t.Logger().Debug("Background refresh in progress, serving stale data beyond hard expiry")
485-
return entry.session.Clone(), true
486-
}
487-
488-
// No refresh in progress, try to fetch fresh
489-
t.Logger().Debug("Org session beyond hard expiry, attempting fresh fetch")
490-
session, found := t.fetchOrgSessionWithTimeout(orgID)
491-
492-
if !found {
493-
// Fetch failed, serve stale data to maintain org limits
494-
t.Logger().Warning("Org session fetch failed, serving stale data beyond hard expiry to maintain limits")
495-
return entry.session.Clone(), true
496-
}
497-
498-
// Fresh fetch succeeded, update cache and return
499-
t.cacheOrgSession(orgID, session)
500-
return session, true
404+
// Check Gateway cache first
405+
if !t.Spec.GlobalConfig.LocalSessionCache.DisableCacheSessionState {
406+
if cached, found := t.Gw.SessionCache.Get(cacheKey); found {
407+
t.Logger().Debug("Using cached org session")
408+
return cached.(user.SessionState).Clone(), true
501409
}
502410
}
503411

504-
// No cache, fetch fresh with timeout
505-
t.Logger().Debug("No cached org session, attempting fresh fetch with timeout")
412+
// Not in cache, fetch with timeout to prevent blocking
413+
t.Logger().Debug("Fetching org session with timeout")
506414
session, found := t.fetchOrgSessionWithTimeout(orgID)
507415

508-
if !found {
509-
t.Logger().Warning("Org session fetch timed out after 2s")
510-
return user.SessionState{}, false
416+
// Cache the result if found
417+
if found && !t.Spec.GlobalConfig.LocalSessionCache.DisableCacheSessionState {
418+
t.Gw.SessionCache.Set(cacheKey, session.Clone(), cache.DefaultExpiration)
511419
}
512420

513-
t.cacheOrgSession(orgID, session)
514-
return session, true
421+
return session, found
515422
}
516423

517424
func (t *BaseMiddleware) SetOrgExpiry(orgid string, expiry int64) {
@@ -521,27 +428,40 @@ func (t *BaseMiddleware) SetOrgExpiry(orgid string, expiry int64) {
521428
func (t *BaseMiddleware) OrgSessionExpiry(orgid string) int64 {
522429
t.Logger().Debug("Checking: ", orgid)
523430

524-
// Cache failed attempt
525-
id, err, _ := orgSessionExpiryCache.Do(orgid, func() (interface{}, error) {
526-
cachedVal, found := t.Gw.ExpiryCache.Get(orgid)
527-
if found {
528-
return cachedVal, nil
431+
// Check cache first
432+
cachedVal, found := t.Gw.ExpiryCache.Get(orgid)
433+
if found {
434+
val, ok := cachedVal.(int64)
435+
if !ok {
436+
t.Logger().Error("Type assertion failed")
437+
return DEFAULT_ORG_SESSION_EXPIRATION
529438
}
439+
return val
440+
}
530441

531-
s, found := t.OrgSession(orgid)
532-
if found && t.Spec.GlobalConfig.EnforceOrgDataAge {
533-
return s.DataExpires, nil
442+
// Cache miss, start async refresh in background, return default immediately
443+
go t.refreshOrgSessionExpiry(orgid)
444+
445+
t.Logger().Debug("no cached entry found, returning 7 days (async refresh started)")
446+
return DEFAULT_ORG_SESSION_EXPIRATION
447+
}
448+
449+
// refreshOrgSessionExpiry fetches org session expiry in the background
450+
func (t *BaseMiddleware) refreshOrgSessionExpiry(orgid string) {
451+
defer func() {
452+
if r := recover(); r != nil {
453+
t.Logger().Errorf("Panic recovered during org session expiry refresh for org %s: %v", orgid, r)
534454
}
535-
return 0, errors.New("missing session")
536-
})
455+
}()
537456

538-
if err != nil {
539-
t.Logger().Debug("no cached entry found, returning 7 days")
457+
s, found := t.OrgSession(orgid) // RPC call happens in background
458+
if found && t.Spec.GlobalConfig.EnforceOrgDataAge {
459+
t.Logger().Debug("Background refresh: setting data expiry for org: ", orgid)
460+
t.SetOrgExpiry(orgid, s.DataExpires)
461+
} else {
462+
t.Logger().Debug("Background refresh: org session not found, setting default expiry")
540463
t.SetOrgExpiry(orgid, DEFAULT_ORG_SESSION_EXPIRATION)
541-
return DEFAULT_ORG_SESSION_EXPIRATION
542464
}
543-
544-
return id.(int64)
545465
}
546466

547467
func (t *BaseMiddleware) UpdateRequestSession(r *http.Request) bool {

0 commit comments

Comments
 (0)