diff --git a/pulsaradmin/pkg/admin/namespace.go b/pulsaradmin/pkg/admin/namespace.go index e92f020cb2..cc6a4aeb12 100644 --- a/pulsaradmin/pkg/admin/namespace.go +++ b/pulsaradmin/pkg/admin/namespace.go @@ -295,6 +295,15 @@ type Namespaces interface { // RemoveSubscriptionExpirationTime removes subscription expiration time from a namespace, // defaulting to broker settings RemoveSubscriptionExpirationTime(namespace utils.NameSpaceName) error + + // GetOffloadPolicies returns the offload configuration for a namespace + GetOffloadPolicies(namespace utils.NameSpaceName) (*utils.OffloadPolicies, error) + + // SetOffloadPolicies sets the offload configuration on a namespace + SetOffloadPolicies(namespace utils.NameSpaceName, policy *utils.OffloadPolicies) error + + // DeleteOffloadPolicies removes the offload configuration from a namespace + DeleteOffloadPolicies(namespace utils.NameSpaceName) error } type namespaces struct { @@ -940,3 +949,32 @@ func (n *namespaces) RemoveSubscriptionExpirationTime(namespace utils.NameSpaceN endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "subscriptionExpirationTime") return n.pulsar.Client.Delete(endpoint) } + +func (n *namespaces) SetOffloadPolicies(namespace utils.NameSpaceName, policy *utils.OffloadPolicies) error { + nsName, err := utils.GetNamespaceName(namespace.String()) + if err != nil { + return err + } + endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "offloadPolicies") + return n.pulsar.Client.Post(endpoint, policy) +} + +func (n *namespaces) GetOffloadPolicies(namespace utils.NameSpaceName) (*utils.OffloadPolicies, error) { + var policy utils.OffloadPolicies + nsName, err := utils.GetNamespaceName(namespace.String()) + if err != nil { + return nil, err + } + endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "offloadPolicies") + err = n.pulsar.Client.Get(endpoint, &policy) + return &policy, err +} + +func (n *namespaces) DeleteOffloadPolicies(namespace utils.NameSpaceName) error { + nsName, err := utils.GetNamespaceName(namespace.String()) + if err != nil { + return err + } + endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "removeOffloadPolicies") + return n.pulsar.Client.Delete(endpoint) +} diff --git a/pulsaradmin/pkg/admin/namespace_test.go b/pulsaradmin/pkg/admin/namespace_test.go index 8fa687ff41..cc342192cc 100644 --- a/pulsaradmin/pkg/admin/namespace_test.go +++ b/pulsaradmin/pkg/admin/namespace_test.go @@ -20,11 +20,12 @@ package admin import ( "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func ptr(n int) *int { @@ -341,3 +342,82 @@ func TestNamespaces_GetOffloadThresholdInSeconds(t *testing.T) { expected := int64(60) assert.Equal(t, expected, offloadThresholdInSeconds) } + +func TestNamespaces_SetOffloadPolicies(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + namespace, _ := utils.GetNamespaceName("public/default") + + tests := []struct { + name string + errReason string + policy *utils.OffloadPolicies + }{ + { + name: "Set invalid empty offload policy", + errReason: "The driver is not supported, support value: S3,aws-s3," + + "google-cloud-storage,filesystem,azureblob,aliyun-oss", + policy: &utils.OffloadPolicies{}, + }, + { + name: "Set invalid S3 offload policy", + errReason: "The bucket must be specified for namespace offload.", + policy: &utils.OffloadPolicies{ + ManagedLedgerOffloadDriver: "S3", + }, + }, + { + name: "Set valid filesystem offload policy", + errReason: "", + policy: &utils.OffloadPolicies{ + ManagedLedgerOffloadDriver: "filesystem", + OffloadersDirectory: "/tmp", + ManagedLedgerOffloadedReadPriority: "BOOKKEEPER_FIRST", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := admin.Namespaces().SetOffloadPolicies(*namespace, tt.policy) + if tt.errReason == "" { + assert.Equal(t, nil, err) + } + if err != nil { + restError := err.(rest.Error) + assert.Equal(t, tt.errReason, restError.Reason) + } + }) + } +} + +func TestNamespaces_GetAndDeleteOffloadPolicies(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + namespace, _ := utils.GetNamespaceName("public/default") + + // set simple filesystem offload policy and get it + err = admin.Namespaces().SetOffloadPolicies(*namespace, &utils.OffloadPolicies{ + ManagedLedgerOffloadDriver: "filesystem", + OffloadersDirectory: "/var/tmp", + ManagedLedgerOffloadedReadPriority: "TIERED_STORAGE_FIRST", + }) + assert.Equal(t, nil, err) + offload, err := admin.Namespaces().GetOffloadPolicies(*namespace) + assert.Equal(t, nil, err) + assert.Equal(t, "filesystem", offload.ManagedLedgerOffloadDriver) + assert.Equal(t, "/var/tmp", offload.OffloadersDirectory) + assert.Equal(t, "TIERED_STORAGE_FIRST", offload.ManagedLedgerOffloadedReadPriority) + + // delete previously set filesystem offload policy + err = admin.Namespaces().DeleteOffloadPolicies(*namespace) + assert.Equal(t, nil, err) + offload, err = admin.Namespaces().GetOffloadPolicies(*namespace) + assert.Equal(t, nil, err) + assert.Equal(t, "", offload.ManagedLedgerOffloadDriver) +} diff --git a/pulsaradmin/pkg/admin/topic.go b/pulsaradmin/pkg/admin/topic.go index 82cfc87ead..c718180d3c 100644 --- a/pulsaradmin/pkg/admin/topic.go +++ b/pulsaradmin/pkg/admin/topic.go @@ -383,6 +383,30 @@ type Topics interface { // @param data // list of replication cluster id SetReplicationClusters(topic utils.TopicName, data []string) error + + // GetOffloadPolicies returns the offload configuration for a topic + // + // @param topic + // topicName struct + // @param applied + // when set to true, function will try to find policy applied to this topic + // in namespace level, if no policy set in topic level + GetOffloadPolicies(topic utils.TopicName, applied bool) (*utils.OffloadPolicies, error) + + // SetOffloadPolicies sets the offload policy for a topic + // + // @param topic + // topicName struct + // @param policy + // Pointer to the OffloadPolicies struct with fields set according to the used + // tiered storage configuration + SetOffloadPolicies(topic utils.TopicName, policy *utils.OffloadPolicies) error + + // DeleteOffloadPolicies removes the offload configuration on a topic + // + // @param topic + // topicName struct + DeleteOffloadPolicies(topic utils.TopicName) error } type topics struct { @@ -917,3 +941,22 @@ func (t *topics) GetReplicationClusters(topic utils.TopicName) ([]string, error) err := t.pulsar.Client.Get(endpoint, &data) return data, err } + +func (t *topics) GetOffloadPolicies(topic utils.TopicName, applied bool) (*utils.OffloadPolicies, error) { + var policy utils.OffloadPolicies + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies") + _, err := t.pulsar.Client.GetWithQueryParams(endpoint, &policy, map[string]string{ + "applied": strconv.FormatBool(applied), + }, true) + return &policy, err +} + +func (t *topics) SetOffloadPolicies(topic utils.TopicName, policy *utils.OffloadPolicies) error { + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies") + return t.pulsar.Client.Post(endpoint, policy) +} + +func (t *topics) DeleteOffloadPolicies(topic utils.TopicName) error { + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies") + return t.pulsar.Client.Delete(endpoint) +} diff --git a/pulsaradmin/pkg/admin/topic_test.go b/pulsaradmin/pkg/admin/topic_test.go index 734cce1c51..5dfe6dec70 100644 --- a/pulsaradmin/pkg/admin/topic_test.go +++ b/pulsaradmin/pkg/admin/topic_test.go @@ -24,9 +24,11 @@ import ( "testing" "time" - "github.com/apache/pulsar-client-go/pulsar" "github.com/stretchr/testify/assert" + "github.com/apache/pulsar-client-go/pulsar" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" ) @@ -523,3 +525,91 @@ func TestRetention(t *testing.T) { 100*time.Millisecond, ) } + +func TestSetOffloadPolicies(t *testing.T) { + randomName := newTopicName() + topic := "persistent://public/default/" + randomName + cfg := &config.Config{} + admin, err := New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + topicName, err := utils.GetTopicName(topic) + assert.NoError(t, err) + err = admin.Topics().Create(*topicName, 4) + assert.NoError(t, err) + + tests := []struct { + name string + errReason string + policy *utils.OffloadPolicies + }{ + { + name: "Set invalid empty offload policy", + errReason: "The driver is not supported, support value: S3,aws-s3," + + "google-cloud-storage,filesystem,azureblob,aliyun-oss", + policy: &utils.OffloadPolicies{}, + }, + { + name: "Set invalid S3 offload policy", + errReason: "The bucket must be specified for namespace offload.", + policy: &utils.OffloadPolicies{ + ManagedLedgerOffloadDriver: "S3", + }, + }, + { + name: "Set valid filesystem offload policy", + errReason: "", + policy: &utils.OffloadPolicies{ + ManagedLedgerOffloadDriver: "filesystem", + OffloadersDirectory: "/tmp", + ManagedLedgerOffloadedReadPriority: "BOOKKEEPER_FIRST", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := admin.Topics().SetOffloadPolicies(*topicName, tt.policy) + if tt.errReason == "" { + assert.Equal(t, nil, err) + } + if err != nil { + restError := err.(rest.Error) + assert.Equal(t, tt.errReason, restError.Reason) + } + }) + } +} + +func TestGetAndDeleteOffloadPolicies(t *testing.T) { + randomName := newTopicName() + topic := "persistent://public/default/" + randomName + cfg := &config.Config{} + admin, err := New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + topicName, err := utils.GetTopicName(topic) + assert.NoError(t, err) + err = admin.Topics().Create(*topicName, 4) + assert.NoError(t, err) + + // set simple filesystem offload policy and get it + err = admin.Topics().SetOffloadPolicies(*topicName, &utils.OffloadPolicies{ + ManagedLedgerOffloadDriver: "filesystem", + OffloadersDirectory: "/var/tmp", + ManagedLedgerOffloadedReadPriority: "TIERED_STORAGE_FIRST", + }) + assert.Equal(t, nil, err) + offload, err := admin.Topics().GetOffloadPolicies(*topicName, false) + assert.Equal(t, nil, err) + assert.Equal(t, "filesystem", offload.ManagedLedgerOffloadDriver) + assert.Equal(t, "/var/tmp", offload.OffloadersDirectory) + assert.Equal(t, "TIERED_STORAGE_FIRST", offload.ManagedLedgerOffloadedReadPriority) + + // delete previously set filesystem offload policy + err = admin.Topics().DeleteOffloadPolicies(*topicName) + assert.Equal(t, nil, err) + offload, err = admin.Topics().GetOffloadPolicies(*topicName, false) + assert.Equal(t, nil, err) + assert.Equal(t, "", offload.ManagedLedgerOffloadDriver) + +} diff --git a/pulsaradmin/pkg/utils/offload_policies.go b/pulsaradmin/pkg/utils/offload_policies.go new file mode 100644 index 0000000000..ef0f7190e1 --- /dev/null +++ b/pulsaradmin/pkg/utils/offload_policies.go @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package utils + +type OffloadPolicies struct { + FileSystemDriver bool `json:"fileSystemDriver"` + FileSystemProfilePath string `json:"fileSystemProfilePath"` + FileSystemURI string `json:"fileSystemURI"` + GcsDriver bool `json:"gcsDriver"` + GcsManagedLedgerOffloadBucket string `json:"gcsManagedLedgerOffloadBucket"` + GcsManagedLedgerOffloadMaxBlockSizeInBytes int `json:"gcsManagedLedgerOffloadMaxBlockSizeInBytes"` + GcsManagedLedgerOffloadReadBufferSizeInBytes int `json:"gcsManagedLedgerOffloadReadBufferSizeInBytes"` + GcsManagedLedgerOffloadRegion string `json:"gcsManagedLedgerOffloadRegion"` + GcsManagedLedgerOffloadServiceAccountKeyFile string `json:"gcsManagedLedgerOffloadServiceAccountKeyFile"` + ManagedLedgerExtraConfigurations map[string]string `json:"managedLedgerExtraConfigurations"` + ManagedLedgerOffloadBucket string `json:"managedLedgerOffloadBucket"` + ManagedLedgerOffloadDeletionLagInMillis int `json:"managedLedgerOffloadDeletionLagInMillis"` + ManagedLedgerOffloadDriver string `json:"managedLedgerOffloadDriver"` + ManagedLedgerOffloadMaxBlockSizeInBytes int `json:"managedLedgerOffloadMaxBlockSizeInBytes"` + ManagedLedgerOffloadMaxThreads int `json:"managedLedgerOffloadMaxThreads"` + ManagedLedgerOffloadPrefetchRounds int `json:"managedLedgerOffloadPrefetchRounds"` + ManagedLedgerOffloadReadBufferSizeInBytes int `json:"managedLedgerOffloadReadBufferSizeInBytes"` + ManagedLedgerOffloadRegion string `json:"managedLedgerOffloadRegion"` + ManagedLedgerOffloadServiceEndpoint string `json:"managedLedgerOffloadServiceEndpoint"` + ManagedLedgerOffloadThresholdInBytes int `json:"managedLedgerOffloadThresholdInBytes"` + ManagedLedgerOffloadThresholdInSeconds int `json:"managedLedgerOffloadThresholdInSeconds"` + ManagedLedgerOffloadedReadPriority string `json:"managedLedgerOffloadedReadPriority"` + OffloadersDirectory string `json:"offloadersDirectory"` + S3Driver bool `json:"s3Driver"` + S3ManagedLedgerOffloadBucket string `json:"s3ManagedLedgerOffloadBucket"` + S3ManagedLedgerOffloadCredentialID string `json:"s3ManagedLedgerOffloadCredentialId"` + S3ManagedLedgerOffloadCredentialSecret string `json:"s3ManagedLedgerOffloadCredentialSecret"` + S3ManagedLedgerOffloadMaxBlockSizeInBytes int `json:"s3ManagedLedgerOffloadMaxBlockSizeInBytes"` + S3ManagedLedgerOffloadReadBufferSizeInBytes int `json:"s3ManagedLedgerOffloadReadBufferSizeInBytes"` + S3ManagedLedgerOffloadRegion string `json:"s3ManagedLedgerOffloadRegion"` + S3ManagedLedgerOffloadRole string `json:"s3ManagedLedgerOffloadRole"` + S3ManagedLedgerOffloadRoleSessionName string `json:"s3ManagedLedgerOffloadRoleSessionName"` + S3ManagedLedgerOffloadServiceEndpoint string `json:"s3ManagedLedgerOffloadServiceEndpoint"` +}