Skip to content

Commit 16efcda

Browse files
authored
feat: support to deny dll according to database property (milvus-io#40764)
- issue: milvus-io#40762 Signed-off-by: SimFG <bang.fu@zilliz.com>
1 parent 1953676 commit 16efcda

File tree

11 files changed

+235
-10
lines changed

11 files changed

+235
-10
lines changed

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ require (
1818
github.com/gin-gonic/gin v1.9.1
1919
github.com/go-playground/validator/v10 v10.14.0
2020
github.com/gofrs/flock v0.8.1
21-
github.com/golang/protobuf v1.5.4
21+
github.com/golang/protobuf v1.5.4 // indirect
2222
github.com/google/btree v1.1.2
2323
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
2424
github.com/klauspost/compress v1.17.9
2525
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
26-
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250305065753-10afe827b61e
26+
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250319042203-5e03a870569a
2727
github.com/minio/minio-go/v7 v7.0.73
2828
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
2929
github.com/prometheus/client_golang v1.14.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -734,8 +734,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6 h1:YHMFI6L
734734
github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
735735
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
736736
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
737-
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250305065753-10afe827b61e h1:3wuhvb3a1Oq1NRPJpCpatKxfPR8XCdpZmRAgkF2u4Sg=
738-
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250305065753-10afe827b61e/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
737+
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250319042203-5e03a870569a h1:UR+ueSDgg+Atix9QH35e7EwYp8wKm/Ncv1DcCTcUuXk=
738+
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250319042203-5e03a870569a/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
739739
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
740740
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
741741
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=

internal/proxy/simple_rate_limiter.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ func (m *SimpleLimiter) Check(dbID int64, collectionIDToPartIDs map[int64][]int6
6969
if !Params.QuotaConfig.QuotaAndLimitsEnabled.GetAsBool() {
7070
return nil
7171
}
72+
if n <= 0 {
73+
return nil
74+
}
7275

7376
m.quotaStatesMu.RLock()
7477
defer m.quotaStatesMu.RUnlock()

internal/proxy/simple_rate_limiter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ func TestRateLimiter(t *testing.T) {
398398
err := simpleLimiter.Check(-1, nil, internalpb.RateType_DDLDB, 1)
399399
assert.NoError(t, err)
400400

401-
err = simpleLimiter.Check(-1, nil, internalpb.RateType_DDLDB, 1)
401+
err = simpleLimiter.Check(-1, nil, internalpb.RateType_DDLDB, 0)
402402
assert.NoError(t, err)
403403
})
404404
}

internal/rootcoord/quota_center.go

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ var dqlRateTypes = typeutil.NewSet(
112112
type LimiterRange struct {
113113
RateScope internalpb.RateScope
114114
OpType opType
115+
IncludeRateTypes typeutil.Set[internalpb.RateType]
115116
ExcludeRateTypes typeutil.Set[internalpb.RateType]
116117
}
117118

@@ -235,8 +236,14 @@ func updateLimiter(node *rlinternal.RateLimiterNode, limiter *ratelimitutil.Limi
235236
return
236237
}
237238
limiters := node.GetLimiters()
238-
getRateTypes(limiterRange.RateScope, limiterRange.OpType).
239-
Complement(limiterRange.ExcludeRateTypes).Range(func(rt internalpb.RateType) bool {
239+
rateTypes := getRateTypes(limiterRange.RateScope, limiterRange.OpType)
240+
if limiterRange.IncludeRateTypes.Len() > 0 {
241+
rateTypes = rateTypes.Intersection(limiterRange.IncludeRateTypes)
242+
}
243+
if limiterRange.ExcludeRateTypes.Len() > 0 {
244+
rateTypes = rateTypes.Complement(limiterRange.ExcludeRateTypes)
245+
}
246+
rateTypes.Range(func(rt internalpb.RateType) bool {
240247
originLimiter, ok := limiters.Get(rt)
241248
if !ok {
242249
log.Warn("update limiter failed, limiter not found",
@@ -557,6 +564,54 @@ func (q *QuotaCenter) collectMetrics() error {
557564
return nil
558565
}
559566

567+
func getDbPropertyWithAction(db *model.Database, property string, actionFunc func(bool)) {
568+
if db == nil || property == "" || actionFunc == nil {
569+
return
570+
}
571+
if v := db.GetProperty(property); v != "" {
572+
if dbForceDenyDDLEnabled, err := strconv.ParseBool(v); err == nil {
573+
actionFunc(dbForceDenyDDLEnabled)
574+
} else {
575+
log.Warn("invalid configuration for database force deny DDL",
576+
zap.String("config item", property),
577+
zap.String("config value", v))
578+
}
579+
}
580+
}
581+
582+
func (q *QuotaCenter) calculateDBDDLRates() {
583+
dbs, err := q.meta.ListDatabases(q.ctx, typeutil.MaxTimestamp)
584+
if err != nil {
585+
log.Warn("get databases failed", zap.Error(err))
586+
return
587+
}
588+
for _, db := range dbs {
589+
dbDDLKeysWithRatesType := map[string]typeutil.Set[internalpb.RateType]{
590+
common.DatabaseForceDenyDDLKey: ddlRateTypes,
591+
common.DatabaseForceDenyCollectionDDLKey: typeutil.NewSet(internalpb.RateType_DDLCollection),
592+
common.DatabaseForceDenyPartitionDDLKey: typeutil.NewSet(internalpb.RateType_DDLPartition),
593+
common.DatabaseForceDenyIndexDDLKey: typeutil.NewSet(internalpb.RateType_DDLIndex),
594+
common.DatabaseForceDenyFlushDDLKey: typeutil.NewSet(internalpb.RateType_DDLFlush),
595+
common.DatabaseForceDenyCompactionDDLKey: typeutil.NewSet(internalpb.RateType_DDLCompaction),
596+
}
597+
598+
for ddlKey, rateTypes := range dbDDLKeysWithRatesType {
599+
getDbPropertyWithAction(db, ddlKey, func(enabled bool) {
600+
if enabled {
601+
dbLimiters := q.rateLimiter.GetOrCreateDatabaseLimiters(db.ID,
602+
newParamLimiterFunc(internalpb.RateScope_Database, allOps))
603+
updateLimiter(dbLimiters, GetEarliestLimiter(), &LimiterRange{
604+
RateScope: internalpb.RateScope_Database,
605+
OpType: ddl,
606+
IncludeRateTypes: rateTypes,
607+
})
608+
dbLimiters.GetQuotaStates().Insert(milvuspb.QuotaState_DenyToDDL, commonpb.ErrorCode_ForceDeny)
609+
}
610+
})
611+
}
612+
}
613+
}
614+
560615
// forceDenyWriting sets dml rates to 0 to reject all dml requests.
561616
func (q *QuotaCenter) forceDenyWriting(errorCode commonpb.ErrorCode, cluster bool, dbIDs, collectionIDs []int64, col2partitionIDs map[int64][]int64) error {
562617
log := log.Ctx(context.TODO()).WithRateGroup("quotaCenter.forceDenyWriting", 1.0, 60.0)
@@ -1176,6 +1231,8 @@ func (q *QuotaCenter) calculateRates() error {
11761231
return err
11771232
}
11781233

1234+
q.calculateDBDDLRates()
1235+
11791236
// log.Debug("QuotaCenter calculates rate done", zap.Any("rates", q.currentRates))
11801237
return nil
11811238
}

internal/rootcoord/quota_center_test.go

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,7 @@ func TestQuotaCenter(t *testing.T) {
462462
qc := mocks.NewMockQueryCoordClient(t)
463463
meta := mockrootcoord.NewIMetaTable(t)
464464
meta.EXPECT().GetCollectionByIDWithMaxTs(mock.Anything, mock.Anything).Return(nil, merr.ErrCollectionNotFound).Maybe()
465+
meta.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return([]*model.Database{}, nil).Maybe()
465466
quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta)
466467
quotaCenter.clearMetrics()
467468
err = quotaCenter.calculateRates()
@@ -1798,3 +1799,146 @@ func TestTORequestLimiter(t *testing.T) {
17981799
assert.Equal(t, 1, len(proxyLimit.Codes))
17991800
assert.Equal(t, commonpb.ErrorCode_ForceDeny, proxyLimit.Codes[0])
18001801
}
1802+
1803+
func TestDatabaseForceDenyDDL(t *testing.T) {
1804+
getQuotaCenter := func() (*QuotaCenter, *mockrootcoord.IMetaTable) {
1805+
ctx := context.Background()
1806+
qc := mocks.NewMockQueryCoordClient(t)
1807+
meta := mockrootcoord.NewIMetaTable(t)
1808+
pcm := proxyutil.NewMockProxyClientManager(t)
1809+
dc := mocks.NewMockDataCoordClient(t)
1810+
core, _ := NewCore(ctx, nil)
1811+
core.tsoAllocator = newMockTsoAllocator()
1812+
1813+
quotaCenter := NewQuotaCenter(pcm, qc, dc, core.tsoAllocator, meta)
1814+
return quotaCenter, meta
1815+
}
1816+
1817+
t.Run("fail to list database", func(t *testing.T) {
1818+
quotaCenter, meta := getQuotaCenter()
1819+
meta.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(nil, errors.New("mock error")).Once()
1820+
quotaCenter.calculateDBDDLRates()
1821+
})
1822+
1823+
t.Run("force deny ddl for database", func(t *testing.T) {
1824+
quotaCenter, meta := getQuotaCenter()
1825+
meta.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return([]*model.Database{
1826+
{
1827+
ID: 1, Name: "db1", Properties: []*commonpb.KeyValuePair{
1828+
{
1829+
Key: common.DatabaseForceDenyDDLKey,
1830+
Value: "true",
1831+
},
1832+
},
1833+
},
1834+
{
1835+
ID: 2, Name: "db2", Properties: []*commonpb.KeyValuePair{
1836+
{
1837+
Key: "aaa",
1838+
Value: "true",
1839+
},
1840+
},
1841+
},
1842+
{
1843+
ID: 3, Name: "db3", Properties: []*commonpb.KeyValuePair{
1844+
{
1845+
Key: common.DatabaseForceDenyDDLKey,
1846+
Value: "100",
1847+
},
1848+
},
1849+
},
1850+
}, nil).Once()
1851+
quotaCenter.calculateDBDDLRates()
1852+
1853+
limiters := quotaCenter.rateLimiter.GetDatabaseLimiters(1)
1854+
assert.Equal(t, 1, limiters.GetQuotaStates().Len())
1855+
assert.True(t, limiters.GetQuotaStates().Contain(milvuspb.QuotaState_DenyToDDL))
1856+
1857+
{
1858+
limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLCollection)
1859+
assert.Equal(t, true, ok)
1860+
assert.EqualValues(t, 0.0, limiter.Limit())
1861+
}
1862+
{
1863+
limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLPartition)
1864+
assert.Equal(t, true, ok)
1865+
assert.EqualValues(t, 0.0, limiter.Limit())
1866+
}
1867+
{
1868+
limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLIndex)
1869+
assert.Equal(t, true, ok)
1870+
assert.EqualValues(t, 0.0, limiter.Limit())
1871+
}
1872+
{
1873+
limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLCompaction)
1874+
assert.Equal(t, true, ok)
1875+
assert.EqualValues(t, 0.0, limiter.Limit())
1876+
}
1877+
{
1878+
limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLFlush)
1879+
assert.Equal(t, true, ok)
1880+
assert.EqualValues(t, 0.0, limiter.Limit())
1881+
}
1882+
})
1883+
1884+
t.Run("force deny detail ddl for database", func(t *testing.T) {
1885+
quotaCenter, meta := getQuotaCenter()
1886+
meta.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return([]*model.Database{
1887+
{
1888+
ID: 1, Name: "foo123", Properties: []*commonpb.KeyValuePair{
1889+
{
1890+
Key: common.DatabaseForceDenyCollectionDDLKey,
1891+
Value: "true",
1892+
},
1893+
{
1894+
Key: common.DatabaseForceDenyPartitionDDLKey,
1895+
Value: "true",
1896+
},
1897+
{
1898+
Key: common.DatabaseForceDenyFlushDDLKey,
1899+
Value: "true",
1900+
},
1901+
{
1902+
Key: common.DatabaseForceDenyCompactionDDLKey,
1903+
Value: "true",
1904+
},
1905+
{
1906+
Key: common.DatabaseForceDenyIndexDDLKey,
1907+
Value: "true",
1908+
},
1909+
},
1910+
},
1911+
}, nil).Once()
1912+
quotaCenter.calculateDBDDLRates()
1913+
1914+
limiters := quotaCenter.rateLimiter.GetDatabaseLimiters(1)
1915+
assert.Equal(t, 1, limiters.GetQuotaStates().Len())
1916+
assert.True(t, limiters.GetQuotaStates().Contain(milvuspb.QuotaState_DenyToDDL))
1917+
1918+
{
1919+
limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLCollection)
1920+
assert.Equal(t, true, ok)
1921+
assert.EqualValues(t, 0.0, limiter.Limit())
1922+
}
1923+
{
1924+
limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLPartition)
1925+
assert.Equal(t, true, ok)
1926+
assert.EqualValues(t, 0.0, limiter.Limit())
1927+
}
1928+
{
1929+
limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLIndex)
1930+
assert.Equal(t, true, ok)
1931+
assert.EqualValues(t, 0.0, limiter.Limit())
1932+
}
1933+
{
1934+
limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLCompaction)
1935+
assert.Equal(t, true, ok)
1936+
assert.EqualValues(t, 0.0, limiter.Limit())
1937+
}
1938+
{
1939+
limiter, ok := limiters.GetLimiters().Get(internalpb.RateType_DDLFlush)
1940+
assert.Equal(t, true, ok)
1941+
assert.EqualValues(t, 0.0, limiter.Limit())
1942+
}
1943+
})
1944+
}

