-
Notifications
You must be signed in to change notification settings - Fork 21
fix #58 - allow to download the shard in parallel #61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,9 +30,10 @@ import ( | |
| type Syncer struct { | ||
| bkt objstore.Bucket | ||
|
|
||
| blockOpts []BlockOption | ||
| metaFilter MetaFilter | ||
| concurrency int | ||
| blockOpts []BlockOption | ||
| metaFilter MetaFilter | ||
| concurrency int | ||
| shardConcurrency int | ||
|
|
||
| mu sync.Mutex | ||
| blocks map[schema.ExternalLabelsHash]map[util.Date]*db.Block | ||
|
|
@@ -43,9 +44,10 @@ type Syncer struct { | |
| type SyncerOption func(*syncerConfig) | ||
|
|
||
| type syncerConfig struct { | ||
| blockOpts []BlockOption | ||
| metaFilter MetaFilter | ||
| concurrency int | ||
| blockOpts []BlockOption | ||
| metaFilter MetaFilter | ||
| concurrency int | ||
| shardConcurrency int | ||
| } | ||
|
|
||
| func BlockOptions(opts ...BlockOption) SyncerOption { | ||
|
|
@@ -65,6 +67,11 @@ func BlockConcurrency(c int) SyncerOption { | |
| cfg.concurrency = c | ||
| } | ||
| } | ||
| func ShardConcurrency(c int) SyncerOption { | ||
| return func(cfg *syncerConfig) { | ||
| cfg.shardConcurrency = c | ||
| } | ||
| } | ||
|
|
||
| type BlockOption func(*blockConfig) | ||
|
|
||
|
|
@@ -87,20 +94,22 @@ func LabelFilesDir(d string) BlockOption { | |
|
|
||
| func NewSyncer(bkt objstore.Bucket, opts ...SyncerOption) *Syncer { | ||
| cfg := syncerConfig{ | ||
| metaFilter: AllMetasMetaFilter, | ||
| concurrency: 1, | ||
| metaFilter: AllMetasMetaFilter, | ||
| concurrency: 1, | ||
| shardConcurrency: 0, | ||
| } | ||
|
|
||
| for _, o := range opts { | ||
| o(&cfg) | ||
| } | ||
|
|
||
| return &Syncer{ | ||
| bkt: bkt, | ||
| blocks: make(map[schema.ExternalLabelsHash]map[util.Date]*db.Block), | ||
| blockOpts: cfg.blockOpts, | ||
| metaFilter: cfg.metaFilter, | ||
| concurrency: cfg.concurrency, | ||
| bkt: bkt, | ||
| blocks: make(map[schema.ExternalLabelsHash]map[util.Date]*db.Block), | ||
| blockOpts: cfg.blockOpts, | ||
| metaFilter: cfg.metaFilter, | ||
| concurrency: cfg.concurrency, | ||
| shardConcurrency: cfg.shardConcurrency, | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -116,6 +125,7 @@ func (s *Syncer) Sync(ctx context.Context, parquetStreams map[schema.ExternalLab | |
| blk *db.Block | ||
| extLabels schema.ExternalLabels | ||
| err error | ||
| blockName string | ||
| } | ||
|
|
||
| type metaHashTuple struct { | ||
|
|
@@ -156,9 +166,9 @@ func (s *Syncer) Sync(ctx context.Context, parquetStreams map[schema.ExternalLab | |
| go func() { | ||
| defer wg.Done() | ||
| for mh := range workerC { | ||
| blk, err := newBlockForMeta(ctx, s.bkt, mh.extLabels, mh.m, s.blockOpts...) | ||
| blk, err := newBlockForMeta(ctx, s.bkt, mh.extLabels, mh.m, s.shardConcurrency, s.blockOpts...) | ||
| if err != nil { | ||
| blkC <- blockOrError{err: fmt.Errorf("unable to read block %s %q: %w", mh.streamHash.String(), mh.m.Date, err)} | ||
| blkC <- blockOrError{blk: nil, err: fmt.Errorf("unable to read block %s %q: %w", mh.streamHash.String(), mh.m.Date, err), blockName: mh.streamHash.String()} | ||
| } else { | ||
| blkC <- blockOrError{blk: blk, extLabels: mh.extLabels} | ||
| } | ||
|
|
@@ -234,7 +244,7 @@ where shards live (s3, disk, memory) and should also do regular maintenance on i | |
| i.e. downloading files that it wants on disk, deleting files that are out of retention, etc. | ||
| */ | ||
|
|
||
| func newBlockForMeta(ctx context.Context, bkt objstore.Bucket, extLabels schema.ExternalLabels, m schema.Meta, opts ...BlockOption) (*db.Block, error) { | ||
| func newBlockForMeta(ctx context.Context, bkt objstore.Bucket, extLabels schema.ExternalLabels, m schema.Meta, shardConcurrency int, opts ...BlockOption) (*db.Block, error) { | ||
| cfg := blockConfig{ | ||
| readBufferSize: 8 * units.MiB, | ||
| labelFilesDir: os.TempDir(), | ||
|
|
@@ -243,7 +253,15 @@ func newBlockForMeta(ctx context.Context, bkt objstore.Bucket, extLabels schema. | |
| o(&cfg) | ||
| } | ||
|
|
||
| shards, err := readShards(ctx, bkt, m, extLabels.Hash(), cfg) | ||
| var shards []*db.Shard | ||
| var err error | ||
|
|
||
| // Use parallel loading if shardConcurrency > 0, otherwise use sequential (original) loading | ||
| if shardConcurrency > 0 { | ||
| shards, err = readShardsParallel(ctx, bkt, m, cfg, extLabels.Hash(), shardConcurrency) | ||
| } else { | ||
| shards, err = readShards(ctx, bkt, m, extLabels.Hash(), cfg) | ||
| } | ||
| if err != nil { | ||
| return nil, fmt.Errorf("unable to read shards: %w", err) | ||
| } | ||
|
|
@@ -263,6 +281,54 @@ func readShards(ctx context.Context, bkt objstore.Bucket, m schema.Meta, extLabe | |
| return shards, nil | ||
| } | ||
|
|
||
| func readShardsParallel(ctx context.Context, bkt objstore.Bucket, m schema.Meta, cfg blockConfig, extLabelsHash schema.ExternalLabelsHash, shardConcurrency int) ([]*db.Shard, error) { | ||
| type shardOrError struct { | ||
| shard *db.Shard | ||
| index int | ||
| err error | ||
| } | ||
|
|
||
| shardC := make(chan shardOrError, int(m.Shards)) | ||
| var wg sync.WaitGroup | ||
| sem := make(chan struct{}, shardConcurrency) | ||
|
|
||
| for i := range int(m.Shards) { | ||
| wg.Add(1) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can modernize with |
||
| go func(shardIndex int) { | ||
| defer wg.Done() | ||
|
|
||
| sem <- struct{}{} | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is too complicated, I think. Instead we typically spawn |
||
| defer func() { <-sem }() | ||
|
|
||
| shard, err := readShard(ctx, bkt, m, extLabelsHash, shardIndex, cfg) | ||
| shardC <- shardOrError{shard: shard, index: shardIndex, err: err} | ||
| }(i) | ||
| } | ||
|
|
||
| go func() { | ||
| wg.Wait() | ||
| close(shardC) | ||
| }() | ||
|
|
||
| shards := make([]*db.Shard, 0, int(m.Shards)) | ||
| var failedShards []int | ||
|
|
||
| for result := range shardC { | ||
| if result.err != nil { | ||
| failedShards = append(failedShards, result.index) | ||
| continue | ||
| } | ||
| shards = append(shards, result.shard) | ||
| } | ||
|
|
||
| // Return error only if ALL shards failed | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can see that. This comment has zero value.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this was for my readability will remove it |
||
| if len(shards) == 0 { | ||
| return nil, fmt.Errorf("all shards failed to load, failed_shards=%v", failedShards) | ||
| } | ||
|
|
||
| return shards, nil | ||
| } | ||
|
|
||
| func bucketReaderFromContext(bkt objstore.Bucket, name string) func(context.Context) io.ReaderAt { | ||
| return func(ctx context.Context) io.ReaderAt { | ||
| return newBucketReaderAt(ctx, bkt, name) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope that everyone working on this repo is able to read a simple if statement (:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was for my readability will remove it