diff --git a/cmd/config.go b/cmd/config.go index 1b1c2e8..7001bd0 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -284,10 +284,11 @@ func setupTSDBDiscovery(ctx context.Context, g *run.Group, log *slog.Logger, bkt } type syncerOpts struct { - syncerInterval time.Duration - syncerConcurrency int - syncerReadBufferSize units.Base2Bytes - syncerLabelFilesDir string + syncerInterval time.Duration + syncerConcurrency int + syncerReadBufferSize units.Base2Bytes + syncerLabelFilesDir string + syncerShardConcurrency int filterType string filterThanosBackfillEndpoint string @@ -337,6 +338,7 @@ func setupSyncer(ctx context.Context, g *run.Group, log *slog.Logger, bkt objsto bkt, locate.FilterMetas(metaFilter), locate.BlockConcurrency(opts.syncerConcurrency), + locate.ShardConcurrency(opts.syncerShardConcurrency), locate.BlockOptions( locate.ReadBufferSize(opts.syncerReadBufferSize), locate.LabelFilesDir(opts.syncerLabelFilesDir), diff --git a/cmd/serve.go b/cmd/serve.go index df178d7..2d9fd58 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -86,6 +86,7 @@ func (opts *discoveryOpts) registerServeFlags(cmd *kingpin.CmdClause) { func (opts *syncerOpts) registerServeFlags(cmd *kingpin.CmdClause) { cmd.Flag("block.syncer.interval", "interval to sync blocks").Default("1m").DurationVar(&opts.syncerInterval) cmd.Flag("block.syncer.concurrency", "concurrency for loading blocks").Default("1").IntVar(&opts.syncerConcurrency) + cmd.Flag("block.syncer.shard-concurrency", "concurrency for loading shards within each block (0 = sequential)").Default("0").IntVar(&opts.syncerShardConcurrency) cmd.Flag("block.syncer.read-buffer-size", "read buffer size for blocks").Default("2MiB").BytesVar(&opts.syncerReadBufferSize) cmd.Flag("block.filter.type", "").Default("all-metas").EnumVar(&opts.filterType, "thanos-backfill", "all-metas") cmd.Flag("block.filter.thanos-backfill.endpoint", "endpoint to ignore for backfill").StringVar(&opts.filterThanosBackfillEndpoint) diff --git a/locate/syncer.go b/locate/syncer.go index 47c3583..5cb835c 100644 --- a/locate/syncer.go +++ b/locate/syncer.go @@ -30,9 +30,10 @@ import ( type Syncer struct { bkt objstore.Bucket - blockOpts []BlockOption - metaFilter MetaFilter - concurrency int + blockOpts []BlockOption + metaFilter MetaFilter + concurrency int + shardConcurrency int mu sync.Mutex blocks map[schema.ExternalLabelsHash]map[util.Date]*db.Block @@ -43,9 +44,10 @@ type Syncer struct { type SyncerOption func(*syncerConfig) type syncerConfig struct { - blockOpts []BlockOption - metaFilter MetaFilter - concurrency int + blockOpts []BlockOption + metaFilter MetaFilter + concurrency int + shardConcurrency int } func BlockOptions(opts ...BlockOption) SyncerOption { @@ -65,6 +67,11 @@ func BlockConcurrency(c int) SyncerOption { cfg.concurrency = c } } +func ShardConcurrency(c int) SyncerOption { + return func(cfg *syncerConfig) { + cfg.shardConcurrency = c + } +} type BlockOption func(*blockConfig) @@ -87,8 +94,9 @@ func LabelFilesDir(d string) BlockOption { func NewSyncer(bkt objstore.Bucket, opts ...SyncerOption) *Syncer { cfg := syncerConfig{ - metaFilter: AllMetasMetaFilter, - concurrency: 1, + metaFilter: AllMetasMetaFilter, + concurrency: 1, + shardConcurrency: 0, } for _, o := range opts { @@ -96,11 +104,12 @@ func NewSyncer(bkt objstore.Bucket, opts ...SyncerOption) *Syncer { } return &Syncer{ - bkt: bkt, - blocks: make(map[schema.ExternalLabelsHash]map[util.Date]*db.Block), - blockOpts: cfg.blockOpts, - metaFilter: cfg.metaFilter, - concurrency: cfg.concurrency, + bkt: bkt, + blocks: make(map[schema.ExternalLabelsHash]map[util.Date]*db.Block), + blockOpts: cfg.blockOpts, + metaFilter: cfg.metaFilter, + concurrency: cfg.concurrency, + shardConcurrency: cfg.shardConcurrency, } } @@ -116,6 +125,7 @@ func (s *Syncer) Sync(ctx context.Context, parquetStreams map[schema.ExternalLab blk *db.Block extLabels schema.ExternalLabels err error + blockName string } type metaHashTuple struct { @@ -156,9 +166,9 @@ func (s *Syncer) Sync(ctx context.Context, parquetStreams map[schema.ExternalLab go func() { defer wg.Done() for mh := range workerC { - blk, err := newBlockForMeta(ctx, s.bkt, mh.extLabels, mh.m, s.blockOpts...) + blk, err := newBlockForMeta(ctx, s.bkt, mh.extLabels, mh.m, s.shardConcurrency, s.blockOpts...) if err != nil { - blkC <- blockOrError{err: fmt.Errorf("unable to read block %s %q: %w", mh.streamHash.String(), mh.m.Date, err)} + blkC <- blockOrError{blk: nil, err: fmt.Errorf("unable to read block %s %q: %w", mh.streamHash.String(), mh.m.Date, err), blockName: mh.streamHash.String()} } else { blkC <- blockOrError{blk: blk, extLabels: mh.extLabels} } @@ -234,7 +244,7 @@ where shards live (s3, disk, memory) and should also do regular maintenance on i i.e. downloading files that it wants on disk, deleting files that are out of retention, etc. */ -func newBlockForMeta(ctx context.Context, bkt objstore.Bucket, extLabels schema.ExternalLabels, m schema.Meta, opts ...BlockOption) (*db.Block, error) { +func newBlockForMeta(ctx context.Context, bkt objstore.Bucket, extLabels schema.ExternalLabels, m schema.Meta, shardConcurrency int, opts ...BlockOption) (*db.Block, error) { cfg := blockConfig{ readBufferSize: 8 * units.MiB, labelFilesDir: os.TempDir(), @@ -243,7 +253,15 @@ func newBlockForMeta(ctx context.Context, bkt objstore.Bucket, extLabels schema. o(&cfg) } - shards, err := readShards(ctx, bkt, m, extLabels.Hash(), cfg) + var shards []*db.Shard + var err error + + // Use parallel loading if shardConcurrency > 0, otherwise use sequential (original) loading + if shardConcurrency > 0 { + shards, err = readShardsParallel(ctx, bkt, m, cfg, extLabels.Hash(), shardConcurrency) + } else { + shards, err = readShards(ctx, bkt, m, extLabels.Hash(), cfg) + } if err != nil { return nil, fmt.Errorf("unable to read shards: %w", err) } @@ -263,6 +281,54 @@ func readShards(ctx context.Context, bkt objstore.Bucket, m schema.Meta, extLabe return shards, nil } +func readShardsParallel(ctx context.Context, bkt objstore.Bucket, m schema.Meta, cfg blockConfig, extLabelsHash schema.ExternalLabelsHash, shardConcurrency int) ([]*db.Shard, error) { + type shardOrError struct { + shard *db.Shard + index int + err error + } + + shardC := make(chan shardOrError, int(m.Shards)) + var wg sync.WaitGroup + sem := make(chan struct{}, shardConcurrency) + + for i := range int(m.Shards) { + wg.Add(1) + go func(shardIndex int) { + defer wg.Done() + + sem <- struct{}{} + defer func() { <-sem }() + + shard, err := readShard(ctx, bkt, m, extLabelsHash, shardIndex, cfg) + shardC <- shardOrError{shard: shard, index: shardIndex, err: err} + }(i) + } + + go func() { + wg.Wait() + close(shardC) + }() + + shards := make([]*db.Shard, 0, int(m.Shards)) + var failedShards []int + + for result := range shardC { + if result.err != nil { + failedShards = append(failedShards, result.index) + continue + } + shards = append(shards, result.shard) + } + + // Return error only if ALL shards failed + if len(shards) == 0 { + return nil, fmt.Errorf("all shards failed to load, failed_shards=%v", failedShards) + } + + return shards, nil +} + func bucketReaderFromContext(bkt objstore.Bucket, name string) func(context.Context) io.ReaderAt { return func(ctx context.Context) io.ReaderAt { return newBucketReaderAt(ctx, bkt, name)