internal/util/ratelimitutil/rate_limiter_tree.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,11 @@ func (rln *RateLimiterNode) GetQuotaExceededError(rt internalpb.RateType) error
9797
if errCode, ok := rln.quotaStates.Get(milvuspb.QuotaState_DenyToRead); ok {
9898
return merr.WrapErrServiceQuotaExceeded(ratelimitutil.GetQuotaErrorString(errCode))
9999
}
100+
case internalpb.RateType_DDLCollection, internalpb.RateType_DDLPartition,
101+
internalpb.RateType_DDLIndex, internalpb.RateType_DDLCompaction, internalpb.RateType_DDLFlush:
102+
if errCode, ok := rln.quotaStates.Get(milvuspb.QuotaState_DenyToDDL); ok {
103+
return merr.WrapErrServiceQuotaExceeded(ratelimitutil.GetQuotaErrorString(errCode))
104+
}
100105
}
101106
return merr.WrapErrServiceQuotaExceeded(fmt.Sprintf("rate type: %s", rt.String()))
102107
}

internal/util/ratelimitutil/rate_limiter_tree_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,15 @@ func TestRateLimiterNodeGetQuotaExceededError(t *testing.T) {
143143
assert.True(t, strings.Contains(err.Error(), "disabled"))
144144
})
145145

146+
t.Run("ddl", func(t *testing.T) {
147+
limitNode := NewRateLimiterNode(internalpb.RateScope_Database)
148+
limitNode.quotaStates.Insert(milvuspb.QuotaState_DenyToDDL, commonpb.ErrorCode_ForceDeny)
149+
err := limitNode.GetQuotaExceededError(internalpb.RateType_DDLCollection)
150+
assert.True(t, errors.Is(err, merr.ErrServiceQuotaExceeded))
151+
// reference: ratelimitutil.GetQuotaErrorString(errCode)
152+
assert.True(t, strings.Contains(err.Error(), "disabled"))
153+
})
154+
146155
t.Run("unknown", func(t *testing.T) {
147156
limitNode := NewRateLimiterNode(internalpb.RateScope_Cluster)
148157
err := limitNode.GetQuotaExceededError(internalpb.RateType_DDLCompaction)

pkg/common/common.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,13 @@ const (
187187
DatabaseForceDenyWritingKey = "database.force.deny.writing"
188188
DatabaseForceDenyReadingKey = "database.force.deny.reading"
189189

190+
DatabaseForceDenyDDLKey = "database.force.deny.ddl" // all ddl
191+
DatabaseForceDenyCollectionDDLKey = "database.force.deny.collectionDDL"
192+
DatabaseForceDenyPartitionDDLKey = "database.force.deny.partitionDDL"
193+
DatabaseForceDenyIndexDDLKey = "database.force.deny.index"
194+
DatabaseForceDenyFlushDDLKey = "database.force.deny.flush"
195+
DatabaseForceDenyCompactionDDLKey = "database.force.deny.compaction"
196+
190197
// collection level load properties
191198
CollectionReplicaNumber = "collection.replica.number"
192199
CollectionResourceGroups = "collection.resource_groups"

pkg/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ require (
2020
github.com/jolestar/go-commons-pool/v2 v2.1.2
2121
github.com/json-iterator/go v1.1.12
2222
github.com/klauspost/compress v1.17.9
23-
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250305065753-10afe827b61e
23+
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250319042203-5e03a870569a
2424
github.com/minio/minio-go/v7 v7.0.73
2525
github.com/nats-io/nats-server/v2 v2.10.12
2626
github.com/nats-io/nats.go v1.34.1

0 commit comments

Comments
 (0)