Skip to content

Commit

Permalink
Implement node role registration (#314)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanahmily authored Aug 16, 2023
1 parent 18e3903 commit eda6bea
Show file tree
Hide file tree
Showing 48 changed files with 868 additions and 587 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Release Notes.
- Document the clustering.
- Support multiple roles for banyand server.
- Support for recovery buffer using wal.
- Register the node role to the metadata registry.

### Bugs

Expand Down
96 changes: 96 additions & 0 deletions api/common/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ package common
import (
"context"
"fmt"
"net"
"strconv"
"strings"

"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/host"
)

type (
Expand Down Expand Up @@ -110,3 +114,95 @@ func NewError(tpl string, args ...any) Error {
func (e Error) Msg() string {
return e.msg
}

// Node contains the node id and address.
type Node struct {
NodeID string
GrpcAddress string
HTTPAddress string
}

var (
// FlagNodeHost is the node id from flag.
FlagNodeHost string
// FlagNodeHostProvider is the node id provider from flag.
FlagNodeHostProvider NodeHostProvider
)

// NodeHostProvider is the provider of node id.
type NodeHostProvider int

// NodeIDProvider constants.
const (
NodeHostProviderHostname NodeHostProvider = iota
NodeHostProviderIP
NodeHostProviderFlag
)

// String returns the string representation of NodeIDProvider.
func (n *NodeHostProvider) String() string {
return [...]string{"Hostname", "IP", "Flag"}[*n]
}

// ParseNodeHostProvider parses the string to NodeIDProvider.
func ParseNodeHostProvider(s string) (NodeHostProvider, error) {
switch strings.ToLower(s) {
case "hostname":
return NodeHostProviderHostname, nil
case "ip":
return NodeHostProviderIP, nil
case "flag":
return NodeHostProviderFlag, nil
default:
return 0, fmt.Errorf("unknown node id provider %s", s)
}
}

// GenerateNode generates a node id.
func GenerateNode(grpcPort, httpPort *uint32) (Node, error) {
port := grpcPort
if port == nil {
port = httpPort
}
if port == nil {
return Node{}, fmt.Errorf("no port found")
}
node := Node{}
var nodeHost string
switch FlagNodeHostProvider {
case NodeHostProviderHostname:
h, err := host.Name()
if err != nil {
return Node{}, err
}
nodeHost = h
case NodeHostProviderIP:
ip, err := host.IP()
if err != nil {
return Node{}, err
}
nodeHost = ip
case NodeHostProviderFlag:
nodeHost = FlagNodeHost
default:
return Node{}, fmt.Errorf("unknown node id provider %d", FlagNodeHostProvider)
}
node.NodeID = net.JoinHostPort(nodeHost, strconv.FormatUint(uint64(*port), 10))
if grpcPort != nil {
node.GrpcAddress = net.JoinHostPort(nodeHost, strconv.FormatUint(uint64(*grpcPort), 10))
}
if httpPort != nil {
node.HTTPAddress = net.JoinHostPort(nodeHost, strconv.FormatUint(uint64(*httpPort), 10))
}
return node, nil
}

// ContextNodeKey is a context key to store the node id.
var ContextNodeKey = contextNodeKey{}

type contextNodeKey struct{}

// ContextNodeRolesKey is a context key to store the node roles.
var ContextNodeRolesKey = contextNodeRolesKey{}

type contextNodeRolesKey struct{}
18 changes: 13 additions & 5 deletions api/proto/banyandb/database/v1/database.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,27 @@ import "google/protobuf/timestamp.proto";
option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1";
option java_package = "org.apache.skywalking.banyandb.database.v1";

enum Role {
ROLE_UNSPECIFIED = 0;
ROLE_META = 1;
ROLE_DATA = 2;
ROLE_QUERY = 3;
ROLE_LIAISON = 4;
}

message Node {
string id = 1;
common.v1.Metadata metadata = 2;
string addr = 3;
google.protobuf.Timestamp updated_at = 4;
string name = 1;
repeated Role roles = 2;
string grpc_address = 3;
string http_address = 4;
google.protobuf.Timestamp created_at = 5;
}

message Shard {
uint64 id = 1;
common.v1.Metadata metadata = 2;
common.v1.Catalog catalog = 3;
Node node = 4;
string node = 4;
uint32 total = 5;
google.protobuf.Timestamp updated_at = 6;
google.protobuf.Timestamp created_at = 7;
Expand Down
50 changes: 0 additions & 50 deletions api/proto/banyandb/database/v1/event.proto

This file was deleted.

31 changes: 9 additions & 22 deletions banyand/internal/cmd/liaison.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ package cmd

import (
"context"
"fmt"
"os"

"github.com/spf13/cobra"

"github.com/apache/skywalking-banyandb/banyand/liaison"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
"github.com/apache/skywalking-banyandb/banyand/liaison/http"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/config"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/signal"
Expand All @@ -50,18 +49,15 @@ func newLiaisonCmd() *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate metadata service")
}
tcp, err := liaison.NewEndpoint(ctx, pipeline, metaSvc)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate Endpoint transport layer")
}
grpcServer := grpc.NewServer(ctx, pipeline, metaSvc)
profSvc := observability.NewProfService()
metricSvc := observability.NewMetricService()
httpServer := http.NewService()
httpServer := http.NewServer()

units := []run.Unit{
new(signal.Handler),
pipeline,
tcp,
grpcServer,
httpServer,
profSvc,
}
Expand All @@ -70,33 +66,24 @@ func newLiaisonCmd() *cobra.Command {
}
// Meta the run Group units.
liaisonGroup.Register(units...)
logging := logger.Logging{}
liaisonCmd := &cobra.Command{
Use: "liaison",
Version: version.Build(),
Short: "Run as the liaison server",
PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) {
if err = config.Load("logging", cmd.Flags()); err != nil {
RunE: func(cmd *cobra.Command, args []string) (err error) {
node, err := common.GenerateNode(grpcServer.GetPort(), httpServer.GetPort())
if err != nil {
return err
}
return logger.Init(logging)
},
RunE: func(cmd *cobra.Command, args []string) (err error) {
fmt.Print(logo)
logger.GetLogger().Info().Msg("starting as a liaison server")
// Spawn our go routines and wait for shutdown.
if err := liaisonGroup.Run(); err != nil {
if err := liaisonGroup.Run(context.WithValue(context.Background(), common.ContextNodeKey, node)); err != nil {
logger.GetLogger().Error().Err(err).Stack().Str("name", liaisonGroup.Name()).Msg("Exit")
os.Exit(-1)
}
return nil
},
}

liaisonCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the logging")
liaisonCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging")
liaisonCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module")
liaisonCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging")
liaisonCmd.Flags().AddFlagSet(liaisonGroup.RegisterFlags().FlagSet)
return liaisonCmd
}
87 changes: 0 additions & 87 deletions banyand/internal/cmd/meta.go

This file was deleted.

Loading

0 comments on commit eda6bea

Please sign in to comment.