Skip to content

Commit 5ff8459

Browse files
authored
Merge pull request #205 from porjo/ratelimit_fix
Ratelimit fix
2 parents 8d8a62a + fad4ded commit 5ff8459

File tree

6 files changed

+65
-88
lines changed

6 files changed

+65
-88
lines changed

cmd/youtubeuploader/main.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@ import (
1919
"flag"
2020
"fmt"
2121
"log"
22+
"log/slog"
2223
"net/http"
2324
"os"
2425
"path/filepath"
2526
"strings"
2627

2728
yt "github.com/porjo/youtubeuploader"
2829
"github.com/porjo/youtubeuploader/internal/limiter"
29-
"github.com/porjo/youtubeuploader/internal/utils"
3030
"google.golang.org/api/googleapi"
3131
)
3232

@@ -104,12 +104,19 @@ func main() {
104104
RecordingDate: recordingDate,
105105
}
106106

107-
config.Logger = utils.NewLogger(*debug)
108-
109-
config.Logger.Debugf("Youtubeuploader version: %s\n", appVersion)
107+
// setup logging
108+
programLevel := new(slog.LevelVar) // Info by default
109+
so := &slog.HandlerOptions{Level: programLevel}
110+
if *debug {
111+
//so.AddSource = true
112+
programLevel.Set(slog.LevelDebug)
113+
}
114+
logger := slog.New(slog.NewTextHandler(os.Stderr, so))
115+
slog.SetDefault(logger)
110116

117+
slog.Debug("youtubeuploader version", "version", appVersion)
111118
if config.ShowAppVersion {
112-
fmt.Printf("Youtubeuploader version: %s\n", appVersion)
119+
// exit immediatly after showing version
113120
os.Exit(0)
114121
}
115122

@@ -142,7 +149,7 @@ func main() {
142149
ctx, cancel := context.WithCancel(context.Background())
143150
defer cancel()
144151

145-
transport, err := limiter.NewLimitTransport(config.Logger, http.DefaultTransport, limitRange, filesize, config.RateLimit)
152+
transport, err := limiter.NewLimitTransport(http.DefaultTransport, limitRange, filesize, config.RateLimit)
146153
if err != nil {
147154
log.Fatal(err)
148155
}

files.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"strings"
2626
"time"
2727

28-
"github.com/porjo/youtubeuploader/internal/utils"
2928
"google.golang.org/api/youtube/v3"
3029
)
3130

@@ -62,8 +61,6 @@ type Config struct {
6261
NotifySubscribers bool
6362
SendFileName bool
6463
RecordingDate Date
65-
66-
Logger utils.Logger
6764
}
6865

6966
type MediaType int
@@ -191,7 +188,7 @@ func LoadVideoMeta(config Config, video *youtube.Video) (*VideoMeta, error) {
191188
return videoMeta, nil
192189
}
193190

