Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
d0e1aef
resourcemanager: add metadata watcher scaffold
okJiang Feb 27, 2026
c766971
resourcemanager: fix metadata watcher review issues
okJiang Mar 6, 2026
75b7550
resourcemanager: fix static lint issues
okJiang Mar 6, 2026
9a4d049
resourcemanager: fix watcher delete edge cases
okJiang Mar 9, 2026
98a1ded
resourcemanager: wire metadata watcher bootstrap
okJiang Mar 9, 2026
4092c2e
resourcemanager: clarify rm watcher scope
okJiang Mar 9, 2026
e7a2d93
resourcemanager: fix watcher review follow-ups
okJiang Mar 10, 2026
e5a7c7f
resourcemanager: fix raw-load validation and config save
okJiang Mar 10, 2026
180747c
resourcemanager: regroup metadata watcher tests
okJiang Mar 10, 2026
6909d07
resourcemanager: fix watcher test static lint
okJiang Mar 10, 2026
3fcc9fc
resourcemanager: init default group in watcher
okJiang Mar 11, 2026
a5f0a5b
resourcemanager: keep watcher default init cache-only
okJiang Mar 11, 2026
a5e47c1
resourcemanager: address metadata watcher review
okJiang Mar 13, 2026
93d4d6e
resourcemanager: simplify controller config update locking
okJiang Mar 13, 2026
1810868
resourcemanager: log malformed watcher paths
okJiang Mar 16, 2026
52b345f
resourcemanager: warn on malformed watcher keys
okJiang Mar 16, 2026
e73d960
resourcemanager: drop watcher log assertion test
okJiang Mar 16, 2026
561b8a0
Merge remote-tracking branch 'upstream/master' into codex/rm-pr4-meta…
okJiang Mar 17, 2026
64be26e
Merge remote-tracking branch 'upstream/master' into codex/rm-pr4-meta…
okJiang Mar 18, 2026
df1e2fc
errs: improve ErrInvalidGroup message to include specific context
okJiang Mar 18, 2026
66813f4
make error gen
okJiang Mar 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 67 additions & 14 deletions pkg/mcs/resourcemanager/server/keyspace_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"

"github.com/pingcap/errors"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"

