Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions cmd/youtubeuploader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (
"flag"
"fmt"
"log"
"log/slog"
"net/http"
"os"
"path/filepath"
"strings"

yt "github.com/porjo/youtubeuploader"
"github.com/porjo/youtubeuploader/internal/limiter"
"github.com/porjo/youtubeuploader/internal/utils"
"google.golang.org/api/googleapi"
)

Expand Down Expand Up @@ -104,12 +104,19 @@ func main() {
RecordingDate: recordingDate,
}

config.Logger = utils.NewLogger(*debug)

config.Logger.Debugf("Youtubeuploader version: %s\n", appVersion)
// setup logging
programLevel := new(slog.LevelVar) // Info by default
so := &slog.HandlerOptions{Level: programLevel}
if *debug {
//so.AddSource = true
programLevel.Set(slog.LevelDebug)
}
logger := slog.New(slog.NewTextHandler(os.Stderr, so))
slog.SetDefault(logger)

slog.Debug("youtubeuploader version", "version", appVersion)
if config.ShowAppVersion {
fmt.Printf("Youtubeuploader version: %s\n", appVersion)
// exit immediatly after showing version
Comment on lines +117 to +119

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now the version command shows nothing, is this intended?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for spotting that! Fixed in master.

os.Exit(0)
}

Expand Down Expand Up @@ -142,7 +149,7 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

transport, err := limiter.NewLimitTransport(config.Logger, http.DefaultTransport, limitRange, filesize, config.RateLimit)
transport, err := limiter.NewLimitTransport(http.DefaultTransport, limitRange, filesize, config.RateLimit)
if err != nil {
log.Fatal(err)
}
Expand Down
9 changes: 3 additions & 6 deletions files.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"strings"
"time"

"github.com/porjo/youtubeuploader/internal/utils"
"google.golang.org/api/youtube/v3"
)

Expand Down Expand Up @@ -62,8 +61,6 @@ type Config struct {
NotifySubscribers bool
SendFileName bool
RecordingDate Date

Logger utils.Logger
}

type MediaType int
Expand Down Expand Up @@ -191,7 +188,7 @@ func LoadVideoMeta(config Config, video *youtube.Video) (*VideoMeta, error) {
return videoMeta, nil
}

