Skip to content

Commit

Permalink
DRA: remove AddNode from ClusterSnapshot
Browse files Browse the repository at this point in the history
AddNodeInfo already provides the same functionality, and has to be used
in production code in order to propagate DRA objects correctly.

Uses in production are replaced with Initialize(), which will later
take DRA objects into account. Uses in the test code are replaced with
AddNodeInfo().
  • Loading branch information
towca committed Nov 7, 2024
1 parent b808ee3 commit 87f5557
Show file tree
Hide file tree
Showing 12 changed files with 43 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,12 @@ func TestFilterOutSchedulable(t *testing.T) {
allExpectedScheduledPods = append(allExpectedScheduledPods, tc.expectedScheduledPods...)

for node, pods := range tc.nodesWithPods {
err := clusterSnapshot.AddNode(node)
assert.NoError(t, err)

for _, pod := range pods {
pod.Spec.NodeName = node.Name
err = clusterSnapshot.AddPod(pod, node.Name)
assert.NoError(t, err)

allExpectedScheduledPods = append(allExpectedScheduledPods, pod)
}
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pods...))
assert.NoError(t, err)
}

clusterSnapshot.Fork()
Expand Down
19 changes: 3 additions & 16 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ func (a *Actuator) taintNode(node *apiv1.Node) error {
}

func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterSnapshot, error) {
knownNodes := make(map[string]bool)
snapshot := clustersnapshot.NewBasicClusterSnapshot()
pods, err := a.ctx.AllPodLister().List()
if err != nil {
Expand All @@ -366,22 +365,10 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS
scheduledPods := kube_util.ScheduledPods(pods)
nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff)

for _, node := range nodes {
if err := snapshot.AddNode(node); err != nil {
return nil, err
}

knownNodes[node.Name] = true
}

for _, pod := range nonExpendableScheduledPods {
if knownNodes[pod.Spec.NodeName] {
if err := snapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
return nil, err
}
}
err = snapshot.Initialize(nodes, nonExpendableScheduledPods)
if err != nil {
return nil, err
}

return snapshot, nil
}

