Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
d0e1aef
resourcemanager: add metadata watcher scaffold
okJiang Feb 27, 2026
c766971
resourcemanager: fix metadata watcher review issues
okJiang Mar 6, 2026
75b7550
resourcemanager: fix static lint issues
okJiang Mar 6, 2026
9a4d049
resourcemanager: fix watcher delete edge cases
okJiang Mar 9, 2026
98a1ded
resourcemanager: wire metadata watcher bootstrap
okJiang Mar 9, 2026
4092c2e
resourcemanager: clarify rm watcher scope
okJiang Mar 9, 2026
e7a2d93
resourcemanager: fix watcher review follow-ups
okJiang Mar 10, 2026
e5a7c7f
resourcemanager: fix raw-load validation and config save
okJiang Mar 10, 2026
180747c
resourcemanager: regroup metadata watcher tests
okJiang Mar 10, 2026
6909d07
resourcemanager: fix watcher test static lint
okJiang Mar 10, 2026
3fcc9fc
resourcemanager: init default group in watcher
okJiang Mar 11, 2026
a5f0a5b
resourcemanager: keep watcher default init cache-only
okJiang Mar 11, 2026
a5e47c1
resourcemanager: address metadata watcher review
okJiang Mar 13, 2026
93d4d6e
resourcemanager: simplify controller config update locking
okJiang Mar 13, 2026
1810868
resourcemanager: log malformed watcher paths
okJiang Mar 16, 2026
52b345f
resourcemanager: warn on malformed watcher keys
okJiang Mar 16, 2026
e73d960
resourcemanager: drop watcher log assertion test
okJiang Mar 16, 2026
561b8a0
Merge remote-tracking branch 'upstream/master' into codex/rm-pr4-meta…
okJiang Mar 17, 2026
64be26e
Merge remote-tracking branch 'upstream/master' into codex/rm-pr4-meta…
okJiang Mar 18, 2026
df1e2fc
errs: improve ErrInvalidGroup message to include specific context
okJiang Mar 18, 2026
66813f4
make error gen
okJiang Mar 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,7 @@ the %s resource group does not exist

["PD:resourcemanager:ErrInvalidGroup"]
error = '''
invalid group settings, please check the group name, priority and the number of resources
invalid group settings, please check %s
'''

["PD:resourcemanager:ErrKeyspaceNotExists"]
Expand Down
2 changes: 1 addition & 1 deletion pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ var (
ErrKeyspaceNotExistsByName = errors.Normalize("keyspace not found with name: %s", errors.RFCCodeText("PD:resourcemanager:ErrKeyspaceNotExistsByName"))
ErrResourceGroupNotExists = errors.Normalize("the %s resource group does not exist", errors.RFCCodeText("PD:resourcemanager:ErrGroupNotExists"))
ErrDeleteReservedGroup = errors.Normalize("cannot delete reserved group", errors.RFCCodeText("PD:resourcemanager:ErrDeleteReservedGroup"))
ErrInvalidGroup = errors.Normalize("invalid group settings, please check the group name, priority and the number of resources", errors.RFCCodeText("PD:resourcemanager:ErrInvalidGroup"))
ErrInvalidGroup = errors.Normalize("invalid group settings, please check %s", errors.RFCCodeText("PD:resourcemanager:ErrInvalidGroup"))
)

// Microservice errors
Expand Down
186 changes: 158 additions & 28 deletions pkg/mcs/resourcemanager/server/keyspace_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package server

import (
"context"
"encoding/json"
"math"
"sort"
Expand All @@ -23,11 +24,14 @@ import (
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"

"github.com/pingcap/errors"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/syncutil"
)

Expand Down Expand Up @@ -93,19 +97,87 @@ func newKeyspaceResourceGroupManager(
}
}

