Skip to content

Commit

Permalink
wait for node to report address before removing taint (#158)
Browse files Browse the repository at this point in the history
* address/assigner: return assigned address

* cmd/assignAddress: return assignedAddress

* cmd/run: wait for address to be reported before removing taint
  • Loading branch information
huwcbjones authored Nov 5, 2024
1 parent ab70f62 commit d7f419c
Show file tree
Hide file tree
Showing 9 changed files with 405 additions and 61 deletions.
80 changes: 71 additions & 9 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func prepareLogger(level string, json bool) *logrus.Entry {
return log
}

func assignAddress(c context.Context, log *logrus.Entry, client kubernetes.Interface, assigner address.Assigner, node *types.Node, cfg *config.Config) error {
func assignAddress(c context.Context, log *logrus.Entry, client kubernetes.Interface, assigner address.Assigner, node *types.Node, cfg *config.Config) (string, error) {
ctx, cancel := context.WithCancel(c)
defer cancel()

Expand All @@ -101,22 +101,23 @@ func assignAddress(c context.Context, log *logrus.Entry, client kubernetes.Inter
"retry-counter": retryCounter,
"retry-attempts": cfg.RetryAttempts,
}).Debug("assigning static public IP address to node")
err := func(ctx context.Context) error {
assignedAddress, err := func(ctx context.Context) (string, error) {
if err := lock.Lock(ctx); err != nil {
return errors.Wrap(err, "failed to acquire lock")
return "", errors.Wrap(err, "failed to acquire lock")
}
log.Debug("lock acquired")
defer func() {
lock.Unlock(ctx) //nolint:errcheck
log.Debug("lock released")
}()
if err := assigner.Assign(ctx, node.Instance, node.Zone, cfg.Filter, cfg.OrderBy); err != nil {
return err //nolint:wrapcheck
assignedAddress, err := assigner.Assign(ctx, node.Instance, node.Zone, cfg.Filter, cfg.OrderBy)
if err != nil {
return "", err //nolint:wrapcheck
}
return nil
return assignedAddress, nil
}(c)
if err == nil || errors.Is(err, address.ErrStaticIPAlreadyAssigned) {
return nil
return assignedAddress, nil
}

log.WithError(err).WithFields(logrus.Fields{
Expand All @@ -130,7 +131,64 @@ func assignAddress(c context.Context, log *logrus.Entry, client kubernetes.Inter
continue
case <-ctx.Done():
// If the context is done, return an error indicating that the operation was cancelled
return errors.Wrap(ctx.Err(), "context cancelled while assigning addresses")
return "", errors.Wrap(ctx.Err(), "context cancelled while assigning addresses")
}
}
return "", errors.New("reached maximum number of retries")
}

func waitForAddressToBeReported(c context.Context, log *logrus.Entry, explorer nd.Explorer, node *types.Node, assignedAddress string, cfg *config.Config) error {
ctx, cancel := context.WithCancel(c)
defer cancel()

// ticker for retry interval
ticker := time.NewTicker(cfg.RetryInterval)
defer ticker.Stop()

for retryCounter := 0; retryCounter <= cfg.RetryAttempts; retryCounter++ {
log.WithFields(logrus.Fields{
"node": node.Name,
"instance": node.Instance,
"address": assignedAddress,
"retry-counter": retryCounter,
"retry-attempts": cfg.RetryAttempts,
}).Debug("Waiting for node to report assigned address")

nodeInfo, err := explorer.GetNode(ctx, node.Name)
if err == nil {
for _, ip := range nodeInfo.ExternalIPs {
if ip.String() == assignedAddress {
log.WithFields(logrus.Fields{
"node": node.Name,
"instance": node.Instance,
"address": assignedAddress,
"retry-counter": retryCounter,
"retry-attempts": cfg.RetryAttempts,
}).Info("Node is reporting assigned address")
return nil
}
}
log.WithError(err).WithFields(logrus.Fields{
"node": node.Name,
"instance": node.Instance,
"address": assignedAddress,
}).Warn("Node is not yet reporting the assigned address")
} else {
log.WithError(err).WithFields(logrus.Fields{
"node": node.Name,
"instance": node.Instance,
"address": assignedAddress,
}).Error("failed to check if node is reporting the assigned address")
}

log.Infof("retrying after %v", cfg.RetryInterval)

select {
case <-ticker.C:
continue
case <-ctx.Done():
// If the context is done, return an error indicating that the operation was cancelled
return errors.Wrap(ctx.Err(), "context cancelled while waiting for node to report assigned address")
}
}
return errors.New("reached maximum number of retries")
Expand Down Expand Up @@ -169,12 +227,16 @@ func run(c context.Context, log *logrus.Entry, cfg *config.Config) error {
return errors.Wrap(err, "initializing assigner")
}

err = assignAddress(ctx, log, clientset, assigner, n, cfg)
assignedAddress, err := assignAddress(ctx, log, clientset, assigner, n, cfg)
if err != nil {
return errors.Wrap(err, "assigning static public IP address")
}

if cfg.TaintKey != "" {
if err := waitForAddressToBeReported(ctx, log, explorer, n, assignedAddress, cfg); err != nil {
return errors.Wrap(err, "waiting for node to report assigned address")
}

logger := log.WithField("taint-key", cfg.TaintKey)
tainter := nd.NewTainter(clientset)

Expand Down
Loading

0 comments on commit d7f419c

Please sign in to comment.