Expand Down Expand Up @@ -106,6 +107,49 @@ func (krgm *keyspaceResourceGroupManager) addResourceGroupFromRaw(name string, r
return nil
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

func (krgm *keyspaceResourceGroupManager) upsertResourceGroupFromRaw(name string, rawValue string) error {
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal([]byte(rawValue), group); err != nil {
log.Error("failed to parse the keyspace resource group meta info",
zap.Uint32("keyspace-id", krgm.keyspaceID), zap.String("name", name), zap.String("raw-value", rawValue), zap.Error(err))
return err
}
if group.Name != name {
err := errors.Errorf("resource group key name %s does not match payload name %s", name, group.Name)
log.Error("resource group name mismatch in watcher payload",
zap.Uint32("keyspace-id", krgm.keyspaceID),
zap.String("name", name),
zap.String("payload-name", group.Name),
zap.String("raw-value", rawValue),
zap.Error(err))
return err
}

krgm.RLock()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to wrap the curl for groups into two functions, such as groupExists and saveGroup? It makes the reader confused when to use the lock?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept this inline because the lock boundaries are the actual point here: we only need the manager map under krgm.RLock() to fetch the current pointer, and after that the update/insert paths diverge.

Extracting helpers like groupExists / saveGroup would still need to preserve the same read-lock vs write-lock split, so it would mostly hide the locking shape rather than simplify it.

existing := krgm.groups[group.Name]
krgm.RUnlock()
if existing != nil {
if err := existing.ApplySettings(group); err != nil {
log.Error("failed to apply the keyspace resource group settings from raw value",
zap.Uint32("keyspace-id", krgm.keyspaceID), zap.String("name", name), zap.String("raw-value", rawValue), zap.Error(err))
return err
}
return nil
}

krgm.Lock()
krgm.groups[group.Name] = FromProtoResourceGroup(group)
krgm.Unlock()
return nil
}

func (krgm *keyspaceResourceGroupManager) deleteResourceGroupFromCache(name string) {
krgm.Lock()
delete(krgm.groups, name)
delete(krgm.groupRUTrackers, name)
krgm.Unlock()
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

func (krgm *keyspaceResourceGroupManager) setRawStatesIntoResourceGroup(name string, rawValue string) error {
tokens := &GroupStates{}
if err := json.Unmarshal([]byte(rawValue), tokens); err != nil {
Expand All @@ -128,22 +172,30 @@ func (krgm *keyspaceResourceGroupManager) initDefaultResourceGroup() {
if ok {
return
}
defaultGroup := &ResourceGroup{
defaultGroup := newDefaultResourceGroup()
if err := krgm.addResourceGroup(defaultGroup.IntoProtoResourceGroup(krgm.keyspaceID)); err != nil {
log.Warn("init default group failed", zap.Uint32("keyspace-id", krgm.keyspaceID), zap.Error(err))
}
}

func newDefaultResourceGroup() *ResourceGroup {
return &ResourceGroup{
Name: DefaultResourceGroupName,
Mode: rmpb.GroupMode_RUMode,
RUSettings: &RequestUnitSettings{
RU: &GroupTokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: UnlimitedRate,
BurstLimit: UnlimitedBurstLimit,
},
RUSettings: NewRequestUnitSettings(DefaultResourceGroupName, &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: UnlimitedRate,
BurstLimit: UnlimitedBurstLimit,
},
},
}),
Priority: middlePriority,
}
if err := krgm.addResourceGroup(defaultGroup.IntoProtoResourceGroup(krgm.keyspaceID)); err != nil {
log.Warn("init default group failed", zap.Uint32("keyspace-id", krgm.keyspaceID), zap.Error(err))
}
}

func (krgm *keyspaceResourceGroupManager) restoreDefaultResourceGroupFromReserved() {
krgm.Lock()
krgm.groups[DefaultResourceGroupName] = newDefaultResourceGroup()
krgm.Unlock()
Comment thread
okJiang marked this conversation as resolved.
}

func (krgm *keyspaceResourceGroupManager) addResourceGroup(grouppb *rmpb.ResourceGroup) error {
Expand Down Expand Up @@ -204,12 +256,13 @@ func (krgm *keyspaceResourceGroupManager) deleteResourceGroup(name string) error
if !krgm.writeRole.AllowsMetadataWrite() {
return errMetadataWriteDisabled
}
if err := krgm.storage.DeleteResourceGroupStates(krgm.keyspaceID, name); err != nil {
return err
}
if err := krgm.storage.DeleteResourceGroupSetting(krgm.keyspaceID, name); err != nil {
return err
}
krgm.Lock()
delete(krgm.groups, name)
krgm.Unlock()
krgm.deleteResourceGroupFromCache(name)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
return nil
}

Expand Down
158 changes: 158 additions & 0 deletions pkg/mcs/resourcemanager/server/keyspace_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func TestDeleteResourceGroup(t *testing.T) {
}
err := krgm.addResourceGroup(group)
re.NoError(err)
krgm.groupRUTrackers[group.GetName()] = newGroupRUTracker()

// Verify the group exists.
re.NotNil(krgm.getResourceGroup(group.GetName(), false))
Expand All @@ -189,6 +190,8 @@ func TestDeleteResourceGroup(t *testing.T) {

// Verify the group was deleted.
re.Nil(krgm.getResourceGroup(group.GetName(), false))
_, ok := krgm.groupRUTrackers[group.GetName()]
re.False(ok)

// Try to delete the default group.
krgm.initDefaultResourceGroup()
Expand All @@ -199,6 +202,55 @@ func TestDeleteResourceGroup(t *testing.T) {
re.NotNil(krgm.getResourceGroup(DefaultResourceGroupName, false))
}

func TestDeleteResourceGroupClearsPersistedStatesInMetadataOnlyRole(t *testing.T) {
re := require.New(t)

memStorage := storage.NewStorageWithMemoryBackend()
krgm := newKeyspaceResourceGroupManager(1, memStorage, ResourceGroupWriteRolePDMetaOnly)
group := &rmpb.ResourceGroup{
Name: "test_group",
Mode: rmpb.GroupMode_RUMode,
Priority: 5,
RUSettings: &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: 100,
BurstLimit: 200,
},
},
},
}

re.NoError(krgm.addResourceGroup(group))

now := time.Now()
re.NoError(memStorage.SaveResourceGroupStates(1, group.Name, &GroupStates{
RU: &GroupTokenBucketState{
Tokens: 321,
LastUpdate: &now,
},
RUConsumption: &rmpb.Consumption{RRU: 11, WRU: 22},
}))

stateCount := 0
re.NoError(memStorage.LoadResourceGroupStates(func(keyspaceID uint32, name, _ string) {
if keyspaceID == 1 && name == group.Name {
stateCount++
}
}))
re.Equal(1, stateCount)

re.NoError(krgm.deleteResourceGroup(group.Name))

stateCount = 0
re.NoError(memStorage.LoadResourceGroupStates(func(keyspaceID uint32, name, _ string) {
if keyspaceID == 1 && name == group.Name {
stateCount++
}
}))
re.Zero(stateCount)
}

