From 57ab8045ba1f857a485f515aa5845c27215809c1 Mon Sep 17 00:00:00 2001 From: WesselAtWork Date: Wed, 16 Apr 2025 15:51:59 +0200 Subject: [PATCH] table prefix key schema check and test --- pkg/storage/config/schema_config.go | 13 ++- pkg/storage/config/schema_config_test.go | 16 +++ .../indexshipper/storage/cached_client.go | 19 +++- .../storage/cached_client_test.go | 98 +++++++++++++++++-- 4 files changed, 131 insertions(+), 15 deletions(-) diff --git a/pkg/storage/config/schema_config.go b/pkg/storage/config/schema_config.go index 03d69f1da49fe..9314eaf60c7a1 100644 --- a/pkg/storage/config/schema_config.go +++ b/pkg/storage/config/schema_config.go @@ -39,6 +39,7 @@ var ( errInvalidSchemaVersion = errors.New("invalid schema version") errInvalidTablePeriod = errors.New("the table period must be a multiple of 24h (1h for schema v1)") errInvalidTableName = errors.New("invalid table name") + errInvalidTablePrefixValue = errors.New("table prefix can not contain a path delimiter") errConfigFileNotSet = errors.New("schema config file needs to be set") errConfigChunkPrefixNotSet = errors.New("schema config for chunks is missing the 'prefix' setting") errSchemaIncreasingFromTime = errors.New("from time in schemas must be distinct and in increasing order") @@ -579,15 +580,15 @@ func (cfg IndexPeriodicTableConfig) MarshalYAML() (interface{}, error) { func ValidatePathPrefix(prefix string) error { if prefix == "" { - return errors.New("prefix must be set") + return errors.New("path prefix must be set") } else if strings.Contains(prefix, "\\") { // When using windows filesystem as object store the implementation of ObjectClient in Cortex takes care of conversion of separator. // We just need to always use `/` as a path separator. - return fmt.Errorf("prefix should only have '%s' as a path separator", pathPrefixDelimiter) + return fmt.Errorf("path prefix should only have '%s' as a path separator", pathPrefixDelimiter) } else if strings.HasPrefix(prefix, pathPrefixDelimiter) { - return errors.New("prefix should never start with a path separator i.e '/'") + return errors.New("path prefix should never start with a path separator i.e '/'") } else if !strings.HasSuffix(prefix, pathPrefixDelimiter) { - return errors.New("prefix should end with a path separator i.e '/'") + return errors.New("path prefix should end with a path separator i.e '/'") } return nil @@ -639,6 +640,10 @@ func (cfg PeriodicTableConfig) Validate() error { return errInvalidTablePeriod } + if strings.Index(cfg.Prefix, pathPrefixDelimiter) > -1 { + return errInvalidTablePrefixValue + } + return nil } diff --git a/pkg/storage/config/schema_config_test.go b/pkg/storage/config/schema_config_test.go index 8a8de6077e447..0caa4654e06a6 100644 --- a/pkg/storage/config/schema_config_test.go +++ b/pkg/storage/config/schema_config_test.go @@ -170,6 +170,22 @@ func TestSchemaConfig_Validate(t *testing.T) { }, err: nil, }, + "should fail on index table prefix ending in path delimiter": { + config: &SchemaConfig{ + Configs: []PeriodConfig{ + { + Schema: "v13", + IndexTables: IndexPeriodicTableConfig{ + PathPrefix: "index/", + PeriodicTableConfig: PeriodicTableConfig{ + Prefix: "v13/key_", + }, + }, + }, + }, + }, + err: errInvalidTablePrefixValue, + }, "should pass on index and chunk table period set to zero (no period tables)": { config: &SchemaConfig{ Configs: []PeriodConfig{ diff --git a/pkg/storage/stores/shipper/indexshipper/storage/cached_client.go b/pkg/storage/stores/shipper/indexshipper/storage/cached_client.go index 2aa3cfda87ea1..d8c69b69948fa 100644 --- a/pkg/storage/stores/shipper/indexshipper/storage/cached_client.go +++ b/pkg/storage/stores/shipper/indexshipper/storage/cached_client.go @@ -95,6 +95,7 @@ func (c *cachedObjectClient) List(ctx context.Context, prefix, objectDelimiter s prefix = strings.TrimSuffix(prefix, delimiter) ss := strings.Split(prefix, delimiter) + // should only accept something like: table or table/userid if len(ss) > 2 { return nil, nil, fmt.Errorf("invalid prefix %s", prefix) } @@ -304,19 +305,29 @@ func (t *table) buildCache(ctx context.Context, objectClient client.ObjectClient } ss := strings.Split(object.Key, delimiter) - if len(ss) < 2 || len(ss) > 3 { - return fmt.Errorf("invalid key: %s", object.Key) + + // db.gz + if len(ss) < 2 { + return fmt.Errorf("bare key without context: %s", object.Key) } - if len(ss) == 2 { + // table/db.gz + if len(ss) < 3 { t.commonObjects = append(t.commonObjects, object) - } else { + continue + } + + // table/userid/db.gz + if len(ss) < 4 { userID := ss[1] if len(t.userObjects[userID]) == 0 { t.userIDs = append(t.userIDs, client.StorageCommonPrefix(path.Join(t.name, userID))) } t.userObjects[userID] = append(t.userObjects[userID], object) + continue } + + return fmt.Errorf("key too long: %s", object.Key) } t.cacheBuiltAt = time.Now() diff --git a/pkg/storage/stores/shipper/indexshipper/storage/cached_client_test.go b/pkg/storage/stores/shipper/indexshipper/storage/cached_client_test.go index 78c04bd4dc3eb..22cf35f678c4c 100644 --- a/pkg/storage/stores/shipper/indexshipper/storage/cached_client_test.go +++ b/pkg/storage/stores/shipper/indexshipper/storage/cached_client_test.go @@ -111,22 +111,106 @@ func TestCachedObjectClient_List(t *testing.T) { require.Equal(t, objectsFromListCall, objects) }) + t.Run("supports path prefixed clients", func(t *testing.T) { + ctx := context.Background() + + path_prefix := "my/long/path/prefix/" + objectsInStorage := []string{ + path_prefix + "table1/db.gz", + path_prefix + "table2/db1.gz", + path_prefix + "table2/db2.gz", + path_prefix + "table3/user1/db.gz", + path_prefix + "table3/user2/db.gz", + } + objectClient := newMockObjectClient(t, objectsInStorage) + prefixedClient := client.NewPrefixedObjectClient(objectClient, path_prefix) + cachedObjectClient := newCachedObjectClient(prefixedClient) + + objects, _, err := cachedObjectClient.List(ctx, "table1/", "/", false) + require.Nil(t, err) + require.Equal(t, []string{"table1/db.gz"}, objectKeys(objects)) + + objects, _, err = cachedObjectClient.List(ctx, "table2/", "/", false) + require.Nil(t, err) + require.Equal(t, []string{"table2/db1.gz", "table2/db2.gz"}, objectKeys(objects)) + + objects, _, err = cachedObjectClient.List(ctx, "table3/user1", "/", false) + require.Nil(t, err) + require.Equal(t, []string{"table3/user1/db.gz"}, objectKeys(objects)) + }) + t.Run("supports prefixed clients", func(t *testing.T) { ctx := context.Background() - prefix := "my/amazing/prefix/" + path_prefix := "/" + prefix := "prefix_term_" + objectsInStorage := []string{ + path_prefix + prefix + "table1/db.gz", + path_prefix + prefix + "table2/db1.gz", + path_prefix + prefix + "table2/db2.gz", + path_prefix + prefix + "table3/user1/db.gz", + path_prefix + prefix + "table3/user2/db.gz", + } + objectClient := newMockObjectClient(t, objectsInStorage) + prefixedClient := client.NewPrefixedObjectClient(objectClient, path_prefix) + cachedObjectClient := newCachedObjectClient(prefixedClient) + + objects, _, err := cachedObjectClient.List(ctx, "prefix_term_table1/", "/", false) + require.Nil(t, err) + require.Equal(t, []string{"prefix_term_table1/db.gz"}, objectKeys(objects)) + + objects, _, err = cachedObjectClient.List(ctx, "prefix_term_table2/", "/", false) + require.Nil(t, err) + require.Equal(t, []string{"prefix_term_table2/db1.gz", "prefix_term_table2/db2.gz"}, objectKeys(objects)) + + objects, _, err = cachedObjectClient.List(ctx, "prefix_term_table3/user1/", "/", false) + require.Nil(t, err) + require.Equal(t, []string{"prefix_term_table3/user1/db.gz"}, objectKeys(objects)) + }) + + t.Run("supports both path prefixed and prefix clients", func(t *testing.T) { + ctx := context.Background() + + path_prefix := "my/long/path/prefix/" + prefix := "prefix_term_" objectsInStorage := []string{ - prefix + "table1/db.gz", - prefix + "table2/db.gz", - prefix + "table2/db2.gz", + path_prefix + prefix + "table1/db.gz", + path_prefix + prefix + "table2/db1.gz", + path_prefix + prefix + "table2/db2.gz", + path_prefix + prefix + "table3/user1/db.gz", + path_prefix + prefix + "table3/user2/db.gz", } objectClient := newMockObjectClient(t, objectsInStorage) - prefixedClient := client.NewPrefixedObjectClient(objectClient, prefix) + prefixedClient := client.NewPrefixedObjectClient(objectClient, path_prefix) cachedObjectClient := newCachedObjectClient(prefixedClient) - objects, _, err := cachedObjectClient.List(ctx, "table2/", "/", false) + objects, _, err := cachedObjectClient.List(ctx, "prefix_term_table1/", "/", false) require.Nil(t, err) - require.Equal(t, []string{"table2/db.gz", "table2/db2.gz"}, objectKeys(objects)) + require.Equal(t, []string{"prefix_term_table1/db.gz"}, objectKeys(objects)) + + objects, _, err = cachedObjectClient.List(ctx, "prefix_term_table2/", "/", false) + require.Nil(t, err) + require.Equal(t, []string{"prefix_term_table2/db1.gz", "prefix_term_table2/db2.gz"}, objectKeys(objects)) + + objects, _, err = cachedObjectClient.List(ctx, "prefix_term_table3/user1/", "/", false) + require.Nil(t, err) + require.Equal(t, []string{"prefix_term_table3/user1/db.gz"}, objectKeys(objects)) + }) + + t.Run("does not support prefix with delimiter", func(t *testing.T) { + ctx := context.Background() + + path_prefix := "my/long/path/prefix/" + prefix := "prefix/term/with/delimiter_" + objectsInStorage := []string{ + path_prefix + prefix + "table1/db.gz", + } + objectClient := newMockObjectClient(t, objectsInStorage) + prefixedClient := client.NewPrefixedObjectClient(objectClient, path_prefix) + cachedObjectClient := newCachedObjectClient(prefixedClient) + + _, _, err := cachedObjectClient.List(ctx, "prefix/term/with/delimiter_table1/", "/", false) + require.Error(t, err) }) }