Skip to content

feat: add shard creation to precreator service #26360

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master-1.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ func (s *Server) appendPrecreatorService(c precreator.Config) error {
}
srv := precreator.NewService(c)
srv.MetaClient = s.MetaClient
srv.Store = s.TSDBStore
s.Services = append(s.Services, srv)
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions internal/meta_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type MetaClientMock struct {

OpenFn func() error

PrecreateShardGroupsFn func(from, to time.Time) error
PrecreateShardGroupsFn func(from, to time.Time) ([]meta.ShardGroupFullInfo, error)
PruneShardGroupsFn func() error

RetentionPolicyFn func(database, name string) (rpi *meta.RetentionPolicyInfo, err error)
Expand Down Expand Up @@ -173,7 +173,7 @@ func (c *MetaClientMock) Open() error { return c.OpenFn() }
func (c *MetaClientMock) Data() meta.Data { return c.DataFn() }
func (c *MetaClientMock) SetData(d *meta.Data) error { return c.SetDataFn(d) }

func (c *MetaClientMock) PrecreateShardGroups(from, to time.Time) error {
func (c *MetaClientMock) PrecreateShardGroups(from, to time.Time) ([]meta.ShardGroupFullInfo, error) {
return c.PrecreateShardGroupsFn(from, to)
}
func (c *MetaClientMock) PruneShardGroups() error { return c.PruneShardGroupsFn() }
16 changes: 12 additions & 4 deletions services/meta/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,16 +759,24 @@ func (c *Client) DeleteShardGroup(database, policy string, id uint64) error {
return nil
}

type ShardGroupFullInfo struct {
ShardGroup *ShardGroupInfo
Database string
RetentionPolicy string
}

// PrecreateShardGroups creates shard groups whose endtime is before the 'to' time passed in, but
// is yet to expire before 'from'. This is to avoid the need for these shards to be created when data
// for the corresponding time range arrives. Shard creation involves Raft consensus, and precreation
// avoids taking the hit at write-time.
func (c *Client) PrecreateShardGroups(from, to time.Time) error {
func (c *Client) PrecreateShardGroups(from, to time.Time) ([]ShardGroupFullInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
var changed bool

// return nil if no shard groups are created
var newShardGroups []ShardGroupFullInfo = nil
for _, di := range data.Databases {
for _, rp := range di.RetentionPolicies {
if len(rp.ShardGroups) == 0 {
Expand Down Expand Up @@ -797,6 +805,7 @@ func (c *Client) PrecreateShardGroups(from, to time.Time) error {
zap.Uint64("group_id", g.ID), zap.Error(err))
continue
}
newShardGroups = append(newShardGroups, ShardGroupFullInfo{ShardGroup: newGroup, Database: di.Name, RetentionPolicy: rp.Name})
changed = true
c.logger.Info("New shard group successfully precreated",
logger.ShardGroup(newGroup.ID),
Expand All @@ -808,11 +817,10 @@ func (c *Client) PrecreateShardGroups(from, to time.Time) error {

if changed {
if err := c.commit(data); err != nil {
return err
return nil, err
}
}

return nil
return newShardGroups, nil
}

// ShardOwner returns the owning shard group info for a specific shard.
Expand Down
13 changes: 9 additions & 4 deletions services/meta/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,10 +956,11 @@ func TestMetaClient_Shards(t *testing.T) {
// Test pre-creating shard groups.
dur := sg.EndTime.Sub(sg.StartTime) + time.Nanosecond
tmax := tmin.Add(dur)
if err := c.PrecreateShardGroups(tmin, tmax); err != nil {
if newShardGroups, err := c.PrecreateShardGroups(tmin, tmax); err != nil {
t.Fatal(err)
} else if len(newShardGroups) != 1 {
t.Fatalf("wrong number of shard groups: %d", len(newShardGroups))
}

// Test finding shard groups by time range.
groups, err := c.ShardGroupsByTimeRange("db0", "autogen", tmin, tmax)
if err != nil {
Expand Down Expand Up @@ -1029,13 +1030,17 @@ func TestMetaClient_CreateShardGroupIdempotent(t *testing.T) {
// Test pre-creating shard groups.
dur := sg.EndTime.Sub(sg.StartTime) + time.Nanosecond
tmax := tmin.Add(dur)
if err := c.PrecreateShardGroups(tmin, tmax); err != nil {
if newShardGroups, err := c.PrecreateShardGroups(tmin, tmax); err != nil {
t.Fatal(err)
} else if len(newShardGroups) != 1 {
t.Fatalf("wrong number of shard groups: %d", len(newShardGroups))
}
i = c.Data().Index
t.Log("index: ", i)
if err := c.PrecreateShardGroups(tmin, tmax); err != nil {
if newShardGroups, err := c.PrecreateShardGroups(tmin, tmax); err != nil {
t.Fatal(err)
} else if len(newShardGroups) != 0 {
t.Fatalf("wrong number of shard groups: %d", len(newShardGroups))
}
t.Log("index: ", i)
if got, exp := c.Data().Index, i; got != exp {
Expand Down
48 changes: 45 additions & 3 deletions services/precreator/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
package precreator // import "github.com/influxdata/influxdb/services/precreator"

import (
"errors"
"fmt"
"sync"
"time"

"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/services/meta"
"go.uber.org/zap"
)

Expand All @@ -19,8 +22,12 @@ type Service struct {
done chan struct{}
wg sync.WaitGroup

Store interface {
CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error
}

MetaClient interface {
PrecreateShardGroups(now, cutoff time.Time) error
PrecreateShardGroups(now, cutoff time.Time) ([]meta.ShardGroupFullInfo, error)
}
}

Expand Down Expand Up @@ -76,7 +83,7 @@ func (s *Service) runPrecreation() {
select {
case <-time.After(s.checkInterval):
if err := s.precreate(time.Now().UTC()); err != nil {
s.Logger.Info("Failed to precreate shards", zap.Error(err))
s.Logger.Warn("Failed to precreate shards", zap.Error(err))
}
case <-s.done:
s.Logger.Info("Terminating precreation service")
Expand All @@ -88,5 +95,40 @@ func (s *Service) runPrecreation() {
// precreate performs actual resource precreation.
func (s *Service) precreate(now time.Time) error {
cutoff := now.Add(s.advancePeriod).UTC()
return s.MetaClient.PrecreateShardGroups(now, cutoff)
if newShardGroups, err := s.MetaClient.PrecreateShardGroups(now, cutoff); err != nil {
return err
} else {
errs := make([]error, 0, len(newShardGroups))
for _, sgfi := range newShardGroups {
if len(sgfi.ShardGroup.Shards) <= 0 {
err := fmt.Errorf("shard group %d covering %s to %s for database %s and retention policy %s has no shards",
sgfi.ShardGroup.ID,
sgfi.ShardGroup.StartTime,
sgfi.ShardGroup.EndTime,
sgfi.Database,
sgfi.RetentionPolicy)
errs = append(errs, err)
} else if err := s.Store.CreateShard(sgfi.Database, sgfi.RetentionPolicy, sgfi.ShardGroup.Shards[0].ID, true); err != nil {
// TODO(DSB): is Shards[0] always the right shard to create? Yes for OSS, no for Enterprise
decoratedErr := fmt.Errorf("failed to create shard %d for shard group %d covering %s to %s for database %s and retention policy %s: %w",
sgfi.ShardGroup.Shards[0].ID,
sgfi.ShardGroup.ID,
sgfi.ShardGroup.StartTime,
sgfi.ShardGroup.EndTime,
sgfi.Database,
sgfi.RetentionPolicy,
err)
errs = append(errs, decoratedErr)
} else {
s.Logger.Debug("Created shard",
zap.String("database", sgfi.Database),
zap.String("retention_policy", sgfi.RetentionPolicy),
zap.Uint64("shard_group_id", sgfi.ShardGroup.ID),
zap.Uint64("shard_id", sgfi.ShardGroup.Shards[0].ID),
zap.Time("start_time", sgfi.ShardGroup.StartTime),
zap.Time("end_time", sgfi.ShardGroup.EndTime))
}
}
return errors.Join(errs...)
}
}
5 changes: 3 additions & 2 deletions services/precreator/service_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package precreator_test

import (
"github.com/influxdata/influxdb/services/meta"
"os"
"testing"
"time"
Expand All @@ -16,12 +17,12 @@ func TestShardPrecreation(t *testing.T) {
precreate := false

var mc internal.MetaClientMock
mc.PrecreateShardGroupsFn = func(now, cutoff time.Time) error {
mc.PrecreateShardGroupsFn = func(now, cutoff time.Time) ([]meta.ShardGroupFullInfo, error) {
if !precreate {
close(done)
precreate = true
}
return nil
return nil, nil
}

s := NewTestService()
Expand Down