Skip to content

Redefine the Kubernetes process registry #189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/process/finders/kubernetes/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ type PodContainer struct {
ContainerStatus v1.ContainerStatus

// the kubernetes resource registry
registry *Registry
registry Registry
}

// AnalyzeContainers means query the containers by pod
func AnalyzeContainers(pod *v1.Pod, registry *Registry) []*PodContainer {
func AnalyzeContainers(pod *v1.Pod, registry Registry) []*PodContainer {
containers := make([]*PodContainer, 0)
// nolint
for _, cs := range pod.Status.ContainerStatuses {
Expand Down
29 changes: 17 additions & 12 deletions pkg/process/finders/kubernetes/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,30 @@ type ProcessFinder struct {

// k8s clients
k8sConfig *rest.Config
registry *Registry
registry Registry
CLI *kubernetes.Clientset

// runtime config
namespaces []string

// for IsPodIP check
podIPChecker *cache.Expiring
podIPMutexes map[int]*sync.Mutex
}

func (f *ProcessFinder) Init(ctx context.Context, conf base.FinderBaseConfig, manager base.ProcessManager) error {
return f.InitWithRegistry(ctx, conf, manager, func(clientset *kubernetes.Clientset) Registry {
config := conf.(*Config)
var namespaces []string
// namespace update
if config.Namespaces != "" {
namespaces = strings.Split(config.Namespaces, ",")
} else {
namespaces = []string{v1.NamespaceAll}
}
return NewStaticNamespaceRegistry(clientset, namespaces, config.NodeName)
})
}

func (f *ProcessFinder) InitWithRegistry(ctx context.Context, conf base.FinderBaseConfig, manager base.ProcessManager,
registrySupplier func(*kubernetes.Clientset) Registry) error {
f.clusterName = manager.GetModuleManager().FindModule(core.ModuleName).(core.Operator).ClusterName()
k8sConf, cli, err := f.validateConfig(ctx, conf.(*Config))
if err != nil {
Expand All @@ -99,7 +111,7 @@ func (f *ProcessFinder) Init(ctx context.Context, conf base.FinderBaseConfig, ma

f.ctx, f.cancelCtx = context.WithCancel(ctx)
f.stopChan = make(chan struct{}, 1)
f.registry = NewRegistry(f.CLI, f.namespaces, f.conf.NodeName)
f.registry = registrySupplier(cli)
f.manager = manager
f.podIPChecker = cache.NewExpiring()
f.podIPMutexes = make(map[int]*sync.Mutex)
Expand Down Expand Up @@ -132,13 +144,6 @@ func (f *ProcessFinder) validateConfig(ctx context.Context, conf *Config) (*rest
return nil, nil, fmt.Errorf("could not found the node: %s, %v", conf.NodeName, err)
}

// namespace update
if conf.Namespaces != "" {
f.namespaces = strings.Split(conf.Namespaces, ",")
} else {
f.namespaces = []string{v1.NamespaceAll}
}

// process builders
if err := ProcessBuildersInit(conf.Analyzers); err != nil {
return nil, nil, err
Expand Down
26 changes: 16 additions & 10 deletions pkg/process/finders/kubernetes/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,21 @@ import (

const rsyncPeriod = 5 * time.Minute

type Registry struct {
type Registry interface {
Start(stopChan chan struct{})
BuildPodContainers() map[string]*PodContainer
FindServiceName(namespace, podName string) string
}

type StaticNamespaceRegistry struct {
podInformers []cache.SharedInformer
serviceInformers []cache.SharedInformer

podServiceNameCache map[string]string
}

func NewRegistry(cli *kubernetes.Clientset, namespaces []string, nodeName string) *Registry {
r := &Registry{
func NewStaticNamespaceRegistry(cli *kubernetes.Clientset, namespaces []string, nodeName string) Registry {
r := &StaticNamespaceRegistry{
podInformers: make([]cache.SharedInformer, 0),
serviceInformers: make([]cache.SharedInformer, 0),
podServiceNameCache: make(map[string]string),
Expand All @@ -59,14 +65,14 @@ func NewRegistry(cli *kubernetes.Clientset, namespaces []string, nodeName string
return r
}

func (r *Registry) Start(stopChan chan struct{}) {
func (r *StaticNamespaceRegistry) Start(stopChan chan struct{}) {
for i := range r.podInformers {
go r.podInformers[i].Run(stopChan)
go r.serviceInformers[i].Run(stopChan)
}
}

func (r *Registry) BuildPodContainers() map[string]*PodContainer {
func (r *StaticNamespaceRegistry) BuildPodContainers() map[string]*PodContainer {
// cgroupid -> container
containers := make(map[string]*PodContainer)
for _, in := range r.podInformers {
Expand All @@ -84,11 +90,11 @@ func (r *Registry) BuildPodContainers() map[string]*PodContainer {
return containers
}

func (r *Registry) FindServiceName(namespace, podName string) string {
func (r *StaticNamespaceRegistry) FindServiceName(namespace, podName string) string {
return r.podServiceNameCache[namespace+"_"+podName]
}

func (r *Registry) recomposePodServiceName() {
func (r *StaticNamespaceRegistry) recomposePodServiceName() {
result := make(map[string]string)
for i := range r.podInformers {
for _, podT := range r.podInformers[i].GetStore().List() {
Expand Down Expand Up @@ -134,17 +140,17 @@ func chooseServiceName(a, b string) string {
return b
}

func (r *Registry) OnAdd(d interface{}) {
func (r *StaticNamespaceRegistry) OnAdd(d interface{}) {
r.recomposePodServiceName()
}

func (r *Registry) OnUpdate(d, u interface{}) {
func (r *StaticNamespaceRegistry) OnUpdate(d, u interface{}) {
same := reflect.DeepEqual(d, u)
if !same {
r.recomposePodServiceName()
}
}

func (r *Registry) OnDelete(d interface{}) {
func (r *StaticNamespaceRegistry) OnDelete(d interface{}) {
r.recomposePodServiceName()
}
Loading