Skip to content

Commit

Permalink
Reconnect to the etcd in the startup phase
Browse files Browse the repository at this point in the history
Signed-off-by: Gao Hongtao <hanahmily@gmail.com>
  • Loading branch information
hanahmily committed Sep 19, 2024
1 parent 0e734c4 commit d57d1dc
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Release Notes.
- Fix several "sync.Pool" leak issues by adding a tracker to the pool.
- Fix panic when removing a expired segment.
- Fix panic when reading a disorder block of measure. This block's versions are not sorted in descending order.
- Fix the bug that the etcd client doesn't reconnect when facing the context timeout in the startup phase.

### Documentation

Expand Down
76 changes: 62 additions & 14 deletions banyand/metadata/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package metadata

import (
"context"
"os"
"os/signal"
"syscall"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -68,6 +71,7 @@ type clientService struct {
etcdTLSCertFile string
etcdTLSKeyFile string
endpoints []string
registryTimeout time.Duration
forceRegisterNode bool
}

Expand All @@ -84,6 +88,7 @@ func (s *clientService) FlagSet() *run.FlagSet {
fs.StringVar(&s.etcdTLSCAFile, flagEtcdTLSCAFile, "", "Trusted certificate authority")
fs.StringVar(&s.etcdTLSCertFile, flagEtcdTLSCertFile, "", "Etcd client certificate")
fs.StringVar(&s.etcdTLSKeyFile, flagEtcdTLSKeyFile, "", "Private key for the etcd client certificate.")
fs.DurationVar(&s.registryTimeout, "node-registry-timeout", 2*time.Minute, "The timeout for the node registry")
return fs
}

Expand All @@ -95,17 +100,50 @@ func (s *clientService) Validate() error {
}

func (s *clientService) PreRun(ctx context.Context) error {
var err error
s.schemaRegistry, err = schema.NewEtcdSchemaRegistry(
schema.Namespace(s.namespace),
schema.ConfigureServerEndpoints(s.endpoints),
schema.ConfigureEtcdUser(s.etcdUsername, s.etcdPassword),
schema.ConfigureEtcdTLSCAFile(s.etcdTLSCAFile),
schema.ConfigureEtcdTLSCertAndKey(s.etcdTLSCertFile, s.etcdTLSKeyFile),
)
if err != nil {
stopCh := make(chan struct{})
sn := make(chan os.Signal, 1)
l := logger.GetLogger(s.Name())
signal.Notify(sn,
syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
go func() {
select {
case si := <-sn:
logger.GetLogger(s.Name()).Info().Msgf("signal received: %s", si)
close(stopCh)
case <-s.closer.CloseNotify():
close(stopCh)
}
}()

for {
var err error
s.schemaRegistry, err = schema.NewEtcdSchemaRegistry(
schema.Namespace(s.namespace),
schema.ConfigureServerEndpoints(s.endpoints),
schema.ConfigureEtcdUser(s.etcdUsername, s.etcdPassword),
schema.ConfigureEtcdTLSCAFile(s.etcdTLSCAFile),
schema.ConfigureEtcdTLSCertAndKey(s.etcdTLSCertFile, s.etcdTLSKeyFile),
)
if errors.Is(err, context.DeadlineExceeded) {
select {
case <-stopCh:
return errors.New("pre-run interrupted")
case <-time.After(s.registryTimeout):
return errors.New("pre-run timeout")
case <-s.closer.CloseNotify():
return errors.New("pre-run interrupted")
default:
l.Warn().Strs("etcd-endpoints", s.endpoints).Msg("the schema registry init timeout, retrying...")
time.Sleep(time.Second)
continue
}
}
if err == nil {
break
}
return err
}

val := ctx.Value(common.ContextNodeKey)
if val == nil {
return errors.New("node id is empty")
Expand All @@ -116,7 +154,6 @@ func (s *clientService) PreRun(ctx context.Context) error {
return errors.New("node roles is empty")
}
nodeRoles := val.([]databasev1.Role)
l := logger.GetLogger(s.Name())
nodeInfo := &databasev1.Node{
Metadata: &commonv1.Metadata{
Name: node.NodeID,
Expand All @@ -126,15 +163,26 @@ func (s *clientService) PreRun(ctx context.Context) error {
Roles: nodeRoles,
CreatedAt: timestamppb.Now(),
}
var cancel context.CancelFunc
for {
ctxRegister, cancel := context.WithTimeout(ctx, time.Second*10)
err = s.schemaRegistry.RegisterNode(ctxRegister, nodeInfo, s.forceRegisterNode)
ctx, cancel = context.WithTimeout(ctx, time.Second*10)
err := s.schemaRegistry.RegisterNode(ctx, nodeInfo, s.forceRegisterNode)
cancel()
if errors.Is(err, schema.ErrGRPCAlreadyExists) {
return errors.Wrapf(err, "node[%s] already exists in etcd", node.NodeID)
} else if errors.Is(err, context.DeadlineExceeded) {
l.Warn().Strs("etcd-endpoints", s.endpoints).Msg("register node timeout, retrying...")
continue
select {
case <-stopCh:
return errors.New("register node interrupted")
case <-time.After(s.registryTimeout):
return errors.New("register node timeout")
case <-s.closer.CloseNotify():
return errors.New("register node interrupted")
default:
l.Warn().Strs("etcd-endpoints", s.endpoints).Msg("register node timeout, retrying...")
time.Sleep(time.Second)
continue
}
}
if err == nil {
l.Info().Stringer("info", nodeInfo).Msg("register node successfully")
Expand Down
1 change: 1 addition & 0 deletions scripts/push-release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ cp ${PRODUCT_NAME}-*.tgz.asc skywalking/banyandb/"$VERSION"
cp ${PRODUCT_NAME}-*.tgz.sha512 skywalking/banyandb/"$VERSION"

cd skywalking/banyandb && svn add "$VERSION" && svn commit -m "Draft Apache SkyWalking BanyanDB release $VERSION"
cd "$VERSION"

cat << EOF
=========================================================================
Expand Down

0 comments on commit d57d1dc

Please sign in to comment.