func (krgm *keyspaceResourceGroupManager) addResourceGroupFromRaw(name string, rawValue string) error {
func (krgm *keyspaceResourceGroupManager) parseResourceGroupFromRaw(name, rawValue string) (*rmpb.ResourceGroup, error) {
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal([]byte(rawValue), group); err != nil {
log.Error("failed to parse the keyspace resource group meta info",
zap.Uint32("keyspace-id", krgm.keyspaceID), zap.String("name", name), zap.String("raw-value", rawValue), zap.Error(err))
return nil, err
}
if group.Name != name {
err := errors.Errorf("resource group key name %s does not match payload name %s", name, group.Name)
log.Error("resource group name mismatch in storage payload",
zap.Uint32("keyspace-id", krgm.keyspaceID),
zap.String("raw-value", rawValue),
zap.Error(err))
return nil, err
}
return group, nil
}

func validateResourceGroupProto(grouppb *rmpb.ResourceGroup) error {
if len(grouppb.Name) == 0 || len(grouppb.Name) > maxGroupNameLength {
return errs.ErrInvalidGroup.FastGenByArgs("the group name")
}
if grouppb.GetPriority() > maxPriority {
return errs.ErrInvalidGroup.FastGenByArgs("the group priority")
}
return nil
}

func (krgm *keyspaceResourceGroupManager) addResourceGroupFromRaw(name string, rawValue string) error {
group, err := krgm.parseResourceGroupFromRaw(name, rawValue)
if err != nil {
return err
}
if err := validateResourceGroupProto(group); err != nil {
return err
}
resourceGroup := FromProtoResourceGroup(group)
krgm.Lock()
krgm.groups[group.Name] = resourceGroup
krgm.Unlock()
krgm.syncBurstabilityWithServiceLimit(resourceGroup)
return nil
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

func (krgm *keyspaceResourceGroupManager) upsertResourceGroupFromRaw(name string, rawValue string) error {
group, err := krgm.parseResourceGroupFromRaw(name, rawValue)
if err != nil {
return err
}
if err := validateResourceGroupProto(group); err != nil {
return err
}

krgm.RLock()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to wrap the curl for groups into two functions, such as groupExists and saveGroup? It makes the reader confused when to use the lock?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept this inline because the lock boundaries are the actual point here: we only need the manager map under krgm.RLock() to fetch the current pointer, and after that the update/insert paths diverge.

Extracting helpers like groupExists / saveGroup would still need to preserve the same read-lock vs write-lock split, so it would mostly hide the locking shape rather than simplify it.

existing := krgm.groups[group.Name]
krgm.RUnlock()
if existing != nil {
if err := existing.ApplySettings(group); err != nil {
log.Error("failed to apply the keyspace resource group settings from raw value",
zap.Uint32("keyspace-id", krgm.keyspaceID), zap.String("name", name), zap.String("raw-value", rawValue), zap.Error(err))
return err
}
krgm.syncBurstabilityWithServiceLimit(existing)
return nil
}

resourceGroup := FromProtoResourceGroup(group)
krgm.Lock()
krgm.groups[group.Name] = FromProtoResourceGroup(group)
krgm.groups[group.Name] = resourceGroup
krgm.Unlock()
krgm.syncBurstabilityWithServiceLimit(resourceGroup)
return nil
}

func (krgm *keyspaceResourceGroupManager) deleteResourceGroupFromCache(name string) {
krgm.Lock()
delete(krgm.groups, name)
delete(krgm.groupRUTrackers, name)
krgm.Unlock()
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

func (krgm *keyspaceResourceGroupManager) setRawStatesIntoResourceGroup(name string, rawValue string) error {
tokens := &GroupStates{}
if err := json.Unmarshal([]byte(rawValue), tokens); err != nil {
Expand All @@ -128,35 +200,60 @@ func (krgm *keyspaceResourceGroupManager) initDefaultResourceGroup() {
if ok {
return
}
defaultGroup := &ResourceGroup{
defaultGroup := newDefaultResourceGroup()
if err := krgm.addResourceGroup(defaultGroup.IntoProtoResourceGroup(krgm.keyspaceID)); err != nil {
log.Warn("init default group failed", zap.Uint32("keyspace-id", krgm.keyspaceID), zap.Error(err))
}
}

func (krgm *keyspaceResourceGroupManager) ensureReservedDefaultGroupInCache() {
krgm.RLock()
_, ok := krgm.groups[DefaultResourceGroupName]
krgm.RUnlock()
if ok {
return
}
defaultGroup := newDefaultResourceGroup()
inserted := false
krgm.Lock()
if _, ok := krgm.groups[DefaultResourceGroupName]; !ok {
krgm.groups[DefaultResourceGroupName] = defaultGroup
inserted = true
}
krgm.Unlock()
if inserted {
krgm.syncBurstabilityWithServiceLimit(defaultGroup)
}
}

func newDefaultResourceGroup() *ResourceGroup {
return &ResourceGroup{
Name: DefaultResourceGroupName,
Mode: rmpb.GroupMode_RUMode,
RUSettings: &RequestUnitSettings{
RU: &GroupTokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: UnlimitedRate,
BurstLimit: UnlimitedBurstLimit,
},
RUSettings: NewRequestUnitSettings(DefaultResourceGroupName, &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: UnlimitedRate,
BurstLimit: UnlimitedBurstLimit,
},
},
Priority: middlePriority,
}
if err := krgm.addResourceGroup(defaultGroup.IntoProtoResourceGroup(krgm.keyspaceID)); err != nil {
log.Warn("init default group failed", zap.Uint32("keyspace-id", krgm.keyspaceID), zap.Error(err))
}),
Priority: middlePriority,
RUConsumption: &rmpb.Consumption{},
}
}

func (krgm *keyspaceResourceGroupManager) restoreDefaultResourceGroupFromReserved() {
defaultGroup := newDefaultResourceGroup()
krgm.Lock()
krgm.groups[DefaultResourceGroupName] = defaultGroup
krgm.Unlock()
Comment thread
okJiang marked this conversation as resolved.
krgm.syncBurstabilityWithServiceLimit(defaultGroup)
}

func (krgm *keyspaceResourceGroupManager) addResourceGroup(grouppb *rmpb.ResourceGroup) error {
if len(grouppb.Name) == 0 || len(grouppb.Name) > maxGroupNameLength {
return errs.ErrInvalidGroup
}
// Check the Priority.
if grouppb.GetPriority() > maxPriority {
return errs.ErrInvalidGroup
if err := validateResourceGroupProto(grouppb); err != nil {
return err
}
group := FromProtoResourceGroup(grouppb)
krgm.Lock()
defer krgm.Unlock()
if krgm.writeRole.AllowsMetadataWrite() {
if err := group.persistSettings(krgm.keyspaceID, krgm.storage); err != nil {
return err
Expand All @@ -167,13 +264,16 @@ func (krgm *keyspaceResourceGroupManager) addResourceGroup(grouppb *rmpb.Resourc
return err
}
}
krgm.Lock()
krgm.groups[group.Name] = group
krgm.Unlock()
krgm.syncBurstabilityWithServiceLimit(group)
return nil
}

func (krgm *keyspaceResourceGroupManager) modifyResourceGroup(group *rmpb.ResourceGroup) error {
if group == nil || group.Name == "" {
return errs.ErrInvalidGroup
return errs.ErrInvalidGroup.FastGenByArgs("the group name")
}
krgm.RLock()
curGroup, ok := krgm.groups[group.Name]
Expand Down Expand Up @@ -204,12 +304,29 @@ func (krgm *keyspaceResourceGroupManager) deleteResourceGroup(name string) error
if !krgm.writeRole.AllowsMetadataWrite() {
return errMetadataWriteDisabled
}
if err := krgm.storage.DeleteResourceGroupSetting(krgm.keyspaceID, name); err != nil {
return err
if txnStorage, ok := krgm.storage.(interface {
RunInTxn(context.Context, func(txn kv.Txn) error) error
}); ok {
if err := txnStorage.RunInTxn(context.Background(), func(txn kv.Txn) error {
if err := txn.Remove(keypath.KeyspaceResourceGroupSettingPath(krgm.keyspaceID, name)); err != nil {
return err
}
return txn.Remove(keypath.KeyspaceResourceGroupStatePath(krgm.keyspaceID, name))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this delete failed, do we need to revert the KeyspaceResourceGroupSettingPath?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think reverting KeyspaceResourceGroupSettingPath is the right fallback here.

In the transactional path this cannot happen because both deletes are done in one txn. In the non-transactional fallback, once the settings key delete has succeeded the group is already gone from the authoritative metadata view; trying to recreate that settings key would be another non-transactional write and could repersist stale metadata instead of restoring the original state cleanly.

}); err != nil {
return err
}
} else {
if err := krgm.storage.DeleteResourceGroupSetting(krgm.keyspaceID, name); err != nil {
return err
}
if err := krgm.storage.DeleteResourceGroupStates(krgm.keyspaceID, name); err != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the error happened, don't return the error?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally don't return that states-delete error in the fallback path.

At that point the settings key has already been deleted, so returning an error would make callers observe a failed delete even though the authoritative group metadata is already gone. The remaining states key is only orphaned cleanup data; logging a warning is useful, but surfacing it as the operation result would be misleading.

log.Warn("failed to delete resource group states after deleting settings",
zap.Uint32("keyspace-id", krgm.keyspaceID),
zap.String("name", name),
zap.Error(err))
}
}
krgm.Lock()
delete(krgm.groups, name)
krgm.Unlock()
krgm.deleteResourceGroupFromCache(name)
return nil
}

Expand Down Expand Up @@ -763,6 +880,19 @@ func (krgm *keyspaceResourceGroupManager) cleanupOverrides() {
}
}

// Newly loaded groups can miss the initial service-limit replay, so apply the
// same baseline burst invalidation when they enter the cache.
func (krgm *keyspaceResourceGroupManager) syncBurstabilityWithServiceLimit(group *ResourceGroup) {
if group == nil || group.getBurstLimit(true) >= 0 || group.getOverrideBurstLimit() >= 0 {
return
}
serviceLimit, isSet := krgm.getServiceLimit()
if !isSet || serviceLimit <= 0 {
return
}
group.overrideBurstLimit(int64(serviceLimit))
}

// Since the burstable resource groups won't require tokens from the server anymore,
// we have to override the burst limit of all the resource groups to the service limit.
// This ensures the burstability of the resource groups can be properly invalidated.
Expand Down
Loading
Loading