Expand Down
18 changes: 2 additions & 16 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,22 +244,8 @@ func (a *StaticAutoscaler) cleanUpIfRequired() {

func (a *StaticAutoscaler) initializeClusterSnapshot(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) caerrors.AutoscalerError {
a.ClusterSnapshot.Clear()

knownNodes := make(map[string]bool)
for _, node := range nodes {
if err := a.ClusterSnapshot.AddNode(node); err != nil {
klog.Errorf("Failed to add node %s to cluster snapshot: %v", node.Name, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
knownNodes[node.Name] = true
}
for _, pod := range scheduledPods {
if knownNodes[pod.Spec.NodeName] {
if err := a.ClusterSnapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
klog.Errorf("Failed to add pod %s scheduled to node %s to cluster snapshot: %v", pod.Name, pod.Spec.NodeName, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
}
if err := a.ClusterSnapshot.Initialize(nodes, scheduledPods); err != nil {
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
return nil
}
Expand Down
6 changes: 4 additions & 2 deletions cluster-autoscaler/estimator/binpacking_estimator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ func TestBinpackingEstimate(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot()
// Add one node in different zone to trigger topology spread constraints
clusterSnapshot.AddNode(makeNode(100, 100, 10, "oldnode", "zone-jupiter"))
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(makeNode(100, 100, 10, "oldnode", "zone-jupiter")))
assert.NoError(t, err)

predicateChecker, err := predicatechecker.NewTestPredicateChecker()
assert.NoError(t, err)
Expand Down Expand Up @@ -268,7 +269,8 @@ func BenchmarkBinpackingEstimate(b *testing.B) {

for i := 0; i < b.N; i++ {
clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot()
clusterSnapshot.AddNode(makeNode(100, 100, 10, "oldnode", "zone-jupiter"))
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(makeNode(100, 100, 10, "oldnode", "zone-jupiter")))
assert.NoError(b, err)

predicateChecker, err := predicatechecker.NewTestPredicateChecker()
assert.NoError(b, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/context"
podinjectionbackoff "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection/backoff"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
Expand Down Expand Up @@ -112,10 +113,8 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
p := NewPodInjectionPodListProcessor(podinjectionbackoff.NewFakePodControllerRegistry())
clusterSnapshot := clustersnapshot.NewDeltaClusterSnapshot()
clusterSnapshot.AddNode(node)
for _, pod := range tc.scheduledPods {
clusterSnapshot.AddPod(pod, node.Name)
}
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.scheduledPods...))
assert.NoError(t, err)
ctx := context.AutoscalingContext{
AutoscalingKubeClients: context.AutoscalingKubeClients{
ListerRegistry: kubernetes.NewListerRegistry(nil, nil, nil, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister),
Expand Down
9 changes: 2 additions & 7 deletions cluster-autoscaler/simulator/clustersnapshot/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,26 +238,21 @@ func (snapshot *BasicClusterSnapshot) Initialize(nodes []*apiv1.Node, scheduledP

knownNodes := make(map[string]bool)
for _, node := range nodes {
if err := snapshot.AddNode(node); err != nil {
if err := snapshot.getInternalData().addNode(node); err != nil {
return err
}
knownNodes[node.Name] = true
}
for _, pod := range scheduledPods {
if knownNodes[pod.Spec.NodeName] {
if err := snapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
if err := snapshot.getInternalData().addPod(pod, pod.Spec.NodeName); err != nil {
return err
}
}
}
return nil
}

// AddNode adds node to the snapshot.
func (snapshot *BasicClusterSnapshot) AddNode(node *apiv1.Node) error {
return snapshot.getInternalData().addNode(node)
}

// RemoveNode removes nodes (and pods scheduled to it) from the snapshot.
func (snapshot *BasicClusterSnapshot) RemoveNode(nodeName string) error {
return snapshot.getInternalData().removeNode(nodeName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ type ClusterSnapshot interface {
// Initialize clears the snapshot and initializes it with real objects from the cluster - Nodes,
// scheduled pods.
Initialize(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) error
// AddNode adds node to the snapshot.
AddNode(node *apiv1.Node) error

// RemoveNode removes nodes (and pods scheduled to it) from the snapshot.
RemoveNode(nodeName string) error
// AddPod adds pod to the snapshot and schedules it to given node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"

apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -67,21 +68,21 @@ func assignPodsToNodes(pods []*apiv1.Pod, nodes []*apiv1.Node) {
}
}

func BenchmarkAddNode(b *testing.B) {
func BenchmarkAddNodeInfo(b *testing.B) {
testCases := []int{1, 10, 100, 1000, 5000, 15000, 100000}

for snapshotName, snapshotFactory := range snapshots {
for _, tc := range testCases {
nodes := createTestNodes(tc)
clusterSnapshot := snapshotFactory()
b.ResetTimer()
b.Run(fmt.Sprintf("%s: AddNode() %d", snapshotName, tc), func(b *testing.B) {
b.Run(fmt.Sprintf("%s: AddNodeInfo() %d", snapshotName, tc), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
clusterSnapshot.Clear()
b.StartTimer()
for _, node := range nodes {
err := clusterSnapshot.AddNode(node)
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
if err != nil {
assert.NoError(b, err)
}
Expand Down Expand Up @@ -171,12 +172,12 @@ func BenchmarkForkAddRevert(b *testing.B) {
b.Run(fmt.Sprintf("%s: ForkAddRevert (%d nodes, %d pods)", snapshotName, ntc, ptc), func(b *testing.B) {
for i := 0; i < b.N; i++ {
clusterSnapshot.Fork()
err = clusterSnapshot.AddNode(tmpNode1)
err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(tmpNode1))
if err != nil {
assert.NoError(b, err)
}
clusterSnapshot.Fork()
err = clusterSnapshot.AddNode(tmpNode2)
err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(tmpNode2))
if err != nil {
assert.NoError(b, err)
}
Expand Down Expand Up @@ -216,7 +217,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) {
}
snapshot.Fork()
for _, node := range nodes[tc.nodeCount:] {
if err := snapshot.AddNode(node); err != nil {
if err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)); err != nil {
assert.NoError(b, err)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ func validTestCases(t *testing.T) []modificationTestCase {

testCases := []modificationTestCase{
{
name: "add node",
name: "add empty nodeInfo",
op: func(snapshot ClusterSnapshot) {
err := snapshot.AddNode(node)
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
assert.NoError(t, err)
},
modifiedState: snapshotState{
Expand Down Expand Up @@ -133,7 +133,7 @@ func validTestCases(t *testing.T) []modificationTestCase {
err := snapshot.RemoveNode(node.Name)
assert.NoError(t, err)

err = snapshot.AddNode(node)
err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
assert.NoError(t, err)
},
modifiedState: snapshotState{
Expand Down Expand Up @@ -199,7 +199,7 @@ func TestForking(t *testing.T) {
tc.op(snapshot)
snapshot.Fork()

snapshot.AddNode(node)
snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))

snapshot.Revert()
snapshot.Revert()
Expand Down Expand Up @@ -243,7 +243,7 @@ func TestForking(t *testing.T) {
snapshot.Fork()
tc.op(snapshot)
snapshot.Fork()
snapshot.AddNode(node)
snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
snapshot.Revert()
err := snapshot.Commit()
assert.NoError(t, err)
Expand Down Expand Up @@ -321,7 +321,7 @@ func TestClear(t *testing.T) {
snapshot.Fork()

for _, node := range extraNodes {
err := snapshot.AddNode(node)
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
assert.NoError(t, err)
}

Expand Down Expand Up @@ -379,7 +379,7 @@ func TestNode404(t *testing.T) {
snapshot := snapshotFactory()

node := BuildTestNode("node", 10, 100)
err := snapshot.AddNode(node)
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
assert.NoError(t, err)

snapshot.Fork()
Expand All @@ -405,7 +405,7 @@ func TestNode404(t *testing.T) {
snapshot := snapshotFactory()

node := BuildTestNode("node", 10, 100)
err := snapshot.AddNode(node)
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
assert.NoError(t, err)

err = snapshot.RemoveNode("node")
Expand All @@ -428,9 +428,6 @@ func TestNodeAlreadyExists(t *testing.T) {
name string
op func(ClusterSnapshot) error
}{
{"add node", func(snapshot ClusterSnapshot) error {
return snapshot.AddNode(node)
}},
{"add nodeInfo", func(snapshot ClusterSnapshot) error {
return snapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pod))
}},
Expand All @@ -442,7 +439,7 @@ func TestNodeAlreadyExists(t *testing.T) {
func(t *testing.T) {
snapshot := snapshotFactory()

err := snapshot.AddNode(node)
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
assert.NoError(t, err)

// Node already in base.
Expand All @@ -454,7 +451,7 @@ func TestNodeAlreadyExists(t *testing.T) {
func(t *testing.T) {
snapshot := snapshotFactory()

err := snapshot.AddNode(node)
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
assert.NoError(t, err)

snapshot.Fork()
Expand All @@ -471,7 +468,7 @@ func TestNodeAlreadyExists(t *testing.T) {

snapshot.Fork()

err := snapshot.AddNode(node)
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
assert.NoError(t, err)

// Node already in fork.
Expand All @@ -484,7 +481,7 @@ func TestNodeAlreadyExists(t *testing.T) {

snapshot.Fork()

err := snapshot.AddNode(node)
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
assert.NoError(t, err)

err = snapshot.Commit()
Expand Down
9 changes: 2 additions & 7 deletions cluster-autoscaler/simulator/clustersnapshot/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,26 +427,21 @@ func (snapshot *DeltaClusterSnapshot) Initialize(nodes []*apiv1.Node, scheduledP

knownNodes := make(map[string]bool)
for _, node := range nodes {
if err := snapshot.AddNode(node); err != nil {
if err := snapshot.data.addNode(node); err != nil {
return err
}
knownNodes[node.Name] = true
}
for _, pod := range scheduledPods {
if knownNodes[pod.Spec.NodeName] {
if err := snapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
if err := snapshot.data.addPod(pod, pod.Spec.NodeName); err != nil {
return err
}
}
}
return nil
}

// AddNode adds node to the snapshot.
func (snapshot *DeltaClusterSnapshot) AddNode(node *apiv1.Node) error {
return snapshot.data.addNode(node)
}

// RemoveNode removes nodes (and pods scheduled to it) from the snapshot.
func (snapshot *DeltaClusterSnapshot) RemoveNode(nodeName string) error {
return snapshot.data.removeNode(nodeName)
Expand Down
3 changes: 2 additions & 1 deletion cluster-autoscaler/simulator/clustersnapshot/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
)

// InitializeClusterSnapshotOrDie clears cluster snapshot and then initializes it with given set of nodes and pods.
Expand All @@ -35,7 +36,7 @@ func InitializeClusterSnapshotOrDie(
snapshot.Clear()

for _, node := range nodes {
err = snapshot.AddNode(node)
err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
assert.NoError(t, err, "error while adding node %s", node.Name)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,9 @@ func TestFitsAnyNode(t *testing.T) {
}

clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot()
err = clusterSnapshot.AddNode(n1000)
err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(n1000))
assert.NoError(t, err)
err = clusterSnapshot.AddNode(n2000)
err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(n2000))
assert.NoError(t, err)

for _, tc := range testCases {
Expand Down Expand Up @@ -286,7 +286,7 @@ func TestDebugInfo(t *testing.T) {

clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot()

err := clusterSnapshot.AddNode(node1)
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1))
assert.NoError(t, err)

// with default predicate checker
Expand Down

0 comments on commit 87f5557

Please sign in to comment.