Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmd/k8s-rdma-shared-dp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,13 @@ func main() {

// add version flag
versionOpt := false
var configFilePath string
useCdi := false
var configFilePath, kubeletRootDir string
flag.BoolVar(&versionOpt, "version", false, "Show application version")
flag.BoolVar(&versionOpt, "v", false, "Show application version")
flag.StringVar(
&configFilePath, "config-file", resources.DefaultConfigFilePath, "path to device plugin config file")
useCdi := false
flag.StringVar(&kubeletRootDir, "kubelet-root-dir", "/var/lib/kubelet", "root directory of kubelet")
flag.BoolVar(&useCdi, "use-cdi", false,
"Use Container Device Interface to expose devices in containers")
flag.Parse()
Expand All @@ -80,7 +81,7 @@ func main() {

log.Println("Starting K8s RDMA Shared Device Plugin version=", version)

rm := resources.NewResourceManager(configFilePath, useCdi)
rm := resources.NewResourceManager(configFilePath, kubeletRootDir, useCdi)

log.Println("resource manager reading configs")
if err := rm.ReadConfig(); err != nil {
Expand Down
21 changes: 8 additions & 13 deletions pkg/resources/resources_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"fmt"
"log"
"os"
"path"
"regexp"
"strconv"
"time"
Expand Down Expand Up @@ -71,15 +72,15 @@ const (
)

var (
activeSockDir = "/var/lib/kubelet/plugins_registry"
deprecatedSockDir = "/var/lib/kubelet/device-plugins"
kubeletPluginRegistry = "plugins_registry"
deprecatedPluginRegistry = "device-plugins"
)

// resourceManager for plugin
type resourceManager struct {
configFile string
defaultResourcePrefix string
socketSuffix string
kubeletRootDir string
watchMode bool
configList []*types.UserConfig
resourceServers []types.ResourceServer
Expand All @@ -90,18 +91,12 @@ type resourceManager struct {
useCdi bool
}

func NewResourceManager(configFile string, useCdi bool) types.ResourceManager {
watcherMode := detectPluginWatchMode(activeSockDir)
if watcherMode {
fmt.Println("Using Kubelet Plugin Registry Mode")
} else {
fmt.Println("Using Deprecated Devie Plugin Registry Path")
}
func NewResourceManager(configFile, kubeletRootDir string, useCdi bool) types.ResourceManager {
return &resourceManager{
configFile: configFile,
watchMode: detectPluginWatchMode(path.Join(kubeletRootDir, kubeletPluginRegistry)),
kubeletRootDir: kubeletRootDir,
defaultResourcePrefix: rdmaHcaResourcePrefix,
socketSuffix: socketSuffix,
watchMode: watcherMode,
netlinkManager: &netlinkManager{},
rds: NewRdmaDeviceSpec(requiredRdmaDevices),
useCdi: useCdi,
Expand Down Expand Up @@ -244,7 +239,7 @@ func (rm *resourceManager) InitServers() error {
return err
}
}
rs, err := newResourceServer(config, filteredDevices, rm.watchMode, rm.socketSuffix, rm.useCdi)
rs, err := newResourceServer(config, filteredDevices, rm.kubeletRootDir, rm.watchMode, rm.useCdi)
if err != nil {
return err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/resources/resources_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,19 @@ func (l *FakeLink) Type() string {
var _ = Describe("ResourcesManger", func() {
Context("NewResourceManager", func() {
const activeSockDirBackUP = "/var/lib/kubelet/plugins_registry"
const kubeletRootDir = "/var/lib/kubelet"

It("Resource Manager with watcher mode", func() {
fs := utils.FakeFilesystem{
Dirs: []string{activeSockDir[1:]},
}
defer fs.Use()()
defer fs.Use()
activeSockDir = path.Join(fs.RootDir, activeSockDirBackUP[1:])
defer func() {
activeSockDir = activeSockDirBackUP
}()

obj := NewResourceManager(DefaultConfigFilePath, false)
obj := NewResourceManager(DefaultConfigFilePath, kubeletRootDir, false)
rm := obj.(*resourceManager)
Expect(rm.watchMode).To(Equal(true))
})
Expand All @@ -92,7 +93,7 @@ var _ = Describe("ResourcesManger", func() {
activeSockDir = activeSockDirBackUP
}()

obj := NewResourceManager(DefaultConfigFilePath, false)
obj := NewResourceManager(DefaultConfigFilePath, kubeletRootDir, false)
rm := obj.(*resourceManager)
Expect(rm.watchMode).To(Equal(false))
})
Expand Down
76 changes: 42 additions & 34 deletions pkg/resources/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ import (
"log"
"net"
"os"
"path"
"path/filepath"
"strconv"
"sync"

"time"

"golang.org/x/net/context"
grpc "google.golang.org/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
Expand All @@ -66,15 +68,17 @@ type resourcesServerPort struct {
}

type resourceServer struct {
resourceName string
watchMode bool
socketName string
socketPath string
stopWatcher chan bool
updateResource chan bool
health chan *pluginapi.Device
rsConnector types.ResourceServerPort
rdmaHcaMax int
activeSockDir string
deprecatedSockDir string
resourceName string
watchMode bool
socketName string
socketPath string
stopWatcher chan bool
updateResource chan bool
health chan *pluginapi.Device
rsConnector types.ResourceServerPort
rdmaHcaMax int
// Mutex protects devs and deviceSpec
mutex sync.RWMutex
devs []*pluginapi.Device
Expand Down Expand Up @@ -135,11 +139,17 @@ func (rsc *resourcesServerPort) GetClientConn(unixSocketPath string) (*grpc.Clie
}

// newResourceServer returns an initialized server
func newResourceServer(config *types.UserConfig, devices []types.PciNetDevice, watcherMode bool,
socketSuffix string, useCdi bool) (types.ResourceServer, error) {
func newResourceServer(config *types.UserConfig, devices []types.PciNetDevice, kubeletRootDir string, watcherMode, useCdi bool) (types.ResourceServer, error) {
var activeSockDir, deprecatedSockDir string
var devs []*pluginapi.Device

sockDir := activeSockDir
if watcherMode {
activeSockDir = path.Join(kubeletRootDir, kubeletPluginRegistry)
fmt.Printf("Using watch mode with socket directory: %s\n", activeSockDir)
} else {
deprecatedSockDir = path.Join(kubeletRootDir, deprecatedPluginRegistry)
fmt.Printf("Using deprecated mode with socket directory: %s\n", deprecatedSockDir)
}

if config.RdmaHcaMax < 0 {
return nil, fmt.Errorf("error: Invalid value for rdmaHcaMax < 0: %d", config.RdmaHcaMax)
Expand All @@ -163,28 +173,26 @@ func newResourceServer(config *types.UserConfig, devices []types.PciNetDevice, w
log.Printf("Warning: no Rdma Devices were found for resource %s\n", config.ResourceName)
}

if !watcherMode {
sockDir = deprecatedSockDir
}

socketName := fmt.Sprintf("%s.%s", config.ResourceName, socketSuffix)

return &resourceServer{
resourceName: fmt.Sprintf("%s/%s", config.ResourcePrefix, config.ResourceName),
socketName: socketName,
socketPath: filepath.Join(sockDir, socketName),
watchMode: watcherMode,
devs: devs,
deviceSpec: deviceSpec,
stopWatcher: make(chan bool),
updateResource: make(chan bool, 1),
health: make(chan *pluginapi.Device),
rsConnector: &resourcesServerPort{},
rdmaHcaMax: config.RdmaHcaMax,
pciDevices: devices,
useCdi: useCdi,
cdi: cdi.New(),
cdiResourceName: config.ResourceName,
activeSockDir: activeSockDir,
deprecatedSockDir: deprecatedSockDir,
resourceName: fmt.Sprintf("%s/%s", config.ResourcePrefix, config.ResourceName),
socketName: socketName,
socketPath: filepath.Join(activeSockDir, socketName),
watchMode: watcherMode,
devs: devs,
deviceSpec: deviceSpec,
stopWatcher: make(chan bool),
updateResource: make(chan bool, 1),
health: make(chan *pluginapi.Device),
rsConnector: &resourcesServerPort{},
rdmaHcaMax: config.RdmaHcaMax,
pciDevices: devices,
useCdi: useCdi,
cdi: cdi.New(),
cdiResourceName: config.ResourceName,
}, nil
}

Expand Down Expand Up @@ -290,7 +298,7 @@ func (rs *resourceServer) Watch() {

// Register registers the device plugin for the given resourceName with Kubelet.
func (rs *resourceServer) register() error {
kubeletEndpoint := filepath.Join(deprecatedSockDir, kubeEndPoint)
kubeletEndpoint := filepath.Join(rs.deprecatedSockDir, kubeEndPoint)
conn, err := rs.rsConnector.GetClientConn(kubeletEndpoint)
if err != nil {
return err
Expand Down Expand Up @@ -437,7 +445,7 @@ func (rs *resourceServer) GetInfo(_ context.Context, _ *registerapi.InfoRequest)
pluginInfoResponse := &registerapi.PluginInfo{
Type: registerapi.DevicePlugin,
Name: rs.resourceName,
Endpoint: filepath.Join(activeSockDir, rs.socketName),
Endpoint: filepath.Join(rs.activeSockDir, rs.socketName),
SupportedVersions: []string{"v1alpha1", "v1beta1"},
}
return pluginInfoResponse, nil
Expand Down
31 changes: 18 additions & 13 deletions pkg/resources/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ const (
fakeNetDevicePath = "sys/class/net/ib0/"
)

var (
activeSockDir = "/var/lib/kubelet/plugins-registry"
deprecatedSockDir = "/var/lib/kubelet/device-plugins"
)

type devPluginListAndWatchServerMock struct {
grpc.ServerStream
devices []*pluginapi.Device
Expand Down Expand Up @@ -92,7 +97,7 @@ var _ = Describe("resourceServer tests", func() {
}
defer fs.Use()()
conf := &types.UserConfig{ResourceName: "test_server", ResourcePrefix: "rdma", RdmaHcaMax: 100}
obj, err := newResourceServer(conf, fakeDeviceList, true, "socket", false)
obj, err := newResourceServer(conf, fakeDeviceList, "/var/lib/kubelet", false, false)
Expect(err).ToNot(HaveOccurred())
rs := obj.(*resourceServer)
Expect(rs.resourceName).To(Equal("rdma/test_server"))
Expand All @@ -107,7 +112,7 @@ var _ = Describe("resourceServer tests", func() {
}
defer fs.Use()()
conf := &types.UserConfig{ResourceName: "test_server", ResourcePrefix: "rdma", RdmaHcaMax: 0}
obj, err := newResourceServer(conf, fakeDeviceList, true, "socket", false)
obj, err := newResourceServer(conf, fakeDeviceList, "/var/lib/kubelet", false, false)
Expect(err).ToNot(HaveOccurred())
rs := obj.(*resourceServer)
Expect(rs.resourceName).To(Equal("rdma/test_server"))
Expand All @@ -126,7 +131,7 @@ var _ = Describe("resourceServer tests", func() {
fakePciDevice.On("GetRdmaSpec").Return([]*pluginapi.DeviceSpec{})
fakePciDevice.On("GetPciAddr").Return("0000:02:00.0")
deviceList := []types.PciNetDevice{fakePciDevice}
obj, err := newResourceServer(conf, deviceList, true, "socket", false)
obj, err := newResourceServer(conf, deviceList, "/var/lib/kubelet", true, false)
Expect(err).ToNot(HaveOccurred())
rs := obj.(*resourceServer)
Expect(rs.resourceName).To(Equal("rdma/test_server"))
Expand All @@ -141,7 +146,7 @@ var _ = Describe("resourceServer tests", func() {
}
defer fs.Use()()
conf := &types.UserConfig{ResourceName: "test_server", ResourcePrefix: "rdma", RdmaHcaMax: 100}
obj, err := newResourceServer(conf, fakeDeviceList, false, "socket", false)
obj, err := newResourceServer(conf, fakeDeviceList, "/var/lib/kubelet", false, false)
Expect(err).ToNot(HaveOccurred())
rs := obj.(*resourceServer)
Expect(rs.resourceName).To(Equal("rdma/test_server"))
Expand All @@ -156,7 +161,7 @@ var _ = Describe("resourceServer tests", func() {
}
defer fs.Use()()
conf := &types.UserConfig{ResourceName: "test_server", ResourcePrefix: "rdma", RdmaHcaMax: 0}
obj, err := newResourceServer(conf, fakeDeviceList, false, "socket", false)
obj, err := newResourceServer(conf, fakeDeviceList, "/var/lib/kubelet", false, false)
Expect(err).ToNot(HaveOccurred())
rs := obj.(*resourceServer)
Expect(rs.resourceName).To(Equal("rdma/test_server"))
Expand All @@ -166,7 +171,7 @@ var _ = Describe("resourceServer tests", func() {
})
It("server with plugin with invalid max number of resources", func() {
conf := &types.UserConfig{ResourceName: "test_server", ResourcePrefix: "rdma", RdmaHcaMax: -100}
obj, err := newResourceServer(conf, fakeDeviceList, true, "socket", false)
obj, err := newResourceServer(conf, fakeDeviceList, "/var/lib/kubelet", true, false)
Expect(err).To(HaveOccurred())
Expect(obj).To(BeNil())
})
Expand Down Expand Up @@ -399,7 +404,7 @@ var _ = Describe("resourceServer tests", func() {
}
defer fs.Use()()
conf := &types.UserConfig{RdmaHcaMax: 100, ResourcePrefix: "rdma", ResourceName: "fake"}
obj, err := newResourceServer(conf, fakeDeviceList, true, "fake", false)
obj, err := newResourceServer(conf, fakeDeviceList, "/var/lib/kubelet", true, false)
Expect(err).ToNot(HaveOccurred())

rs := obj.(*resourceServer)
Expand Down Expand Up @@ -430,7 +435,7 @@ var _ = Describe("resourceServer tests", func() {
}
defer fs.Use()()
conf := &types.UserConfig{RdmaHcaMax: 1, ResourcePrefix: "rdma", ResourceName: "fake"}
obj, err := newResourceServer(conf, fakeDeviceList, true, "fake", true)
obj, err := newResourceServer(conf, fakeDeviceList, "/var/lib/kubelet", true, true)
Expect(err).ToNot(HaveOccurred())

cdi := &cdiMocks.CDI{}
Expand Down Expand Up @@ -572,7 +577,7 @@ var _ = Describe("resourceServer tests", func() {
}()

conf := &types.UserConfig{ResourceName: "fake_test", ResourcePrefix: "rdma", RdmaHcaMax: 100}
obj, err := newResourceServer(conf, fakeDeviceList, true, "socket", false)
obj, err := newResourceServer(conf, fakeDeviceList, "/var/lib/kubelet", true, false)
Expect(err).ToNot(HaveOccurred())
rs := obj.(*resourceServer)

Expand Down Expand Up @@ -636,7 +641,7 @@ var _ = Describe("resourceServer tests", func() {
deprecatedSockDir = fs.RootDir

conf := &types.UserConfig{ResourceName: "fakename", ResourcePrefix: "rdma", RdmaHcaMax: 100}
obj, err := newResourceServer(conf, fakeDeviceList, false, "socket", false)
obj, err := newResourceServer(conf, fakeDeviceList, "/var/lib/kubelet", false, false)
Expect(err).ToNot(HaveOccurred())
rs := obj.(*resourceServer)

Expand Down Expand Up @@ -665,7 +670,7 @@ var _ = Describe("resourceServer tests", func() {
activeSockDir = fs.RootDir

conf := &types.UserConfig{ResourceName: "fakename", ResourcePrefix: "rdma", RdmaHcaMax: 100}
obj, err := newResourceServer(conf, fakeDeviceList, true, "socket", false)
obj, err := newResourceServer(conf, fakeDeviceList, "/var/lib/kubelet", true, false)
Expect(err).ToNot(HaveOccurred())
rs := obj.(*resourceServer)

Expand Down Expand Up @@ -695,7 +700,7 @@ var _ = Describe("resourceServer tests", func() {
deprecatedSockDir = fs.RootDir

conf := &types.UserConfig{ResourceName: "fakename", ResourcePrefix: "rdma", RdmaHcaMax: 100}
obj, err := newResourceServer(conf, fakeDeviceList, false, "socket", false)
obj, err := newResourceServer(conf, fakeDeviceList, "/var/lib/kubelet", false, false)
Expect(err).ToNot(HaveOccurred())
rs := obj.(*resourceServer)

Expand Down Expand Up @@ -724,7 +729,7 @@ var _ = Describe("resourceServer tests", func() {
DescribeTable("allocating",
func(req *pluginapi.AllocateRequest, expectedRespLength int, shouldFail bool) {
conf := &types.UserConfig{ResourceName: "fakename", ResourcePrefix: "rdma", RdmaHcaMax: 100}
obj, err := newResourceServer(conf, fakeDeviceList, true, "socket", false)
obj, err := newResourceServer(conf, fakeDeviceList, "/var/lib/kubelet", true, false)
Expect(err).ToNot(HaveOccurred())
rs := obj.(*resourceServer)

Expand Down