Skip to content

Commit a4851fc

Browse files
committed
uplink: enable new concurrent segment upload codepath by default
This change enables new concurrent segment upload codepath by default. It's still possible to disable it with context created with testuplink.DisableConcurrentSegmentUploads. As an addition this change fixes small Jenkins issue. Change-Id: I26229bb0f071edc6433bfdcde404e00be1ea35ab
1 parent 138ef76 commit a4851fc

6 files changed

Lines changed: 176 additions & 33 deletions

File tree

Jenkinsfile

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,15 +154,12 @@ pipeline {
154154
dir('testsuite'){
155155
sh 'cp go.mod go-temp.mod'
156156
sh 'go vet -modfile go-temp.mod -mod=mod storj.io/storj/...'
157-
sh 'go test -modfile go-temp.mod -mod=mod -parallel 4 -p 6 -vet=off -timeout 20m -json storj.io/storj/... > ../.build/testsuite-storj.json'
157+
sh 'go test -modfile go-temp.mod -mod=mod -tags noembed -parallel 4 -p 6 -vet=off -timeout 20m -json storj.io/storj/... 2>&1 | tee ../.build/testsuite-storj.json | xunit -out ../.build/testsuite-storj.xml'
158158
}
159159
}
160160

161161
post {
162162
always {
163-
dir('testsuite'){
164-
sh 'cat ../.build/testsuite-storj.json | xunit -out ../.build/testsuite-storj.xml'
165-
}
166163
sh script: 'cat .build/testsuite-storj.json | tparse -all -top -slow 100', returnStatus: true
167164
archiveArtifacts artifacts: '.build/testsuite-storj.json'
168165
junit '.build/testsuite-storj.xml'

private/testuplink/uplink.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ type listLimitKey struct{}
2222

2323
type concurrentSegmentUploadsConfigKey struct{}
2424

25+
type disableConcurrentSegmentUploadsKey struct{}
26+
2527
type (
2628
logWriterKey struct{}
2729
logWriterContextKey struct{}
@@ -111,14 +113,25 @@ func WithConcurrentSegmentUploadsConfig(ctx context.Context, config ConcurrentSe
111113
return context.WithValue(ctx, concurrentSegmentUploadsConfigKey{}, config)
112114
}
113115

116+
// DisableConcurrentSegmentUploads creates a context that disables the new
117+
// concurrent segment upload codepath.
118+
func DisableConcurrentSegmentUploads(ctx context.Context) context.Context {
119+
return context.WithValue(ctx, disableConcurrentSegmentUploadsKey{}, struct{}{})
120+
}
121+
114122
// GetConcurrentSegmentUploadsConfig returns the scheduler options to
115-
// use with the new concurrent segment upload codepath, or nil if no scheduler
116-
// options have been set.
123+
// use with the new concurrent segment upload codepath, if no scheduler
124+
// options have been set it will return default configuration. Concurrent
125+
// segment upload code path can be disabled with DisableConcurrentSegmentUploads.
117126
func GetConcurrentSegmentUploadsConfig(ctx context.Context) *ConcurrentSegmentUploadsConfig {
127+
if value := ctx.Value(disableConcurrentSegmentUploadsKey{}); value != nil {
128+
return nil
129+
}
118130
if config, ok := ctx.Value(concurrentSegmentUploadsConfigKey{}).(ConcurrentSegmentUploadsConfig); ok {
119131
return &config
120132
}
121-
return nil
133+
config := DefaultConcurrentSegmentUploadsConfig()
134+
return &config
122135
}
123136

124137
// WithLogWriter creates context with information about upload log file.

testsuite/access_test.go

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -618,16 +618,11 @@ func TestUploadNotAllowedPath(t *testing.T) {
618618

619619
testData := bytes.NewBuffer(testrand.Bytes(1 * memory.KiB))
620620

621-
upload, err := project.UploadObject(ctx, "testbucket", "first-level-object", nil)
622-
require.NoError(t, err)
623-
624-
_, err = io.Copy(upload, testData)
621+
_, err = project.UploadObject(ctx, "testbucket", "first-level-object", nil)
625622
require.Error(t, err)
623+
require.ErrorIs(t, err, uplink.ErrPermissionDenied)
626624

627-
err = upload.Abort()
628-
require.NoError(t, err)
629-
630-
upload, err = project.UploadObject(ctx, "testbucket", "videos/second-level-object", nil)
625+
upload, err := project.UploadObject(ctx, "testbucket", "videos/second-level-object", nil)
631626
require.NoError(t, err)
632627

633628
_, err = io.Copy(upload, testData)
@@ -876,11 +871,9 @@ func TestImmutableUpload(t *testing.T) {
876871
}
877872

878873
{ // we shouldn't be able upload to a different location
879-
upload, err := project.UploadObject(ctx, "testbucket", "object2", nil)
880-
require.NoError(t, err)
881-
_, err = upload.Write(testrand.Bytes(5 * memory.KiB))
874+
_, err := project.UploadObject(ctx, "testbucket", "object2", nil)
882875
require.Error(t, err)
883-
require.Error(t, upload.Commit())
876+
require.ErrorIs(t, err, uplink.ErrPermissionDenied)
884877
}
885878

886879
// we shouldn't be able to delete

testsuite/object_test.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"github.com/stretchr/testify/require"
1717
"go.uber.org/zap"
1818

19-
"storj.io/common/errs2"
2019
"storj.io/common/fpath"
2120
"storj.io/common/memory"
2221
"storj.io/common/testcontext"
@@ -386,9 +385,7 @@ func TestContextCancelUpload(t *testing.T) {
386385
assertObjectEmptyCreated(t, upload.Info(), "test.dat")
387386

388387
uploadcancel()
389-
_, err = upload.Write(randData)
390-
require.Error(t, err)
391-
require.True(t, errs2.IsCanceled(err))
388+
requireWriteEventuallyReturns(t, upload, randData, context.Canceled)
392389

393390
err = upload.Abort()
394391
require.NoError(t, err)

testsuite/uplink/metainfo/uplink_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ import (
1616
"storj.io/uplink/private/testuplink"
1717
)
1818

19-
func TestMultisegmentUploadWithLastInline(t *testing.T) {
20-
// this is special case were uploaded object has 3 segments (2 remote + 1 inline)
19+
func TestMultisegmentUploadWithoutInlineSegment(t *testing.T) {
2120
testplanet.Run(t, testplanet.Config{
2221
SatelliteCount: 1, StorageNodeCount: 4, UplinkCount: 1,
2322
Reconfigure: testplanet.Reconfigure{
@@ -36,7 +35,9 @@ func TestMultisegmentUploadWithLastInline(t *testing.T) {
3635
require.NoError(t, err)
3736
require.Equal(t, expectedData, downloaded)
3837

39-
// verify that object has 3 segments, 2 remote + 1 inline
38+
// in the past object with size equal to multiplication of max segment size was uploaded
39+
// as remote segments + one additional inline segment, after upload code path refactor we
40+
// are uploading now only 2 remote segments without last inline segment
4041
objects, err := planet.Satellites[0].Metabase.DB.TestingAllCommittedObjects(ctx, planet.Uplinks[0].Projects[0].ID, "testbucket")
4142
require.NoError(t, err)
4243
require.Len(t, objects, 1)
@@ -47,9 +48,8 @@ func TestMultisegmentUploadWithLastInline(t *testing.T) {
4748
ObjectKey: objects[0].ObjectKey,
4849
})
4950
require.NoError(t, err)
50-
require.Equal(t, 3, len(segments))
51-
// TODO we should check 2 segments to be remote and last one to be inline
52-
// but main satellite implementation doens't give such info at the moment
51+
require.Len(t, segments, 2)
52+
require.EqualValues(t, 20*memory.KiB, segments[0].PlainSize)
5353
})
5454
}
5555

testsuite/upload_test.go

Lines changed: 147 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@ import (
1313
"time"
1414

1515
"github.com/stretchr/testify/require"
16+
"go.uber.org/zap"
1617

1718
"storj.io/common/memory"
1819
"storj.io/common/testcontext"
1920
"storj.io/common/testrand"
2021
"storj.io/storj/private/testplanet"
22+
"storj.io/storj/satellite"
2123
"storj.io/uplink"
2224
"storj.io/uplink/private/testuplink"
2325
)
@@ -360,10 +362,7 @@ func TestUploadEventuallyFailsWithNoNodes(t *testing.T) {
360362
require.NoError(t, planet.StopPeer(planet.StorageNodes[i]))
361363
}
362364

363-
project, err := planet.Uplinks[0].OpenProject(
364-
testuplink.WithConcurrentSegmentUploadsDefaultConfig(ctx),
365-
planet.Satellites[0],
366-
)
365+
project, err := planet.Uplinks[0].OpenProject(ctx, planet.Satellites[0])
367366
require.NoError(t, err)
368367
defer ctx.Check(project.Close)
369368

@@ -449,3 +448,147 @@ func TestConcurrentUploadToSamePath(t *testing.T) {
449448
require.Equal(t, expectedData, downloaded)
450449
})
451450
}
451+
452+
func TestUploadLimits(t *testing.T) {
453+
testplanet.Run(t, testplanet.Config{
454+
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 2,
455+
Reconfigure: testplanet.Reconfigure{
456+
Satellite: func(log *zap.Logger, index int, config *satellite.Config) {
457+
config.ProjectLimit.CacheCapacity = 0
458+
},
459+
},
460+
}, func(t *testing.T, tpCtx *testcontext.Context, planet *testplanet.Planet) {
461+
data := testrand.Bytes(6 * memory.KiB)
462+
463+
ctx := testuplink.WithMaxSegmentSize(tpCtx, 5*memory.KiB)
464+
465+
t.Run("segment limit", func(t *testing.T) {
466+
upl := planet.Uplinks[0]
467+
accountingDB := planet.Satellites[0].DB.ProjectAccounting()
468+
err := accountingDB.UpdateProjectSegmentLimit(ctx, upl.Projects[0].ID, 0)
469+
require.NoError(t, err)
470+
471+
project, err := upl.OpenProject(ctx, planet.Satellites[0])
472+
require.NoError(t, err)
473+
defer tpCtx.Check(project.Close)
474+
475+
_, err = project.CreateBucket(ctx, "testbucket")
476+
require.NoError(t, err)
477+
478+
// should fail on Write beause we uploaded more than segment
479+
// and request to satellite were made. The Write call may not fail
480+
// immmediately, since writes are buffered and the segment uploads
481+
// are handled concurrently.
482+
upload, err := project.UploadObject(ctx, "testbucket", "test/path/0", nil)
483+
require.NoError(t, err)
484+
requireWriteEventuallyReturns(t, upload, data, uplink.ErrSegmentsLimitExceeded)
485+
require.ErrorIs(t, upload.Commit(), uplink.ErrSegmentsLimitExceeded)
486+
487+
// should fail on Commit as Write input is too small to create single segment
488+
upload, err = project.UploadObject(ctx, "testbucket", "test/path/0", nil)
489+
require.NoError(t, err)
490+
n, err := upload.Write(testrand.Bytes(3 * memory.KiB))
491+
require.NoError(t, err)
492+
require.NotZero(t, n)
493+
require.ErrorIs(t, upload.Commit(), uplink.ErrSegmentsLimitExceeded)
494+
495+
// should fail on direct call to BeginObject
496+
_, err = project.BeginUpload(ctx, "testbucket", "test/path/0", nil)
497+
require.ErrorIs(t, err, uplink.ErrSegmentsLimitExceeded)
498+
499+
// update limit to be able to call BeginUpload without error
500+
err = accountingDB.UpdateProjectSegmentLimit(ctx, upl.Projects[0].ID, 1)
501+
require.NoError(t, err)
502+
503+
uploadInfo, err := project.BeginUpload(ctx, "testbucket", "test/path/0", nil)
504+
require.NoError(t, err)
505+
506+
err = accountingDB.UpdateProjectSegmentLimit(ctx, upl.Projects[0].ID, 0)
507+
require.NoError(t, err)
508+
509+
// should fail on Write beause we uploaded more than segment
510+
// and request to satellite were made. The Write call may not fail
511+
// immmediately, since writes are buffered and the segment uploads
512+
// are handled concurrently.
513+
partUpload, err := project.UploadPart(ctx, "testbucket", "test/path/0", uploadInfo.UploadID, 0)
514+
require.NoError(t, err)
515+
requireWriteEventuallyReturns(t, partUpload, data, uplink.ErrSegmentsLimitExceeded)
516+
require.ErrorIs(t, partUpload.Commit(), uplink.ErrSegmentsLimitExceeded)
517+
518+
// should fail on Commit as Write input is too small to create single segment
519+
partUpload, err = project.UploadPart(ctx, "testbucket", "test/path/0", uploadInfo.UploadID, 0)
520+
require.NoError(t, err)
521+
_, err = partUpload.Write(testrand.Bytes(3 * memory.KiB))
522+
require.NoError(t, err)
523+
require.ErrorIs(t, partUpload.Commit(), uplink.ErrSegmentsLimitExceeded)
524+
})
525+
t.Run("storage limit", func(t *testing.T) {
526+
upl := planet.Uplinks[1]
527+
accountingDB := planet.Satellites[0].DB.ProjectAccounting()
528+
err := accountingDB.UpdateProjectUsageLimit(ctx, upl.Projects[0].ID, 0)
529+
require.NoError(t, err)
530+
531+
project, err := upl.OpenProject(ctx, planet.Satellites[0])
532+
require.NoError(t, err)
533+
defer tpCtx.Check(project.Close)
534+
535+
_, err = project.CreateBucket(ctx, "testbucket")
536+
require.NoError(t, err)
537+
538+
// should fail on Write beause we uploaded more than segment
539+
// and request to satellite were made. The Write call may not fail
540+
// immmediately, since writes are buffered and the segment uploads
541+
// are handled concurrently.
542+
upload, err := project.UploadObject(ctx, "testbucket", "test/path/0", nil)
543+
require.NoError(t, err)
544+
requireWriteEventuallyReturns(t, upload, data, uplink.ErrStorageLimitExceeded)
545+
require.ErrorIs(t, upload.Commit(), uplink.ErrStorageLimitExceeded)
546+
547+
// should fail on Commit as Write input is too small to create single segment
548+
upload, err = project.UploadObject(ctx, "testbucket", "test/path/0", nil)
549+
require.NoError(t, err)
550+
_, err = upload.Write(testrand.Bytes(3 * memory.KiB))
551+
require.NoError(t, err)
552+
require.ErrorIs(t, upload.Commit(), uplink.ErrStorageLimitExceeded)
553+
554+
// should fail on direct call to BeginObject
555+
_, err = project.BeginUpload(ctx, "testbucket", "test/path/0", nil)
556+
require.ErrorIs(t, err, uplink.ErrStorageLimitExceeded)
557+
558+
// update limit to be able to call BeginUpload without error
559+
err = accountingDB.UpdateProjectUsageLimit(ctx, upl.Projects[0].ID, 1)
560+
require.NoError(t, err)
561+
562+
uploadInfo, err := project.BeginUpload(ctx, "testbucket", "test/path/0", nil)
563+
require.NoError(t, err)
564+
565+
err = accountingDB.UpdateProjectUsageLimit(ctx, upl.Projects[0].ID, 0)
566+
require.NoError(t, err)
567+
568+
// should fail on Write beause we uploaded more than segment
569+
// and request to satellite were made. The Write call may not fail
570+
// immmediately, since writes are buffered and the segment uploads
571+
// are handled concurrently.
572+
partUpload, err := project.UploadPart(ctx, "testbucket", "test/path/0", uploadInfo.UploadID, 0)
573+
require.NoError(t, err)
574+
requireWriteEventuallyReturns(t, partUpload, data, uplink.ErrStorageLimitExceeded)
575+
require.ErrorIs(t, partUpload.Commit(), uplink.ErrStorageLimitExceeded)
576+
577+
// should fail on Commit as Write input is too small to create single segment
578+
partUpload, err = project.UploadPart(ctx, "testbucket", "test/path/0", uploadInfo.UploadID, 0)
579+
require.NoError(t, err)
580+
_, err = partUpload.Write(testrand.Bytes(3 * memory.KiB))
581+
require.NoError(t, err)
582+
require.ErrorIs(t, partUpload.Commit(), uplink.ErrStorageLimitExceeded)
583+
})
584+
})
585+
}
586+
587+
func requireWriteEventuallyReturns(tb testing.TB, w io.Writer, data []byte, expectErr error) {
588+
require.Eventually(tb, func() bool {
589+
_, err := w.Write(data)
590+
// only write the data on the first call to write.
591+
data = data[0:]
592+
return errors.Is(err, expectErr)
593+
}, time.Second*5, time.Millisecond*10)
594+
}

0 commit comments

Comments
 (0)