Skip to content

Commit e77f7cf

Browse files
authored
Merge pull request #18 from m1k1o/hls-proxy
(WIP) Hls proxy
2 parents 2b62995 + 1c8bbdd commit e77f7cf

File tree

8 files changed

+417
-0
lines changed

8 files changed

+417
-0
lines changed

README.md

+6
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ Outputs:
1919
- [x] Basic MP4 over HTTP (h264+aac) : `http://go-transcode/[profile]/[stream-id]`
2020
- [x] Basic HLS over HTTP (h264+aac) : `http://go-transcode/[profile]/[stream-id]/index.m3u8`
2121
- [x] Demo HTML player (for HLS) : `http://go-transcode/[profile]/[stream-id]/play.html`
22+
- [x] HLS proxy : `http://go-transcode/hlsproxy/[hls-proxy-id]/[original-request]`
2223

2324
Features:
2425
- [ ] Seeking for static files (index)
@@ -56,6 +57,11 @@ streams:
5657
cam: rtmp://localhost/live/cam
5758
ch1_hd: http://192.168.1.34:9981/stream/channelid/85
5859
ch2_hd: http://192.168.1.34:9981/stream/channelid/43
60+
61+
# For proxying HLS streams
62+
hls-proxy:
63+
my_server: http://192.168.1.34:9981
64+
5965
```
6066

6167
## Transcoding profiles

hlsproxy/cache.go

+121
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package hlsproxy
2+
3+
import (
4+
"io"
5+
"time"
6+
7+
"github.com/m1k1o/go-transcode/internal/utils"
8+
)
9+
10+
func (m *ManagerCtx) getFromCache(key string) (*utils.Cache, bool) {
11+
m.cacheMu.RLock()
12+
entry, ok := m.cache[key]
13+
m.cacheMu.RUnlock()
14+
15+
// on cache miss
16+
if !ok {
17+
m.logger.Debug().Str("key", key).Msg("cache miss")
18+
return nil, false
19+
}
20+
21+
// if cache has expired
22+
if time.Now().After(entry.Expires) {
23+
return nil, false
24+
}
25+
26+
// cache hit
27+
m.logger.Debug().Str("key", key).Msg("cache hit")
28+
return entry, true
29+
}
30+
31+
func (m *ManagerCtx) saveToCache(key string, reader io.Reader, duration time.Duration) *utils.Cache {
32+
m.cacheMu.Lock()
33+
cache := utils.NewCache(time.Now().Add(duration))
34+
m.cache[key] = cache
35+
m.cacheMu.Unlock()
36+
37+
// pipe reader to writer.
38+
go func() {
39+
defer cache.Close()
40+
41+
_, err := io.Copy(cache, reader)
42+
if err != nil {
43+
m.logger.Err(err).Msg("error while copying to cache")
44+
}
45+
46+
// close reader, if it needs to be closed
47+
if closer, ok := reader.(io.ReadCloser); ok {
48+
closer.Close()
49+
}
50+
}()
51+
52+
// start periodic cleanup if not running
53+
m.cleanupStart()
54+
55+
return cache
56+
}
57+
58+
func (m *ManagerCtx) clearCache() {
59+
cacheSize := 0
60+
61+
m.cacheMu.Lock()
62+
for key, entry := range m.cache {
63+
// remove expired entries
64+
if time.Now().After(entry.Expires) {
65+
delete(m.cache, key)
66+
m.logger.Debug().Str("key", key).Msg("cache cleanup remove expired")
67+
} else {
68+
cacheSize++
69+
}
70+
}
71+
m.cacheMu.Unlock()
72+
73+
if cacheSize == 0 {
74+
m.cleanupStop()
75+
}
76+
}
77+
78+
func (m *ManagerCtx) cleanupStart() {
79+
m.cleanupMu.Lock()
80+
defer m.cleanupMu.Unlock()
81+
82+
// if already running
83+
if m.cleanup {
84+
return
85+
}
86+
87+
m.shutdown = make(chan struct{})
88+
m.cleanup = true
89+
90+
go func() {
91+
m.logger.Debug().Msg("cleanup started")
92+
93+
ticker := time.NewTicker(cacheCleanupPeriod)
94+
defer ticker.Stop()
95+
96+
for {
97+
select {
98+
case <-m.shutdown:
99+
return
100+
case <-ticker.C:
101+
m.logger.Debug().Msg("performing cleanup")
102+
m.clearCache()
103+
}
104+
}
105+
}()
106+
}
107+
108+
func (m *ManagerCtx) cleanupStop() {
109+
m.cleanupMu.Lock()
110+
defer m.cleanupMu.Unlock()
111+
112+
// if not running
113+
if !m.cleanup {
114+
return
115+
}
116+
117+
m.cleanup = false
118+
close(m.shutdown)
119+
120+
m.logger.Debug().Msg("cleanup stopped")
121+
}

hlsproxy/manager.go

+121
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package hlsproxy
2+
3+
import (
4+
"io"
5+
"net/http"
6+
"regexp"
7+
"strings"
8+
"sync"
9+
"time"
10+
11+
"github.com/m1k1o/go-transcode/internal/utils"
12+
13+
"github.com/rs/zerolog"
14+
"github.com/rs/zerolog/log"
15+
)
16+
17+
// how often should be cache cleanup called
18+
const cacheCleanupPeriod = 4 * time.Second
19+
20+
const segmentExpiration = 60 * time.Second
21+
22+
const playlistExpiration = 1 * time.Second
23+
24+
type ManagerCtx struct {
25+
logger zerolog.Logger
26+
baseUrl string
27+
prefix string
28+
29+
cache map[string]*utils.Cache
30+
cacheMu sync.RWMutex
31+
32+
cleanup bool
33+
cleanupMu sync.RWMutex
34+
shutdown chan struct{}
35+
}
36+
37+
func New(baseUrl string, prefix string) *ManagerCtx {
38+
// ensure it ends with slash
39+
baseUrl = strings.TrimSuffix(baseUrl, "/")
40+
baseUrl += "/"
41+
42+
return &ManagerCtx{
43+
logger: log.With().Str("module", "hlsproxy").Str("submodule", "manager").Logger(),
44+
baseUrl: baseUrl,
45+
prefix: prefix,
46+
cache: map[string]*utils.Cache{},
47+
}
48+
}
49+
50+
func (m *ManagerCtx) Shutdown() {
51+
m.cleanupStop()
52+
}
53+
54+
func (m *ManagerCtx) ServePlaylist(w http.ResponseWriter, r *http.Request) {
55+
url := m.baseUrl + strings.TrimPrefix(r.URL.String(), m.prefix)
56+
57+
cache, ok := m.getFromCache(url)
58+
if !ok {
59+
resp, err := http.Get(url)
60+
if err != nil {
61+
log.Err(err).Msg("unable to get HTTP")
62+
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
63+
return
64+
}
65+
defer resp.Body.Close()
66+
67+
if resp.StatusCode < 200 && resp.StatusCode >= 300 {
68+
defer resp.Body.Close()
69+
70+
log.Err(err).Int("code", resp.StatusCode).Msg("invalid HTTP response")
71+
http.Error(w, http.StatusText(http.StatusBadGateway), http.StatusBadGateway)
72+
return
73+
}
74+
75+
buf, err := io.ReadAll(resp.Body)
76+
if err != nil {
77+
log.Err(err).Msg("unadle to read response body")
78+
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
79+
return
80+
}
81+
82+
var re = regexp.MustCompile(`(?m:^(https?\:\/\/[^\/]+)?\/)`)
83+
text := re.ReplaceAllString(string(buf), m.prefix)
84+
85+
cache = m.saveToCache(url, strings.NewReader(text), playlistExpiration)
86+
}
87+
88+
w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
89+
w.WriteHeader(200)
90+
91+
cache.ServeHTTP(w)
92+
}
93+
94+
func (m *ManagerCtx) ServeMedia(w http.ResponseWriter, r *http.Request) {
95+
url := m.baseUrl + strings.TrimPrefix(r.URL.String(), m.prefix)
96+
97+
cache, ok := m.getFromCache(url)
98+
if !ok {
99+
resp, err := http.Get(url)
100+
if err != nil {
101+
log.Err(err).Msg("unable to get HTTP")
102+
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
103+
return
104+
}
105+
106+
if resp.StatusCode < 200 && resp.StatusCode >= 300 {
107+
defer resp.Body.Close()
108+
109+
log.Err(err).Int("code", resp.StatusCode).Msg("invalid HTTP response")
110+
http.Error(w, http.StatusText(http.StatusBadGateway), http.StatusBadGateway)
111+
return
112+
}
113+
114+
cache = m.saveToCache(url, resp.Body, segmentExpiration)
115+
}
116+
117+
w.Header().Set("Content-Type", "video/MP2T")
118+
w.WriteHeader(200)
119+
120+
cache.ServeHTTP(w)
121+
}

hlsproxy/types.go

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package hlsproxy
2+
3+
import "net/http"
4+
5+
type Manager interface {
6+
Shutdown()
7+
8+
ServePlaylist(w http.ResponseWriter, r *http.Request)
9+
ServeMedia(w http.ResponseWriter, r *http.Request)
10+
}

internal/api/hlsproxy.go

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package api
2+
3+
import (
4+
"net/http"
5+
"strings"
6+
7+
"github.com/go-chi/chi"
8+
9+
"github.com/m1k1o/go-transcode/hlsproxy"
10+
)
11+
12+
const hlsProxyPerfix = "/hlsproxy/"
13+
14+
var hlsProxyManagers map[string]hlsproxy.Manager = make(map[string]hlsproxy.Manager)
15+
16+
func (a *ApiManagerCtx) HLSProxy(r chi.Router) {
17+
r.Get(hlsProxyPerfix+"{sourceId}/*", func(w http.ResponseWriter, r *http.Request) {
18+
ID := chi.URLParam(r, "sourceId")
19+
20+
// check if stream exists
21+
baseUrl, ok := a.config.HlsProxy[ID]
22+
if !ok {
23+
http.Error(w, "404 hls proxy source not found", http.StatusNotFound)
24+
return
25+
}
26+
27+
manager, ok := hlsProxyManagers[ID]
28+
if !ok {
29+
// create new manager
30+
manager = hlsproxy.New(baseUrl, hlsProxyPerfix+ID+"/")
31+
hlsProxyManagers[ID] = manager
32+
}
33+
34+
// if this is playlist request
35+
if strings.HasSuffix(r.URL.String(), ".m3u8") {
36+
manager.ServePlaylist(w, r)
37+
} else {
38+
manager.ServeMedia(w, r)
39+
}
40+
})
41+
}

internal/api/router.go

+10
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ func (manager *ApiManagerCtx) Shutdown() error {
3535
hls.Stop()
3636
}
3737

38+
// shutdown all hls proxy managers
39+
for _, hls := range hlsProxyManagers {
40+
hls.Shutdown()
41+
}
42+
3843
return nil
3944
}
4045

@@ -44,6 +49,11 @@ func (a *ApiManagerCtx) Mount(r *chi.Mux) {
4449
_, _ = w.Write([]byte("pong"))
4550
})
4651

52+
if len(a.config.HlsProxy) > 0 {
53+
r.Group(a.HLSProxy)
54+
log.Info().Interface("hls-proxy", a.config.HlsProxy).Msg("hls proxy is active")
55+
}
56+
4757
r.Group(a.HLS)
4858
r.Group(a.Http)
4959
}

internal/config/config.go

+4
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ type Server struct {
5050
BaseDir string `yaml:"basedir,omitempty"`
5151
Streams map[string]string `yaml:"streams"`
5252
Profiles string `yaml:"profiles,omitempty"`
53+
54+
HlsProxy map[string]string
5355
}
5456

5557
func (Server) Init(cmd *cobra.Command) error {
@@ -114,6 +116,8 @@ func (s *Server) Set() {
114116
s.Profiles = fmt.Sprintf("%s/profiles", s.BaseDir)
115117
}
116118
s.Streams = viper.GetStringMapString("streams")
119+
120+
s.HlsProxy = viper.GetStringMapString("hls-proxy")
117121
}
118122

119123
func (s *Server) AbsPath(elem ...string) string {

0 commit comments

Comments
 (0)