From d3e2603b6203dbc577a774d2587d6fa01118269d Mon Sep 17 00:00:00 2001 From: Jacob Su Date: Thu, 17 Oct 2024 21:34:35 +0800 Subject: [PATCH 1/4] support redis and srs host config. --- platform/main.go | 4 ++++ platform/service.go | 14 ++++++++------ platform/utils.go | 2 +- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/platform/main.go b/platform/main.go index 9bd97915..cffc6706 100644 --- a/platform/main.go +++ b/platform/main.go @@ -20,6 +20,7 @@ import ( "github.com/ossrs/go-oryx-lib/errors" "github.com/ossrs/go-oryx-lib/logger" + // Use v8 because we use Go 1.16+, while v9 requires Go 1.18+ "github.com/go-redis/redis/v8" "github.com/google/uuid" @@ -113,6 +114,9 @@ func doMain(ctx context.Context) error { setEnvDefault("REDIS_PORT", "6379") setEnvDefault("MGMT_LISTEN", "2022") + // SRS HOST + setEnvDefault("SRS_HOST", "127.0.0.1") + // For HTTPS. setEnvDefault("HTTPS_LISTEN", "2443") setEnvDefault("AUTO_SELF_SIGNED_CERTIFICATE", "on") diff --git a/platform/service.go b/platform/service.go index 96b1e6f5..8b4302a2 100644 --- a/platform/service.go +++ b/platform/service.go @@ -8,7 +8,6 @@ import ( "crypto/tls" "encoding/json" "fmt" - "github.com/joho/godotenv" "io" "io/ioutil" "net/http" @@ -19,9 +18,12 @@ import ( "sync" "time" + "github.com/joho/godotenv" + "github.com/ossrs/go-oryx-lib/errors" ohttp "github.com/ossrs/go-oryx-lib/http" "github.com/ossrs/go-oryx-lib/logger" + // Use v8 because we use Go 1.16+, while v9 requires Go 1.18+ "github.com/go-redis/redis/v8" ) @@ -288,22 +290,22 @@ func handleHTTPService(ctx context.Context, handler *http.ServeMux) error { handleMgmtStreamsKickoff(ctx, handler) handleMgmtUI(ctx, handler) - proxy2023, err := httpCreateProxy("http://127.0.0.1:2023") + proxy2023, err := httpCreateProxy("http://" + os.Getenv("SRS_HOST") + ":2023") if err != nil { return err } - proxy1985, err := httpCreateProxy("http://127.0.0.1:1985") + proxy1985, err := httpCreateProxy("http://" + os.Getenv("SRS_HOST") + ":1985") if err != nil { return err } - proxyWhxp, err := httpCreateProxy("http://127.0.0.1:1985") + proxyWhxp, err := httpCreateProxy("http://" + os.Getenv("SRS_HOST") + ":1985") if err != nil { return err } - proxy8080, err := httpCreateProxy("http://127.0.0.1:8080") + proxy8080, err := httpCreateProxy("http://" + os.Getenv("SRS_HOST") + ":8080") if err != nil { return err } @@ -1690,7 +1692,7 @@ func handleMgmtStreamsKickoff(ctx context.Context, handler *http.ServeMux) { // Whether client exists in SRS server. var code int - clientURL := fmt.Sprintf("http://127.0.0.1:1985/api/v1/clients/%v", streamObject.Client) + clientURL := fmt.Sprintf("http://%v:1985/api/v1/clients/%v", os.Getenv("SRS_HOST"), streamObject.Client) if r0, body, err := requestClient(ctx, clientURL, http.MethodGet); err != nil { return errors.Wrapf(err, "http query client %v", clientURL) } else if r0 != 0 && r0 != ErrorRtmpClientNotFound { diff --git a/platform/utils.go b/platform/utils.go index c8283a7f..cc065f1f 100644 --- a/platform/utils.go +++ b/platform/utils.go @@ -758,7 +758,7 @@ func srsGenerateConfig(ctx context.Context) error { // Reload SRS to apply the new config. if true { - api := "http://127.0.0.1:1985/api/v1/raw?rpc=reload" + api := "http://" + os.Getenv("SRS_HOST") + ":1985/api/v1/raw?rpc=reload" res, err := http.DefaultClient.Get(api) if err != nil { return errors.Wrapf(err, "reload srs %v", api) From 235b93372bfe08f059b605876218917fc3a2fabe Mon Sep 17 00:00:00 2001 From: Jacob Su Date: Sun, 20 Oct 2024 19:22:23 +0800 Subject: [PATCH 2/4] support hls ts download from http stream. --- platform/dvr-local-disk.go | 1 + platform/main.go | 2 ++ platform/service.go | 10 +++++----- platform/srs-hooks.go | 38 +++++++++++++++++++++++++++++++++++--- 4 files changed, 43 insertions(+), 8 deletions(-) diff --git a/platform/dvr-local-disk.go b/platform/dvr-local-disk.go index 3b51ef61..9c3a4791 100644 --- a/platform/dvr-local-disk.go +++ b/platform/dvr-local-disk.go @@ -19,6 +19,7 @@ import ( "github.com/ossrs/go-oryx-lib/errors" ohttp "github.com/ossrs/go-oryx-lib/http" "github.com/ossrs/go-oryx-lib/logger" + // Use v8 because we use Go 1.16+, while v9 requires Go 1.18+ "github.com/go-redis/redis/v8" "github.com/google/uuid" diff --git a/platform/main.go b/platform/main.go index cffc6706..0b527953 100644 --- a/platform/main.go +++ b/platform/main.go @@ -116,6 +116,8 @@ func doMain(ctx context.Context) error { // SRS HOST setEnvDefault("SRS_HOST", "127.0.0.1") + setEnvDefault("SRS_PROXY_HOST", "127.0.0.1") + setEnvDefault("SRS_PROXY_HTTP_PORT", "8080") // For HTTPS. setEnvDefault("HTTPS_LISTEN", "2443") diff --git a/platform/service.go b/platform/service.go index 8b4302a2..38f72efe 100644 --- a/platform/service.go +++ b/platform/service.go @@ -290,22 +290,22 @@ func handleHTTPService(ctx context.Context, handler *http.ServeMux) error { handleMgmtStreamsKickoff(ctx, handler) handleMgmtUI(ctx, handler) - proxy2023, err := httpCreateProxy("http://" + os.Getenv("SRS_HOST") + ":2023") + proxy2023, err := httpCreateProxy("http://" + os.Getenv("SRS_PROXY_HOST") + ":2023") if err != nil { return err } - proxy1985, err := httpCreateProxy("http://" + os.Getenv("SRS_HOST") + ":1985") + proxy1985, err := httpCreateProxy("http://" + os.Getenv("SRS_PROXY_HOST") + ":1985") if err != nil { return err } - proxyWhxp, err := httpCreateProxy("http://" + os.Getenv("SRS_HOST") + ":1985") + proxyWhxp, err := httpCreateProxy("http://" + os.Getenv("SRS_PROXY_HOST") + ":1985") if err != nil { return err } - proxy8080, err := httpCreateProxy("http://" + os.Getenv("SRS_HOST") + ":8080") + proxy8080, err := httpCreateProxy("http://" + os.Getenv("SRS_PROXY_HOST") + ":8080") if err != nil { return err } @@ -1692,7 +1692,7 @@ func handleMgmtStreamsKickoff(ctx context.Context, handler *http.ServeMux) { // Whether client exists in SRS server. var code int - clientURL := fmt.Sprintf("http://%v:1985/api/v1/clients/%v", os.Getenv("SRS_HOST"), streamObject.Client) + clientURL := fmt.Sprintf("http://%v:1985/api/v1/clients/%v", os.Getenv("SRS_PROXY_HOST"), streamObject.Client) if r0, body, err := requestClient(ctx, clientURL, http.MethodGet); err != nil { return errors.Wrapf(err, "http query client %v", clientURL) } else if r0 != 0 && r0 != ErrorRtmpClientNotFound { diff --git a/platform/srs-hooks.go b/platform/srs-hooks.go index 6de847cb..526197c0 100644 --- a/platform/srs-hooks.go +++ b/platform/srs-hooks.go @@ -1,19 +1,19 @@ -// // Copyright (c) 2022-2024 Winlin // // SPDX-License-Identifier: MIT -// package main import ( "context" "encoding/json" "fmt" + "io" "io/ioutil" "math/rand" "net/http" "net/url" "os" + "path/filepath" "strconv" "strings" "time" @@ -21,6 +21,7 @@ import ( "github.com/ossrs/go-oryx-lib/errors" ohttp "github.com/ossrs/go-oryx-lib/http" "github.com/ossrs/go-oryx-lib/logger" + // Use v8 because we use Go 1.16+, while v9 requires Go 1.18+ "github.com/go-redis/redis/v8" cam "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/cam/v20190116" @@ -728,8 +729,39 @@ func handleOnHls(ctx context.Context, handler *http.ServeMux) error { if msg.Action != SrsActionOnHls { return errors.Errorf("invalid action=%v", msg.Action) } + if _, err := os.Stat(msg.File); err != nil { - return errors.Wrapf(err, "invalid ts file %v", msg.File) + logger.Tf(ctx, "invalid ts file %v", msg.File) + + if err := os.MkdirAll(filepath.Dir(msg.File), 0755); err != nil { + return errors.Wrapf(err, "failed to create ts file directory %v", filepath.Dir(msg.File)) + } + + if tsFile, err := os.Create(msg.File); err != nil { + return errors.Wrapf(err, "failed to create ts file %v", msg.File) + } else { + tsUrl := "http://" + os.Getenv("SRS_PROXY_HOST") + ":" + os.Getenv("SRS_PROXY_HTTP_PORT") + "/" + msg.URL + logger.Tf(ctx, "download ts from %v", tsUrl) + client := http.Client{ + CheckRedirect: func(req *http.Request, via []*http.Request) error { + r.URL.Opaque = r.URL.Path + return nil + }, + } + + resp, err := client.Get(tsUrl) + if err != nil { + return errors.Wrapf(err, "http error to get url %v", tsUrl) + } + defer resp.Body.Close() + + size, err := io.Copy(tsFile, resp.Body) + if err != nil { + return errors.Wrapf(err, "copy http resp to file %v", tsFile) + } + defer tsFile.Close() + logger.Tf(ctx, "Download ts file %s with size %d", tsUrl, size) + } } logger.Tf(ctx, "on_hls ok, %v", string(b)) From 4326d82f99c7a2844d7837978944e0b0156956d3 Mon Sep 17 00:00:00 2001 From: Jacob Su Date: Tue, 22 Oct 2024 00:08:15 +0800 Subject: [PATCH 3/4] support srs http api config by env. --- platform/main.go | 3 +-- platform/service.go | 11 ++++++----- platform/srs-hooks.go | 2 +- platform/utils.go | 6 ++++-- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/platform/main.go b/platform/main.go index 0b527953..9a03f862 100644 --- a/platform/main.go +++ b/platform/main.go @@ -116,8 +116,7 @@ func doMain(ctx context.Context) error { // SRS HOST setEnvDefault("SRS_HOST", "127.0.0.1") - setEnvDefault("SRS_PROXY_HOST", "127.0.0.1") - setEnvDefault("SRS_PROXY_HTTP_PORT", "8080") + setEnvDefault("SRS_HTTP_STREAM_PORT", "8080") // For HTTPS. setEnvDefault("HTTPS_LISTEN", "2443") diff --git a/platform/service.go b/platform/service.go index 38f72efe..30c044f4 100644 --- a/platform/service.go +++ b/platform/service.go @@ -11,6 +11,7 @@ import ( "io" "io/ioutil" "net/http" + "os" "path" "runtime" "strconv" @@ -290,22 +291,22 @@ func handleHTTPService(ctx context.Context, handler *http.ServeMux) error { handleMgmtStreamsKickoff(ctx, handler) handleMgmtUI(ctx, handler) - proxy2023, err := httpCreateProxy("http://" + os.Getenv("SRS_PROXY_HOST") + ":2023") + proxy2023, err := httpCreateProxy("http://" + os.Getenv("SRS_HOST") + ":2023") if err != nil { return err } - proxy1985, err := httpCreateProxy("http://" + os.Getenv("SRS_PROXY_HOST") + ":1985") + proxy1985, err := httpCreateProxy("http://" + os.Getenv("SRS_HOST") + ":1985") if err != nil { return err } - proxyWhxp, err := httpCreateProxy("http://" + os.Getenv("SRS_PROXY_HOST") + ":1985") + proxyWhxp, err := httpCreateProxy("http://" + os.Getenv("SRS_HOST") + ":1985") if err != nil { return err } - proxy8080, err := httpCreateProxy("http://" + os.Getenv("SRS_PROXY_HOST") + ":8080") + proxy8080, err := httpCreateProxy("http://" + os.Getenv("SRS_HOST") + ":8080") if err != nil { return err } @@ -1692,7 +1693,7 @@ func handleMgmtStreamsKickoff(ctx context.Context, handler *http.ServeMux) { // Whether client exists in SRS server. var code int - clientURL := fmt.Sprintf("http://%v:1985/api/v1/clients/%v", os.Getenv("SRS_PROXY_HOST"), streamObject.Client) + clientURL := fmt.Sprintf("http://%v:1985/api/v1/clients/%v", os.Getenv("SRS_HOST"), streamObject.Client) if r0, body, err := requestClient(ctx, clientURL, http.MethodGet); err != nil { return errors.Wrapf(err, "http query client %v", clientURL) } else if r0 != 0 && r0 != ErrorRtmpClientNotFound { diff --git a/platform/srs-hooks.go b/platform/srs-hooks.go index 526197c0..7001b648 100644 --- a/platform/srs-hooks.go +++ b/platform/srs-hooks.go @@ -740,7 +740,7 @@ func handleOnHls(ctx context.Context, handler *http.ServeMux) error { if tsFile, err := os.Create(msg.File); err != nil { return errors.Wrapf(err, "failed to create ts file %v", msg.File) } else { - tsUrl := "http://" + os.Getenv("SRS_PROXY_HOST") + ":" + os.Getenv("SRS_PROXY_HTTP_PORT") + "/" + msg.URL + tsUrl := "http://" + os.Getenv("SRS_HOST") + ":" + os.Getenv("SRS_HTTP_STREAM_PORT") + "/" + msg.URL logger.Tf(ctx, "download ts from %v", tsUrl) client := http.Client{ CheckRedirect: func(req *http.Request, via []*http.Request) error { diff --git a/platform/utils.go b/platform/utils.go index cc065f1f..bda9c354 100644 --- a/platform/utils.go +++ b/platform/utils.go @@ -761,7 +761,8 @@ func srsGenerateConfig(ctx context.Context) error { api := "http://" + os.Getenv("SRS_HOST") + ":1985/api/v1/raw?rpc=reload" res, err := http.DefaultClient.Get(api) if err != nil { - return errors.Wrapf(err, "reload srs %v", api) + logger.Tf(ctx, "srs api reload error %v", err) + return nil } defer res.Body.Close() @@ -771,7 +772,8 @@ func srsGenerateConfig(ctx context.Context) error { } if res.StatusCode != http.StatusOK { - return errors.Errorf("reload srs %v, code=%v, body=%v", api, res.StatusCode, string(b)) + logger.Tf(ctx, "reload srs %v, code=%v, body=%v", api, res.StatusCode, string(b)) + return nil } logger.Tf(ctx, "reload submit srs ok") } From 734639af50bf2527a6cabb1258cc4addb1f1bf98 Mon Sep 17 00:00:00 2001 From: Jacob Su Date: Tue, 29 Oct 2024 11:05:19 +0800 Subject: [PATCH 4/4] fix code scanning warning: uncontrolled data used in path expression. --- platform/dvr-local-disk.go | 20 ++++------ platform/dvr-tencent-cos.go | 22 +++++------ platform/dvr-tencent-vod.go | 22 +++++------ platform/ocr.go | 51 +++++------------------- platform/srs-hooks.go | 78 ++++++++++++++++++++++--------------- platform/transcript.go | 52 +++++-------------------- 6 files changed, 90 insertions(+), 155 deletions(-) diff --git a/platform/dvr-local-disk.go b/platform/dvr-local-disk.go index 9c3a4791..cb1907e1 100644 --- a/platform/dvr-local-disk.go +++ b/platform/dvr-local-disk.go @@ -4,6 +4,7 @@ package main import ( + "bytes" "context" "encoding/json" "fmt" @@ -535,21 +536,16 @@ func (v *RecordWorker) Handle(ctx context.Context, handler *http.ServeMux) error return nil } -func (v *RecordWorker) OnHlsTsMessage(ctx context.Context, msg *SrsOnHlsMessage) error { +func (v *RecordWorker) OnHlsTsMessage(ctx context.Context, msg *SrsOnHlsMessage, data []byte) error { // Copy the ts file to temporary cache dir. tsid := uuid.NewString() tsfile := path.Join("record", fmt.Sprintf("%v.ts", tsid)) - // Always use execFile when params contains user inputs, see https://auth0.com/blog/preventing-command-injection-attacks-in-node-js-apps/ - // Note that should never use fs.copyFileSync(file, tsfile, fs.constants.COPYFILE_FICLONE_FORCE) which fails in macOS. - if err := exec.CommandContext(ctx, "cp", "-f", msg.File, tsfile).Run(); err != nil { - return errors.Wrapf(err, "copy file %v to %v", msg.File, tsfile) - } - - // Get the file size. - stats, err := os.Stat(msg.File) - if err != nil { - return errors.Wrapf(err, "stat file %v", msg.File) + if file, err := os.Create(tsfile); err != nil { + return errors.Wrapf(err, "create file %v error", tsfile) + } else { + defer file.Close() + io.Copy(file, bytes.NewReader(data)) } // Create a local ts file object. @@ -558,7 +554,7 @@ func (v *RecordWorker) OnHlsTsMessage(ctx context.Context, msg *SrsOnHlsMessage) URL: msg.URL, SeqNo: msg.SeqNo, Duration: msg.Duration, - Size: uint64(stats.Size()), + Size: uint64(len(data)), File: tsfile, } diff --git a/platform/dvr-tencent-cos.go b/platform/dvr-tencent-cos.go index 210960c6..d4adad4d 100644 --- a/platform/dvr-tencent-cos.go +++ b/platform/dvr-tencent-cos.go @@ -4,13 +4,14 @@ package main import ( + "bytes" "context" "encoding/json" "fmt" + "io" "net/http" "net/url" "os" - "os/exec" "path" "strings" "sync" @@ -231,7 +232,7 @@ func (v *DvrWorker) Handle(ctx context.Context, handler *http.ServeMux) error { return nil } -func (v *DvrWorker) OnHlsTsMessage(ctx context.Context, msg *SrsOnHlsMessage) error { +func (v *DvrWorker) OnHlsTsMessage(ctx context.Context, msg *SrsOnHlsMessage, data []byte) error { // Ignore for Tencent Cloud credentials not ready. if !v.ready() { return nil @@ -241,16 +242,11 @@ func (v *DvrWorker) OnHlsTsMessage(ctx context.Context, msg *SrsOnHlsMessage) er tsid := uuid.NewString() tsfile := path.Join("dvr", fmt.Sprintf("%v.ts", tsid)) - // Always use execFile when params contains user inputs, see https://auth0.com/blog/preventing-command-injection-attacks-in-node-js-apps/ - // Note that should never use fs.copyFileSync(file, tsfile, fs.constants.COPYFILE_FICLONE_FORCE) which fails in macOS. - if err := exec.CommandContext(ctx, "cp", "-f", msg.File, tsfile).Run(); err != nil { - return errors.Wrapf(err, "copy file %v to %v", msg.File, tsfile) - } - - // Get the file size. - stats, err := os.Stat(msg.File) - if err != nil { - return errors.Wrapf(err, "stat file %v", msg.File) + if file, err := os.Create(tsfile); err != nil { + return errors.Wrapf(err, "create file %v error", tsfile) + } else { + defer file.Close() + io.Copy(file, bytes.NewReader(data)) } // Create a local ts file object. @@ -259,7 +255,7 @@ func (v *DvrWorker) OnHlsTsMessage(ctx context.Context, msg *SrsOnHlsMessage) er URL: msg.URL, SeqNo: msg.SeqNo, Duration: msg.Duration, - Size: uint64(stats.Size()), + Size: uint64(len(data)), File: tsfile, } diff --git a/platform/dvr-tencent-vod.go b/platform/dvr-tencent-vod.go index 6c261a1f..f26d6567 100644 --- a/platform/dvr-tencent-vod.go +++ b/platform/dvr-tencent-vod.go @@ -4,13 +4,14 @@ package main import ( + "bytes" "context" "encoding/json" "fmt" + "io" "net/http" "net/url" "os" - "os/exec" "path" "strconv" "strings" @@ -321,7 +322,7 @@ func (v *VodWorker) Handle(ctx context.Context, handler *http.ServeMux) error { return nil } -func (v *VodWorker) OnHlsTsMessage(ctx context.Context, msg *SrsOnHlsMessage) error { +func (v *VodWorker) OnHlsTsMessage(ctx context.Context, msg *SrsOnHlsMessage, data []byte) error { // Ignore for Tencent Cloud credentials not ready. if !v.ready() { return nil @@ -331,16 +332,11 @@ func (v *VodWorker) OnHlsTsMessage(ctx context.Context, msg *SrsOnHlsMessage) er tsid := uuid.NewString() tsfile := path.Join("vod", fmt.Sprintf("%v.ts", tsid)) - // Always use execFile when params contains user inputs, see https://auth0.com/blog/preventing-command-injection-attacks-in-node-js-apps/ - // Note that should never use fs.copyFileSync(file, tsfile, fs.constants.COPYFILE_FICLONE_FORCE) which fails in macOS. - if err := exec.CommandContext(ctx, "cp", "-f", msg.File, tsfile).Run(); err != nil { - return errors.Wrapf(err, "copy file %v to %v", msg.File, tsfile) - } - - // Get the file size. - stats, err := os.Stat(msg.File) - if err != nil { - return errors.Wrapf(err, "stat file %v", msg.File) + if file, err := os.Create(tsfile); err != nil { + return errors.Wrapf(err, "create file %v error", tsfile) + } else { + defer file.Close() + io.Copy(file, bytes.NewReader(data)) } // Create a local ts file object. @@ -349,7 +345,7 @@ func (v *VodWorker) OnHlsTsMessage(ctx context.Context, msg *SrsOnHlsMessage) er URL: msg.URL, SeqNo: msg.SeqNo, Duration: msg.Duration, - Size: uint64(stats.Size()), + Size: uint64(len(data)), File: tsfile, } diff --git a/platform/ocr.go b/platform/ocr.go index 5c739198..1c61f535 100644 --- a/platform/ocr.go +++ b/platform/ocr.go @@ -4,6 +4,7 @@ package main import ( + "bytes" "context" "encoding/base64" "encoding/json" @@ -21,6 +22,7 @@ import ( "github.com/ossrs/go-oryx-lib/errors" ohttp "github.com/ossrs/go-oryx-lib/http" "github.com/ossrs/go-oryx-lib/logger" + // Use v8 because we use Go 1.16+, while v9 requires Go 1.18+ "github.com/go-redis/redis/v8" "github.com/google/uuid" @@ -39,17 +41,12 @@ type OCRWorker struct { // The global OCR task, only support one OCR task. task *OCRTask - // Use async goroutine to process on_hls messages. - msgs chan *SrsOnHlsMessage - // Got message from SRS, a new TS segment file is generated. tsfiles chan *SrsOnHlsObject } func NewOCRWorker() *OCRWorker { v := &OCRWorker{ - // Message on_hls. - msgs: make(chan *SrsOnHlsMessage, 1024), // TS files. tsfiles: make(chan *SrsOnHlsObject, 1024), } @@ -547,16 +544,7 @@ func (v *OCRWorker) Enabled() bool { return v.task.enabled() } -func (v *OCRWorker) OnHlsTsMessage(ctx context.Context, msg *SrsOnHlsMessage) error { - select { - case <-ctx.Done(): - case v.msgs <- msg: - } - - return nil -} - -func (v *OCRWorker) OnHlsTsMessageImpl(ctx context.Context, msg *SrsOnHlsMessage) error { +func (v *OCRWorker) OnHlsTsMessage(ctx context.Context, msg *SrsOnHlsMessage, data []byte) error { // Ignore if not natch the task config. if !v.task.match(msg) { return nil @@ -566,16 +554,11 @@ func (v *OCRWorker) OnHlsTsMessageImpl(ctx context.Context, msg *SrsOnHlsMessage tsid := fmt.Sprintf("%v-org-%v", msg.SeqNo, uuid.NewString()) tsfile := path.Join("ocr", fmt.Sprintf("%v.ts", tsid)) - // Always use execFile when params contains user inputs, see https://auth0.com/blog/preventing-command-injection-attacks-in-node-js-apps/ - // Note that should never use fs.copyFileSync(file, tsfile, fs.constants.COPYFILE_FICLONE_FORCE) which fails in macOS. - if err := exec.CommandContext(ctx, "cp", "-f", msg.File, tsfile).Run(); err != nil { - return errors.Wrapf(err, "copy file %v to %v", msg.File, tsfile) - } - - // Get the file size. - stats, err := os.Stat(msg.File) - if err != nil { - return errors.Wrapf(err, "stat file %v", msg.File) + if file, err := os.Create(tsfile); err != nil { + return errors.Wrapf(err, "create file %v error", tsfile) + } else { + defer file.Close() + io.Copy(file, bytes.NewReader(data)) } // Create a local ts file object. @@ -584,7 +567,7 @@ func (v *OCRWorker) OnHlsTsMessageImpl(ctx context.Context, msg *SrsOnHlsMessage URL: msg.URL, SeqNo: msg.SeqNo, Duration: msg.Duration, - Size: uint64(stats.Size()), + Size: uint64(len(data)), File: tsfile, } @@ -659,22 +642,6 @@ func (v *OCRWorker) Start(ctx context.Context) error { } }() - // Consume all on_hls messages. - wg.Add(1) - go func() { - defer wg.Done() - - for ctx.Err() == nil { - select { - case <-ctx.Done(): - case msg := <-v.msgs: - if err := v.OnHlsTsMessageImpl(ctx, msg); err != nil { - logger.Wf(ctx, "ocr: handle on hls message %v err %+v", msg.String(), err) - } - } - } - }() - // Consume all ts files by task. wg.Add(1) go func() { diff --git a/platform/srs-hooks.go b/platform/srs-hooks.go index 7001b648..e3e25676 100644 --- a/platform/srs-hooks.go +++ b/platform/srs-hooks.go @@ -730,38 +730,52 @@ func handleOnHls(ctx context.Context, handler *http.ServeMux) error { return errors.Errorf("invalid action=%v", msg.Action) } - if _, err := os.Stat(msg.File); err != nil { - logger.Tf(ctx, "invalid ts file %v", msg.File) + // below stupied code is used to resolve the code scanning error: + // Uncontrolled data used in path expression + allowedDir, err := os.Getwd() + if err != nil { + return errors.Wrapf(err, "can not get current working directory") + } - if err := os.MkdirAll(filepath.Dir(msg.File), 0755); err != nil { - return errors.Wrapf(err, "failed to create ts file directory %v", filepath.Dir(msg.File)) - } + safePath := filepath.Join(allowedDir, filepath.Clean(msg.File)) + logger.Tf(ctx, "safePath is %v", safePath) + absPath, err := filepath.Abs(safePath) + if err != nil { + return errors.Wrapf(err, "can not get absolute path from %v", safePath) + } - if tsFile, err := os.Create(msg.File); err != nil { - return errors.Wrapf(err, "failed to create ts file %v", msg.File) - } else { - tsUrl := "http://" + os.Getenv("SRS_HOST") + ":" + os.Getenv("SRS_HTTP_STREAM_PORT") + "/" + msg.URL - logger.Tf(ctx, "download ts from %v", tsUrl) - client := http.Client{ - CheckRedirect: func(req *http.Request, via []*http.Request) error { - r.URL.Opaque = r.URL.Path - return nil - }, - } + if !strings.HasPrefix(absPath, allowedDir) { + return errors.Errorf("Access denied, %v is outside allowed directory", absPath) + } - resp, err := client.Get(tsUrl) - if err != nil { - return errors.Wrapf(err, "http error to get url %v", tsUrl) - } - defer resp.Body.Close() + var data []byte + if _, err := os.Stat(absPath); err != nil { + logger.Tf(ctx, "invalid ts file %v", absPath) + tsUrl := "http://" + os.Getenv("SRS_HOST") + ":" + os.Getenv("SRS_HTTP_STREAM_PORT") + "/" + msg.URL + logger.Tf(ctx, "download ts from %v", tsUrl) + client := http.Client{ + CheckRedirect: func(req *http.Request, via []*http.Request) error { + r.URL.Opaque = r.URL.Path + return nil + }, + } - size, err := io.Copy(tsFile, resp.Body) - if err != nil { - return errors.Wrapf(err, "copy http resp to file %v", tsFile) - } - defer tsFile.Close() - logger.Tf(ctx, "Download ts file %s with size %d", tsUrl, size) + res, err := client.Get(tsUrl) + if err != nil { + return errors.Wrapf(err, "http error to get url %v", tsUrl) + } + defer res.Body.Close() + + if b, err := io.ReadAll(res.Body); err != nil { + return errors.Wrapf(err, "read http response error") + } else { + data = b } + logger.Tf(ctx, "Download ts file %s with size %d", tsUrl, len(data)) + } else if b, err := os.ReadFile(absPath); err != nil { + return errors.Wrapf(err, "read %v error", absPath) + } else { + data = b } logger.Tf(ctx, "on_hls ok, %v", string(b)) @@ -769,7 +783,7 @@ func handleOnHls(ctx context.Context, handler *http.ServeMux) error { if recordAll, err := rdb.HGet(ctx, SRS_RECORD_PATTERNS, "all").Result(); err != nil && err != redis.Nil { return errors.Wrapf(err, "hget %v all", SRS_RECORD_PATTERNS) } else if recordAll == "true" { - if err = recordWorker.OnHlsTsMessage(ctx, &msg); err != nil { + if err = recordWorker.OnHlsTsMessage(ctx, &msg, data); err != nil { return errors.Wrapf(err, "feed %v", msg.String()) } logger.Tf(ctx, "record %v", msg.String()) @@ -779,7 +793,7 @@ func handleOnHls(ctx context.Context, handler *http.ServeMux) error { if dvrAll, err := rdb.HGet(ctx, SRS_DVR_PATTERNS, "all").Result(); err != nil && err != redis.Nil { return errors.Wrapf(err, "hget %v all", SRS_DVR_PATTERNS) } else if dvrAll == "true" { - if err = dvrWorker.OnHlsTsMessage(ctx, &msg); err != nil { + if err = dvrWorker.OnHlsTsMessage(ctx, &msg, data); err != nil { return errors.Wrapf(err, "feed %v", msg.String()) } logger.Tf(ctx, "dvr %v", msg.String()) @@ -789,7 +803,7 @@ func handleOnHls(ctx context.Context, handler *http.ServeMux) error { if vodAll, err := rdb.HGet(ctx, SRS_VOD_PATTERNS, "all").Result(); err != nil && err != redis.Nil { return errors.Wrapf(err, "hget %v all", SRS_VOD_PATTERNS) } else if vodAll == "true" { - if err = vodWorker.OnHlsTsMessage(ctx, &msg); err != nil { + if err = vodWorker.OnHlsTsMessage(ctx, &msg, data); err != nil { return errors.Wrapf(err, "feed %v", msg.String()) } logger.Tf(ctx, "vod %v", msg.String()) @@ -797,7 +811,7 @@ func handleOnHls(ctx context.Context, handler *http.ServeMux) error { // Handle TS file by Transcript task if enabled. if transcriptWorker.Enabled() { - if err = transcriptWorker.OnHlsTsMessage(ctx, &msg); err != nil { + if err = transcriptWorker.OnHlsTsMessage(ctx, &msg, data); err != nil { return errors.Wrapf(err, "feed %v", msg.String()) } logger.Tf(ctx, "transcript %v", msg.String()) @@ -805,7 +819,7 @@ func handleOnHls(ctx context.Context, handler *http.ServeMux) error { // Handle TS file by OCR task if enabled. if ocrWorker.Enabled() { - if err = ocrWorker.OnHlsTsMessage(ctx, &msg); err != nil { + if err = ocrWorker.OnHlsTsMessage(ctx, &msg, data); err != nil { return errors.Wrapf(err, "feed %v", msg.String()) } logger.Tf(ctx, "ocr %v", msg.String()) diff --git a/platform/transcript.go b/platform/transcript.go index 51340915..17b5b2a4 100644 --- a/platform/transcript.go +++ b/platform/transcript.go @@ -4,6 +4,7 @@ package main import ( + "bytes" "context" "encoding/json" "fmt" @@ -21,6 +22,7 @@ import ( "github.com/ossrs/go-oryx-lib/errors" ohttp "github.com/ossrs/go-oryx-lib/http" "github.com/ossrs/go-oryx-lib/logger" + // Use v8 because we use Go 1.16+, while v9 requires Go 1.18+ "github.com/go-redis/redis/v8" "github.com/google/uuid" @@ -38,18 +40,12 @@ type TranscriptWorker struct { // The global transcript task, only support one transcript task. task *TranscriptTask - - // Use async goroutine to process on_hls messages. - msgs chan *SrsOnHlsMessage - // Got message from SRS, a new TS segment file is generated. tsfiles chan *SrsOnHlsObject } func NewTranscriptWorker() *TranscriptWorker { v := &TranscriptWorker{ - // Message on_hls. - msgs: make(chan *SrsOnHlsMessage, 1024), // TS files. tsfiles: make(chan *SrsOnHlsObject, 1024), } @@ -942,16 +938,7 @@ func (v *TranscriptWorker) Enabled() bool { return v.task.enabled() } -func (v *TranscriptWorker) OnHlsTsMessage(ctx context.Context, msg *SrsOnHlsMessage) error { - select { - case <-ctx.Done(): - case v.msgs <- msg: - } - - return nil -} - -func (v *TranscriptWorker) OnHlsTsMessageImpl(ctx context.Context, msg *SrsOnHlsMessage) error { +func (v *TranscriptWorker) OnHlsTsMessage(ctx context.Context, msg *SrsOnHlsMessage, data []byte) error { // Ignore if not natch the task config. if !v.task.match(msg) { return nil @@ -961,16 +948,11 @@ func (v *TranscriptWorker) OnHlsTsMessageImpl(ctx context.Context, msg *SrsOnHls tsid := fmt.Sprintf("%v-org-%v", msg.SeqNo, uuid.NewString()) tsfile := path.Join("transcript", fmt.Sprintf("%v.ts", tsid)) - // Always use execFile when params contains user inputs, see https://auth0.com/blog/preventing-command-injection-attacks-in-node-js-apps/ - // Note that should never use fs.copyFileSync(file, tsfile, fs.constants.COPYFILE_FICLONE_FORCE) which fails in macOS. - if err := exec.CommandContext(ctx, "cp", "-f", msg.File, tsfile).Run(); err != nil { - return errors.Wrapf(err, "copy file %v to %v", msg.File, tsfile) - } - - // Get the file size. - stats, err := os.Stat(msg.File) - if err != nil { - return errors.Wrapf(err, "stat file %v", msg.File) + if file, err := os.Create(tsfile); err != nil { + return errors.Wrapf(err, "create file %v error", tsfile) + } else { + defer file.Close() + io.Copy(file, bytes.NewReader(data)) } // Create a local ts file object. @@ -979,7 +961,7 @@ func (v *TranscriptWorker) OnHlsTsMessageImpl(ctx context.Context, msg *SrsOnHls URL: msg.URL, SeqNo: msg.SeqNo, Duration: msg.Duration, - Size: uint64(stats.Size()), + Size: uint64(len(data)), File: tsfile, } @@ -1054,22 +1036,6 @@ func (v *TranscriptWorker) Start(ctx context.Context) error { } }() - // Consume all on_hls messages. - wg.Add(1) - go func() { - defer wg.Done() - - for ctx.Err() == nil { - select { - case <-ctx.Done(): - case msg := <-v.msgs: - if err := v.OnHlsTsMessageImpl(ctx, msg); err != nil { - logger.Wf(ctx, "transcript: handle on hls message %v err %+v", msg.String(), err) - } - } - } - }() - // Consume all ts files by task. wg.Add(1) go func() {