Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Garbage collect NEGs in to-be-deleted state. #2750

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/backends/ig_linker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (

const (
defaultTestZone = "zone-a"
defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/proj/regions/us-central1/subnetworks/default"
defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/mock-project/regions/test-region/subnetworks/default"
)

func newTestIGLinker(fakeGCE *gce.Cloud, fakeInstancePool instancegroups.Manager) *instanceGroupLinker {
Expand All @@ -59,7 +59,7 @@ func TestLink(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())

nodeInformer := zonegetter.FakeNodeInformer()
fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false)
fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false)
zonegetter.AddFakeNodes(fakeZoneGetter, defaultTestZone, "test-instance")

fakeNodePool := instancegroups.NewManager(&instancegroups.ManagerConfig{
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestLinkWithCreationModeError(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())

nodeInformer := zonegetter.FakeNodeInformer()
fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false)
fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false)
zonegetter.AddFakeNodes(fakeZoneGetter, defaultTestZone, "test-instance")

fakeNodePool := instancegroups.NewManager(&instancegroups.ManagerConfig{
Expand Down
2 changes: 1 addition & 1 deletion pkg/backends/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func newTestJig(fakeGCE *gce.Cloud) *Jig {
fakeIGs := instancegroups.NewEmptyFakeInstanceGroups()

nodeInformer := zonegetter.FakeNodeInformer()
fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false)
fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false)
zonegetter.AddFakeNodes(fakeZoneGetter, defaultTestZone, "test-instance")

fakeInstancePool := instancegroups.NewManager(&instancegroups.ManagerConfig{
Expand Down
2 changes: 1 addition & 1 deletion pkg/backends/regional_ig_linker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func newTestRegionalIgLinker(fakeGCE *gce.Cloud, backendPool *Backends, l4Namer
fakeIGs := instancegroups.NewEmptyFakeInstanceGroups()

nodeInformer := zonegetter.FakeNodeInformer()
fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false)
fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false)
zonegetter.AddFakeNodes(fakeZoneGetter, usCentral1AZone, "test-instance1")
zonegetter.AddFakeNodes(fakeZoneGetter, "us-central1-c", "test-instance2")

Expand Down
6 changes: 1 addition & 5 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func NewControllerContext(
logger,
)
// The subnet specified in gce.conf is considered as the default subnet.
context.ZoneGetter = zonegetter.NewZoneGetter(context.NodeInformer, context.Cloud.SubnetworkURL())
context.ZoneGetter = zonegetter.NewZoneGetter(context.NodeInformer, context.NodeTopologyInformer, context.Cloud.SubnetworkURL())
context.InstancePool = instancegroups.NewManager(&instancegroups.ManagerConfig{
Cloud: context.Cloud,
Namer: context.ClusterNamer,
Expand Down Expand Up @@ -328,10 +328,6 @@ func (ctx *ControllerContext) HasSynced() bool {
funcs = append(funcs, ctx.FirewallInformer.HasSynced)
}

if ctx.NodeTopologyInformer != nil {
funcs = append(funcs, ctx.NodeTopologyInformer.HasSynced)
}

for _, f := range funcs {
if !f() {
return false
Expand Down
8 changes: 5 additions & 3 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import (
"k8s.io/ingress-gce/pkg/utils/zonegetter"
)

const defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/proj/regions/us-central1/subnetworks/default"
const defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/mock-project/regions/test-region/subnetworks/default"

var (
nodePortCounter = 30000
Expand All @@ -71,9 +71,11 @@ func newLoadBalancerController() *LoadBalancerController {
kubeClient := fake.NewSimpleClientset()
backendConfigClient := backendconfigclient.NewSimpleClientset()
svcNegClient := svcnegclient.NewSimpleClientset()
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
vals := gce.DefaultTestClusterValues()
vals.SubnetworkURL = defaultTestSubnetURL
fakeGCE := gce.NewFakeGCECloud(vals)
nodeInformer := zonegetter.FakeNodeInformer()
fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false)
fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false)
zonegetter.AddFakeNodes(fakeZoneGetter, fakeZone, "test-node")

(fakeGCE.Compute().(*cloud.MockGCE)).MockGlobalForwardingRules.InsertHook = loadbalancers.InsertGlobalForwardingRuleHook
Expand Down
2 changes: 1 addition & 1 deletion pkg/instancegroups/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestSync(t *testing.T) {
config.HasSynced = func() bool {
return true
}
config.ZoneGetter = zonegetter.NewFakeZoneGetter(informer, defaultTestSubnetURL, false)
config.ZoneGetter = zonegetter.NewFakeZoneGetter(informer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false)

controller := NewController(config, logr.Logger{})

Expand Down
11 changes: 6 additions & 5 deletions pkg/instancegroups/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ package instancegroups

import (
"fmt"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/ingress-gce/pkg/utils"
"net/http"
"strings"
"testing"

apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/ingress-gce/pkg/utils"

"google.golang.org/api/googleapi"
"k8s.io/klog/v2"

Expand All @@ -42,14 +43,14 @@ const (
testZoneC = "dark-moon1-c"
basePath = "/basepath/projects/project-id/"

defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/proj/regions/us-central1/subnetworks/default"
defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/mock-project/regions/test-region/subnetworks/default"
)

var defaultNamer = namer.NewNamer("uid1", "fw1", klog.TODO())

func newNodePool(f Provider, maxIGSize int) Manager {
nodeInformer := zonegetter.FakeNodeInformer()
fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false)
fakeZoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false)

pool := NewManager(&ManagerConfig{
Cloud: f,
Expand Down
2 changes: 2 additions & 0 deletions pkg/l4lb/l4controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"k8s.io/ingress-gce/pkg/test"
"k8s.io/ingress-gce/pkg/utils/common"
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/ingress-gce/pkg/utils/zonegetter"
)

const (
Expand Down Expand Up @@ -852,6 +853,7 @@ func newServiceController(t *testing.T, fakeGCE *gce.Cloud) *L4Controller {
NumL4Workers: 5,
}
ctx := context.NewControllerContext(kubeClient, nil, nil, nil, svcNegClient, nil, nil, nil, kubeClient /*kube client to be used for events*/, fakeGCE, namer, "" /*kubeSystemUID*/, ctxConfig, klog.TODO())
ctx.ZoneGetter = zonegetter.NewFakeZoneGetter(ctx.NodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false)
// Add some nodes so that NEG linker kicks in during ILB creation.
nodes, err := test.CreateAndInsertNodes(ctx.Cloud, []string{"instance-1"}, vals.ZoneName)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/l4lb/l4netlbcontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ const (
shortSessionAffinityIdleTimeout = int32(20) // 20 sec could be used for regular Session Affinity
longSessionAffinityIdleTimeout = int32(2 * 60) // 2 min or 120 sec for Strong Session Affinity

defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/proj/regions/us-central1/subnetworks/default"
defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/mock-project/regions/test-region/subnetworks/default"
)

var (
Expand Down
7 changes: 4 additions & 3 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import (
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/endpointslices"
namer2 "k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/ingress-gce/pkg/utils/patch"
"k8s.io/ingress-gce/pkg/utils/zonegetter"
"k8s.io/klog/v2"
Expand All @@ -68,7 +68,7 @@ type Controller struct {
gcPeriod time.Duration
recorder record.EventRecorder
namer negtypes.NetworkEndpointGroupNamer
l4Namer namer2.L4ResourcesNamer
l4Namer namer.L4ResourcesNamer
zoneGetter *zonegetter.ZoneGetter
networkResolver network.Resolver

Expand Down Expand Up @@ -135,7 +135,7 @@ func NewController(
gkeNetworkParamSetInformer cache.SharedIndexInformer,
nodeTopologyInformer cache.SharedIndexInformer,
hasSynced func() bool,
l4Namer namer2.L4ResourcesNamer,
l4Namer namer.L4ResourcesNamer,
defaultBackendService utils.ServicePort,
cloud negtypes.NetworkEndpointGroupCloud,
zoneGetter *zonegetter.ZoneGetter,
Expand Down Expand Up @@ -182,6 +182,7 @@ func NewController(
syncerMetrics := syncMetrics.NewNegMetricsCollector(flags.F.NegMetricsExportInterval, logger)
manager := newSyncerManager(
namer,
l4Namer,
recorder,
cloud,
zoneGetter,
Expand Down
7 changes: 4 additions & 3 deletions pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func newTestControllerWithParamsAndContext(kubeClient kubernetes.Interface, test
}
nodeInformer := zonegetter.FakeNodeInformer()
zonegetter.PopulateFakeNodeInformer(nodeInformer, false)
zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false)
zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false)

return NewController(
kubeClient,
Expand Down Expand Up @@ -1756,7 +1756,7 @@ func validateServiceAnnotationWithPortInfoMap(t *testing.T, svc *apiv1.Service,

nodeInformer := zonegetter.FakeNodeInformer()
zonegetter.PopulateFakeNodeInformer(nodeInformer, false)
zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false)
zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false)
zones, _ := zoneGetter.ListZones(negtypes.NodeFilterForEndpointCalculatorMode(portInfoMap.EndpointsCalculatorMode()), klog.TODO())
if !sets.NewString(expectZones...).Equal(sets.NewString(zones...)) {
t.Errorf("Unexpected zones listed by the predicate function, got %v, want %v", zones, expectZones)
Expand Down Expand Up @@ -1837,7 +1837,7 @@ func validateServiceStateAnnotationExceptNames(t *testing.T, svc *apiv1.Service,
}
nodeInformer := zonegetter.FakeNodeInformer()
zonegetter.PopulateFakeNodeInformer(nodeInformer, false)
zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, defaultTestSubnetURL, false)
zoneGetter := zonegetter.NewFakeZoneGetter(nodeInformer, zonegetter.FakeNodeTopologyInformer(), defaultTestSubnetURL, false)
// This routine is called from tests verifying L7 NEGs.
zones, _ := zoneGetter.ListZones(negtypes.NodeFilterForEndpointCalculatorMode(negtypes.L7Mode), klog.TODO())

Expand Down Expand Up @@ -2115,6 +2115,7 @@ func newTestNode(name string, unschedulable bool) *apiv1.Node {
Name: name,
},
Spec: apiv1.NodeSpec{
PodCIDR: "10.100.1.0/24",
Unschedulable: unschedulable,
},
}
Expand Down
66 changes: 55 additions & 11 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/common"
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/ingress-gce/pkg/utils/patch"
"k8s.io/ingress-gce/pkg/utils/zonegetter"
"k8s.io/klog/v2"
Expand All @@ -64,6 +65,7 @@ func (k serviceKey) Key() string {
// syncerManager contains all the active syncer goroutines and manage their lifecycle.
type syncerManager struct {
namer negtypes.NetworkEndpointGroupNamer
l4Namer namer.L4ResourcesNamer
recorder record.EventRecorder
cloud negtypes.NetworkEndpointGroupCloud
zoneGetter *zonegetter.ZoneGetter
Expand Down Expand Up @@ -118,6 +120,7 @@ type syncerManager struct {
}

func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer,
l4Namer namer.L4ResourcesNamer,
recorder record.EventRecorder,
cloud negtypes.NetworkEndpointGroupCloud,
zoneGetter *zonegetter.ZoneGetter,
Expand All @@ -140,6 +143,7 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer,

return &syncerManager{
namer: namer,
l4Namer: l4Namer,
recorder: recorder,
cloud: cloud,
zoneGetter: zoneGetter,
Expand Down Expand Up @@ -248,6 +252,8 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg
manager.lpConfig,
manager.enableDualStackNEG,
portInfo.NetworkInfo,
manager.namer,
manager.l4Namer,
)
manager.syncerMap[syncerKey] = syncer
}
Expand Down Expand Up @@ -538,16 +544,21 @@ func (manager *syncerManager) garbageCollectNEG() error {
return nil
}

type negDeletionCandidate struct {
svcNegCR *negv1beta1.ServiceNetworkEndpointGroup
isPartialDelete bool
}

// garbageCollectNEGWithCRD uses the NEG CRs and the svcPortMap to determine which NEGs
// need to be garbage collected. Neg CRs that do not have a configuration in the svcPortMap will deleted
// along with all corresponding NEGs in the CR's list of NetworkEndpointGroups. If NEG deletion fails in
// the cloud, the corresponding Neg CR will not be deleted
func (manager *syncerManager) garbageCollectNEGWithCRD() error {
deletionCandidates := map[string]*negv1beta1.ServiceNetworkEndpointGroup{}
deletionCandidates := make(map[string]negDeletionCandidate)
negCRs := manager.svcNegLister.List()
for _, obj := range negCRs {
neg := obj.(*negv1beta1.ServiceNetworkEndpointGroup)
deletionCandidates[neg.Name] = neg
deletionCandidates[neg.Name] = negDeletionCandidate{svcNegCR: neg, isPartialDelete: false}
}

func() {
Expand All @@ -562,6 +573,13 @@ func (manager *syncerManager) garbageCollectNEGWithCRD() error {
// neither the neg nor CR will not be deleted. In the situation a neg config is not in the svcPortMap,
// but the CR does not have a deletion timestamp, both CR and neg will be deleted.
if _, ok := deletionCandidates[portInfo.NegName]; ok {
// When the service still exists, we won't do GC unless it contains
// to-be-deleted NEGs.
svcNegCR := deletionCandidates[portInfo.NegName].svcNegCR
if flags.F.EnableMultiSubnetClusterPhase1 && containsTBDNeg(svcNegCR) {
deletionCandidates[portInfo.NegName] = negDeletionCandidate{svcNegCR: svcNegCR, isPartialDelete: true}
continue
}
delete(deletionCandidates, portInfo.NegName)
}
}
Expand All @@ -585,7 +603,7 @@ func (manager *syncerManager) garbageCollectNEGWithCRD() error {
errList = append(errList, fmt.Errorf("failed to get zones during garbage collection: %w", err))
}

deletionCandidatesChan := make(chan *negv1beta1.ServiceNetworkEndpointGroup, len(deletionCandidates))
deletionCandidatesChan := make(chan negDeletionCandidate, len(deletionCandidates))
for _, dc := range deletionCandidates {
deletionCandidatesChan <- dc
}
Expand All @@ -595,8 +613,8 @@ func (manager *syncerManager) garbageCollectNEGWithCRD() error {
wg.Add(len(deletionCandidates))
for i := 0; i < manager.numGCWorkers; i++ {
go func() {
for svcNegCR := range deletionCandidatesChan {
errs := manager.processNEGDeletionCandidate(svcNegCR, zones)
for negToDelete := range deletionCandidatesChan {
errs := manager.processNEGDeletionCandidate(negToDelete, zones)

errListMutex.Lock()
errList = append(errList, errs...)
Expand All @@ -611,11 +629,23 @@ func (manager *syncerManager) garbageCollectNEGWithCRD() error {
return utilerrors.NewAggregate(errList)
}

// containsTBDNeg returns if the given NEG CR contains NEG refs in
// to-be-deleted state.
func containsTBDNeg(svcNegCR *negv1beta1.ServiceNetworkEndpointGroup) bool {
contains := false
for _, negRef := range svcNegCR.Status.NetworkEndpointGroups {
contains = contains || negRef.State == negv1beta1.ToBeDeletedState
}
return contains
}

// processNEGDeletionCandidate attempts to delete `svcNegCR` and all NEGs
// associated with it. In case when `svcNegCR` does not have ample information
// about the zones associated with this NEG, it will attempt to delete the NEG
// from all zones specified through the `zones` slice.
func (manager *syncerManager) processNEGDeletionCandidate(svcNegCR *negv1beta1.ServiceNetworkEndpointGroup, zones []string) []error {
func (manager *syncerManager) processNEGDeletionCandidate(negToDelete negDeletionCandidate, zones []string) []error {
svcNegCR := negToDelete.svcNegCR

manager.logger.V(2).Info("Count of NEGs referenced by SvcNegCR", "svcneg", klog.KObj(svcNegCR), "count", len(svcNegCR.Status.NetworkEndpointGroups))
var errList []error
shouldDeleteNegCR := true
Expand All @@ -625,6 +655,11 @@ func (manager *syncerManager) processNEGDeletionCandidate(svcNegCR *negv1beta1.S
deletedNegs := make(map[negtypes.NegInfo]struct{})

for _, negRef := range svcNegCR.Status.NetworkEndpointGroups {
if flags.F.EnableMultiSubnetClusterPhase1 {
if negToDelete.isPartialDelete && negRef.State != negv1beta1.ToBeDeletedState {
continue
}
}
resourceID, err := cloud.ParseResourceURL(negRef.SelfLink)
if err != nil {
errList = append(errList, fmt.Errorf("failed to parse selflink for neg cr %s/%s: %s", svcNegCR.Namespace, svcNegCR.Name, err))
Expand All @@ -638,14 +673,23 @@ func (manager *syncerManager) processNEGDeletionCandidate(svcNegCR *negv1beta1.S
shouldDeleteNegCR = shouldDeleteNegCR && negDeleted
}

// If there is no NEG ref in the NEG CR, our best attempt is to get all existing subnets
// and delete NEGs in these subnets in a full service deletion.
// We will skip in the case of partial deletion. We cannot reconstruct the name of
// the NEG that needs to be deleted since that subnet is no longer available in
// Node Topology CR.
if deleteByZone {
manager.logger.V(2).Info("Deletion candidate has 0 NEG reference", "svcneg", klog.KObj(svcNegCR), "svcNegCR", svcNegCR)
for _, zone := range zones {
negDeleted := manager.deleteNegOrReportErr(svcNegCR.Name, zone, svcNegCR, &errList)
if negDeleted {
deletedNegs[negtypes.NegInfo{Name: svcNegCR.Name, Zone: zone}] = struct{}{}
// When EnableMultiSubnetClusterPhase1=false, we should always process deletion.
// Otherwise, we should only process full deletion.
if !flags.F.EnableMultiSubnetClusterPhase1 || !negToDelete.isPartialDelete {
for _, zone := range zones {
negDeleted := manager.deleteNegOrReportErr(svcNegCR.Name, zone, svcNegCR, &errList)
if negDeleted {
deletedNegs[negtypes.NegInfo{Name: svcNegCR.Name, Zone: zone}] = struct{}{}
}
shouldDeleteNegCR = shouldDeleteNegCR && negDeleted
}
shouldDeleteNegCR = shouldDeleteNegCR && negDeleted
}
}
// Since no more NEG deletion will be happening at this point, and NEG
Expand Down
Loading