Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement node role registration #314

Merged
merged 7 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Release Notes.
- Document the clustering.
- Support multiple roles for banyand server.
- Support for recovery buffer using wal.
- Register the node role to the metadata registry.

### Bugs

Expand Down
96 changes: 96 additions & 0 deletions api/common/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ package common
import (
"context"
"fmt"
"net"
"strconv"
"strings"

"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/host"
)

type (
Expand Down Expand Up @@ -110,3 +114,95 @@ func NewError(tpl string, args ...any) Error {
func (e Error) Msg() string {
return e.msg
}

// Node contains the node id and address.
type Node struct {
NodeID string
GrpcAddress string
HTTPAddress string
}

var (
// FlagNodeHost is the node id from flag.
FlagNodeHost string
// FlagNodeHostProvider is the node id provider from flag.
FlagNodeHostProvider NodeHostProvider
)

// NodeHostProvider is the provider of node id.
type NodeHostProvider int

// NodeIDProvider constants.
const (
NodeHostProviderHostname NodeHostProvider = iota
NodeHostProviderIP
NodeHostProviderFlag
)

// String returns the string representation of NodeIDProvider.
func (n *NodeHostProvider) String() string {
return [...]string{"Hostname", "IP", "Flag"}[*n]
}

// ParseNodeHostProvider parses the string to NodeIDProvider.
func ParseNodeHostProvider(s string) (NodeHostProvider, error) {
switch strings.ToLower(s) {
case "hostname":
return NodeHostProviderHostname, nil
case "ip":
return NodeHostProviderIP, nil
case "flag":
return NodeHostProviderFlag, nil
default:
return 0, fmt.Errorf("unknown node id provider %s", s)
}
}

// GenerateNode generates a node id.
func GenerateNode(grpcPort, httpPort *uint32) (Node, error) {
port := grpcPort
if port == nil {
port = httpPort
}
if port == nil {
return Node{}, fmt.Errorf("no port found")
}
node := Node{}
var nodeHost string
switch FlagNodeHostProvider {
case NodeHostProviderHostname:
h, err := host.Name()
if err != nil {
return Node{}, err
}
nodeHost = h
case NodeHostProviderIP:
ip, err := host.IP()
if err != nil {
return Node{}, err
}
nodeHost = ip
case NodeHostProviderFlag:
nodeHost = FlagNodeHost
default:
return Node{}, fmt.Errorf("unknown node id provider %d", FlagNodeHostProvider)
}
node.NodeID = net.JoinHostPort(nodeHost, strconv.FormatUint(uint64(*port), 10))
if grpcPort != nil {
node.GrpcAddress = net.JoinHostPort(nodeHost, strconv.FormatUint(uint64(*grpcPort), 10))
}
if httpPort != nil {
node.HTTPAddress = net.JoinHostPort(nodeHost, strconv.FormatUint(uint64(*httpPort), 10))
}
return node, nil
}

// ContextNodeKey is a context key to store the node id.
var ContextNodeKey = contextNodeKey{}

type contextNodeKey struct{}

// ContextNodeRolesKey is a context key to store the node roles.
var ContextNodeRolesKey = contextNodeRolesKey{}

type contextNodeRolesKey struct{}
18 changes: 13 additions & 5 deletions api/proto/banyandb/database/v1/database.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,27 @@ import "google/protobuf/timestamp.proto";
option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1";
option java_package = "org.apache.skywalking.banyandb.database.v1";

enum Role {
ROLE_UNSPECIFIED = 0;
ROLE_META = 1;
ROLE_DATA = 2;
ROLE_QUERY = 3;
ROLE_LIAISON = 4;
}

message Node {
string id = 1;
common.v1.Metadata metadata = 2;
string addr = 3;
google.protobuf.Timestamp updated_at = 4;
string name = 1;
repeated Role roles = 2;
string grpc_address = 3;
string http_address = 4;
google.protobuf.Timestamp created_at = 5;
}

message Shard {
uint64 id = 1;
common.v1.Metadata metadata = 2;
common.v1.Catalog catalog = 3;
Node node = 4;
string node = 4;
uint32 total = 5;
google.protobuf.Timestamp updated_at = 6;
google.protobuf.Timestamp created_at = 7;
Expand Down
50 changes: 0 additions & 50 deletions api/proto/banyandb/database/v1/event.proto

This file was deleted.

31 changes: 9 additions & 22 deletions banyand/internal/cmd/liaison.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ package cmd

import (
"context"
"fmt"
"os"

"github.com/spf13/cobra"

"github.com/apache/skywalking-banyandb/banyand/liaison"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
"github.com/apache/skywalking-banyandb/banyand/liaison/http"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/config"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/signal"
Expand All @@ -50,18 +49,15 @@ func newLiaisonCmd() *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate metadata service")
}
tcp, err := liaison.NewEndpoint(ctx, pipeline, metaSvc)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate Endpoint transport layer")
}
grpcServer := grpc.NewServer(ctx, pipeline, metaSvc)
profSvc := observability.NewProfService()
metricSvc := observability.NewMetricService()
httpServer := http.NewService()
httpServer := http.NewServer()

units := []run.Unit{
new(signal.Handler),
pipeline,
tcp,
grpcServer,
httpServer,
profSvc,
}
Expand All @@ -70,33 +66,24 @@ func newLiaisonCmd() *cobra.Command {
}
// Meta the run Group units.
liaisonGroup.Register(units...)
logging := logger.Logging{}
liaisonCmd := &cobra.Command{
Use: "liaison",
Version: version.Build(),
Short: "Run as the liaison server",
PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) {
if err = config.Load("logging", cmd.Flags()); err != nil {
RunE: func(cmd *cobra.Command, args []string) (err error) {
node, err := common.GenerateNode(grpcServer.GetPort(), httpServer.GetPort())
if err != nil {
return err
}
return logger.Init(logging)
},
RunE: func(cmd *cobra.Command, args []string) (err error) {
fmt.Print(logo)
logger.GetLogger().Info().Msg("starting as a liaison server")
// Spawn our go routines and wait for shutdown.
if err := liaisonGroup.Run(); err != nil {
if err := liaisonGroup.Run(context.WithValue(context.Background(), common.ContextNodeKey, node)); err != nil {
logger.GetLogger().Error().Err(err).Stack().Str("name", liaisonGroup.Name()).Msg("Exit")
os.Exit(-1)
}
return nil
},
}

liaisonCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the logging")
liaisonCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging")
liaisonCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module")
liaisonCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging")
liaisonCmd.Flags().AddFlagSet(liaisonGroup.RegisterFlags().FlagSet)
return liaisonCmd
}
87 changes: 0 additions & 87 deletions banyand/internal/cmd/meta.go

This file was deleted.

Loading
Loading