Skip to content

Commit 1c8bbdd

Browse files
committed
automatic cleanup start and stop.
1 parent cab0921 commit 1c8bbdd

File tree

5 files changed

+70
-54
lines changed

5 files changed

+70
-54
lines changed

hlsproxy/cache.go

Lines changed: 58 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ func (m *ManagerCtx) getFromCache(key string) (*utils.Cache, bool) {
2020

2121
// if cache has expired
2222
if time.Now().After(entry.Expires) {
23-
m.removeFromCache(key)
2423
return nil, false
2524
}
2625

@@ -50,25 +49,73 @@ func (m *ManagerCtx) saveToCache(key string, reader io.Reader, duration time.Dur
5049
}
5150
}()
5251

53-
return cache
54-
}
55-
56-
func (m *ManagerCtx) removeFromCache(key string) {
57-
m.cacheMu.Lock()
58-
defer m.cacheMu.Unlock()
52+
// start periodic cleanup if not running
53+
m.cleanupStart()
5954

60-
delete(m.cache, key)
55+
return cache
6156
}
6257

6358
func (m *ManagerCtx) clearCache() {
64-
m.cacheMu.Lock()
65-
defer m.cacheMu.Unlock()
59+
cacheSize := 0
6660

67-
// remove expired entries
61+
m.cacheMu.Lock()
6862
for key, entry := range m.cache {
63+
// remove expired entries
6964
if time.Now().After(entry.Expires) {
7065
delete(m.cache, key)
7166
m.logger.Debug().Str("key", key).Msg("cache cleanup remove expired")
67+
} else {
68+
cacheSize++
7269
}
7370
}
71+
m.cacheMu.Unlock()
72+
73+
if cacheSize == 0 {
74+
m.cleanupStop()
75+
}
76+
}
77+
78+
func (m *ManagerCtx) cleanupStart() {
79+
m.cleanupMu.Lock()
80+
defer m.cleanupMu.Unlock()
81+
82+
// if already running
83+
if m.cleanup {
84+
return
85+
}
86+
87+
m.shutdown = make(chan struct{})
88+
m.cleanup = true
89+
90+
go func() {
91+
m.logger.Debug().Msg("cleanup started")
92+
93+
ticker := time.NewTicker(cacheCleanupPeriod)
94+
defer ticker.Stop()
95+
96+
for {
97+
select {
98+
case <-m.shutdown:
99+
return
100+
case <-ticker.C:
101+
m.logger.Debug().Msg("performing cleanup")
102+
m.clearCache()
103+
}
104+
}
105+
}()
106+
}
107+
108+
func (m *ManagerCtx) cleanupStop() {
109+
m.cleanupMu.Lock()
110+
defer m.cleanupMu.Unlock()
111+
112+
// if not running
113+
if !m.cleanup {
114+
return
115+
}
116+
117+
m.cleanup = false
118+
close(m.shutdown)
119+
120+
m.logger.Debug().Msg("cleanup stopped")
74121
}

hlsproxy/manager.go

Lines changed: 9 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,15 @@ const playlistExpiration = 1 * time.Second
2323

2424
type ManagerCtx struct {
2525
logger zerolog.Logger
26-
mu sync.Mutex
2726
baseUrl string
2827
prefix string
2928

3029
cache map[string]*utils.Cache
3130
cacheMu sync.RWMutex
3231

33-
shutdown chan struct{}
32+
cleanup bool
33+
cleanupMu sync.RWMutex
34+
shutdown chan struct{}
3435
}
3536

3637
func New(baseUrl string, prefix string) *ManagerCtx {
@@ -39,44 +40,15 @@ func New(baseUrl string, prefix string) *ManagerCtx {
3940
baseUrl += "/"
4041

4142
return &ManagerCtx{
42-
logger: log.With().Str("module", "hlsproxy").Str("submodule", "manager").Logger(),
43-
baseUrl: baseUrl,
44-
prefix: prefix,
45-
cache: map[string]*utils.Cache{},
46-
shutdown: make(chan struct{}),
43+
logger: log.With().Str("module", "hlsproxy").Str("submodule", "manager").Logger(),
44+
baseUrl: baseUrl,
45+
prefix: prefix,
46+
cache: map[string]*utils.Cache{},
4747
}
4848
}
4949

50-
func (m *ManagerCtx) Start() error {
51-
m.mu.Lock()
52-
defer m.mu.Unlock()
53-
54-
m.shutdown = make(chan struct{})
55-
56-
// periodic cleanup
57-
go func() {
58-
ticker := time.NewTicker(cacheCleanupPeriod)
59-
defer ticker.Stop()
60-
61-
for {
62-
select {
63-
case <-m.shutdown:
64-
return
65-
case <-ticker.C:
66-
m.logger.Debug().Msg("performing cleanup")
67-
m.clearCache()
68-
}
69-
}
70-
}()
71-
72-
return nil
73-
}
74-
75-
func (m *ManagerCtx) Stop() {
76-
m.mu.Lock()
77-
defer m.mu.Unlock()
78-
79-
close(m.shutdown)
50+
func (m *ManagerCtx) Shutdown() {
51+
m.cleanupStop()
8052
}
8153

8254
func (m *ManagerCtx) ServePlaylist(w http.ResponseWriter, r *http.Request) {

hlsproxy/types.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ package hlsproxy
33
import "net/http"
44

55
type Manager interface {
6-
Start() error
7-
Stop()
6+
Shutdown()
87

98
ServePlaylist(w http.ResponseWriter, r *http.Request)
109
ServeMedia(w http.ResponseWriter, r *http.Request)

internal/api/hlsproxy.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ func (a *ApiManagerCtx) HLSProxy(r chi.Router) {
2828
if !ok {
2929
// create new manager
3030
manager = hlsproxy.New(baseUrl, hlsProxyPerfix+ID+"/")
31-
manager.Start()
32-
3331
hlsProxyManagers[ID] = manager
3432
}
3533

internal/api/router.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ func (manager *ApiManagerCtx) Shutdown() error {
3535
hls.Stop()
3636
}
3737

38-
// stop all hls proxy managers
38+
// shutdown all hls proxy managers
3939
for _, hls := range hlsProxyManagers {
40-
hls.Stop()
40+
hls.Shutdown()
4141
}
4242

4343
return nil

0 commit comments

Comments
 (0)