Skip to content

Commit

Permalink
Parse the host as the node id
Browse files Browse the repository at this point in the history
Signed-off-by: Gao Hongtao <hanahmily@gmail.com>
  • Loading branch information
hanahmily committed Aug 16, 2023
1 parent b4b59c1 commit 74f85d4
Show file tree
Hide file tree
Showing 23 changed files with 402 additions and 421 deletions.
112 changes: 82 additions & 30 deletions api/common/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package common

import (
"context"
"crypto/rand"
"encoding/base64"
"fmt"
"net"
"strconv"
"strings"

"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/host"
)

type (
Expand Down Expand Up @@ -113,42 +115,92 @@ func (e Error) Msg() string {
return e.msg
}

// NodeID identities a node in a cluster.
type NodeID string

// GenerateNodeID generates a node id.
func GenerateNodeID(prefix string, statefulID string) (NodeID, error) {
// If statefulID is empty, return prefix + random suffix.
if statefulID == "" {
suffix, err := generateRandomString(8)
if err != nil {
return NodeID(""), err
}
return NodeID(fmt.Sprintf("%s-%s", prefix, suffix)), nil
}
return NodeID(fmt.Sprintf("%s-%s", prefix, statefulID)), nil
// Node contains the node id and address.
type Node struct {
NodeID string
GrpcAddress string
HTTPAddress string
}

func generateRandomString(length int) (string, error) {
randomBytes := make([]byte, length)
_, err := rand.Read(randomBytes)
if err != nil {
return "", err
}
var (
// FlagNodeHost is the node id from flag.
FlagNodeHost string
// FlagNodeHostProvider is the node id provider from flag.
FlagNodeHostProvider NodeHostProvider
)

// Encode random bytes to base64 URL encoding
randomString := base64.RawURLEncoding.EncodeToString(randomBytes)
// NodeHostProvider is the provider of node id.
type NodeHostProvider int

// Trim any padding characters '=' from the end of the string
randomString = randomString[:length]
// NodeIDProvider constants.
const (
NodeHostProviderHostname NodeHostProvider = iota
NodeHostProviderIP
NodeHostProviderFlag
)

// String returns the string representation of NodeIDProvider.
func (n *NodeHostProvider) String() string {
return [...]string{"Hostname", "IP", "Flag"}[*n]
}

return randomString, nil
// ParseNodeHostProvider parses the string to NodeIDProvider.
func ParseNodeHostProvider(s string) (NodeHostProvider, error) {
switch strings.ToLower(s) {
case "hostname":
return NodeHostProviderHostname, nil
case "ip":
return NodeHostProviderIP, nil
case "flag":
return NodeHostProviderFlag, nil
default:
return 0, fmt.Errorf("unknown node id provider %s", s)
}
}

// GenerateNode generates a node id.
func GenerateNode(grpcPort, httpPort *uint32) (Node, error) {
port := grpcPort
if port == nil {
port = httpPort
}
if port == nil {
return Node{}, fmt.Errorf("no port found")
}
node := Node{}
var nodeHost string
switch FlagNodeHostProvider {
case NodeHostProviderHostname:
h, err := host.Name()
if err != nil {
return Node{}, err
}
nodeHost = h
case NodeHostProviderIP:
ip, err := host.IP()
if err != nil {
return Node{}, err
}
nodeHost = ip
case NodeHostProviderFlag:
nodeHost = FlagNodeHost
default:
return Node{}, fmt.Errorf("unknown node id provider %d", FlagNodeHostProvider)
}
node.NodeID = net.JoinHostPort(nodeHost, strconv.FormatUint(uint64(*port), 10))
if grpcPort != nil {
node.GrpcAddress = net.JoinHostPort(nodeHost, strconv.FormatUint(uint64(*grpcPort), 10))
}
if httpPort != nil {
node.HTTPAddress = net.JoinHostPort(nodeHost, strconv.FormatUint(uint64(*httpPort), 10))
}
return node, nil
}

// ContextNodeIDKey is a context key to store the node id.
var ContextNodeIDKey = contextNodeIDKey{}
// ContextNodeKey is a context key to store the node id.
var ContextNodeKey = contextNodeKey{}

type contextNodeIDKey struct{}
type contextNodeKey struct{}

// ContextNodeRolesKey is a context key to store the node roles.
var ContextNodeRolesKey = contextNodeRolesKey{}
Expand Down
19 changes: 3 additions & 16 deletions api/proto/banyandb/database/v1/database.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,9 @@ enum Role {
message Node {
string name = 1;
repeated Role roles = 2;
google.protobuf.Timestamp created_at = 3;
}

message Endpoint {
string name = 1;
string node = 2;
repeated string addresses = 3;
uint32 port = 4;
Protocol protocol = 5;
google.protobuf.Timestamp created_at = 6;
}

enum Protocol {
PROTOCOL_UNSPECIFIED = 0;
PROTOCOL_HTTP = 1;
PROTOCOL_GRPC = 2;
string grpc_address = 3;
string http_address = 4;
google.protobuf.Timestamp created_at = 5;
}

message Shard {
Expand Down
30 changes: 6 additions & 24 deletions banyand/internal/cmd/liaison.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,16 @@ package cmd

import (
"context"
"fmt"
"os"

"github.com/spf13/cobra"

"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/liaison"
"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
"github.com/apache/skywalking-banyandb/banyand/liaison/http"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/config"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/signal"
Expand All @@ -51,18 +49,15 @@ func newLiaisonCmd() *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate metadata service")
}
tcp, err := liaison.NewEndpoint(ctx, pipeline, metaSvc)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate Endpoint transport layer")
}
grpcServer := grpc.NewServer(ctx, pipeline, metaSvc)
profSvc := observability.NewProfService()
metricSvc := observability.NewMetricService()
httpServer := http.NewService()
httpServer := http.NewServer()

units := []run.Unit{
new(signal.Handler),
pipeline,
tcp,
grpcServer,
httpServer,
profSvc,
}
Expand All @@ -71,37 +66,24 @@ func newLiaisonCmd() *cobra.Command {
}
// Meta the run Group units.
liaisonGroup.Register(units...)
logging := logger.Logging{}
liaisonCmd := &cobra.Command{
Use: "liaison",
Version: version.Build(),
Short: "Run as the liaison server",
PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) {
if err = config.Load("logging", cmd.Flags()); err != nil {
return err
}
return logger.Init(logging)
},
RunE: func(cmd *cobra.Command, args []string) (err error) {
nodeID, err := common.GenerateNodeID("liaison", "")
node, err := common.GenerateNode(grpcServer.GetPort(), httpServer.GetPort())
if err != nil {
return err
}
fmt.Print(logo)
logger.GetLogger().Info().Msg("starting as a liaison server")
// Spawn our go routines and wait for shutdown.
if err := liaisonGroup.Run(context.WithValue(context.Background(), common.ContextNodeIDKey, nodeID)); err != nil {
if err := liaisonGroup.Run(context.WithValue(context.Background(), common.ContextNodeKey, node)); err != nil {
logger.GetLogger().Error().Err(err).Stack().Str("name", liaisonGroup.Name()).Msg("Exit")
os.Exit(-1)
}
return nil
},
}

liaisonCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the logging")
liaisonCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging")
liaisonCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module")
liaisonCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging")
liaisonCmd.Flags().AddFlagSet(liaisonGroup.RegisterFlags().FlagSet)
return liaisonCmd
}
97 changes: 0 additions & 97 deletions banyand/internal/cmd/meta.go

This file was deleted.

Loading

0 comments on commit 74f85d4

Please sign in to comment.