func TestGetResourceGroup(t *testing.T) {
re := require.New(t)

Expand Down Expand Up @@ -306,6 +358,112 @@ func TestAddResourceGroupFromRaw(t *testing.T) {
re.Error(err)
}

func TestUpsertResourceGroupFromRaw(t *testing.T) {
re := require.New(t)

krgm := newKeyspaceResourceGroupManager(1, storage.NewStorageWithMemoryBackend())
group := &rmpb.ResourceGroup{
Name: "test_group",
Mode: rmpb.GroupMode_RUMode,
Priority: 5,
RUSettings: &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: 100,
BurstLimit: 200,
},
},
},
}
re.NoError(krgm.addResourceGroup(group))

mutableGroup := krgm.getMutableResourceGroup(group.Name)
re.NotNil(mutableGroup)
mutableGroup.RUSettings.RU.Tokens = 123.45
mutableGroup.RUConsumption = &rmpb.Consumption{RRU: 12, WRU: 34}

updatedGroup := &rmpb.ResourceGroup{
Name: group.Name,
Mode: rmpb.GroupMode_RUMode,
Priority: 9,
RUSettings: &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: 500,
BurstLimit: 600,
},
},
},
}
rawUpdated, err := proto.Marshal(updatedGroup)
re.NoError(err)
re.NoError(krgm.upsertResourceGroupFromRaw(group.Name, string(rawUpdated)))

current := krgm.getMutableResourceGroup(group.Name)
re.NotNil(current)
re.Equal(uint32(9), current.Priority)
re.Equal(float64(500), current.getFillRate())
re.Equal(int64(600), current.getBurstLimit())
re.InDelta(123.45, current.RUSettings.RU.Tokens, 0.001)
re.Equal(float64(12), current.RUConsumption.RRU)
re.Equal(float64(34), current.RUConsumption.WRU)

newGroup := &rmpb.ResourceGroup{
Name: "new_group",
Mode: rmpb.GroupMode_RUMode,
Priority: 3,
RUSettings: &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: 111,
BurstLimit: 222,
},
},
},
}
rawNew, err := proto.Marshal(newGroup)
re.NoError(err)
re.NoError(krgm.upsertResourceGroupFromRaw(newGroup.GetName(), string(rawNew)))
re.NotNil(krgm.getMutableResourceGroup(newGroup.GetName()))

err = krgm.upsertResourceGroupFromRaw(group.GetName(), "invalid_data")
re.Error(err)

mismatchedGroup := &rmpb.ResourceGroup{
Name: "payload_name",
Mode: rmpb.GroupMode_RUMode,
Priority: 1,
}
rawMismatched, err := proto.Marshal(mismatchedGroup)
re.NoError(err)
err = krgm.upsertResourceGroupFromRaw("key_name", string(rawMismatched))
re.Error(err)
re.Nil(krgm.getMutableResourceGroup("key_name"))
re.Nil(krgm.getMutableResourceGroup("payload_name"))
}

func TestDeleteResourceGroupFromCache(t *testing.T) {
re := require.New(t)

krgm := newKeyspaceResourceGroupManager(1, storage.NewStorageWithMemoryBackend())
group := &rmpb.ResourceGroup{
Name: "test_group",
Mode: rmpb.GroupMode_RUMode,
RUSettings: &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{},
},
},
}
re.NoError(krgm.addResourceGroup(group))
krgm.groupRUTrackers[group.Name] = newGroupRUTracker()

krgm.deleteResourceGroupFromCache(group.Name)
re.Nil(krgm.getMutableResourceGroup(group.Name))
_, ok := krgm.groupRUTrackers[group.Name]
re.False(ok)
}

func TestSetRawStatesIntoResourceGroup(t *testing.T) {
re := require.New(t)

Expand Down
Loading
Loading