Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 78 additions & 5 deletions pkg/controller/network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ func (c *Controller) handleUpdateNp(key string) error {
}
logRate := parseACLLogRate(np.Annotations)

providers := parsePolicyFor(np)

npName := np.Name
nameArray := []rune(np.Name)
if !unicode.IsLetter(nameArray[0]) {
Expand All @@ -142,7 +144,7 @@ func (c *Controller) handleUpdateNp(key string) error {
}

namedPortMap := c.namedPort.GetNamedPortByNs(np.Namespace)
ports, subnetNames, err := c.fetchSelectedPorts(np.Namespace, &np.Spec.PodSelector)
ports, subnetNames, err := c.fetchSelectedPorts(np.Namespace, &np.Spec.PodSelector, providers)
if err != nil {
klog.Errorf("fetch ports belongs to np %s: %v", key, err)
return err
Expand Down Expand Up @@ -212,7 +214,7 @@ func (c *Controller) handleUpdateNp(key string) error {
} else {
var allow, except []string
for _, npp := range npr.From {
if allow, except, err = c.fetchPolicySelectedAddresses(np.Namespace, protocol, npp); err != nil {
if allow, except, err = c.fetchPolicySelectedAddresses(np.Namespace, protocol, npp, providers); err != nil {
klog.Errorf("failed to fetch policy selected addresses, %v", err)
return err
}
Expand Down Expand Up @@ -359,7 +361,7 @@ func (c *Controller) handleUpdateNp(key string) error {
} else {
var allow, except []string
for _, npp := range npr.To {
if allow, except, err = c.fetchPolicySelectedAddresses(np.Namespace, protocol, npp); err != nil {
if allow, except, err = c.fetchPolicySelectedAddresses(np.Namespace, protocol, npp, providers); err != nil {
klog.Errorf("failed to fetch policy selected addresses, %v", err)
return err
}
Expand Down Expand Up @@ -531,7 +533,53 @@ func (c *Controller) handleDeleteNp(key string) error {
return nil
}

func (c *Controller) fetchSelectedPorts(namespace string, selector *metav1.LabelSelector) ([]string, []string, error) {
func parsePolicyFor(np *netv1.NetworkPolicy) set.Set[string] {
raw := strings.TrimSpace(np.Annotations[util.NetworkPolicyForAnnotation])
if raw == "" {
return nil
}

providers := set.New[string]()
invalidMsg := `ignore invalid network_policy_for annotation %q for netpol %s/%s, expect "ovn" or "<namespace>/<net-attach-def>"`

for token := range strings.SplitSeq(raw, ",") {
t := strings.TrimSpace(token)
if t == "" {
continue
}

if strings.EqualFold(t, "ovn") {
providers.Insert(util.OvnProvider)
continue
}
if strings.Contains(t, "/") {
parts := strings.SplitN(t, "/", 2)
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
klog.Warningf(invalidMsg, t, np.Namespace, np.Name)
continue
}
provider := fmt.Sprintf("%s.%s.%s", parts[1], parts[0], util.OvnProvider)
providers.Insert(provider)
continue
}
klog.Warningf(invalidMsg, t, np.Namespace, np.Name)
}

if providers.Len() == 0 {
klog.Warningf("network_policy_for annotation has no valid entries; policy %s/%s selects no pods", np.Namespace, np.Name)
return providers
}
return providers
}

func netpolAppliesToProvider(provider string, providers set.Set[string]) bool {
if providers == nil {
return true
}
return providers.Has(provider)
}

func (c *Controller) fetchSelectedPorts(namespace string, selector *metav1.LabelSelector, providers set.Set[string]) ([]string, []string, error) {
var subnets []string
sel, err := metav1.LabelSelectorAsSelector(selector)
if err != nil {
Expand All @@ -553,17 +601,25 @@ func (c *Controller) fetchSelectedPorts(namespace string, selector *metav1.Label
return nil, nil, fmt.Errorf("failed to get pod networks, %w", err)
}

matchedProvider := false
for _, podNet := range podNets {
if !isOvnSubnet(podNet.Subnet) {
continue
}
if !netpolAppliesToProvider(podNet.ProviderName, providers) {
continue
}
matchedProvider = true

if pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName)] == "true" {
ports = append(ports, ovs.PodNameToPortName(podName, pod.Namespace, podNet.ProviderName))
// Pod selected by networkpolicy has its own subnet which is not the default subnet
subnets = append(subnets, podNet.Subnet.Name)
}
}
if providers != nil && !matchedProvider {
klog.V(4).Infof("skip pod %s/%s: no network attachment matches network_policy_for", pod.Namespace, pod.Name)
}
}
subnets = slices.Compact(subnets)
return ports, subnets, nil
Expand All @@ -587,7 +643,7 @@ func hasEgressRule(np *netv1.NetworkPolicy) bool {
return np.Spec.Egress != nil
}

func (c *Controller) fetchPolicySelectedAddresses(namespace, protocol string, npp netv1.NetworkPolicyPeer) ([]string, []string, error) {
func (c *Controller) fetchPolicySelectedAddresses(namespace, protocol string, npp netv1.NetworkPolicyPeer, providers set.Set[string]) ([]string, []string, error) {
selectedAddresses := []string{}
exceptAddresses := []string{}

Expand Down Expand Up @@ -643,7 +699,14 @@ func (c *Controller) fetchPolicySelectedAddresses(namespace, protocol string, np
klog.Errorf("failed to get pod nets %v", err)
return nil, nil, err
}
matchedProvider := false
for _, podNet := range podNets {
provider := podNet.ProviderName
if !netpolAppliesToProvider(provider, providers) {
continue
}
matchedProvider = true

podIPAnnotation := pod.Annotations[fmt.Sprintf(util.IPAddressAnnotationTemplate, podNet.ProviderName)]
podIPs := strings.SplitSeq(podIPAnnotation, ",")
for podIP := range podIPs {
Expand All @@ -654,18 +717,28 @@ func (c *Controller) fetchPolicySelectedAddresses(namespace, protocol string, np
if len(svcs) == 0 {
continue
}
if !shouldIncludeServiceIPs(podNet) {
continue
}

svcIPs, err := svcMatchPods(svcs, pod, protocol)
if err != nil {
return nil, nil, err
}
selectedAddresses = append(selectedAddresses, svcIPs...)
}
if providers != nil && !matchedProvider {
klog.V(4).Infof("skip pod %s/%s: no network attachment matches network_policy_for", pod.Namespace, pod.Name)
}
}
}
return selectedAddresses, exceptAddresses, nil
}

func shouldIncludeServiceIPs(podNet *kubeovnNet) bool {
return podNet != nil && podNet.Subnet != nil && podNet.Subnet.Spec.Vpc == util.DefaultVpc
}

func svcMatchPods(svcs []*corev1.Service, pod *corev1.Pod, protocol string) ([]string, error) {
matchSvcs := []string{}
// find svc ip by pod's info
Expand Down
122 changes: 122 additions & 0 deletions pkg/controller/network_policy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package controller

import (
"testing"

netv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/utils/set"

"github.com/stretchr/testify/require"

"github.com/kubeovn/kube-ovn/pkg/util"
)

func TestParsePolicyFor(t *testing.T) {
t.Parallel()

tests := []struct {
name string
annotation *string
wantProviders set.Set[string]
}{
{
name: "annotation omitted",
annotation: nil,
wantProviders: nil,
},
{
name: "ovn only",
annotation: ptrString("ovn"),
wantProviders: set.New(
util.OvnProvider,
),
},
{
name: "duplicate ovn",
annotation: ptrString("ovn, ovn"),
wantProviders: set.New(
util.OvnProvider,
),
},
{
name: "secondary only",
annotation: ptrString("ns1/net1"),
wantProviders: set.New(
"net1.ns1." + util.OvnProvider,
),
},
{
name: "ovn and secondary",
annotation: ptrString(" ovn , ns1/net1 "),
wantProviders: set.New(
util.OvnProvider,
"net1.ns1."+util.OvnProvider,
),
},
{
name: "ovn and invalid",
annotation: ptrString("ovn, foo"),
wantProviders: set.New(
util.OvnProvider,
),
},
{
name: "invalid all",
annotation: ptrString("all"),
wantProviders: set.New[string](),
},
{
name: "invalid default",
annotation: ptrString("default"),
wantProviders: set.New[string](),
},
{
name: "invalid no entries",
annotation: ptrString(","),
wantProviders: set.New[string](),
},
{
name: "invalid token",
annotation: ptrString("foo"),
wantProviders: set.New[string](),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
np := &netv1.NetworkPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: "np",
Namespace: "default",
},
}
if tt.annotation != nil {
np.Annotations = map[string]string{
util.NetworkPolicyForAnnotation: *tt.annotation,
}
}

providers := parsePolicyFor(np)
if tt.wantProviders == nil {
require.Nil(t, providers)
return
}
require.Equal(t, tt.wantProviders, providers)
})
}
}

func TestNetpolAppliesToProvider(t *testing.T) {
t.Parallel()
providers := set.New("ovn", "net1.ns1.ovn")
require.True(t, netpolAppliesToProvider("ovn", providers))
require.False(t, netpolAppliesToProvider("net2.ns2.ovn", providers))
require.True(t, netpolAppliesToProvider("ovn", nil))
require.False(t, netpolAppliesToProvider("ovn", set.New[string]()))
}

func ptrString(s string) *string {
return &s
}
1 change: 1 addition & 0 deletions pkg/util/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ const (
NodeNameLabel = "ovn.kubernetes.io/node-name"
NetworkPolicyLogAnnotation = "ovn.kubernetes.io/enable_log"
NetworkPolicyEnforcementAnnotation = "ovn.kubernetes.io/network_policy_enforcement"
NetworkPolicyForAnnotation = "ovn.kubernetes.io/network_policy_for"
ACLActionsLogAnnotation = "ovn.kubernetes.io/log_acl_actions"
ACLLogMeterAnnotation = "ovn.kubernetes.io/acl_log_meter_rate"

Expand Down
Loading