func Open(filename string, mediaType MediaType) (io.ReadCloser, int, error) {
func Open(filename string, mediaType MediaType) (io.ReadCloser, int64, error) {
var reader io.ReadCloser
var filesize int64
var err error
Expand All @@ -205,7 +202,7 @@ func Open(filename string, mediaType MediaType) (io.ReadCloser, int, error) {
if lenStr != "" {
filesize, err = strconv.ParseInt(lenStr, 10, 64)
if err != nil {
return reader, int(filesize), err
return reader, filesize, err
}
}

Expand Down Expand Up @@ -260,7 +257,7 @@ func Open(filename string, mediaType MediaType) (io.ReadCloser, int, error) {
filesize = fileInfo.Size()

}
return reader, int(filesize), err
return reader, filesize, err
}

func (d *Date) UnmarshalJSON(b []byte) (err error) {
Expand Down
66 changes: 37 additions & 29 deletions internal/limiter/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,28 @@ import (
"context"
"fmt"
"io"
"log/slog"
"net/http"
"net/http/httputil"
"strings"
"sync"
"time"

"github.com/porjo/youtubeuploader/internal/utils"
"golang.org/x/time/rate"
)

const (
kbps2bpsMultiplier = 125 // kbps * 125 = bytes/s
defaultBurstLimit = 16 * 1024
)

type LimitTransport struct {
transport http.RoundTripper
limitRange LimitRange
reader limitChecker
readerInit bool
filesize int
filesize int64
rateLimit int

logger utils.Logger
}

type LimitRange struct {
Expand All @@ -53,12 +56,14 @@ type limitChecker struct {
status Status
rateLimit int
burstLimit int

ctx context.Context
}

type Status struct {
AvgRate int // Bytes per second
Bytes int
TotalBytes int
Bytes int64
TotalBytes int64

Progress string

Expand All @@ -79,14 +84,16 @@ func (lc *limitChecker) Read(p []byte) (int, error) {

if lc.rateLimit > 0 {
if lc.limiter == nil {
lc.burstLimit = len(p)

lc.burstLimit = defaultBurstLimit

slog.Debug("limiter: creating limiter", "burstlimit", lc.burstLimit, "ratelimit", lc.rateLimit, "initial buf len", len(p))

// token bucket
// - starts full and is refilled at the specified rate (tokens per second)
// - can burst (empty bucket) up to bucket size (burst limit)

// kbps * 125 = bytes/s
lc.limiter = rate.NewLimiter(rate.Limit(lc.rateLimit*125), lc.burstLimit)
lc.limiter = rate.NewLimiter(rate.Limit(lc.rateLimit*kbps2bpsMultiplier), lc.burstLimit)
}

if lc.limitRange.start.IsZero() || lc.limitRange.end.IsZero() {
Expand All @@ -107,28 +114,29 @@ func (lc *limitChecker) Read(p []byte) (int, error) {
}
}

read, err := lc.ReadCloser.Read(p)
if err != nil {
return read, err
}

if limit {

tokens := read

// tokens cannot exceed size of bucket (burst limit)
if tokens > lc.burstLimit {
tokens = lc.burstLimit
// tokens cannot exceed burst limit
if len(p) > lc.burstLimit {
slog.Debug("limiter: adjusting read buffer to match burst limit", "buf size", len(p), "burst limit", lc.burstLimit)
p = p[:lc.burstLimit]
}

err = lc.limiter.WaitN(context.Background(), tokens)
tokens := len(p)

err := lc.limiter.WaitN(lc.ctx, tokens)
if err != nil {
return read, err
return 0, err
}

}

lc.status.Bytes += read
read, err := lc.ReadCloser.Read(p)
if err != nil {
return read, err
}

lc.status.Bytes += int64(read)

if lc.status.TotalBytes > 0 {
// bytes read may be greater than filesize due to MIME multipart headers in body. Reset to filesize
Expand Down Expand Up @@ -180,14 +188,13 @@ func ParseLimitBetween(between, inputTimeLayout string) (LimitRange, error) {
return lr, nil
}

func NewLimitTransport(logger utils.Logger, rt http.RoundTripper, lr LimitRange, filesize int, ratelimit int) (*LimitTransport, error) {
func NewLimitTransport(rt http.RoundTripper, lr LimitRange, filesize int64, ratelimit int) (*LimitTransport, error) {

if rt == nil {
return nil, fmt.Errorf("roundtripper can't be nil")
}

lt := &LimitTransport{
logger: logger,
transport: rt,
limitRange: lr,
filesize: filesize,
Expand Down Expand Up @@ -216,6 +223,7 @@ func (t *LimitTransport) RoundTrip(r *http.Request) (*http.Response, error) {

t.reader.Lock()
if !t.readerInit {
t.reader.ctx = r.Context()
t.reader.limitRange = t.limitRange
t.reader.rateLimit = t.rateLimit
t.reader.status.TotalBytes = t.filesize
Expand All @@ -234,19 +242,19 @@ func (t *LimitTransport) RoundTrip(r *http.Request) (*http.Response, error) {
}

if contentType != "" {
t.logger.Debugf("Content-Type header value %q\n", contentType)
slog.Debug("content-Type header", "value", contentType)
}
t.logger.Debugf("Requesting URL %q\n", r.URL)
slog.Debug("requesting URL", "url", r.URL)

resp, err := t.transport.RoundTrip(r)
if err == nil {
t.logger.Debugf("Response status code: %d\n", resp.StatusCode)
slog.Debug("response status", "code", resp.StatusCode)
if resp.Body != nil {
respBytes, err := httputil.DumpResponse(resp, true)
if err != nil {
t.logger.Debugf("Error reading response: %s\n", err)
slog.Debug("error reading response", "err", err)
} else {
t.logger.Debugf("response dump:\n%s", respBytes)
slog.Debug("response dump", "response", respBytes)
}
}
}
Expand Down
31 changes: 0 additions & 31 deletions internal/utils/utils.go

This file was deleted.

3 changes: 2 additions & 1 deletion run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"path/filepath"
Expand Down Expand Up @@ -117,7 +118,7 @@ func Run(ctx context.Context, transport *limiter.LimitTransport, config Config,
call := service.Videos.Insert([]string{"snippet", "status", "recordingDetails"}, upload)
if config.SendFileName && config.Filename != "-" {
filetitle := filepath.Base(config.Filename)
config.Logger.Debugf("Adding file name to request: %q\n", filetitle)
slog.Debug("adding file name to request", "file", filetitle)
call.Header().Set("Slug", filetitle)
}
video, err = call.NotifySubscribers(config.NotifySubscribers).Media(videoReader, option).Do()
Expand Down
25 changes: 10 additions & 15 deletions test/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ import (

yt "github.com/porjo/youtubeuploader"
"github.com/porjo/youtubeuploader/internal/limiter"
"github.com/porjo/youtubeuploader/internal/utils"
"google.golang.org/api/youtube/v3"
)

const (
fileSize int = 1e7 // 10MB
fileSize int64 = 1e7 // 10MB

oAuthResponse = `{
"access_token": "xxxxxxxxxxxxxxxxxxxxxxxxxxxx",
Expand All @@ -55,21 +54,19 @@ var (
transport *mockTransport

recordingDate yt.Date

logger *slog.Logger
)

type mockTransport struct {
url *url.URL
}

type mockReader struct {
read int
fileSize int
read int64
fileSize int64
}

func (m *mockTransport) RoundTrip(r *http.Request) (*http.Response, error) {
logger.Info("roundtrip", "method", r.Method, "URL", r.URL.String())
slog.Info("roundtrip", "method", r.Method, "URL", r.URL.String())
r.URL.Scheme = m.url.Scheme
r.URL.Host = m.url.Host

Expand All @@ -83,18 +80,18 @@ func (m *mockReader) Close() error {
func (m *mockReader) Read(p []byte) (int, error) {

l := len(p)
if m.read+l >= m.fileSize {
if m.read+int64(l) >= m.fileSize {
diff := m.fileSize - m.read
m.read += diff
return diff, io.EOF
return int(diff), io.EOF
}
m.read += l
m.read += int64(l)
return l, nil
}

func TestMain(m *testing.M) {

logger = slog.Default()
logger := slog.Default()

testServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

Expand Down Expand Up @@ -169,8 +166,6 @@ func TestMain(m *testing.M) {
transport = &mockTransport{url: url}

config = yt.Config{}
//config.Logger = utils.NewLogger(true)
config.Logger = utils.NewLogger(false)
config.Filename = "test.mp4"
config.PlaylistIDs = []string{"xxxx", "yyyy"}
recordingDate = yt.Date{}
Expand All @@ -186,12 +181,12 @@ func TestRateLimit(t *testing.T) {

runTimeWant := 2

rateLimit := int(fileSize / 125 / runTimeWant)
rateLimit := int(fileSize / 125 / int64(runTimeWant))

t.Logf("File size %d bytes", fileSize)
t.Logf("Ratelimit %d Kbps", rateLimit)

transport, err := limiter.NewLimitTransport(config.Logger, transport, limiter.LimitRange{}, fileSize, rateLimit)
transport, err := limiter.NewLimitTransport(transport, limiter.LimitRange{}, fileSize, rateLimit)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading