Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"fmt"
"net/http"
_ "net/http/pprof"
Expand Down Expand Up @@ -48,6 +49,7 @@ import (
"github.com/sirupsen/logrus"
flag "github.com/spf13/pflag"
"golang.org/x/sys/unix"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/component-base/logs"
logsapi "k8s.io/component-base/logs/api/v1"
Expand Down Expand Up @@ -190,6 +192,8 @@ func main() {
}
}

csiCfg := getCSIPluginConfig()

for _, driverName := range driverNames {
wg.Add(1)
endPointName := replaceCsiEndpoint(driverName, *endpoint)
Expand All @@ -199,19 +203,19 @@ func main() {
case TypePluginNAS:
go func(endPoint string) {
defer wg.Done()
driver := nas.NewDriver(meta, endPoint, serviceType)
driver := nas.NewDriver(meta, endPoint, serviceType, csiCfg)
driver.Run()
}(endPointName)
case TypePluginOSS:
go func(endPoint string) {
defer wg.Done()
driver := oss.NewDriver(endPoint, meta, serviceType)
driver := oss.NewDriver(endPoint, meta, serviceType, csiCfg)
driver.Run()
}(endPointName)
case TypePluginDISK:
go func(endPoint string) {
defer wg.Done()
driver := disk.NewDriver(meta, endPoint, serviceType)
driver := disk.NewDriver(meta, endPoint, serviceType, csiCfg)
driver.Run()
}(endPointName)

Expand Down Expand Up @@ -296,3 +300,30 @@ func healthHandler(w http.ResponseWriter, r *http.Request) {
message := "Liveness probe is OK, time:" + time.String()
_, _ = w.Write([]byte(message))
}

func getCSIPluginConfig() (config utils.Config) {
cfg, err := options.GetRestConfig()
if err != nil {
klog.ErrorS(err, "error building kube config")
return
}

client, err := kubernetes.NewForConfig(cfg)
if err != nil {
if cfg != nil {
klog.Fatal(err, "error building kube client")
} else {
klog.ErrorS(err, "error building kube client")
return
}
}

cm, err := client.CoreV1().ConfigMaps("kube-system").Get(context.Background(), "csi-plugin", metav1.GetOptions{})
if err != nil {
klog.ErrorS(err, "failed to get config map csi-plugin")
return
}

config.ConfigMap = cm.Data
return
}
11 changes: 9 additions & 2 deletions pkg/common/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,15 @@ func ParseEndpoint(ep string) (string, string, error) {
}

func RunCSIServer(driverType, endpoint string, servers Servers) {
config := options.MustGetRestConfig()
clientset := kubernetes.NewForConfigOrDie(config)
config, err := options.GetRestConfig()
if err != nil {
klog.ErrorS(err, "failed to get rest config")
}

var clientset kubernetes.Interface
if config != nil {
clientset = kubernetes.NewForConfigOrDie(config)
}

proto, addr, err := ParseEndpoint(endpoint)
if err != nil {
Expand Down
9 changes: 3 additions & 6 deletions pkg/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,12 @@ func initDriver() {
}

// NewDriver create the identity/node/controller server and disk driver
func NewDriver(m metadata.MetadataProvider, endpoint string, serviceType utils.ServiceType) *DISK {
func NewDriver(m metadata.MetadataProvider, endpoint string, serviceType utils.ServiceType, csiCfg utils.Config) *DISK {
initDriver()
tmpdisk := &DISK{}
tmpdisk.endpoint = endpoint

// Config Global vars
csiCfg := GlobalConfigSet(m)
GlobalConfigSet(m, csiCfg)

if serviceType&utils.Node != 0 {
GlobalConfigVar.NodeID = metadata.MustGet(m, metadata.InstanceID)
Expand Down Expand Up @@ -158,7 +157,7 @@ func (disk *DISK) Run() {
}

// GlobalConfigSet set Global Config
func GlobalConfigSet(m metadata.MetadataProvider) utils.Config {
func GlobalConfigSet(m metadata.MetadataProvider, csiCfg utils.Config) {
configMapName := "csi-plugin"

// Global Configs Set
Expand All @@ -179,7 +178,6 @@ func GlobalConfigSet(m metadata.MetadataProvider) utils.Config {
klog.Fatalf("Error building kubernetes snapclientset: %s", err.Error())
}

csiCfg := utils.Config{}
configMap, err := kubeClient.CoreV1().ConfigMaps("kube-system").Get(context.Background(), configMapName, metav1.GetOptions{})
if err != nil {
klog.Infof("Not found configmap named as csi-plugin under kube-system, with: %v", err)
Expand Down Expand Up @@ -247,7 +245,6 @@ func GlobalConfigSet(m metadata.MetadataProvider) utils.Config {
GlobalConfigVar.DetachBeforeDelete,
GlobalConfigVar.ClusterID,
)
return csiCfg
}

func newBatcher(fromNode bool) (waitstatus.StatusWaiter[ecs.Disk], batcher.Batcher[ecs.Disk]) {
Expand Down
11 changes: 5 additions & 6 deletions pkg/metric/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ type CSICollector struct {
}

// newCSICollector method returns the CSICollector object
func newCSICollector(driverNames []string, serviceType utils.ServiceType) error {
func newCSICollector(driverNames []string, serviceType utils.ServiceType) {
if csiCollectorInstance != nil {
return nil
return
}
collectors := make(map[string]Collector)
if serviceType&utils.Node != 0 {
Expand All @@ -67,16 +67,15 @@ func newCSICollector(driverNames []string, serviceType utils.ServiceType) error
if enabled {
collector, err := reg.Factory()
if err != nil {
return err
klog.ErrorS(err, "Failed to create collector")
} else {
collectors[reg.Name] = collector
}
collectors[reg.Name] = collector
}
}
}
collectors[CsiGrpcExecTimeCollectorName] = &CsiGrpcExecTimeCollector
csiCollectorInstance = &CSICollector{Collectors: collectors}

return nil
}

// Describe implements the prometheus.Collector interface.
Expand Down
5 changes: 1 addition & 4 deletions pkg/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// NewMetricHandler method returns a promHttp object
func NewMetricHandler(driverNames []string, serviceType utils.ServiceType) *Handler {
//csi collector singleton
err := newCSICollector(driverNames, serviceType)
if err != nil {
klog.Errorf("Couldn't create collector: %s", err)
}
newCSICollector(driverNames, serviceType)
return newHandler()
}

Expand Down
8 changes: 3 additions & 5 deletions pkg/mounter/fuse_pod_manager/fuse_pod_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

mounterutils "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils"
corev1 "k8s.io/api/core/v1"
apiserrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -101,14 +102,11 @@ type FuseContainerConfig struct {
Extra map[string]string
}

func ExtractFuseContainerConfig(configmap *corev1.ConfigMap, name string) (config FuseContainerConfig) {
func ExtractFuseContainerConfig(csiCfg utils.Config, name string) (config FuseContainerConfig) {
config.Resources.Requests = make(corev1.ResourceList)
config.Resources.Limits = make(corev1.ResourceList)

if configmap == nil {
return
}
content := configmap.Data["fuse-"+name]
content := csiCfg.Get("fuse-"+name, "OSS_FUSE_"+strings.ToUpper(name), "")
for _, line := range strings.Split(content, "\n") {
line = strings.TrimSpace(line)
if line == "" {
Expand Down
7 changes: 4 additions & 3 deletions pkg/mounter/fuse_pod_manager/fuse_pod_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"
"time"

"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
Expand All @@ -17,8 +18,8 @@ import (
)

func Test_ExtractFuseContainerConfig(t *testing.T) {
configmap := &corev1.ConfigMap{
Data: map[string]string{
csiCfg := utils.Config{
ConfigMap: map[string]string{
"fuse-ossfs": `
image=ossfs:latest
cpu-request=100m
Expand All @@ -32,7 +33,7 @@ func Test_ExtractFuseContainerConfig(t *testing.T) {
`,
},
}
config := ExtractFuseContainerConfig(configmap, "ossfs")
config := ExtractFuseContainerConfig(csiCfg, "ossfs")
expected := FuseContainerConfig{
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
Expand Down
4 changes: 2 additions & 2 deletions pkg/mounter/fuse_pod_manager/oss/ossfs/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ var ossfsDbglevels = map[string]string{
fpm.DebugLevelFatal: "crit",
}

func NewFuseOssfs(configmap *corev1.ConfigMap, m metadata.MetadataProvider) ossfpm.OSSFuseMounterType {
config := fpm.ExtractFuseContainerConfig(configmap, mounterutils.OssFsType)
func NewFuseOssfs(csiCfg utils.Config, m metadata.MetadataProvider) ossfpm.OSSFuseMounterType {
config := fpm.ExtractFuseContainerConfig(csiCfg, mounterutils.OssFsType)

// set default image
ossfpm.SetDefaultImage(mounterutils.OssFsType, m, &config)
Expand Down
9 changes: 5 additions & 4 deletions pkg/mounter/fuse_pod_manager/oss/ossfs/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
fpm "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/fuse_pod_manager"
ossfpm "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/fuse_pod_manager/oss"
mounterutils "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -102,7 +103,7 @@ func Test_buildAuthSpec_ossfs(t *testing.T) {

func TestPrecheckAuthConfig_ossfs(t *testing.T) {
fakeMeta := metadata.NewMetadata()
fakeOssfs := NewFuseOssfs(nil, fakeMeta)
fakeOssfs := NewFuseOssfs(utils.Config{}, fakeMeta)
tests := []struct {
name string
opts *ossfpm.Options
Expand Down Expand Up @@ -316,7 +317,7 @@ func TestMakeAuthConfig_ossfs(t *testing.T) {
t.Setenv("CLUSTER_ID", "cluster-id")
t.Setenv("ALIBABA_CLOUD_ACCOUNT_ID", "account-id")
fakeMeta := metadata.NewMetadata()
fakeOssfs := NewFuseOssfs(nil, fakeMeta)
fakeOssfs := NewFuseOssfs(utils.Config{}, fakeMeta)
tests := []struct {
name string
options *ossfpm.Options
Expand Down Expand Up @@ -597,7 +598,7 @@ func TestMakeMountOptions_ossfs(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
t.Setenv("REGION_ID", tt.region)
fakeMeta := metadata.NewMetadata()
fakeOssfs := NewFuseOssfs(nil, fakeMeta)
fakeOssfs := NewFuseOssfs(utils.Config{}, fakeMeta)
mountOptions, err := fakeOssfs.MakeMountOptions(tt.opts, fakeMeta)
assert.Equal(t, tt.expectedError, err != nil)
assert.ElementsMatch(t, tt.expected, mountOptions)
Expand Down Expand Up @@ -710,7 +711,7 @@ func TestGetAuthOpttions_ossfs(t *testing.T) {

func TestAddDefaultMountOptions_ossfs(t *testing.T) {
fakeMeta := metadata.NewMetadata()
fakeInter := NewFuseOssfs(nil, fakeMeta)
fakeInter := NewFuseOssfs(utils.Config{}, fakeMeta)
fakeOssfs, ok := fakeInter.(*fuseOssfs)
if !ok {
t.Fatalf("failed to cast to fuseOssfs")
Expand Down
4 changes: 2 additions & 2 deletions pkg/mounter/fuse_pod_manager/oss/ossfs2/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ var ossfs2Dbglevels = map[string]string{
fpm.DebugLevelInfo: "info",
}

func NewFuseOssfs(configmap *corev1.ConfigMap, m metadata.MetadataProvider) ossfpm.OSSFuseMounterType {
config := fpm.ExtractFuseContainerConfig(configmap, mounterutils.OssFs2Type)
func NewFuseOssfs(csiCfg utils.Config, m metadata.MetadataProvider) ossfpm.OSSFuseMounterType {
config := fpm.ExtractFuseContainerConfig(csiCfg, mounterutils.OssFs2Type)
// set default image
ossfpm.SetDefaultImage(mounterutils.OssFs2Type, m, &config)
// set default memory request
Expand Down
9 changes: 5 additions & 4 deletions pkg/mounter/fuse_pod_manager/oss/ossfs2/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
fpm "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/fuse_pod_manager"
ossfpm "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/fuse_pod_manager/oss"
mounterutils "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/utils"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
Expand All @@ -19,7 +20,7 @@ import (

func TestPrecheckAuthConfig_ossfs2(t *testing.T) {
fakeMeta := metadata.NewMetadata()
fakeOssfs := NewFuseOssfs(nil, fakeMeta)
fakeOssfs := NewFuseOssfs(utils.Config{}, fakeMeta)
tests := []struct {
name string
opts *ossfpm.Options
Expand Down Expand Up @@ -156,7 +157,7 @@ func TestMakeAuthConfig_ossfs2(t *testing.T) {
t.Setenv("CLUSTER_ID", "cluster-id")
t.Setenv("ALIBABA_CLOUD_ACCOUNT_ID", "account-id")
fakeMeta := metadata.NewMetadata()
fakeOssfs := NewFuseOssfs(nil, fakeMeta)
fakeOssfs := NewFuseOssfs(utils.Config{}, fakeMeta)
tests := []struct {
name string
options *ossfpm.Options
Expand Down Expand Up @@ -371,7 +372,7 @@ func TestMakeMountOptions_ossfs2(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
t.Setenv("REGION_ID", tt.region)
fakeMeta := metadata.NewMetadata()
fakeOssfs := NewFuseOssfs(nil, fakeMeta)
fakeOssfs := NewFuseOssfs(utils.Config{}, fakeMeta)
opts, err := fakeOssfs.MakeMountOptions(tt.opts, fakeMeta)
assert.Equal(t, tt.expectedError, err != nil)
assert.Equal(t, tt.expected, opts)
Expand All @@ -381,7 +382,7 @@ func TestMakeMountOptions_ossfs2(t *testing.T) {

func TestAddDefaultMountOptions_ossfs2(t *testing.T) {
fakeMeta := metadata.NewMetadata()
fakeInter := NewFuseOssfs(nil, fakeMeta)
fakeInter := NewFuseOssfs(utils.Config{}, fakeMeta)
fakeOssfs, ok := fakeInter.(*fuseOssfs)
if !ok {
t.Fatalf("failed to cast to fuseOssfs")
Expand Down
14 changes: 7 additions & 7 deletions pkg/mounter/fuse_pod_manager/oss/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ package oss
import (
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud/metadata"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter"
corev1 "k8s.io/api/core/v1"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils"
"k8s.io/client-go/kubernetes"
)

var (
fstypeToFactory = map[string]func(*corev1.ConfigMap, metadata.MetadataProvider) OSSFuseMounterType{}
fstypeToFactory = map[string]func(utils.Config, metadata.MetadataProvider) OSSFuseMounterType{}
fstypeToPath = map[string]string{}
fstypeToInterceptors = map[string][]mounter.MountInterceptor{}
)

// RegisterFuseMounter registers a fuse mounter factory for a given fstype
func RegisterFuseMounter(fstype string, factory func(*corev1.ConfigMap, metadata.MetadataProvider) OSSFuseMounterType) {
func RegisterFuseMounter(fstype string, factory func(utils.Config, metadata.MetadataProvider) OSSFuseMounterType) {
fstypeToFactory[fstype] = factory
}

Expand Down Expand Up @@ -50,12 +50,12 @@ func GetFuseMountInterceptors(fstype string) ([]mounter.MountInterceptor, bool)
}

// GetFuseMounter returns a fuse mounter instance for the given fstype
func GetFuseMounter(fstype string, configmap *corev1.ConfigMap, m metadata.MetadataProvider) (OSSFuseMounterType, error) {
func GetFuseMounter(fstype string, csiCfg utils.Config, m metadata.MetadataProvider) (OSSFuseMounterType, error) {
factory, ok := fstypeToFactory[fstype]
if !ok {
return nil, &UnsupportedFstypeError{Fstype: fstype}
}
return factory(configmap, m), nil
return factory(csiCfg, m), nil
}

// GetAllRegisteredFuseTypes returns all registered fuse types
Expand All @@ -70,10 +70,10 @@ func GetAllRegisteredFuseTypes() []string {
// GetAllOSSFusePodManagers creates a map of all registered OSS fuse pod managers
// configmap can be nil if not available (e.g., in CSI agent mode)
// client can be nil if not needed (e.g., in CSI agent mode)
func GetAllOSSFusePodManagers(configmap *corev1.ConfigMap, m metadata.MetadataProvider, client kubernetes.Interface) map[string]*OSSFusePodManager {
func GetAllOSSFusePodManagers(csiCfg utils.Config, m metadata.MetadataProvider, client kubernetes.Interface) map[string]*OSSFusePodManager {
fusePodManagers := make(map[string]*OSSFusePodManager, len(fstypeToFactory))
for fstype, factory := range fstypeToFactory {
fusePodManagers[fstype] = NewOSSFusePodManager(factory(configmap, m), client)
fusePodManagers[fstype] = NewOSSFusePodManager(factory(csiCfg, m), client)
}
return fusePodManagers
}
Expand Down
Loading