194-
func Open(filename string, mediaType MediaType) (io.ReadCloser, int, error) {
191+
func Open(filename string, mediaType MediaType) (io.ReadCloser, int64, error) {
195192
var reader io.ReadCloser
196193
var filesize int64
197194
var err error
@@ -205,7 +202,7 @@ func Open(filename string, mediaType MediaType) (io.ReadCloser, int, error) {
205202
if lenStr != "" {
206203
filesize, err = strconv.ParseInt(lenStr, 10, 64)
207204
if err != nil {
208-
return reader, int(filesize), err
205+
return reader, filesize, err
209206
}
210207
}
211208

@@ -260,7 +257,7 @@ func Open(filename string, mediaType MediaType) (io.ReadCloser, int, error) {
260257
filesize = fileInfo.Size()
261258

262259
}
263-
return reader, int(filesize), err
260+
return reader, filesize, err
264261
}
265262

266263
func (d *Date) UnmarshalJSON(b []byte) (err error) {

internal/limiter/limiter.go

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,28 @@ import (
1818
"context"
1919
"fmt"
2020
"io"
21+
"log/slog"
2122
"net/http"
2223
"net/http/httputil"
2324
"strings"
2425
"sync"
2526
"time"
2627

27-
"github.com/porjo/youtubeuploader/internal/utils"
2828
"golang.org/x/time/rate"
2929
)
3030

31+
const (
32+
kbps2bpsMultiplier = 125 // kbps * 125 = bytes/s
33+
defaultBurstLimit = 16 * 1024
34+
)
35+
3136
type LimitTransport struct {
3237
transport http.RoundTripper
3338
limitRange LimitRange
3439
reader limitChecker
3540
readerInit bool
36-
filesize int
41+
filesize int64
3742
rateLimit int
38-
39-
logger utils.Logger
4043
}
4144

4245
type LimitRange struct {
@@ -53,12 +56,14 @@ type limitChecker struct {
5356
status Status
5457
rateLimit int
5558
burstLimit int
59+
60+
ctx context.Context
5661
}
5762

5863
type Status struct {
5964
AvgRate int // Bytes per second
60-
Bytes int
61-
TotalBytes int
65+
Bytes int64
66+
TotalBytes int64
6267

6368
Progress string
6469

@@ -79,14 +84,16 @@ func (lc *limitChecker) Read(p []byte) (int, error) {
7984

8085
if lc.rateLimit > 0 {
8186
if lc.limiter == nil {
82-
lc.burstLimit = len(p)
87+
88+
lc.burstLimit = defaultBurstLimit
89+
90+
slog.Debug("limiter: creating limiter", "burstlimit", lc.burstLimit, "ratelimit", lc.rateLimit, "initial buf len", len(p))
8391

8492
// token bucket
8593
// - starts full and is refilled at the specified rate (tokens per second)
8694
// - can burst (empty bucket) up to bucket size (burst limit)
8795

88-
// kbps * 125 = bytes/s
89-
lc.limiter = rate.NewLimiter(rate.Limit(lc.rateLimit*125), lc.burstLimit)
96+
lc.limiter = rate.NewLimiter(rate.Limit(lc.rateLimit*kbps2bpsMultiplier), lc.burstLimit)
9097
}
9198

9299
if lc.limitRange.start.IsZero() || lc.limitRange.end.IsZero() {
@@ -107,28 +114,29 @@ func (lc *limitChecker) Read(p []byte) (int, error) {
107114
}
108115
}
109116

110-
read, err := lc.ReadCloser.Read(p)
111-
if err != nil {
112-
return read, err
113-
}
114-
115117
if limit {
116118

117-
tokens := read
118-
119-
// tokens cannot exceed size of bucket (burst limit)
120-
if tokens > lc.burstLimit {
121-
tokens = lc.burstLimit
119+
// tokens cannot exceed burst limit
120+
if len(p) > lc.burstLimit {
121+
slog.Debug("limiter: adjusting read buffer to match burst limit", "buf size", len(p), "burst limit", lc.burstLimit)
122+
p = p[:lc.burstLimit]
122123
}
123124

124-
err = lc.limiter.WaitN(context.Background(), tokens)
125+
tokens := len(p)
126+
127+
err := lc.limiter.WaitN(lc.ctx, tokens)
125128
if err != nil {
126-
return read, err
129+
return 0, err
127130
}
128131

129132
}
130133

131-
lc.status.Bytes += read
134+
read, err := lc.ReadCloser.Read(p)
135+
if err != nil {
136+
return read, err
137+
}
138+
139+
lc.status.Bytes += int64(read)
132140

133141
if lc.status.TotalBytes > 0 {
134142
// bytes read may be greater than filesize due to MIME multipart headers in body. Reset to filesize
@@ -180,14 +188,13 @@ func ParseLimitBetween(between, inputTimeLayout string) (LimitRange, error) {
180188
return lr, nil
181189
}
182190

183-
func NewLimitTransport(logger utils.Logger, rt http.RoundTripper, lr LimitRange, filesize int, ratelimit int) (*LimitTransport, error) {
191+
func NewLimitTransport(rt http.RoundTripper, lr LimitRange, filesize int64, ratelimit int) (*LimitTransport, error) {
184192

185193
if rt == nil {
186194
return nil, fmt.Errorf("roundtripper can't be nil")
187195
}
188196

189197
lt := &LimitTransport{
190-
logger: logger,
191198
transport: rt,
192199
limitRange: lr,
193200
filesize: filesize,
@@ -216,6 +223,7 @@ func (t *LimitTransport) RoundTrip(r *http.Request) (*http.Response, error) {
216223

217224
t.reader.Lock()
218225
if !t.readerInit {
226+
t.reader.ctx = r.Context()
219227
t.reader.limitRange = t.limitRange
220228
t.reader.rateLimit = t.rateLimit
221229
t.reader.status.TotalBytes = t.filesize
@@ -234,19 +242,19 @@ func (t *LimitTransport) RoundTrip(r *http.Request) (*http.Response, error) {
234242
}
235243

236244
if contentType != "" {
237-
t.logger.Debugf("Content-Type header value %q\n", contentType)
245+
slog.Debug("content-Type header", "value", contentType)
238246
}
239-
t.logger.Debugf("Requesting URL %q\n", r.URL)
247+
slog.Debug("requesting URL", "url", r.URL)
240248

241249
resp, err := t.transport.RoundTrip(r)
242250
if err == nil {
243-
t.logger.Debugf("Response status code: %d\n", resp.StatusCode)
251+
slog.Debug("response status", "code", resp.StatusCode)
244252
if resp.Body != nil {
245253
respBytes, err := httputil.DumpResponse(resp, true)
246254
if err != nil {
247-
t.logger.Debugf("Error reading response: %s\n", err)
255+
slog.Debug("error reading response", "err", err)
248256
} else {
249-
t.logger.Debugf("response dump:\n%s", respBytes)
257+
slog.Debug("response dump", "response", respBytes)
250258
}
251259
}
252260
}

internal/utils/utils.go

Lines changed: 0 additions & 31 deletions
This file was deleted.

run.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"encoding/json"
2020
"fmt"
2121
"io"
22+
"log/slog"
2223
"net/http"
2324
"os"
2425
"path/filepath"
@@ -117,7 +118,7 @@ func Run(ctx context.Context, transport *limiter.LimitTransport, config Config,
117118
call := service.Videos.Insert([]string{"snippet", "status", "recordingDetails"}, upload)
118119
if config.SendFileName && config.Filename != "-" {
119120
filetitle := filepath.Base(config.Filename)
120-
config.Logger.Debugf("Adding file name to request: %q\n", filetitle)
121+
slog.Debug("adding file name to request", "file", filetitle)
121122
call.Header().Set("Slug", filetitle)
122123
}
123124
video, err = call.NotifySubscribers(config.NotifySubscribers).Media(videoReader, option).Do()

test/upload_test.go

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,11 @@ import (
3333

3434
yt "github.com/porjo/youtubeuploader"
3535
"github.com/porjo/youtubeuploader/internal/limiter"
36-
"github.com/porjo/youtubeuploader/internal/utils"
3736
"google.golang.org/api/youtube/v3"
3837
)
3938

4039
const (
41-
fileSize int = 1e7 // 10MB
40+
fileSize int64 = 1e7 // 10MB
4241

4342
oAuthResponse = `{
4443
"access_token": "xxxxxxxxxxxxxxxxxxxxxxxxxxxx",
@@ -55,21 +54,19 @@ var (
5554
transport *mockTransport
5655

5756
recordingDate yt.Date
58-
59-
logger *slog.Logger
6057
)
6158

6259
type mockTransport struct {
6360
url *url.URL
6461
}
6562

6663
type mockReader struct {
67-
read int
68-
fileSize int
64+
read int64
65+
fileSize int64
6966
}
7067

7168
func (m *mockTransport) RoundTrip(r *http.Request) (*http.Response, error) {
72-
logger.Info("roundtrip", "method", r.Method, "URL", r.URL.String())
69+
slog.Info("roundtrip", "method", r.Method, "URL", r.URL.String())
7370
r.URL.Scheme = m.url.Scheme
7471
r.URL.Host = m.url.Host
7572

@@ -83,18 +80,18 @@ func (m *mockReader) Close() error {
8380
func (m *mockReader) Read(p []byte) (int, error) {
8481

8582
l := len(p)
86-
if m.read+l >= m.fileSize {
83+
if m.read+int64(l) >= m.fileSize {
8784
diff := m.fileSize - m.read
8885
m.read += diff
89-
return diff, io.EOF
86+
return int(diff), io.EOF
9087
}
91-
m.read += l
88+
m.read += int64(l)
9289
return l, nil
9390
}
9491

9592
func TestMain(m *testing.M) {
9693

97-
logger = slog.Default()
94+
logger := slog.Default()
9895

9996
testServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
10097

@@ -169,8 +166,6 @@ func TestMain(m *testing.M) {
169166
transport = &mockTransport{url: url}
170167

171168
config = yt.Config{}
172-
//config.Logger = utils.NewLogger(true)
173-
config.Logger = utils.NewLogger(false)
174169
config.Filename = "test.mp4"
175170
config.PlaylistIDs = []string{"xxxx", "yyyy"}
176171
recordingDate = yt.Date{}
@@ -186,12 +181,12 @@ func TestRateLimit(t *testing.T) {
186181

187182
runTimeWant := 2
188183

189-
rateLimit := int(fileSize / 125 / runTimeWant)
184+
rateLimit := int(fileSize / 125 / int64(runTimeWant))
190185

191186
t.Logf("File size %d bytes", fileSize)
192187
t.Logf("Ratelimit %d Kbps", rateLimit)
193188

194-
transport, err := limiter.NewLimitTransport(config.Logger, transport, limiter.LimitRange{}, fileSize, rateLimit)
189+
transport, err := limiter.NewLimitTransport(transport, limiter.LimitRange{}, fileSize, rateLimit)
195190
if err != nil {
196191
t.Fatal(err)
197192
}

0 commit comments

Comments
 (0)