Merge "Identify platform pods based on pod/namespace labels"

This commit is contained in:
Zuul 2024-05-27 17:15:36 +00:00 committed by Gerrit Code Review
commit c5025a6680
2 changed files with 408 additions and 0 deletions

View File

@ -0,0 +1,407 @@
From 5e86e6319f5e6d0f0a56863a141238b9c4721ceb Mon Sep 17 00:00:00 2001
From: Boovan Rajendran <boovan.rajendran@windriver.com>
Date: Mon, 6 May 2024 02:29:04 -0400
Subject: [PATCH] Identify platform pods based on pod or namespace labels
Pods with labeled with 'app.starlingx.io/component=platform'
are identified as 'platform'. These have isolated cpu affinity
cpuset when cpu-manager 'static' policy is configured.
For k8s 1.24.4 to identify the pod as 'platform', keep existing
hardcoded namespace list to support the application that
have not upgraded yet, from old versions, and also to support
the new application, which has pod/namespace labels.
Signed-off-by: Boovan Rajendran <boovan.rajendran@windriver.com>
---
pkg/kubelet/cm/cpumanager/policy_static.go | 81 +++++++++-
.../cm/cpumanager/policy_static_test.go | 147 ++++++++++++++++++
.../cm/cpumanager/topology_hints_test.go | 3 +
3 files changed, 229 insertions(+), 2 deletions(-)
diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go
index 94f18152d5b..286e983ec32 100644
--- a/pkg/kubelet/cm/cpumanager/policy_static.go
+++ b/pkg/kubelet/cm/cpumanager/policy_static.go
@@ -17,11 +17,17 @@ limitations under the License.
package cpumanager
import (
+ "context"
"fmt"
"strconv"
+ k8sclient "k8s.io/client-go/kubernetes"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ restclient "k8s.io/client-go/rest"
v1 "k8s.io/api/core/v1"
+ "k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
+ "k8s.io/kubernetes/cmd/kubeadm/app/constants"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
@@ -41,6 +47,23 @@ const (
ErrorSMTAlignment = "SMTAlignmentError"
)
+// Declared as variables so that they can easily more
+// overridden during testing
+type getPodNamespace func(string) (*v1.Namespace, error)
+type buildFromConfigFlag func(masterUrl string, kubeconfigPath string) (*restclient.Config, error)
+type isKubeInfraFunc func(pod *v1.Pod) bool
+
+var varGetNamespaceObject getPodNamespace
+var varBuildConfigFromFlags buildFromConfigFlag
+var varIsKubeInfra isKubeInfraFunc
+
+func init() {
+ varIsKubeInfra = isKubeInfra
+ varGetNamespaceObject = getPodNamespaceObject
+ varBuildConfigFromFlags = clientcmd.BuildConfigFromFlags
+}
+
+
// SMTAlignmentError represents an error due to SMT alignment
type SMTAlignmentError struct {
RequestedCPUs int
@@ -286,7 +309,7 @@ func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, c
func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
// Process infra pods before guaranteed pods
- if isKubeInfra(pod) {
+ if varIsKubeInfra(pod) {
// Container belongs in reserved pool.
// We don't want to fall through to the p.guaranteedCPUs() clause below so return either nil or error.
if _, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
@@ -451,7 +474,7 @@ func (p *staticPolicy) guaranteedCPUs(pod *v1.Pod, container *v1.Container) int
return 0
}
// Infrastructure pods use reserved CPUs even if they're in the Guaranteed QoS class
- if isKubeInfra(pod) {
+ if varIsKubeInfra(pod) {
return 0
}
// Safe downcast to do for all systems with < 2.1 billion CPUs.
@@ -668,14 +691,68 @@ func (p *staticPolicy) generateCPUTopologyHints(availableCPUs cpuset.CPUSet, reu
return hints
}
+func getPodNamespaceObject(podNamespaceName string) (*v1.Namespace, error) {
+
+ kubeConfigPath := constants.GetKubeletKubeConfigPath()
+ cfg, err := varBuildConfigFromFlags("", kubeConfigPath)
+ if err != nil {
+ klog.Error("Failed to build client config from ", kubeConfigPath, err.Error())
+ return nil, err
+ }
+
+ clientset, err := k8sclient.NewForConfig(cfg)
+ if err != nil {
+ klog.Error("Failed to get clientset for KUBECONFIG ", kubeConfigPath, err.Error())
+ return nil, err
+ }
+
+ namespaceObj, err := clientset.CoreV1().Namespaces().Get(context.TODO(), podNamespaceName, metav1.GetOptions{})
+ if err != nil {
+ klog.Error("Error getting namespace object:", err.Error())
+ return nil, err
+ }
+
+ return namespaceObj, nil
+
+}
+
// check if a given pod is in a platform infrastructure namespace
+// or check if a given pod is labelled as platform pod or is in
+// a namespace labelled as a platform namespace
func isKubeInfra(pod *v1.Pod) bool {
+ podName := pod.GetName()
+ podNamespaceName := pod.GetNamespace()
for _, namespace := range infraNamespaces {
if namespace == pod.Namespace {
+ klog.Infof("Pod %s has %s namespace. Treating as platform pod.", podName , podNamespaceName)
return true
}
}
+
+ klog.InfoS("Checking pod ", podName , " for label 'app.starlingx.io/component=platform'.")
+ podLabels := pod.GetLabels()
+ val, ok := podLabels["app.starlingx.io/component"]
+ if (ok && val == "platform") {
+ klog.InfoS("Pod ", podName, " has 'app.starlingx.io/component=platform' label. Treating as platform pod.")
+ return true
+ }
+
+ klog.V(4).InfoS("Pod ", podName, " does not have 'app.starlingx.io/component=platform' label. Checking its namespace information...")
+ namespaceObj, err := varGetNamespaceObject(podNamespaceName)
+ if err != nil {
+ return false
+ }
+
+ namespaceLabels := namespaceObj.GetLabels()
+ val, ok = namespaceLabels["app.starlingx.io/component"]
+ if ok && val == "platform" {
+ klog.InfoS("For pod: ", podName, ", its Namespace ", podNamespaceName, " has 'app.starlingx.io/component=platform' label. Treating as platform pod.")
+ return true
+ }
+
+ klog.InfoS("Neither pod ", podName, " nor its namespace ", podNamespaceName, " has 'app.starlingx.io/component=platform' label. Not treating as platform pod.")
return false
+
}
// get the isolated CPUs (if any) from the devices associated with a specific container
diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go
index d0308556c6d..95ed4122aeb 100644
--- a/pkg/kubelet/cm/cpumanager/policy_static_test.go
+++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go
@@ -17,10 +17,13 @@ limitations under the License.
package cpumanager
import (
+ "errors"
"fmt"
"reflect"
"testing"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ restclient "k8s.io/client-go/rest"
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
@@ -823,6 +826,7 @@ type staticPolicyTestWithResvList struct {
stAssignments state.ContainerCPUAssignments
stDefaultCPUSet cpuset.CPUSet
pod *v1.Pod
+ isKubeInfraPodfunc isKubeInfraFunc
expErr error
expNewErr error
expCPUAlloc bool
@@ -894,6 +898,14 @@ func TestStaticPolicyStartWithResvList(t *testing.T) {
}
}
+func fakeIsKubeInfraTrue(pod *v1.Pod) bool {
+ return true
+}
+
+func fakeIsKubeInfraFalse(pod *v1.Pod) bool {
+ return false
+}
+
func TestStaticPolicyAddWithResvList(t *testing.T) {
infraPod := makePod("fakePod", "fakeContainer2", "200m", "200m")
infraPod.Namespace = "kube-system"
@@ -907,6 +919,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7),
pod: makePod("fakePod", "fakeContainer2", "8000m", "8000m"),
+ isKubeInfraPodfunc: fakeIsKubeInfraFalse,
expErr: fmt.Errorf("not enough cpus available to satisfy request"),
expCPUAlloc: false,
expCSet: cpuset.NewCPUSet(),
@@ -920,6 +933,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(2, 3, 4, 5, 6, 7),
pod: makePod("fakePod", "fakeContainer2", "1000m", "1000m"),
+ isKubeInfraPodfunc: fakeIsKubeInfraFalse,
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(4), // expect sibling of partial core
@@ -937,6 +951,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
},
stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 4, 5),
pod: makePod("fakePod", "fakeContainer3", "2000m", "2000m"),
+ isKubeInfraPodfunc: fakeIsKubeInfraFalse,
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(4, 5),
@@ -954,6 +969,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
},
stDefaultCPUSet: cpuset.NewCPUSet(4, 5),
pod: infraPod,
+ isKubeInfraPodfunc: fakeIsKubeInfraTrue,
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(0, 1),
@@ -971,6 +987,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
},
stDefaultCPUSet: cpuset.NewCPUSet(4, 5),
pod: infraPod,
+ isKubeInfraPodfunc: fakeIsKubeInfraTrue,
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(0),
@@ -987,6 +1004,7 @@ func TestStaticPolicyAddWithResvList(t *testing.T) {
defaultCPUSet: testCase.stDefaultCPUSet,
}
+ varIsKubeInfra = testCase.isKubeInfraPodfunc
container := &testCase.pod.Spec.Containers[0]
err := policy.Allocate(st, testCase.pod, container)
if !reflect.DeepEqual(err, testCase.expErr) {
@@ -1106,3 +1124,132 @@ func TestStaticPolicyOptions(t *testing.T) {
})
}
}
+
+func makePodWithLabels(podLabels map[string]string) *v1.Pod {
+ return &v1.Pod{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-pod",
+ Namespace: "test-namespace",
+ Labels: podLabels,
+ },
+ }
+}
+
+func fakeBuildConfigFromFlags(masterUrl string, kubeconfigPath string) (*restclient.Config, error) {
+
+ return &restclient.Config{}, nil
+}
+
+func fakeBuildConfigFromFlagsError(masterUrl string, kubeconfigPath string) (*restclient.Config, error) {
+
+ errString := fmt.Sprintf("%s file not found", kubeconfigPath)
+ return nil, errors.New(errString)
+
+}
+
+func getFakeInfraPodNamespace(_ string) (*v1.Namespace, error) {
+
+ return &v1.Namespace{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-namespace",
+ Labels: map[string]string{
+ "app.starlingx.io/component": "platform",
+ },
+ }}, nil
+}
+
+func getFakeNonInfraPodNamespace(_ string) (*v1.Namespace, error) {
+
+ return &v1.Namespace{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-namespace",
+ Labels: map[string]string{
+ "fake": "label",
+ }}}, nil
+
+}
+
+type kubeInfraPodTestCase struct {
+ description string
+ pod *v1.Pod
+ namespaceFunc getPodNamespace
+ expectedValue bool
+}
+
+func TestKubeInfraPod(t *testing.T) {
+ testCases := []kubeInfraPodTestCase{
+ {
+ description: "Pod with platform label and namespace with platform label",
+ pod: makePodWithLabels(map[string]string{
+ "app.starlingx.io/component": "platform",
+ }),
+ namespaceFunc: getFakeInfraPodNamespace,
+ expectedValue: true,
+ },
+ {
+ description: "Pod with platform label and namespace without platform label",
+ pod: makePodWithLabels(map[string]string{
+ "app.starlingx.io/component": "platform",
+ }),
+ namespaceFunc: getFakeNonInfraPodNamespace,
+ expectedValue: true,
+
+ },
+ {
+ description: "Pod without platform label and namespace with platform label",
+ pod: makePodWithLabels(map[string]string{
+ "test": "label",
+ }),
+ namespaceFunc: getFakeInfraPodNamespace,
+ expectedValue: true,
+ },
+ {
+ description: "Pod without platform label and namespace without platform label",
+ pod: makePodWithLabels(map[string]string{
+ "test": "namespace",
+ }),
+ namespaceFunc: getFakeNonInfraPodNamespace,
+ expectedValue: false,
+ },
+
+ }
+
+ for _, testCase := range testCases {
+ t.Run(testCase.description, func(t *testing.T) {
+
+ varGetNamespaceObject = testCase.namespaceFunc
+ varBuildConfigFromFlags = fakeBuildConfigFromFlags
+ gotValue := isKubeInfra(testCase.pod)
+
+ if gotValue != testCase.expectedValue {
+ t.Errorf("StaticPolicy isKubeInfraPod() error %v. expected value %v actual value %v",
+ testCase.description, testCase.expectedValue, gotValue)
+ } else {
+ fmt.Printf("StaticPolicy isKubeInfraPod() test successful. : %v ", testCase.description)
+ }
+
+ })
+ }
+
+ test := kubeInfraPodTestCase{
+ description: "Failure reading kubeconfig file",
+ pod: makePodWithLabels(map[string]string{
+ "test": "namespace",
+ }),
+ namespaceFunc: getFakeNonInfraPodNamespace,
+ expectedValue: false,
+ }
+
+ varGetNamespaceObject = getPodNamespaceObject
+ varBuildConfigFromFlags = fakeBuildConfigFromFlagsError
+
+ gotValue := isKubeInfra(test.pod)
+
+ if gotValue != test.expectedValue {
+ t.Errorf("StaticPolicy isKubeInfraPod() error %v. expected value %v actual value %v",
+ test.description, test.expectedValue, gotValue)
+ } else {
+ fmt.Printf("StaticPolicy isKubeInfraPod() test successful. : %v ", test.description)
+ }
+
+}
diff --git a/pkg/kubelet/cm/cpumanager/topology_hints_test.go b/pkg/kubelet/cm/cpumanager/topology_hints_test.go
index 9b8abe77488..b5a0ca41ab1 100644
--- a/pkg/kubelet/cm/cpumanager/topology_hints_test.go
+++ b/pkg/kubelet/cm/cpumanager/topology_hints_test.go
@@ -142,6 +142,7 @@ func TestPodGuaranteedCPUs(t *testing.T) {
expectedCPU: 6,
},
}
+ varIsKubeInfra = fakeIsKubeInfraFalse
for _, tc := range tcases {
requestedCPU := p.podGuaranteedCPUs(tc.pod)
@@ -184,6 +185,7 @@ func TestGetTopologyHints(t *testing.T) {
sourcesReady: &sourcesReadyStub{},
}
+ varIsKubeInfra = fakeIsKubeInfraFalse
hints := m.GetTopologyHints(&tc.pod, &tc.container)[string(v1.ResourceCPU)]
if len(tc.expectedHints) == 0 && len(hints) == 0 {
continue
@@ -237,6 +239,7 @@ func TestGetPodTopologyHints(t *testing.T) {
sourcesReady: &sourcesReadyStub{},
}
+ varIsKubeInfra = fakeIsKubeInfraFalse
podHints := m.GetPodTopologyHints(&tc.pod)[string(v1.ResourceCPU)]
if len(tc.expectedHints) == 0 && len(podHints) == 0 {
continue
--
2.25.1

View File

@ -14,3 +14,4 @@ kubelet-cpumanager-introduce-concept-of-isolated-CPU.patch
cpumanager-policy-static-test-refactor.patch
kubelet-CFS-quota-throttling-for-non-integer-cpulimit.patch
kubeadm-reduce-UpgradeManifestTimeout.patch
Identify-platform-pods-based-on-pod-or-namespace-labels.patch