diff --git a/cmd/tusd/cli/composer.go b/cmd/tusd/cli/composer.go index e47dc19cd..2d085e68b 100644 --- a/cmd/tusd/cli/composer.go +++ b/cmd/tusd/cli/composer.go @@ -3,12 +3,11 @@ package cli import ( "context" "fmt" + "log/slog" "os" "path/filepath" "strings" - "golang.org/x/exp/slog" - "github.com/tus/tusd/v2/internal/s3log" "github.com/tus/tusd/v2/pkg/azurestore" "github.com/tus/tusd/v2/pkg/filelocker" @@ -20,7 +19,6 @@ import ( "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/prometheus/client_golang/prometheus" ) diff --git a/cmd/tusd/cli/flags.go b/cmd/tusd/cli/flags.go index c81fa22c1..0cf43b627 100644 --- a/cmd/tusd/cli/flags.go +++ b/cmd/tusd/cli/flags.go @@ -3,12 +3,12 @@ package cli import ( "flag" "path/filepath" + "slices" "strings" "time" "github.com/tus/tusd/v2/internal/grouped_flags" "github.com/tus/tusd/v2/pkg/hooks" - "golang.org/x/exp/slices" ) var Flags struct { diff --git a/cmd/tusd/cli/log.go b/cmd/tusd/cli/log.go index bf4bbe43c..f7c3361c0 100644 --- a/cmd/tusd/cli/log.go +++ b/cmd/tusd/cli/log.go @@ -2,9 +2,8 @@ package cli import ( "log" + "log/slog" "os" - - "golang.org/x/exp/slog" ) var stdout = log.New(os.Stdout, "", log.LstdFlags|log.Lmicroseconds) diff --git a/go.mod b/go.mod index 0b7637641..beda31759 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,6 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v1.21.0 github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1 github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.4 - github.com/Shopify/toxiproxy/v2 v2.12.0 github.com/aws/aws-sdk-go-v2 v1.41.1 github.com/aws/aws-sdk-go-v2/config v1.32.7 github.com/aws/aws-sdk-go-v2/service/s3 v1.96.0 @@ -26,12 +25,10 @@ require ( github.com/hashicorp/go-hclog v1.6.3 github.com/hashicorp/go-plugin v1.7.0 github.com/prometheus/client_golang v1.23.2 - github.com/rs/zerolog v1.34.0 github.com/sethgrid/pester v1.2.0 github.com/stretchr/testify v1.11.1 github.com/tus/lockfile v1.2.0 github.com/vimeo/go-util v1.4.1 - golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8 golang.org/x/net v0.49.0 golang.org/x/sync v0.19.0 google.golang.org/api v0.264.0 @@ -86,7 +83,6 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.11 // indirect github.com/googleapis/gax-go/v2 v2.16.0 // indirect - github.com/gorilla/mux v1.8.1 // indirect github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect github.com/hashicorp/yamux v0.1.2 // indirect github.com/kylelemons/godebug v1.1.0 // indirect @@ -100,7 +96,6 @@ require ( github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.66.1 // indirect github.com/prometheus/procfs v0.16.1 // indirect - github.com/rs/xid v1.6.0 // indirect github.com/spiffe/go-spiffe/v2 v2.6.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/contrib/detectors/gcp v1.38.0 // indirect @@ -120,6 +115,5 @@ require ( google.golang.org/genproto v0.0.0-20251202230838-ff82c1b0f217 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260122232226-8e98ce8d340d // indirect - gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index a750821a5..ef274de04 100644 --- a/go.sum +++ b/go.sum @@ -48,8 +48,6 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0 github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0.54.0/go.mod h1:vB2GH9GAYYJTO3mEn8oYwzEdhlayZIdQz6zdzgUIRvA= github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.54.0 h1:s0WlVbf9qpvkh1c/uDAPElam0WrL7fHRIidgZJ7UqZI= github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.54.0/go.mod h1:Mf6O40IAyB9zR/1J8nGDDPirZQQPbYJni8Yisy7NTMc= -github.com/Shopify/toxiproxy/v2 v2.12.0 h1:d1x++lYZg/zijXPPcv7PH0MvHMzEI5aX/YuUi/Sw+yg= -github.com/Shopify/toxiproxy/v2 v2.12.0/go.mod h1:R9Z38Pw6k2cGZWXHe7tbxjGW9azmY1KbDQJ1kd+h7Tk= github.com/aws/aws-sdk-go-v2 v1.41.1 h1:ABlyEARCDLN034NhxlRUSZr4l71mh+T5KAeGh6cerhU= github.com/aws/aws-sdk-go-v2 v1.41.1/go.mod h1:MayyLB8y+buD9hZqkCW3kX1AKq07Y5pXxtgB+rRFhz0= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.4 h1:489krEF9xIGkOaaX3CE/Be2uWjiXrkCH6gUX+bZA/BU= @@ -108,7 +106,6 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/xds/go v0.0.0-20251022180443-0feb69152e9f h1:Y8xYupdHxryycyPlc9Y+bSQAYZnetRJ70VMVKm5CKI0= github.com/cncf/xds/go v0.0.0-20251022180443-0feb69152e9f/go.mod h1:HlzOvOjVBOfTGSRXRyY0OiCS/3J1akRGQQpRO/7zyF4= -github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -144,7 +141,6 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM= github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= github.com/gobwas/ws v1.2.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY= -github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d h1:lBXNCxVENCipq4D1Is42JVOP4eQjlB8TQ6H69Yx5J9Q= github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A= @@ -174,8 +170,6 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.11 h1:vAe81Msw+8tKUxi2Dq github.com/googleapis/enterprise-certificate-proxy v0.3.11/go.mod h1:RFV7MUdlb7AgEq2v7FmMCfeSMCllAzWxFgRdusoGks8= github.com/googleapis/gax-go/v2 v2.16.0 h1:iHbQmKLLZrexmb0OSsNGTeSTS0HO4YvFOG8g5E4Zd0Y= github.com/googleapis/gax-go/v2 v2.16.0/go.mod h1:o1vfQjjNZn4+dPnRdl/4ZD7S9414Y4xA+a/6Icj6l14= -github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= -github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8= github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw= @@ -215,7 +209,6 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= @@ -229,7 +222,6 @@ github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde/go.mod h1:nZgzb github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo= github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -246,10 +238,6 @@ github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzM github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= -github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU= -github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= -github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY= -github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ= github.com/sethgrid/pester v1.2.0 h1:adC9RS29rRUef3rIKWPOuP1Jm3/MmB6ke+OhE5giENI= github.com/sethgrid/pester v1.2.0/go.mod h1:hEUINb4RqvDxtoCaU0BNT/HV4ig5kfgOasrf1xcvr0A= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= @@ -308,8 +296,6 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8= golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8 h1:yqrTHse8TCMW1M1ZCP+VAR/l0kKxwaAIqN/il7x4voA= -golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -357,7 +343,6 @@ golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -411,8 +396,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/h2non/gock.v1 v1.1.2 h1:jBbHXgGBK/AoPVfJh5x4r/WxIrElvbLel8TCZkkZJoY= gopkg.in/h2non/gock.v1 v1.1.2/go.mod h1:n7UGz/ckNChHiK05rDoiC4MYSunEC/lyaUm2WWaDva0= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/e2e/e2e_test.go b/internal/e2e/e2e_test.go index 399b5fe5c..3eef875cf 100644 --- a/internal/e2e/e2e_test.go +++ b/internal/e2e/e2e_test.go @@ -8,7 +8,6 @@ import ( "errors" "fmt" "io" - "log" "net" "net/http" "net/http/httptest" @@ -21,14 +20,10 @@ import ( "testing" "time" - toxiproxy_server "github.com/Shopify/toxiproxy/v2" - toxiproxy "github.com/Shopify/toxiproxy/v2/client" - "github.com/prometheus/client_golang/prometheus" - "github.com/rs/zerolog" - "golang.org/x/exp/constraints" + "github.com/stretchr/testify/require" ) -var toxiClient *toxiproxy.Client +var toxiClient *localProxyClient var TUSD_BINARY string var TUSD_ENDPOINT_RE = regexp.MustCompile(`You can now upload files to: (https?://([^/]+)/\S*)`) @@ -42,25 +37,11 @@ func TestMain(m *testing.M) { os.Exit(1) } - // Create a new toxiproxy server instance - metrics := toxiproxy_server.NewMetricsContainer(prometheus.NewRegistry()) - logger := zerolog.New(os.Stderr).Level(zerolog.ErrorLevel) - server := toxiproxy_server.NewServer(metrics, logger) - - addr := "localhost:8474" - go func(server *toxiproxy_server.ApiServer, addr string) { - if err := server.Listen(addr); err != nil { - log.Fatalf("failed to start toxiproxy: %s", err) - } - }(server, addr) - - // Create a new toxiproxy client instance - toxiClient = toxiproxy.NewClient(addr) + toxiClient = newLocalProxyClient() // Run actual tests exitVal := m.Run() - server.Shutdown() os.Exit(exitVal) } @@ -78,17 +59,13 @@ func TestSuccessfulUpload(t *testing.T) { length := data.Len() req, err := http.NewRequest("POST", endpoint, nil) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) req.Header.Add("Tus-Resumable", "1.0.0") req.Header.Add("Upload-Length", strconv.Itoa(length)) res, err := http.DefaultClient.Do(req) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) if res.StatusCode != http.StatusCreated { t.Fatal("invalid response code") @@ -97,18 +74,14 @@ func TestSuccessfulUpload(t *testing.T) { uploadUrl := res.Header.Get("Location") req, err = http.NewRequest("PATCH", uploadUrl, data) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) req.Header.Add("Tus-Resumable", "1.0.0") req.Header.Add("Upload-Offset", "0") req.Header.Add("Content-Type", "application/offset+octet-stream") res, err = http.DefaultClient.Do(req) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) if res.StatusCode != http.StatusNoContent { t.Fatalf("invalid response code %d", res.StatusCode) @@ -138,11 +111,11 @@ func TestNetworkReadTimeout(t *testing.T) { // We limit the upstream connection to tusd to 5KB/s. The downstream connection // from tusd is not limited. - proxy.AddToxic("", "bandwidth", "upstream", 1, toxiproxy.Attributes{ + proxy.AddToxic("", "bandwidth", "upstream", 1, map[string]any{ "rate": 5, }) - // Endpoint address point to toxiproxy + // Endpoint address points to the network proxy. endpoint := "http://" + proxy.Listen + "/files/" // We tell tusd to create a 50KB upload, but only upload 10KB of data. @@ -150,23 +123,17 @@ func TestNetworkReadTimeout(t *testing.T) { uploadLength := 50 * 1024 data := make([]byte, payloadLength) _, err := rand.Read(data) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // Create upload req, err := http.NewRequest("POST", endpoint, nil) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) req.Header.Add("Tus-Resumable", "1.0.0") req.Header.Add("Upload-Length", strconv.Itoa(uploadLength)) res, err := http.DefaultClient.Do(req) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) if res.StatusCode != http.StatusCreated { t.Fatal("invalid response code") @@ -184,9 +151,7 @@ func TestNetworkReadTimeout(t *testing.T) { // Begin uploading data. The 10KB are transmitted completely after 2s, after which no // more data is received by tusd. The TCP connection stays open. req, err = http.NewRequest("PATCH", uploadUrl, reader) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) req.Header.Add("Tus-Resumable", "1.0.0") req.Header.Add("Upload-Offset", "0") @@ -194,9 +159,7 @@ func TestNetworkReadTimeout(t *testing.T) { start := time.Now() res, err = http.DefaultClient.Do(req) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer res.Body.Close() duration := time.Since(start) @@ -207,9 +170,7 @@ func TestNetworkReadTimeout(t *testing.T) { } body, err := io.ReadAll(res.Body) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) if !strings.Contains(string(body), "ERR_READ_TIMEOUT") { t.Fatalf("invalid response body %s", string(body)) @@ -217,34 +178,24 @@ func TestNetworkReadTimeout(t *testing.T) { // Send HEAD request to fetch offset req, err = http.NewRequest("HEAD", uploadUrl, nil) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) req.Header.Add("Tus-Resumable", "1.0.0") res, err = http.DefaultClient.Do(req) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer res.Body.Close() offset, err := strconv.Atoi(res.Header.Get("Upload-Offset")) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // Data was allowed to flow for 2s at 5KB/s, so we should have // uploaded approximately 10KB. - if !isApprox(offset, 10_000, 0.1) { - t.Fatalf("invalid offset %d", offset) - } + require.InDelta(t, 10_000, offset, 1_000, "invalid offset %d", offset) // Data was allowed to flow for 2s and tusd is configured to time // out after 5s, so the entire request should have ran for 7s. - if !isApprox(duration, 7*time.Second, 0.2) { - t.Fatalf("invalid request duration %v", duration) - } + require.InDelta(t, 7*time.Second, duration, float64(7*time.Second)*0.2, "invalid request duration %v", duration) } // TestUnexpectedNetworkClose tests that tusd correctly saves the transmitted data @@ -263,37 +214,31 @@ func TestUnexpectedNetworkClose(t *testing.T) { // We limit the upstream connection to tusd to 5KB/s. The downstream connection // from tusd is not limited. The upstream connection will be closed after sending // 10KB. - proxy.AddToxic("", "bandwidth", "upstream", 1, toxiproxy.Attributes{ + proxy.AddToxic("", "bandwidth", "upstream", 1, map[string]any{ "rate": 5, }) - proxy.AddToxic("", "limit_data", "upstream", 1, toxiproxy.Attributes{ + proxy.AddToxic("", "limit_data", "upstream", 1, map[string]any{ "bytes": 10_000, }) - // Endpoint address point to toxiproxy + // Endpoint address points to the network proxy. endpoint := "http://" + proxy.Listen + "/files/" // 50KB of random upload data length := 50 * 1024 data := make([]byte, length) _, err := rand.Read(data) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // Create upload req, err := http.NewRequest("POST", endpoint, nil) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) req.Header.Add("Tus-Resumable", "1.0.0") req.Header.Add("Upload-Length", strconv.Itoa(length)) res, err := http.DefaultClient.Do(req) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) if res.StatusCode != http.StatusCreated { t.Fatal("invalid response code") @@ -302,52 +247,41 @@ func TestUnexpectedNetworkClose(t *testing.T) { uploadUrl := res.Header.Get("Location") req, err = http.NewRequest("PATCH", uploadUrl, bytes.NewReader(data)) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) req.Header.Add("Tus-Resumable", "1.0.0") req.Header.Add("Upload-Offset", "0") req.Header.Add("Content-Type", "application/offset+octet-stream") - // Send the PATCH request. The connection will be closed by the toxiproxy, + // Send the PATCH request. The connection will be closed by the proxy, // so we get an EOF error here. start := time.Now() _, err = http.DefaultClient.Do(req) - if !errors.Is(err, io.EOF) && !strings.Contains(err.Error(), "connection reset") { + require.Error(t, err) + if !isExpectedNetworkCloseError(err) { t.Fatalf("unexpected error %s", err) } duration := time.Since(start) // Send HEAD request to fetch offset req, err = http.NewRequest("HEAD", uploadUrl, nil) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) req.Header.Add("Tus-Resumable", "1.0.0") res, err = http.DefaultClient.Do(req) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer res.Body.Close() offset, err := strconv.Atoi(res.Header.Get("Upload-Offset")) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) - // 10KB were allowed before toxiproxy cut the connection. Accounting + // 10KB were allowed before the proxy cut the connection. Accounting // the overhead of HTTP request, tusd should have received about 10KB. - if !isApprox(offset, 10_000, 0.1) { - t.Fatalf("invalid offset %d", offset) - } + require.InDelta(t, 10_000, offset, 1_000, "invalid offset %d", offset) // Data was allowed to flow for 2s. - if !isApprox(duration, 2*time.Second, 0.2) { - t.Fatalf("invalid request duration %v", duration) - } + require.InDelta(t, 2*time.Second, duration, float64(2*time.Second)*0.2, "invalid request duration %v", duration) } // TestUnexpectedNetworkReset tests that tusd correctly saves the transmitted data @@ -360,7 +294,7 @@ func TestUnexpectedNetworkReset(t *testing.T) { endpoint, addr, _ := spawnTusd(ctx, t) - // We don't use toxiproxy here because we have to control the TCP RST + // We don't use the proxy here because we have to control the TCP RST // flag directly. // We create an upload of 10KB, but only provide 5KB before cutting the connection. @@ -368,23 +302,17 @@ func TestUnexpectedNetworkReset(t *testing.T) { payloadLength := 5 * 1024 data := make([]byte, payloadLength) _, err := rand.Read(data) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // Create upload req, err := http.NewRequest("POST", endpoint, nil) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) req.Header.Add("Tus-Resumable", "1.0.0") req.Header.Add("Upload-Length", strconv.Itoa(uploadLength)) res, err := http.DefaultClient.Do(req) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) if res.StatusCode != http.StatusCreated { t.Fatal("invalid response code") @@ -392,17 +320,13 @@ func TestUnexpectedNetworkReset(t *testing.T) { uploadUrlStr := res.Header.Get("Location") uploadUrl, err := url.Parse(uploadUrlStr) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // Send upload data via a PATCH request. We directly open a TCP socket and write the HTTP // request manually because it allows us to use SetLinger directly and we can send a smaller // body than advertised in the Content-Length header. conn, err := net.Dial("tcp", addr) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // SetLinger(0) causes a RST to be sent instead of a normal FIN handshake. tcpConn := conn.(*net.TCPConn) @@ -437,27 +361,19 @@ Upload-Offset: 0 // Send HEAD request to fetch offset req, err = http.NewRequest("HEAD", uploadUrlStr, nil) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) req.Header.Add("Tus-Resumable", "1.0.0") res, err = http.DefaultClient.Do(req) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer res.Body.Close() offset, err := strconv.Atoi(res.Header.Get("Upload-Offset")) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // 5KB were transmitted, all of which should be safed. - if !isApprox(offset, payloadLength, 0.1) { - t.Fatalf("invalid offset %d", offset) - } + require.InDelta(t, payloadLength, offset, float64(payloadLength)*0.1, "invalid offset %d", offset) } // TestLockRelease asserts that an incoming request will cause any ongoing request @@ -477,34 +393,28 @@ func TestLockRelease(t *testing.T) { // We limit the upstream connection to tusd to 5KB/s. The downstream connection // from tusd is not limited. - proxy.AddToxic("", "bandwidth", "upstream", 1, toxiproxy.Attributes{ + proxy.AddToxic("", "bandwidth", "upstream", 1, map[string]any{ "rate": 5, }) - // Endpoint address point to toxiproxy + // Endpoint address points to the network proxy. endpoint := "http://" + proxy.Listen + "/files/" // 50KB of random upload data length := 50 * 1024 data := make([]byte, length) _, err := rand.Read(data) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // Create upload postReq, err := http.NewRequest("POST", endpoint, nil) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) postReq.Header.Add("Tus-Resumable", "1.0.0") postReq.Header.Add("Upload-Length", strconv.Itoa(length)) postRes, err := http.DefaultClient.Do(postReq) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) if postRes.StatusCode != http.StatusCreated { t.Fatal("invalid response code") @@ -514,9 +424,7 @@ func TestLockRelease(t *testing.T) { // Begin the upload patchReq, err := http.NewRequest("PATCH", uploadUrl, bytes.NewReader(data)) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) patchReq.Header.Add("Tus-Resumable", "1.0.0") patchReq.Header.Add("Upload-Offset", "0") @@ -553,9 +461,7 @@ func TestLockRelease(t *testing.T) { start := time.Now() patchRes, err := http.DefaultClient.Do(patchReq) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer patchRes.Body.Close() // Assert the response to see if tusd correctly emitted an interruption message. @@ -564,9 +470,7 @@ func TestLockRelease(t *testing.T) { } body, err := io.ReadAll(patchRes.Body) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) if !strings.Contains(string(body), "ERR_UPLOAD_INTERRUPTED") { t.Fatalf("invalid response body %s", string(body)) @@ -575,32 +479,24 @@ func TestLockRelease(t *testing.T) { // Wait for the HEAD response and assert its response headRes := <-headResChan err = <-headErrChan - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) if headRes.StatusCode != http.StatusOK { t.Fatalf("invalid response code %d", headRes.StatusCode) } offset, err := strconv.Atoi(headRes.Header.Get("Upload-Offset")) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // Data was allowed to flow for 2s at 5KB/s, so we should have // uploaded approximately 10KB. - if !isApprox(offset, 10_000, 0.1) { - t.Fatalf("invalid offset %d", offset) - } + require.InDelta(t, 10_000, offset, 1_000, "invalid offset %d", offset) // The interrupting request is sent after 2s, but with the poll intervals it might // take some more time for the requests to be finished, so the duration should be // 3s +/- 1s. duration := time.Since(start) - if !isApprox(duration, 3*time.Second, 0.3) { - t.Fatalf("invalid request duration %v", duration) - } + require.InDelta(t, 3*time.Second, duration, float64(3*time.Second)*0.3, "invalid request duration %v", duration) } // TestUploadLengthExceeded asserts that uploading appending requests are limited to @@ -619,11 +515,11 @@ func TestUploadLengthExceeded(t *testing.T) { // We limit the upstream connection to tusd to 5KB/s. The downstream connection // from tusd is not limited. - proxy.AddToxic("", "bandwidth", "upstream", 1, toxiproxy.Attributes{ + proxy.AddToxic("", "bandwidth", "upstream", 1, map[string]any{ "rate": 5, }) - // Endpoint address point to toxiproxy + // Endpoint address points to the network proxy. endpoint := "http://" + proxy.Listen + "/files/" // We specify an upload length of 10KB, but supply 50KB of random upload data. @@ -631,23 +527,17 @@ func TestUploadLengthExceeded(t *testing.T) { payloadLength := 50 * 1024 data := make([]byte, payloadLength) _, err := rand.Read(data) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // Create upload req, err := http.NewRequest("POST", endpoint, nil) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) req.Header.Add("Tus-Resumable", "1.0.0") req.Header.Add("Upload-Length", strconv.Itoa(uploadLength)) res, err := http.DefaultClient.Do(req) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) if res.StatusCode != http.StatusCreated { t.Fatal("invalid response code") @@ -656,9 +546,7 @@ func TestUploadLengthExceeded(t *testing.T) { uploadUrl := res.Header.Get("Location") req, err = http.NewRequest("PATCH", uploadUrl, bytes.NewReader(data)) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) req.Header.Add("Tus-Resumable", "1.0.0") req.Header.Add("Upload-Offset", "0") @@ -673,9 +561,7 @@ func TestUploadLengthExceeded(t *testing.T) { start := time.Now() res, err = http.DefaultClient.Do(req) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer res.Body.Close() duration := time.Since(start) @@ -684,9 +570,7 @@ func TestUploadLengthExceeded(t *testing.T) { } body, err := io.ReadAll(res.Body) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) if !strings.Contains(string(body), "ERR_UPLOAD_SIZE_EXCEEDED") { t.Fatalf("invalid response body %s", string(body)) @@ -694,22 +578,16 @@ func TestUploadLengthExceeded(t *testing.T) { // Send HEAD request to fetch offset req, err = http.NewRequest("HEAD", uploadUrl, nil) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) req.Header.Add("Tus-Resumable", "1.0.0") res, err = http.DefaultClient.Do(req) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer res.Body.Close() offset, err := strconv.Atoi(res.Header.Get("Upload-Offset")) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // tusd must only read the amount specified in Upload-Length. if offset != uploadLength { @@ -718,9 +596,7 @@ func TestUploadLengthExceeded(t *testing.T) { // The request should be stopped immediately after 10KB have been transmitted instead of waiting for // the entire request body. With 5KB/s, that is 2s. - if !isApprox(duration, 2*time.Second, 0.2) { - t.Fatalf("invalid request duration %v", duration) - } + require.InDelta(t, 2*time.Second, duration, float64(2*time.Second)*0.2, "invalid request duration %v", duration) } // TestSuccessfulUpload asserts that ongoing upload requests get properly @@ -745,34 +621,28 @@ func TestStopUpload(t *testing.T) { // We limit the upstream connection to tusd to 5KB/s. The downstream connection // from tusd is not limited. - proxy.AddToxic("", "bandwidth", "upstream", 1, toxiproxy.Attributes{ + proxy.AddToxic("", "bandwidth", "upstream", 1, map[string]any{ "rate": 5, }) - // Endpoint address point to toxiproxy + // Endpoint address points to the network proxy. endpoint := "http://" + proxy.Listen + "/files/" // We specify an upload length of 50KB. uploadLength := 50 * 1024 data := make([]byte, uploadLength) _, err := rand.Read(data) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // Create upload req, err := http.NewRequest("POST", endpoint, nil) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) req.Header.Add("Tus-Resumable", "1.0.0") req.Header.Add("Upload-Length", strconv.Itoa(uploadLength)) res, err := http.DefaultClient.Do(req) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) if res.StatusCode != http.StatusCreated { t.Fatal("invalid response code") @@ -781,9 +651,7 @@ func TestStopUpload(t *testing.T) { uploadUrl := res.Header.Get("Location") req, err = http.NewRequest("PATCH", uploadUrl, bytes.NewReader(data)) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) req.Header.Add("Tus-Resumable", "1.0.0") req.Header.Add("Upload-Offset", "0") @@ -791,9 +659,7 @@ func TestStopUpload(t *testing.T) { start := time.Now() res, err = http.DefaultClient.Do(req) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer res.Body.Close() duration := time.Since(start) @@ -802,9 +668,7 @@ func TestStopUpload(t *testing.T) { } body, err := io.ReadAll(res.Body) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) if !strings.Contains(string(body), "ERR_UPLOAD_STOPPED") { t.Fatalf("invalid response body %s", string(body)) @@ -812,16 +676,12 @@ func TestStopUpload(t *testing.T) { // Send HEAD request to check if upload was terminated req, err = http.NewRequest("HEAD", uploadUrl, nil) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) req.Header.Add("Tus-Resumable", "1.0.0") res, err = http.DefaultClient.Do(req) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer res.Body.Close() if res.StatusCode != http.StatusNotFound { @@ -830,12 +690,12 @@ func TestStopUpload(t *testing.T) { // The first post-receive hook is sent after 3s (due to the progress-hooks-interval flag). // The upload should then be quickly stoppped and terminated. - if !isApprox(duration, 3*time.Second, 0.3) { - t.Fatalf("invalid request duration %v", duration) - } + require.InDelta(t, 3*time.Second, duration, float64(3*time.Second)*0.3, "invalid request duration %v", duration) } func spawnTusd(ctx context.Context, t *testing.T, args ...string) (endpoint string, address string, cmd *exec.Cmd) { + t.Helper() + args = append([]string{"-port=0"}, args...) cmd = exec.CommandContext(ctx, TUSD_BINARY, args...) // Note: Leave stderr alone. It is not a good idea to connect the @@ -845,9 +705,7 @@ func spawnTusd(ctx context.Context, t *testing.T, args ...string) (endpoint stri // cmd.Stderr = os.Stderr stdout, err := cmd.StdoutPipe() - if err != nil { - t.Fatalf("failed to get stdout pipe: %s", err) - } + require.NoError(t, err, "failed to get stdout pipe") // Ensure that stdout is closed, when child process is stopped. cmd.Cancel = func() error { @@ -855,9 +713,7 @@ func spawnTusd(ctx context.Context, t *testing.T, args ...string) (endpoint stri return cmd.Process.Kill() } - if err := cmd.Start(); err != nil { - t.Fatalf("failed to start tusd: %s", err) - } + require.NoError(t, cmd.Start(), "failed to start tusd") scanner := bufio.NewScanner(stdout) @@ -875,16 +731,24 @@ func spawnTusd(ctx context.Context, t *testing.T, args ...string) (endpoint stri return } } - if err := scanner.Err(); err != nil { - t.Fatalf("failed to scan output: %s", err) - } + require.NoError(t, scanner.Err(), "failed to scan output") - panic("unreachable") + require.FailNow(t, "failed to parse tusd endpoint from tusd output") + return } -func isApprox[N constraints.Integer](got N, expected N, tolerance float64) bool { - min := float64(expected) * (1 - tolerance) - max := float64(expected) * (1 + tolerance) +func isExpectedNetworkCloseError(err error) bool { + if err == nil { + return false + } + + if errors.Is(err, io.EOF) { + return true + } + + errMsg := strings.ToLower(err.Error()) - return min <= float64(got) && float64(got) <= max + // Different OS/network stacks surface different text for abrupt socket closes. + return strings.Contains(errMsg, "connection reset") || + strings.Contains(errMsg, "forcibly closed by the remote host") } diff --git a/internal/e2e/e2e_unix_test.go b/internal/e2e/e2e_unix_test.go index 1045b7a1f..a309cee13 100644 --- a/internal/e2e/e2e_unix_test.go +++ b/internal/e2e/e2e_unix_test.go @@ -14,7 +14,7 @@ import ( "testing" "time" - toxiproxy "github.com/Shopify/toxiproxy/v2/client" + "github.com/stretchr/testify/require" ) // TestShutdown asserts that tusd closes all ongoing upload requests and shuts down @@ -34,11 +34,11 @@ func TestShutdown(t *testing.T) { // We limit the upstream connection to tusd to 5KB/s. The downstream connection // from tusd is not limited. - proxy.AddToxic("", "bandwidth", "upstream", 1, toxiproxy.Attributes{ + proxy.AddToxic("", "bandwidth", "upstream", 1, map[string]any{ "rate": 5, }) - // Endpoint address point to toxiproxy + // Endpoint address points to the network proxy. endpoint := "http://" + proxy.Listen + "/files/" // 50KB of random upload data @@ -93,8 +93,6 @@ func TestShutdown(t *testing.T) { // tusd should close the request and exit immediately after the signal. duration := time.Since(start) - if !isApprox(duration, 2*time.Second, 0.1) { - t.Fatalf("invalid request duration %v", duration) - } + require.InDelta(t, 2*time.Second, duration, float64(2*time.Second)*0.1, "invalid request duration %v", duration) } diff --git a/internal/e2e/local_proxy_test.go b/internal/e2e/local_proxy_test.go new file mode 100644 index 000000000..79f6a5f69 --- /dev/null +++ b/internal/e2e/local_proxy_test.go @@ -0,0 +1,321 @@ +package e2e_test + +import ( + "errors" + "fmt" + "io" + "net" + "sync" + "time" +) + +type localProxyClient struct{} + +func newLocalProxyClient() *localProxyClient { + return &localProxyClient{} +} + +func (*localProxyClient) CreateProxy(_ string, listen, upstream string) (*localProxy, error) { + if listen == "" { + listen = "127.0.0.1:0" + } + + ln, err := net.Listen("tcp", listen) + if err != nil { + return nil, err + } + + proxy := &localProxy{ + Listen: ln.Addr().String(), + upstream: upstream, + listener: ln, + closed: make(chan struct{}), + connections: make(map[net.Conn]struct{}), + } + + proxy.wg.Add(1) + go proxy.serve() + + return proxy, nil +} + +type localProxy struct { + Listen string + upstream string + listener net.Listener + + mu sync.RWMutex + config localProxyConfig + + connMu sync.Mutex + connections map[net.Conn]struct{} + + closeOnce sync.Once + closed chan struct{} + wg sync.WaitGroup +} + +type localProxyConfig struct { + upstreamRateBytesPerSecond int64 + upstreamLimitBytes int64 +} + +type localToxic struct{} + +var errUpstreamLimitReached = errors.New("upstream limit reached") + +func (proxy *localProxy) AddToxic(_ string, toxicType, stream string, _ float32, attributes map[string]any) (*localToxic, error) { + if stream != "upstream" { + return nil, fmt.Errorf("unsupported stream %q", stream) + } + + proxy.mu.Lock() + defer proxy.mu.Unlock() + + switch toxicType { + case "bandwidth": + rateKBPerSecond, err := int64Attribute(attributes, "rate") + if err != nil { + return nil, err + } + if rateKBPerSecond <= 0 { + return nil, fmt.Errorf("bandwidth toxic requires rate > 0") + } + proxy.config.upstreamRateBytesPerSecond = rateKBPerSecond * 1024 + + case "limit_data": + limitBytes, err := int64Attribute(attributes, "bytes") + if err != nil { + return nil, err + } + if limitBytes <= 0 { + return nil, fmt.Errorf("limit_data toxic requires bytes > 0") + } + proxy.config.upstreamLimitBytes = limitBytes + + default: + return nil, fmt.Errorf("unsupported toxic type %q", toxicType) + } + + return &localToxic{}, nil +} + +func (proxy *localProxy) Delete() error { + var closeErr error + + proxy.closeOnce.Do(func() { + close(proxy.closed) + closeErr = proxy.listener.Close() + + proxy.connMu.Lock() + for conn := range proxy.connections { + _ = conn.Close() + } + proxy.connMu.Unlock() + + proxy.wg.Wait() + }) + + if errors.Is(closeErr, net.ErrClosed) { + return nil + } + return closeErr +} + +func (proxy *localProxy) serve() { + defer proxy.wg.Done() + + for { + clientConn, err := proxy.listener.Accept() + if err != nil { + if errors.Is(err, net.ErrClosed) { + return + } + + select { + case <-proxy.closed: + return + default: + } + + continue + } + + proxy.trackConnection(clientConn) + proxy.wg.Add(1) + go proxy.handleConnection(clientConn) + } +} + +func (proxy *localProxy) handleConnection(clientConn net.Conn) { + defer proxy.wg.Done() + defer proxy.untrackConnection(clientConn) + + upstreamConn, err := net.Dial("tcp", proxy.upstream) + if err != nil { + _ = clientConn.Close() + return + } + + proxy.trackConnection(upstreamConn) + defer proxy.untrackConnection(upstreamConn) + + config := proxy.currentConfig() + errCh := make(chan error, 2) + + go func() { + errCh <- proxy.copyUpstream(upstreamConn, clientConn, config) + }() + + go func() { + _, err := io.Copy(clientConn, upstreamConn) + errCh <- err + }() + + firstErr := <-errCh + + _ = clientConn.Close() + _ = upstreamConn.Close() + <-errCh + + if errors.Is(firstErr, errUpstreamLimitReached) { + return + } +} + +func (proxy *localProxy) copyUpstream(dst net.Conn, src net.Conn, config localProxyConfig) error { + const maxBufferSize = 32 * 1024 + bufferSize := maxBufferSize + if config.upstreamRateBytesPerSecond > 0 { + // Keep chunks small enough to avoid large bursts (about 50ms worth of data). + bufferSize = int(config.upstreamRateBytesPerSecond / 20) + if bufferSize < 1 { + bufferSize = 1 + } + if bufferSize > maxBufferSize { + bufferSize = maxBufferSize + } + } + + buffer := make([]byte, bufferSize) + sent := int64(0) + start := time.Now() + + for { + readLimit := len(buffer) + if config.upstreamLimitBytes > 0 { + remaining := config.upstreamLimitBytes - sent + if remaining <= 0 { + return errUpstreamLimitReached + } + if int64(readLimit) > remaining { + readLimit = int(remaining) + } + } + + readLen, readErr := src.Read(buffer[:readLimit]) + if readLen > 0 { + if config.upstreamRateBytesPerSecond > 0 { + expectedElapsed := time.Duration(sent+int64(readLen)) * time.Second / time.Duration(config.upstreamRateBytesPerSecond) + if sleepFor := time.Until(start.Add(expectedElapsed)); sleepFor > 0 { + time.Sleep(sleepFor) + } + } + + if err := writeAll(dst, buffer[:readLen]); err != nil { + return err + } + sent += int64(readLen) + + if config.upstreamLimitBytes > 0 && sent >= config.upstreamLimitBytes { + return errUpstreamLimitReached + } + } + + if readErr != nil { + if errors.Is(readErr, io.EOF) { + return nil + } + return readErr + } + } +} + +func (proxy *localProxy) currentConfig() localProxyConfig { + proxy.mu.RLock() + defer proxy.mu.RUnlock() + + return proxy.config +} + +func (proxy *localProxy) trackConnection(conn net.Conn) { + proxy.connMu.Lock() + defer proxy.connMu.Unlock() + + proxy.connections[conn] = struct{}{} +} + +func (proxy *localProxy) untrackConnection(conn net.Conn) { + proxy.connMu.Lock() + defer proxy.connMu.Unlock() + + delete(proxy.connections, conn) + _ = conn.Close() +} + +func int64Attribute(attributes map[string]any, key string) (int64, error) { + rawValue, ok := attributes[key] + if !ok { + return 0, fmt.Errorf("missing %q attribute", key) + } + + switch value := rawValue.(type) { + case int: + return int64(value), nil + case int8: + return int64(value), nil + case int16: + return int64(value), nil + case int32: + return int64(value), nil + case int64: + return value, nil + case uint: + return int64(value), nil + case uint8: + return int64(value), nil + case uint16: + return int64(value), nil + case uint32: + return int64(value), nil + case uint64: + return int64(value), nil + case float32: + return floatAttributeToInt64(float64(value), key) + case float64: + return floatAttributeToInt64(value, key) + default: + return 0, fmt.Errorf("attribute %q has unsupported type %T", key, rawValue) + } +} + +func floatAttributeToInt64(value float64, key string) (int64, error) { + intValue := int64(value) + if float64(intValue) != value { + return 0, fmt.Errorf("attribute %q must be an integer", key) + } + + return intValue, nil +} + +func writeAll(dst io.Writer, data []byte) error { + for len(data) > 0 { + written, err := dst.Write(data) + if err != nil { + return err + } + data = data[written:] + } + + return nil +} diff --git a/internal/s3log/s3log.go b/internal/s3log/s3log.go index cd76baaa2..052fd7374 100644 --- a/internal/s3log/s3log.go +++ b/internal/s3log/s3log.go @@ -5,10 +5,9 @@ import ( "context" "encoding/json" "fmt" + "log/slog" "time" - "golang.org/x/exp/slog" - "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/tus/tusd/v2/pkg/s3store" ) diff --git a/internal/s3log/s3log_test.go b/internal/s3log/s3log_test.go index 1011489e0..9b6febb62 100644 --- a/internal/s3log/s3log_test.go +++ b/internal/s3log/s3log_test.go @@ -3,11 +3,10 @@ package s3log import ( "bytes" "context" + "log/slog" "strings" "testing" - "golang.org/x/exp/slog" - "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/golang/mock/gomock" diff --git a/pkg/azurestore/azurestore_test.go b/pkg/azurestore/azurestore_test.go index 5b64e7290..40c718916 100644 --- a/pkg/azurestore/azurestore_test.go +++ b/pkg/azurestore/azurestore_test.go @@ -283,7 +283,7 @@ func TestWriteChunk(t *testing.T) { data, err := json.Marshal(mockTusdInfo) assert.Nil(err) - var offset int64 = mockSize / 2 + var offset = mockSize / 2 gomock.InOrder( service.EXPECT().NewBlob(ctx, mockID+".info").Return(infoBlob, nil).Times(1), @@ -327,7 +327,7 @@ func TestFinishUpload(t *testing.T) { data, err := json.Marshal(mockTusdInfo) assert.Nil(err) - var offset int64 = mockSize / 2 + var offset = mockSize / 2 gomock.InOrder( service.EXPECT().NewBlob(ctx, mockID+".info").Return(infoBlob, nil).Times(1), diff --git a/pkg/handler/config.go b/pkg/handler/config.go index 4efa43598..7acbebdb9 100644 --- a/pkg/handler/config.go +++ b/pkg/handler/config.go @@ -2,11 +2,10 @@ package handler import ( "errors" + "log/slog" "net/url" "regexp" "time" - - "golang.org/x/exp/slog" ) // Config provides a way to configure the Handler depending on your needs. diff --git a/pkg/handler/context.go b/pkg/handler/context.go index 9c75ad9d8..de316cfb2 100644 --- a/pkg/handler/context.go +++ b/pkg/handler/context.go @@ -3,10 +3,9 @@ package handler import ( "context" "errors" + "log/slog" "net/http" "time" - - "golang.org/x/exp/slog" ) // httpContext is wrapper around context.Context that also carries the diff --git a/pkg/handler/unrouted_handler.go b/pkg/handler/unrouted_handler.go index 8d2e690f7..cca935975 100644 --- a/pkg/handler/unrouted_handler.go +++ b/pkg/handler/unrouted_handler.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "log/slog" "math" "mime" "net/http" @@ -13,8 +14,6 @@ import ( "strconv" "strings" "time" - - "golang.org/x/exp/slog" ) const UploadLengthDeferred = "1" diff --git a/pkg/hooks/file/file.go b/pkg/hooks/file/file.go index d08d39ac8..6d1f3cec6 100644 --- a/pkg/hooks/file/file.go +++ b/pkg/hooks/file/file.go @@ -55,7 +55,7 @@ func (h FileHook) InvokeHook(req hooks.HookRequest) (res hooks.HookResponse, err // Report error if the exit code was non-zero if err, ok := err.(*exec.ExitError); ok { - return res, fmt.Errorf("unexpected return code %d from hook endpoint: %s", err.ProcessState.ExitCode(), string(output)) + return res, fmt.Errorf("unexpected return code %d from hook endpoint: %s", err.ExitCode(), string(output)) } if err != nil { diff --git a/pkg/hooks/hooks.go b/pkg/hooks/hooks.go index 71b3c1598..03ce2358e 100644 --- a/pkg/hooks/hooks.go +++ b/pkg/hooks/hooks.go @@ -19,11 +19,12 @@ package hooks import ( "fmt" + "log/slog" + "slices" - "github.com/prometheus/client_golang/prometheus" "github.com/tus/tusd/v2/pkg/handler" - "golang.org/x/exp/slices" - "golang.org/x/exp/slog" + + "github.com/prometheus/client_golang/prometheus" ) // HookHandler is the main inferface to be implemented by all hook backends. diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index eae84eba7..2fc2a7ff9 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -79,6 +79,7 @@ import ( "net/http" "os" "regexp" + "slices" "strings" "sync" "time" @@ -87,7 +88,6 @@ import ( "github.com/tus/tusd/v2/internal/semaphore" "github.com/tus/tusd/v2/internal/uid" "github.com/tus/tusd/v2/pkg/handler" - "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" "github.com/aws/aws-sdk-go-v2/aws"