diff --git a/CHANGES.md b/CHANGES.md index e537ba38d..20d2bee10 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,6 +11,7 @@ Release Notes. - Document the clustering. - Support multiple roles for banyand server. - Support for recovery buffer using wal. +- Register the node role to the metadata registry. ### Bugs diff --git a/api/common/id.go b/api/common/id.go index 6cc879ca8..074060cdf 100644 --- a/api/common/id.go +++ b/api/common/id.go @@ -20,8 +20,12 @@ package common import ( "context" "fmt" + "net" + "strconv" + "strings" "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/host" ) type ( @@ -110,3 +114,95 @@ func NewError(tpl string, args ...any) Error { func (e Error) Msg() string { return e.msg } + +// Node contains the node id and address. +type Node struct { + NodeID string + GrpcAddress string + HTTPAddress string +} + +var ( + // FlagNodeHost is the node id from flag. + FlagNodeHost string + // FlagNodeHostProvider is the node id provider from flag. + FlagNodeHostProvider NodeHostProvider +) + +// NodeHostProvider is the provider of node id. +type NodeHostProvider int + +// NodeIDProvider constants. +const ( + NodeHostProviderHostname NodeHostProvider = iota + NodeHostProviderIP + NodeHostProviderFlag +) + +// String returns the string representation of NodeIDProvider. +func (n *NodeHostProvider) String() string { + return [...]string{"Hostname", "IP", "Flag"}[*n] +} + +// ParseNodeHostProvider parses the string to NodeIDProvider. +func ParseNodeHostProvider(s string) (NodeHostProvider, error) { + switch strings.ToLower(s) { + case "hostname": + return NodeHostProviderHostname, nil + case "ip": + return NodeHostProviderIP, nil + case "flag": + return NodeHostProviderFlag, nil + default: + return 0, fmt.Errorf("unknown node id provider %s", s) + } +} + +// GenerateNode generates a node id. +func GenerateNode(grpcPort, httpPort *uint32) (Node, error) { + port := grpcPort + if port == nil { + port = httpPort + } + if port == nil { + return Node{}, fmt.Errorf("no port found") + } + node := Node{} + var nodeHost string + switch FlagNodeHostProvider { + case NodeHostProviderHostname: + h, err := host.Name() + if err != nil { + return Node{}, err + } + nodeHost = h + case NodeHostProviderIP: + ip, err := host.IP() + if err != nil { + return Node{}, err + } + nodeHost = ip + case NodeHostProviderFlag: + nodeHost = FlagNodeHost + default: + return Node{}, fmt.Errorf("unknown node id provider %d", FlagNodeHostProvider) + } + node.NodeID = net.JoinHostPort(nodeHost, strconv.FormatUint(uint64(*port), 10)) + if grpcPort != nil { + node.GrpcAddress = net.JoinHostPort(nodeHost, strconv.FormatUint(uint64(*grpcPort), 10)) + } + if httpPort != nil { + node.HTTPAddress = net.JoinHostPort(nodeHost, strconv.FormatUint(uint64(*httpPort), 10)) + } + return node, nil +} + +// ContextNodeKey is a context key to store the node id. +var ContextNodeKey = contextNodeKey{} + +type contextNodeKey struct{} + +// ContextNodeRolesKey is a context key to store the node roles. +var ContextNodeRolesKey = contextNodeRolesKey{} + +type contextNodeRolesKey struct{} diff --git a/api/proto/banyandb/database/v1/database.proto b/api/proto/banyandb/database/v1/database.proto index c530b60be..0e758d829 100644 --- a/api/proto/banyandb/database/v1/database.proto +++ b/api/proto/banyandb/database/v1/database.proto @@ -25,11 +25,19 @@ import "google/protobuf/timestamp.proto"; option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"; option java_package = "org.apache.skywalking.banyandb.database.v1"; +enum Role { + ROLE_UNSPECIFIED = 0; + ROLE_META = 1; + ROLE_DATA = 2; + ROLE_QUERY = 3; + ROLE_LIAISON = 4; +} + message Node { - string id = 1; - common.v1.Metadata metadata = 2; - string addr = 3; - google.protobuf.Timestamp updated_at = 4; + string name = 1; + repeated Role roles = 2; + string grpc_address = 3; + string http_address = 4; google.protobuf.Timestamp created_at = 5; } @@ -37,7 +45,7 @@ message Shard { uint64 id = 1; common.v1.Metadata metadata = 2; common.v1.Catalog catalog = 3; - Node node = 4; + string node = 4; uint32 total = 5; google.protobuf.Timestamp updated_at = 6; google.protobuf.Timestamp created_at = 7; diff --git a/api/proto/banyandb/database/v1/event.proto b/api/proto/banyandb/database/v1/event.proto deleted file mode 100644 index 19f98f68f..000000000 --- a/api/proto/banyandb/database/v1/event.proto +++ /dev/null @@ -1,50 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -syntax = "proto3"; - -package banyandb.database.v1; - -import "banyandb/common/v1/common.proto"; -import "banyandb/database/v1/database.proto"; -import "google/protobuf/timestamp.proto"; - -option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"; -option java_package = "org.apache.skywalking.banyandb.database.v1"; - -enum Action { - ACTION_UNSPECIFIED = 0; - ACTION_PUT = 1; - ACTION_DELETE = 2; -} - -message ShardEvent { - Shard shard = 1; - Action action = 2; - google.protobuf.Timestamp time = 3; -} - -message EntityEvent { - common.v1.Metadata subject = 1; - message TagLocator { - uint32 family_offset = 1; - uint32 tag_offset = 2; - } - repeated TagLocator entity_locator = 2; - Action action = 3; - google.protobuf.Timestamp time = 4; -} diff --git a/banyand/internal/cmd/liaison.go b/banyand/internal/cmd/liaison.go index 97dff1e7e..d8847c7ca 100644 --- a/banyand/internal/cmd/liaison.go +++ b/banyand/internal/cmd/liaison.go @@ -19,17 +19,16 @@ package cmd import ( "context" - "fmt" "os" "github.com/spf13/cobra" - "github.com/apache/skywalking-banyandb/banyand/liaison" + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/liaison/grpc" "github.com/apache/skywalking-banyandb/banyand/liaison/http" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/queue" - "github.com/apache/skywalking-banyandb/pkg/config" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/signal" @@ -50,18 +49,15 @@ func newLiaisonCmd() *cobra.Command { if err != nil { l.Fatal().Err(err).Msg("failed to initiate metadata service") } - tcp, err := liaison.NewEndpoint(ctx, pipeline, metaSvc) - if err != nil { - l.Fatal().Err(err).Msg("failed to initiate Endpoint transport layer") - } + grpcServer := grpc.NewServer(ctx, pipeline, metaSvc) profSvc := observability.NewProfService() metricSvc := observability.NewMetricService() - httpServer := http.NewService() + httpServer := http.NewServer() units := []run.Unit{ new(signal.Handler), pipeline, - tcp, + grpcServer, httpServer, profSvc, } @@ -70,33 +66,24 @@ func newLiaisonCmd() *cobra.Command { } // Meta the run Group units. liaisonGroup.Register(units...) - logging := logger.Logging{} liaisonCmd := &cobra.Command{ Use: "liaison", Version: version.Build(), Short: "Run as the liaison server", - PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) { - if err = config.Load("logging", cmd.Flags()); err != nil { + RunE: func(cmd *cobra.Command, args []string) (err error) { + node, err := common.GenerateNode(grpcServer.GetPort(), httpServer.GetPort()) + if err != nil { return err } - return logger.Init(logging) - }, - RunE: func(cmd *cobra.Command, args []string) (err error) { - fmt.Print(logo) logger.GetLogger().Info().Msg("starting as a liaison server") // Spawn our go routines and wait for shutdown. - if err := liaisonGroup.Run(); err != nil { + if err := liaisonGroup.Run(context.WithValue(context.Background(), common.ContextNodeKey, node)); err != nil { logger.GetLogger().Error().Err(err).Stack().Str("name", liaisonGroup.Name()).Msg("Exit") os.Exit(-1) } return nil }, } - - liaisonCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the logging") - liaisonCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") - liaisonCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") - liaisonCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") liaisonCmd.Flags().AddFlagSet(liaisonGroup.RegisterFlags().FlagSet) return liaisonCmd } diff --git a/banyand/internal/cmd/meta.go b/banyand/internal/cmd/meta.go deleted file mode 100644 index df7530d26..000000000 --- a/banyand/internal/cmd/meta.go +++ /dev/null @@ -1,87 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package cmd - -import ( - "context" - "fmt" - "os" - - "github.com/spf13/cobra" - - "github.com/apache/skywalking-banyandb/banyand/metadata" - "github.com/apache/skywalking-banyandb/banyand/observability" - "github.com/apache/skywalking-banyandb/pkg/config" - "github.com/apache/skywalking-banyandb/pkg/logger" - "github.com/apache/skywalking-banyandb/pkg/run" - "github.com/apache/skywalking-banyandb/pkg/signal" - "github.com/apache/skywalking-banyandb/pkg/version" -) - -var metaGroup = run.NewGroup("meta") - -func newMetaCmd() *cobra.Command { - l := logger.GetLogger("bootstrap") - ctx := context.Background() - metaSvc, err := metadata.NewService(ctx) - if err != nil { - l.Fatal().Err(err).Msg("failed to initiate metadata service") - } - profSvc := observability.NewProfService() - metricSvc := observability.NewMetricService() - - units := []run.Unit{ - new(signal.Handler), - metaSvc, - profSvc, - } - if metricSvc != nil { - units = append(units, metricSvc) - } - // Meta the run Group units. - metaGroup.Register(units...) - logging := logger.Logging{} - metaCmd := &cobra.Command{ - Use: "meta", - Version: version.Build(), - Short: "Run as the meta server", - PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) { - if err = config.Load("logging", cmd.Flags()); err != nil { - return err - } - return logger.Init(logging) - }, - RunE: func(cmd *cobra.Command, args []string) (err error) { - fmt.Print(logo) - logger.GetLogger().Info().Msg("starting as a meta server") - // Spawn our go routines and wait for shutdown. - if err := metaGroup.Run(); err != nil { - logger.GetLogger().Error().Err(err).Stack().Str("name", metaGroup.Name()).Msg("Exit") - os.Exit(-1) - } - return nil - }, - } - - metaCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the logging") - metaCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") - metaCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") - metaCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") - metaCmd.Flags().AddFlagSet(metaGroup.RegisterFlags().FlagSet) - return metaCmd -} diff --git a/banyand/internal/cmd/root.go b/banyand/internal/cmd/root.go index da67766da..b1535a2b8 100644 --- a/banyand/internal/cmd/root.go +++ b/banyand/internal/cmd/root.go @@ -19,8 +19,13 @@ package cmd import ( + "fmt" + "github.com/spf13/cobra" + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/config" + "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/version" ) @@ -35,6 +40,7 @@ const logo = ` // NewRoot returns a root command. func NewRoot() *cobra.Command { + logging := logger.Logging{} cmd := &cobra.Command{ DisableAutoGenTag: true, Version: version.Build(), @@ -42,10 +48,44 @@ func NewRoot() *cobra.Command { Long: logo + ` BanyanDB, as an observability database, aims to ingest, analyze and store Metrics, Tracing and Logging data `, + PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) { + fmt.Print(logo) + if err = config.Load("logging", cmd.Flags()); err != nil { + return err + } + return logger.Init(logging) + }, } + cmd.PersistentFlags().Var(&nodeIDProviderValue{&common.FlagNodeHostProvider}, + "node-host-provider", "the node host provider, can be hostname, ip or flag, default is hostname") + cmd.PersistentFlags().StringVar(&common.FlagNodeHost, "node-host", "", "the node host of the server only used when node-host-provider is \"flag\"") + cmd.PersistentFlags().StringVar(&logging.Env, "logging-env", "prod", "the logging") + cmd.PersistentFlags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") + cmd.PersistentFlags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") + cmd.PersistentFlags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") cmd.AddCommand(newStandaloneCmd()) - cmd.AddCommand(newMetaCmd()) cmd.AddCommand(newStorageCmd()) cmd.AddCommand(newLiaisonCmd()) return cmd } + +type nodeIDProviderValue struct { + value *common.NodeHostProvider +} + +func (c *nodeIDProviderValue) Set(s string) error { + v, err := common.ParseNodeHostProvider(s) + if err != nil { + return err + } + *c.value = v + return nil +} + +func (c *nodeIDProviderValue) String() string { + return c.value.String() +} + +func (c *nodeIDProviderValue) Type() string { + return "nodeIDProvider" +} diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go index 9d6eccaa9..f45a68d3d 100644 --- a/banyand/internal/cmd/standalone.go +++ b/banyand/internal/cmd/standalone.go @@ -19,12 +19,12 @@ package cmd import ( "context" - "fmt" "os" "github.com/spf13/cobra" - "github.com/apache/skywalking-banyandb/banyand/liaison" + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/liaison/grpc" "github.com/apache/skywalking-banyandb/banyand/liaison/http" "github.com/apache/skywalking-banyandb/banyand/measure" "github.com/apache/skywalking-banyandb/banyand/metadata" @@ -32,7 +32,6 @@ import ( "github.com/apache/skywalking-banyandb/banyand/query" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/stream" - "github.com/apache/skywalking-banyandb/pkg/config" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/signal" @@ -64,13 +63,10 @@ func newStandaloneCmd() *cobra.Command { if err != nil { l.Fatal().Err(err).Msg("failed to initiate query processor") } - tcp, err := liaison.NewEndpoint(ctx, pipeline, metaSvc) - if err != nil { - l.Fatal().Err(err).Msg("failed to initiate Endpoint transport layer") - } + grpcServer := grpc.NewServer(ctx, pipeline, metaSvc) profSvc := observability.NewProfService() metricSvc := observability.NewMetricService() - httpServer := http.NewService() + httpServer := http.NewServer() units := []run.Unit{ new(signal.Handler), @@ -79,7 +75,7 @@ func newStandaloneCmd() *cobra.Command { measureSvc, streamSvc, q, - tcp, + grpcServer, httpServer, profSvc, } @@ -88,33 +84,25 @@ func newStandaloneCmd() *cobra.Command { } // Meta the run Group units. standaloneGroup.Register(units...) - logging := logger.Logging{} + standaloneCmd := &cobra.Command{ Use: "standalone", Version: version.Build(), Short: "Run as the standalone server", - PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) { - if err = config.Load("logging", cmd.Flags()); err != nil { + RunE: func(cmd *cobra.Command, args []string) (err error) { + nodeID, err := common.GenerateNode(grpcServer.GetPort(), httpServer.GetPort()) + if err != nil { return err } - return logger.Init(logging) - }, - RunE: func(cmd *cobra.Command, args []string) (err error) { - fmt.Print(logo) logger.GetLogger().Info().Msg("starting as a standalone server") // Spawn our go routines and wait for shutdown. - if err := standaloneGroup.Run(); err != nil { + if err := standaloneGroup.Run(context.WithValue(context.Background(), common.ContextNodeKey, nodeID)); err != nil { logger.GetLogger().Error().Err(err).Stack().Str("name", standaloneGroup.Name()).Msg("Exit") os.Exit(-1) } return nil }, } - - standaloneCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the logging") - standaloneCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") - standaloneCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") - standaloneCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") standaloneCmd.Flags().AddFlagSet(standaloneGroup.RegisterFlags().FlagSet) return standaloneCmd } diff --git a/banyand/internal/cmd/storage.go b/banyand/internal/cmd/storage.go index 7d297fc09..a8f4e159c 100644 --- a/banyand/internal/cmd/storage.go +++ b/banyand/internal/cmd/storage.go @@ -19,11 +19,11 @@ package cmd import ( "context" - "fmt" "os" "github.com/spf13/cobra" + "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/measure" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/observability" @@ -114,21 +114,19 @@ func newStorageCmd() *cobra.Command { } }, RunE: func(cmd *cobra.Command, args []string) (err error) { - fmt.Print(logo) + node, err := common.GenerateNode(nil, nil) + if err != nil { + return err + } logger.GetLogger().Info().Msg("starting as a storage server") // Spawn our go routines and wait for shutdown. - if err := storageGroup.Run(); err != nil { + if err := storageGroup.Run(context.WithValue(context.Background(), common.ContextNodeKey, node)); err != nil { logger.GetLogger().Error().Err(err).Stack().Str("name", storageGroup.Name()).Msg("Exit") os.Exit(-1) } return nil }, } - - storageCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the logging") - storageCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") - storageCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") - storageCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") storageCmd.Flags().StringVarP(&flagStorageMode, "mode", "m", storageModeMix, "the storage mode, one of [data, query, mix]") storageCmd.Flags().AddFlagSet(storageGroup.RegisterFlags().FlagSet) return storageCmd diff --git a/banyand/liaison/grpc/discovery.go b/banyand/liaison/grpc/discovery.go index a2a38841a..606d93fb1 100644 --- a/banyand/liaison/grpc/discovery.go +++ b/banyand/liaison/grpc/discovery.go @@ -60,9 +60,9 @@ func newDiscoveryService(pipeline queue.Queue, kind schema.Kind, metadataRepo me } } -func (ds *discoveryService) initialize() error { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - groups, err := ds.metadataRepo.GroupRegistry().ListGroup(ctx) +func (ds *discoveryService) initialize(ctx context.Context) error { + ctxLocal, cancel := context.WithTimeout(ctx, 5*time.Second) + groups, err := ds.metadataRepo.GroupRegistry().ListGroup(ctxLocal) cancel() if err != nil { return err @@ -74,8 +74,8 @@ func (ds *discoveryService) initialize() error { default: continue } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - shards, innerErr := ds.metadataRepo.ShardRegistry().ListShard(ctx, schema.ListOpt{Group: g.Metadata.Name}) + ctxLocal, cancel := context.WithTimeout(ctx, 5*time.Second) + shards, innerErr := ds.metadataRepo.ShardRegistry().ListShard(ctxLocal, schema.ListOpt{Group: g.Metadata.Name}) cancel() if innerErr != nil { return innerErr @@ -93,8 +93,8 @@ func (ds *discoveryService) initialize() error { switch ds.kind { case schema.KindMeasure: - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) - mm, innerErr := ds.metadataRepo.MeasureRegistry().ListMeasure(ctx, schema.ListOpt{Group: g.Metadata.Name}) + ctxLocal, cancel = context.WithTimeout(ctx, 5*time.Second) + mm, innerErr := ds.metadataRepo.MeasureRegistry().ListMeasure(ctxLocal, schema.ListOpt{Group: g.Metadata.Name}) cancel() if innerErr != nil { return innerErr @@ -110,8 +110,8 @@ func (ds *discoveryService) initialize() error { }) } case schema.KindStream: - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) - ss, innerErr := ds.metadataRepo.StreamRegistry().ListStream(ctx, schema.ListOpt{Group: g.Metadata.Name}) + ctxLocal, cancel = context.WithTimeout(ctx, 5*time.Second) + ss, innerErr := ds.metadataRepo.StreamRegistry().ListStream(ctxLocal, schema.ListOpt{Group: g.Metadata.Name}) cancel() if innerErr != nil { return innerErr diff --git a/banyand/liaison/grpc/registry_test.go b/banyand/liaison/grpc/registry_test.go index 4b12a1ac2..213f52426 100644 --- a/banyand/liaison/grpc/registry_test.go +++ b/banyand/liaison/grpc/registry_test.go @@ -212,6 +212,6 @@ func (p *preloadStreamService) Name() string { return "preload-stream" } -func (p *preloadStreamService) PreRun() error { - return teststream.PreloadSchema(p.metaSvc.SchemaRegistry()) +func (p *preloadStreamService) PreRun(ctx context.Context) error { + return teststream.PreloadSchema(ctx, p.metaSvc.SchemaRegistry()) } diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index 2bb906647..eae908f94 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -22,6 +22,7 @@ import ( "context" "net" "runtime/debug" + "strconv" "time" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" @@ -56,33 +57,58 @@ var ( errAccessLogRootPath = errors.New("access log root path is required") ) +// // Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Server defines the gRPC server. +type Server interface { + run.Unit + GetPort() *uint32 +} + type server struct { pipeline queue.Queue creds credentials.TransportCredentials - *indexRuleRegistryServer - measureSVC *measureService - log *logger.Logger - ser *grpclib.Server + *streamRegistryServer + log *logger.Logger + *indexRuleBindingRegistryServer + ser *grpclib.Server *propertyServer *topNAggregationRegistryServer *groupRegistryServer - stopCh chan struct{} - streamSVC *streamService + stopCh chan struct{} + *indexRuleRegistryServer *measureRegistryServer - *streamRegistryServer - *indexRuleBindingRegistryServer - addr string + streamSVC *streamService + measureSVC *measureService + host string keyFile string certFile string accessLogRootPath string + addr string accessLogRecorders []accessLogRecorder maxRecvMsgSize run.Bytes - tls bool + port uint32 enableIngestionAccessLog bool + tls bool } // NewServer returns a new gRPC server. -func NewServer(_ context.Context, pipeline queue.Queue, schemaRegistry metadata.Repo) run.Unit { +func NewServer(_ context.Context, pipeline queue.Queue, schemaRegistry metadata.Repo) Server { streamSVC := &streamService{ discoveryService: newDiscoveryService(pipeline, schema.KindStream, schemaRegistry), } @@ -119,7 +145,7 @@ func NewServer(_ context.Context, pipeline queue.Queue, schemaRegistry metadata. return s } -func (s *server) PreRun() error { +func (s *server) PreRun(ctx context.Context) error { s.log = logger.GetLogger("liaison-grpc") s.streamSVC.setLogger(s.log) s.measureSVC.setLogger(s.log) @@ -129,7 +155,7 @@ func (s *server) PreRun() error { } for _, c := range components { c.SetLogger(s.log) - if err := c.initialize(); err != nil { + if err := c.initialize(ctx); err != nil { return err } } @@ -148,6 +174,14 @@ func (s *server) Name() string { return "grpc" } +func (s *server) Role() databasev1.Role { + return databasev1.Role_ROLE_LIAISON +} + +func (s *server) GetPort() *uint32 { + return &s.port +} + func (s *server) FlagSet() *run.FlagSet { fs := run.NewFlagSet("grpc") s.maxRecvMsgSize = defaultRecvSize @@ -155,14 +189,16 @@ func (s *server) FlagSet() *run.FlagSet { fs.BoolVar(&s.tls, "tls", false, "connection uses TLS if true, else plain TCP") fs.StringVar(&s.certFile, "cert-file", "", "the TLS cert file") fs.StringVar(&s.keyFile, "key-file", "", "the TLS key file") - fs.StringVar(&s.addr, "addr", ":17912", "the address of banyand listens") + fs.StringVar(&s.host, "grpc-host", "", "the host of banyand listens") + fs.Uint32Var(&s.port, "grpc-port", 17912, "the port of banyand listens") fs.BoolVar(&s.enableIngestionAccessLog, "enable-ingestion-access-log", false, "enable ingestion access log") fs.StringVar(&s.accessLogRootPath, "access-log-root-path", "", "access log root path") return fs } func (s *server) Validate() error { - if s.addr == "" { + s.addr = net.JoinHostPort(s.host, strconv.FormatUint(uint64(s.port), 10)) + if s.addr == ":" { return errNoAddr } if s.enableIngestionAccessLog && s.accessLogRootPath == "" { diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go index 2cbba9b85..6be5c79f1 100644 --- a/banyand/liaison/http/server.go +++ b/banyand/liaison/http/server.go @@ -22,7 +22,9 @@ import ( "context" "fmt" "io/fs" + "net" "net/http" + "strconv" "strings" "time" @@ -45,39 +47,48 @@ import ( ) var ( - _ run.Config = (*service)(nil) - _ run.Service = (*service)(nil) + _ run.Config = (*server)(nil) + _ run.Service = (*server)(nil) errServerCert = errors.New("http: invalid server cert file") errServerKey = errors.New("http: invalid server key file") errNoAddr = errors.New("http: no address") ) -// NewService return a http service. -func NewService() run.Unit { - return &service{ +// NewServer return a http service. +func NewServer() Server { + return &server{ stopCh: make(chan struct{}), } } -type service struct { - mux *chi.Mux - stopCh chan struct{} - clientCloser context.CancelFunc +// Server is the http service. +type Server interface { + run.Unit + GetPort() *uint32 +} + +type server struct { + creds credentials.TransportCredentials l *logger.Logger + clientCloser context.CancelFunc + mux *chi.Mux srv *http.Server + stopCh chan struct{} + host string listenAddr string grpcAddr string - creds credentials.TransportCredentials keyFile string certFile string grpcCert string + port uint32 tls bool } -func (p *service) FlagSet() *run.FlagSet { +func (p *server) FlagSet() *run.FlagSet { flagSet := run.NewFlagSet("http") - flagSet.StringVar(&p.listenAddr, "http-addr", ":17913", "listen addr for http") + flagSet.StringVar(&p.host, "http-host", "localhost", "listen host for http") + flagSet.Uint32Var(&p.port, "http-port", 17913, "listen port for http") flagSet.StringVar(&p.grpcAddr, "http-grpc-addr", "localhost:17912", "http server redirect grpc requests to this address") flagSet.StringVar(&p.certFile, "http-cert-file", "", "the TLS cert file of http server") flagSet.StringVar(&p.keyFile, "http-key-file", "", "the TLS key file of http server") @@ -86,8 +97,9 @@ func (p *service) FlagSet() *run.FlagSet { return flagSet } -func (p *service) Validate() error { - if p.listenAddr == "" { +func (p *server) Validate() error { + p.listenAddr = net.JoinHostPort(p.host, strconv.FormatUint(uint64(p.port), 10)) + if p.listenAddr == ":" { return errNoAddr } observability.UpdateAddress("http", p.listenAddr) @@ -110,11 +122,19 @@ func (p *service) Validate() error { return nil } -func (p *service) Name() string { +func (p *server) Name() string { return "liaison-http" } -func (p *service) PreRun() error { +func (p *server) Role() databasev1.Role { + return databasev1.Role_ROLE_LIAISON +} + +func (p *server) GetPort() *uint32 { + return &p.port +} + +func (p *server) PreRun(_ context.Context) error { p.l = logger.GetLogger(p.Name()) p.mux = chi.NewRouter() @@ -134,7 +154,7 @@ func (p *service) PreRun() error { return nil } -func (p *service) Serve() run.StopNotify { +func (p *server) Serve() run.StopNotify { var ctx context.Context ctx, p.clientCloser = context.WithCancel(context.Background()) opts := make([]grpc.DialOption, 0, 1) @@ -183,7 +203,7 @@ func (p *service) Serve() run.StopNotify { return p.stopCh } -func (p *service) GracefulStop() { +func (p *server) GracefulStop() { if err := p.srv.Close(); err != nil { p.l.Error().Err(err) } diff --git a/banyand/liaison/liaison.go b/banyand/liaison/liaison.go deleted file mode 100644 index a1b657cfb..000000000 --- a/banyand/liaison/liaison.go +++ /dev/null @@ -1,33 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// Package liaison implements a transmission layer between a data layer and a client. -package liaison - -import ( - "context" - - "github.com/apache/skywalking-banyandb/banyand/liaison/grpc" - "github.com/apache/skywalking-banyandb/banyand/metadata" - "github.com/apache/skywalking-banyandb/banyand/queue" - "github.com/apache/skywalking-banyandb/pkg/run" -) - -// NewEndpoint return a new endpoint which is the entry point for the database server. -func NewEndpoint(ctx context.Context, pipeline queue.Queue, schemaRegistry metadata.Repo) (run.Unit, error) { - return grpc.NewServer(ctx, pipeline, schemaRegistry), nil -} diff --git a/banyand/measure/measure_suite_test.go b/banyand/measure/measure_suite_test.go index a1c829bbd..25e31fd97 100644 --- a/banyand/measure/measure_suite_test.go +++ b/banyand/measure/measure_suite_test.go @@ -54,8 +54,8 @@ func (p *preloadMeasureService) Name() string { return "preload-measure" } -func (p *preloadMeasureService) PreRun() error { - return testmeasure.PreloadSchema(p.metaSvc.SchemaRegistry()) +func (p *preloadMeasureService) PreRun(ctx context.Context) error { + return testmeasure.PreloadSchema(ctx, p.metaSvc.SchemaRegistry()) } type services struct { diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index a86bfcc49..21d44fc0d 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -119,7 +119,7 @@ func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) { } case schema.KindTopNAggregation: // createOrUpdate TopN schemas in advance - _, err := createOrUpdateTopNMeasure(sr.metadata.MeasureRegistry(), metadata.Spec.(*databasev1.TopNAggregation)) + _, err := createOrUpdateTopNMeasure(context.Background(), sr.metadata.MeasureRegistry(), metadata.Spec.(*databasev1.TopNAggregation)) if err != nil { sr.l.Error().Err(err).Msg("fail to create/update topN measure") return @@ -134,13 +134,13 @@ func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) { } } -func createOrUpdateTopNMeasure(measureSchemaRegistry schema.Measure, topNSchema *databasev1.TopNAggregation) (*databasev1.Measure, error) { - oldTopNSchema, err := measureSchemaRegistry.GetMeasure(context.TODO(), topNSchema.GetMetadata()) +func createOrUpdateTopNMeasure(ctx context.Context, measureSchemaRegistry schema.Measure, topNSchema *databasev1.TopNAggregation) (*databasev1.Measure, error) { + oldTopNSchema, err := measureSchemaRegistry.GetMeasure(ctx, topNSchema.GetMetadata()) if err != nil && !errors.Is(err, schema.ErrGRPCResourceNotFound) { return nil, err } - sourceMeasureSchema, err := measureSchemaRegistry.GetMeasure(context.Background(), topNSchema.GetSourceMeasure()) + sourceMeasureSchema, err := measureSchemaRegistry.GetMeasure(ctx, topNSchema.GetSourceMeasure()) if err != nil { return nil, err } @@ -184,7 +184,7 @@ func createOrUpdateTopNMeasure(measureSchemaRegistry schema.Measure, topNSchema Fields: []*databasev1.FieldSpec{TopNValueFieldSpec}, } if oldTopNSchema == nil { - if innerErr := measureSchemaRegistry.CreateMeasure(context.Background(), newTopNMeasure); innerErr != nil { + if innerErr := measureSchemaRegistry.CreateMeasure(ctx, newTopNMeasure); innerErr != nil { return nil, innerErr } return newTopNMeasure, nil @@ -198,7 +198,7 @@ func createOrUpdateTopNMeasure(measureSchemaRegistry schema.Measure, topNSchema return oldTopNSchema, nil } // update - if err = measureSchemaRegistry.UpdateMeasure(context.Background(), newTopNMeasure); err != nil { + if err = measureSchemaRegistry.UpdateMeasure(ctx, newTopNMeasure); err != nil { return nil, err } return newTopNMeasure, nil diff --git a/banyand/measure/service.go b/banyand/measure/service.go index 6e46f841f..24689347e 100644 --- a/banyand/measure/service.go +++ b/banyand/measure/service.go @@ -103,11 +103,15 @@ func (s *service) Name() string { return "measure" } -func (s *service) PreRun() error { +func (s *service) Role() databasev1.Role { + return databasev1.Role_ROLE_DATA +} + +func (s *service) PreRun(ctx context.Context) error { s.l = logger.GetLogger(s.Name()) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - groups, err := s.metadata.GroupRegistry().ListGroup(ctx) - cancel() + ctxGroup, cancelGroup := context.WithTimeout(ctx, 5*time.Second) + groups, err := s.metadata.GroupRegistry().ListGroup(ctxGroup) + cancelGroup() if err != nil { return err } @@ -123,20 +127,20 @@ func (s *service) PreRun() error { if innerErr != nil { return innerErr } - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + ctxMeasure, cancelMeasure := context.WithTimeout(ctx, 5*time.Second) allMeasureSchemas, innerErr := s.metadata.MeasureRegistry(). - ListMeasure(ctx, schema.ListOpt{Group: gp.GetSchema().GetMetadata().GetName()}) - cancel() + ListMeasure(ctxMeasure, schema.ListOpt{Group: gp.GetSchema().GetMetadata().GetName()}) + cancelMeasure() if innerErr != nil { return innerErr } for _, measureSchema := range allMeasureSchemas { // sanity check before calling StoreResource // since StoreResource may be called inside the event loop - if checkErr := s.sanityCheck(gp, measureSchema); checkErr != nil { + if checkErr := s.sanityCheck(ctx, gp, measureSchema); checkErr != nil { return checkErr } - if _, innerErr := gp.StoreResource(measureSchema); innerErr != nil { + if _, innerErr := gp.StoreResource(ctx, measureSchema); innerErr != nil { return innerErr } } @@ -155,20 +159,20 @@ func (s *service) PreRun() error { return nil } -func (s *service) sanityCheck(group resourceSchema.Group, measureSchema *databasev1.Measure) error { +func (s *service) sanityCheck(ctx context.Context, group resourceSchema.Group, measureSchema *databasev1.Measure) error { var topNAggrs []*databasev1.TopNAggregation - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctxLocal, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - topNAggrs, err := s.metadata.MeasureRegistry().TopNAggregations(ctx, measureSchema.GetMetadata()) + topNAggrs, err := s.metadata.MeasureRegistry().TopNAggregations(ctxLocal, measureSchema.GetMetadata()) if err != nil || len(topNAggrs) == 0 { return err } for _, topNAggr := range topNAggrs { - topNMeasure, innerErr := createOrUpdateTopNMeasure(s.metadata.MeasureRegistry(), topNAggr) + topNMeasure, innerErr := createOrUpdateTopNMeasure(ctx, s.metadata.MeasureRegistry(), topNAggr) err = multierr.Append(err, innerErr) if topNMeasure != nil { - _, storeErr := group.StoreResource(topNMeasure) + _, storeErr := group.StoreResource(ctx, topNMeasure) if storeErr != nil { err = multierr.Append(err, storeErr) } diff --git a/banyand/metadata/allocator.go b/banyand/metadata/allocator.go index acf52dbfa..0607aacc8 100644 --- a/banyand/metadata/allocator.go +++ b/banyand/metadata/allocator.go @@ -21,8 +21,6 @@ import ( "context" "time" - "google.golang.org/protobuf/types/known/timestamppb" - commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" @@ -51,8 +49,6 @@ func (a *allocator) OnAddOrUpdate(metadata schema.Metadata) { if groupSchema.Catalog == commonv1.Catalog_CATALOG_UNSPECIFIED { return } - now := time.Now() - nowPb := timestamppb.New(now) shardNum := groupSchema.GetResourceOpts().GetShardNum() syncShard := func(id uint64) error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -63,12 +59,7 @@ func (a *allocator) OnAddOrUpdate(metadata schema.Metadata) { Metadata: &commonv1.Metadata{ Name: groupSchema.GetMetadata().GetName(), }, - Node: &databasev1.Node{ - Id: "TODO", - CreatedAt: nowPb, - UpdatedAt: nowPb, - Addr: "TODO", - }, + Node: "TODO", }) } for i := 0; i < int(shardNum); i++ { diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go index 0fc967c77..73768a1fb 100644 --- a/banyand/metadata/client.go +++ b/banyand/metadata/client.go @@ -23,7 +23,9 @@ import ( "time" "go.uber.org/multierr" + "google.golang.org/protobuf/types/known/timestamppb" + "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" @@ -62,13 +64,33 @@ func (s *clientService) Validate() error { return nil } -func (s *clientService) PreRun() error { +func (s *clientService) PreRun(ctx context.Context) error { var err error s.schemaRegistry, err = schema.NewEtcdSchemaRegistry(schema.ConfigureServerEndpoints(s.endpoints)) if err != nil { return err } - + val := ctx.Value(common.ContextNodeKey) + if val == nil { + return errors.New("node id is empty") + } + node := val.(common.Node) + val = ctx.Value(common.ContextNodeRolesKey) + if val == nil { + return errors.New("node roles is empty") + } + nodeRoles := val.([]databasev1.Role) + ctxRegister, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() + if err = s.schemaRegistry.RegisterNode(ctxRegister, &databasev1.Node{ + Name: node.NodeID, + GrpcAddress: node.GrpcAddress, + HttpAddress: node.HTTPAddress, + Roles: nodeRoles, + CreatedAt: timestamppb.Now(), + }); err != nil { + return err + } s.alc = newAllocator(s.schemaRegistry, logger.GetLogger(s.Name()).Named("allocator")) s.schemaRegistry.RegisterHandler(schema.KindGroup|schema.KindNode, s.alc) return nil @@ -124,6 +146,10 @@ func (s *clientService) Name() string { return "metadata" } +func (s *clientService) Role() databasev1.Role { + return databasev1.Role_ROLE_META +} + func (s *clientService) IndexRules(ctx context.Context, subject *commonv1.Metadata) ([]*databasev1.IndexRule, error) { bindings, err := s.schemaRegistry.ListIndexRuleBinding(ctx, schema.ListOpt{Group: subject.Group}) if err != nil { diff --git a/banyand/metadata/embeddedetcd/server.go b/banyand/metadata/embeddedetcd/server.go index d6ac0cb5d..dfd87b73e 100644 --- a/banyand/metadata/embeddedetcd/server.go +++ b/banyand/metadata/embeddedetcd/server.go @@ -85,6 +85,7 @@ func (e *server) StoppingNotify() <-chan struct{} { func (e *server) Close() error { e.server.Close() + <-e.server.Server.StopNotify() return nil } diff --git a/banyand/metadata/metadata_test.go b/banyand/metadata/metadata_test.go index 56eac46ce..8c60f2ff6 100644 --- a/banyand/metadata/metadata_test.go +++ b/banyand/metadata/metadata_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" + "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -48,14 +49,16 @@ func Test_service_RulesBySubject(t *testing.T) { err = s.FlagSet().Parse([]string{"--metadata-root-path=" + rootDir}) is.NoError(err) is.NoError(s.Validate()) - err = s.PreRun() + ctx = context.WithValue(ctx, common.ContextNodeKey, common.Node{NodeID: "test"}) + ctx = context.WithValue(ctx, common.ContextNodeRolesKey, []databasev1.Role{databasev1.Role_ROLE_META}) + err = s.PreRun(ctx) is.NoError(err) defer func() { s.GracefulStop() deferFn() }() - err = test.PreloadSchema(s.SchemaRegistry()) + err = test.PreloadSchema(ctx, s.SchemaRegistry()) is.NoError(err) tests := []struct { diff --git a/banyand/metadata/schema/checker.go b/banyand/metadata/schema/checker.go index 3d669cbc4..cdedc54b4 100644 --- a/banyand/metadata/schema/checker.go +++ b/banyand/metadata/schema/checker.go @@ -85,8 +85,7 @@ var checkerMap = map[Kind]equalityChecker{ KindNode: func(a, b proto.Message) bool { return cmp.Equal(a, b, protocmp.IgnoreUnknown(), - protocmp.IgnoreFields(&databasev1.Node{}, "updated_at"), - protocmp.IgnoreFields(&commonv1.Metadata{}, "id", "create_revision", "mod_revision"), + protocmp.IgnoreFields(&databasev1.Node{}, "created_at"), protocmp.Transform()) }, KindShard: func(a, b proto.Message) bool { diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go index 2c147f30d..6cbb459c7 100644 --- a/banyand/metadata/schema/etcd.go +++ b/banyand/metadata/schema/etcd.go @@ -345,6 +345,65 @@ func (e *etcdSchemaRegistry) delete(ctx context.Context, metadata Metadata) (boo return false, nil } +func (e *etcdSchemaRegistry) register(ctx context.Context, metadata Metadata) error { + if !e.closer.AddRunning() { + return ErrClosed + } + defer e.closer.Done() + key, err := metadata.key() + if err != nil { + return err + } + val, err := proto.Marshal(metadata.Spec.(proto.Message)) + if err != nil { + return err + } + // Create a lease with a short TTL + lease, err := e.client.Grant(ctx, 5) // 5 seconds + if err != nil { + return err + } + var ops []clientv3.Cmp + ops = append(ops, clientv3.Compare(clientv3.CreateRevision(key), "=", 0)) + txn := e.client.Txn(ctx).If(ops...) + txn = txn.Then(clientv3.OpPut(key, string(val), clientv3.WithLease(lease.ID))) + txn = txn.Else(clientv3.OpGet(key)) + response, err := txn.Commit() + if err != nil { + return err + } + if !response.Succeeded { + return errGRPCAlreadyExists + } + // Keep the lease alive + // nolint:contextcheck + keepAliveChan, err := e.client.KeepAlive(context.Background(), lease.ID) + if err != nil { + return err + } + go func() { + if !e.closer.AddRunning() { + return + } + defer func() { + _, _ = e.client.Lease.Revoke(context.Background(), lease.ID) + e.closer.Done() + }() + for { + select { + case <-e.closer.CloseNotify(): + return + case keepAliveResp := <-keepAliveChan: + if keepAliveResp == nil { + // The channel has been closed + return + } + } + } + }() + return nil +} + func formatKey(entityPrefix string, metadata *commonv1.Metadata) string { return groupsKeyPrefix + metadata.GetGroup() + entityPrefix + metadata.GetName() } diff --git a/banyand/metadata/schema/node.go b/banyand/metadata/schema/node.go index 0c6dcdb16..d6d1fd4f8 100644 --- a/banyand/metadata/schema/node.go +++ b/banyand/metadata/schema/node.go @@ -27,11 +27,11 @@ import ( var nodeKeyPrefix = "/nodes/" -func (e *etcdSchemaRegistry) ListNode(ctx context.Context, role Role) ([]*databasev1.Node, error) { - if role == "" { +func (e *etcdSchemaRegistry) ListNode(ctx context.Context, role databasev1.Role) ([]*databasev1.Node, error) { + if role == databasev1.Role_ROLE_UNSPECIFIED { return nil, BadRequest("group", "group should not be empty") } - messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(string(role), nodeKeyPrefix), func() proto.Message { + messages, err := e.listWithPrefix(ctx, nodeKeyPrefix, func() proto.Message { return &databasev1.Node{} }) if err != nil { @@ -39,15 +39,27 @@ func (e *etcdSchemaRegistry) ListNode(ctx context.Context, role Role) ([]*databa } entities := make([]*databasev1.Node, 0, len(messages)) for _, message := range messages { - entities = append(entities, message.(*databasev1.Node)) + node := message.(*databasev1.Node) + for _, r := range node.Roles { + if r == role { + entities = append(entities, node) + break + } + } } return entities, nil } -func formatNodePrefix(role Role) string { - return nodeKeyPrefix + string(role) +func (e *etcdSchemaRegistry) RegisterNode(ctx context.Context, node *databasev1.Node) error { + return e.register(ctx, Metadata{ + TypeMeta: TypeMeta{ + Kind: KindNode, + Name: node.Name, + }, + Spec: node, + }) } -func formatNodeKey(role Role, id string) string { - return formatNodePrefix(role) + "/" + id +func formatNodeKey(name string) string { + return nodeKeyPrefix + name } diff --git a/banyand/metadata/schema/register_test.go b/banyand/metadata/schema/register_test.go new file mode 100644 index 000000000..2fd2ace96 --- /dev/null +++ b/banyand/metadata/schema/register_test.go @@ -0,0 +1,82 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package schema + +import ( + "context" + "fmt" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "github.com/onsi/gomega/gleak" + + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" +) + +var _ = ginkgo.Describe("etcd_register", func() { + var endpoints []string + var goods []gleak.Goroutine + var server embeddedetcd.Server + var r *etcdSchemaRegistry + md := Metadata{ + TypeMeta: TypeMeta{ + Name: "test", + Kind: KindNode, + }, + Spec: &databasev1.Node{}, + } + ginkgo.BeforeEach(func() { + goods = gleak.Goroutines() + ports, err := test.AllocateFreePorts(2) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + endpoints = []string{fmt.Sprintf("http://127.0.0.1:%d", ports[0])} + server, err = embeddedetcd.NewServer( + embeddedetcd.ConfigureListener(endpoints, []string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}), + embeddedetcd.RootDir(randomTempDir())) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + <-server.ReadyNotify() + schemaRegistry, err := NewEtcdSchemaRegistry(ConfigureServerEndpoints(endpoints)) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + r = schemaRegistry.(*etcdSchemaRegistry) + }) + ginkgo.AfterEach(func() { + gomega.Expect(r.Close()).ShouldNot(gomega.HaveOccurred()) + server.Close() + gomega.Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + }) + + ginkgo.It("should revoke the leaser", func() { + gomega.Expect(r.register(context.Background(), md)).ShouldNot(gomega.HaveOccurred()) + k, err := md.key() + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(r.get(context.Background(), k, &databasev1.Node{})).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(r.Close()).ShouldNot(gomega.HaveOccurred()) + schemaRegistry, err := NewEtcdSchemaRegistry(ConfigureServerEndpoints(endpoints)) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + r = schemaRegistry.(*etcdSchemaRegistry) + gomega.Expect(r.get(context.Background(), k, &databasev1.Node{})).Should(gomega.MatchError(ErrGRPCResourceNotFound)) + }) + + ginkgo.It("should register only once", func() { + gomega.Expect(r.register(context.Background(), md)).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(r.register(context.Background(), md)).Should(gomega.MatchError(errGRPCAlreadyExists)) + }) +}) diff --git a/banyand/metadata/schema/schema.go b/banyand/metadata/schema/schema.go index fd97a1d6c..fed58409c 100644 --- a/banyand/metadata/schema/schema.go +++ b/banyand/metadata/schema/schema.go @@ -156,11 +156,7 @@ func (m Metadata) key() (string, error) { Name: m.Name, }), nil case KindNode: - r, err := strToRole(m.Group) - if err != nil { - return "", err - } - return formatNodeKey(r, m.Name), nil + return formatNodeKey(m.Name), nil case KindShard: return formatShardKey(&commonv1.Metadata{ Group: m.Group, @@ -247,32 +243,10 @@ type Property interface { DeleteProperty(ctx context.Context, metadata *propertyv1.Metadata, tags []string) (bool, uint32, error) } -// Role is the role of node. -type Role string - -const ( - // RoleMeta is the role of meta node. - RoleMeta = "meta" - // RoleData is the role of data node. - RoleData = "data" - // RoleQuery is the role of query node. - RoleQuery = "query" - // RoleLiaison is the role of liaison node. - RoleLiaison = "liaison" -) - -func strToRole(role string) (Role, error) { - switch role { - case RoleMeta, RoleData, RoleQuery, RoleLiaison: - return Role(role), nil - default: - return "", errors.New("invalid role") - } -} - // Node allows CRUD node schemas in a group. type Node interface { - ListNode(ctx context.Context, role Role) ([]*databasev1.Node, error) + ListNode(ctx context.Context, role databasev1.Role) ([]*databasev1.Node, error) + RegisterNode(ctx context.Context, node *databasev1.Node) error } // Shard allows CRUD shard schemas in a group. diff --git a/banyand/metadata/server.go b/banyand/metadata/server.go index ce004e3c3..a4c59359a 100644 --- a/banyand/metadata/server.go +++ b/banyand/metadata/server.go @@ -22,6 +22,7 @@ import ( "errors" "strings" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd" "github.com/apache/skywalking-banyandb/pkg/run" ) @@ -38,6 +39,10 @@ func (s *server) Name() string { return "metadata" } +func (s *server) Role() databasev1.Role { + return databasev1.Role_ROLE_META +} + func (s *server) FlagSet() *run.FlagSet { fs := run.NewFlagSet("metadata") fs.StringVar(&s.rootDir, "metadata-root-path", "/tmp", "the root path of metadata") @@ -63,14 +68,14 @@ func (s *server) Validate() error { return s.Service.Validate() } -func (s *server) PreRun() error { +func (s *server) PreRun(ctx context.Context) error { var err error s.metaServer, err = embeddedetcd.NewServer(embeddedetcd.RootDir(s.rootDir), embeddedetcd.ConfigureListener(s.listenClientURL, s.listenPeerURL)) if err != nil { return err } <-s.metaServer.ReadyNotify() - return s.Service.PreRun() + return s.Service.PreRun(ctx) } func (s *server) Serve() run.StopNotify { diff --git a/banyand/query/processor.go b/banyand/query/processor.go index 2884f8d2c..cf36474a0 100644 --- a/banyand/query/processor.go +++ b/banyand/query/processor.go @@ -25,6 +25,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/banyand/measure" @@ -180,7 +181,11 @@ func (q *queryService) Name() string { return moduleName } -func (q *queryService) PreRun() error { +func (q *queryService) Role() databasev1.Role { + return databasev1.Role_ROLE_QUERY +} + +func (q *queryService) PreRun(_ context.Context) error { q.log = logger.GetLogger(moduleName) return multierr.Combine( q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp), diff --git a/banyand/stream/service.go b/banyand/stream/service.go index 07749f5b9..f614b9e96 100644 --- a/banyand/stream/service.go +++ b/banyand/stream/service.go @@ -26,6 +26,7 @@ import ( "github.com/apache/skywalking-banyandb/api/data" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/observability" @@ -93,10 +94,14 @@ func (s *service) Name() string { return "stream" } -func (s *service) PreRun() error { +func (s *service) Role() databasev1.Role { + return databasev1.Role_ROLE_DATA +} + +func (s *service) PreRun(ctx context.Context) error { s.l = logger.GetLogger(s.Name()) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - groups, err := s.metadata.GroupRegistry().ListGroup(ctx) + ctxLocal, cancel := context.WithTimeout(ctx, 5*time.Second) + groups, err := s.metadata.GroupRegistry().ListGroup(ctxLocal) cancel() if err != nil { return err @@ -112,14 +117,14 @@ func (s *service) PreRun() error { if err != nil { return err } - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) - schemas, err := s.metadata.StreamRegistry().ListStream(ctx, schema.ListOpt{Group: gp.GetSchema().GetMetadata().Name}) + ctxLocal, cancel := context.WithTimeout(ctx, 5*time.Second) + schemas, err := s.metadata.StreamRegistry().ListStream(ctxLocal, schema.ListOpt{Group: gp.GetSchema().GetMetadata().Name}) cancel() if err != nil { return err } for _, sa := range schemas { - if _, innerErr := gp.StoreResource(sa); innerErr != nil { + if _, innerErr := gp.StoreResource(ctx, sa); innerErr != nil { return innerErr } } diff --git a/banyand/stream/stream_suite_test.go b/banyand/stream/stream_suite_test.go index fa5fa2c3c..898da965f 100644 --- a/banyand/stream/stream_suite_test.go +++ b/banyand/stream/stream_suite_test.go @@ -53,8 +53,8 @@ func (p *preloadStreamService) Name() string { return "preload-stream" } -func (p *preloadStreamService) PreRun() error { - return teststream.PreloadSchema(p.metaSvc.SchemaRegistry()) +func (p *preloadStreamService) PreRun(ctx context.Context) error { + return teststream.PreloadSchema(ctx, p.metaSvc.SchemaRegistry()) } type services struct { diff --git a/docs/README.md b/docs/README.md index 9b759f879..8185677b0 100644 --- a/docs/README.md +++ b/docs/README.md @@ -6,9 +6,9 @@ Here you can learn all you need to know about BanyanDB. - **Installation**. Instruments about how to download and onboard BanyanDB server, Banyand. - **Clients**. Some native clients to access Banyand. -- **Schema**. Pivotal database native resources. -- **CRUD Operations**. To create, read, update, and delete data points or entities on resources in the schema. - **Observability**. Learn how to effectively monitor, diagnose and optimize Banyand. +- **Concept**. Learn the concepts of Banyand. Includes the architecture, data model, and so on. +- **CRUD Operations**. To create, read, update, and delete data points or entities on resources in the schema. You might also find these links interesting: diff --git a/docs/api-reference.md b/docs/api-reference.md index 96931a186..45fa7b258 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -16,12 +16,7 @@ - [Node](#banyandb-database-v1-Node) - [Shard](#banyandb-database-v1-Shard) -- [banyandb/database/v1/event.proto](#banyandb_database_v1_event-proto) - - [EntityEvent](#banyandb-database-v1-EntityEvent) - - [EntityEvent.TagLocator](#banyandb-database-v1-EntityEvent-TagLocator) - - [ShardEvent](#banyandb-database-v1-ShardEvent) - - - [Action](#banyandb-database-v1-Action) + - [Role](#banyandb-database-v1-Role) - [banyandb/model/v1/common.proto](#banyandb_model_v1_common-proto) - [FieldValue](#banyandb-model-v1-FieldValue) @@ -340,10 +335,10 @@ Metadata is for multi-tenant, multi-model use | Field | Type | Label | Description | | ----- | ---- | ----- | ----------- | -| id | [string](#string) | | | -| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | | -| addr | [string](#string) | | | -| updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | +| name | [string](#string) | | | +| roles | [Role](#banyandb-database-v1-Role) | repeated | | +| grpc_address | [string](#string) | | | +| http_address | [string](#string) | | | | created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | @@ -362,7 +357,7 @@ Metadata is for multi-tenant, multi-model use | id | [uint64](#uint64) | | | | metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | | | catalog | [banyandb.common.v1.Catalog](#banyandb-common-v1-Catalog) | | | -| node | [Node](#banyandb-database-v1-Node) | | | +| node | [string](#string) | | | | total | [uint32](#uint32) | | | | updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | | created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | @@ -373,84 +368,19 @@ Metadata is for multi-tenant, multi-model use - - - - - - - - - -
- -## banyandb/database/v1/event.proto - - - - - -### EntityEvent - - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| subject | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | | -| entity_locator | [EntityEvent.TagLocator](#banyandb-database-v1-EntityEvent-TagLocator) | repeated | | -| action | [Action](#banyandb-database-v1-Action) | | | -| time | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | - - - - - - - - -### EntityEvent.TagLocator - - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| family_offset | [uint32](#uint32) | | | -| tag_offset | [uint32](#uint32) | | | - - - - - - - - -### ShardEvent - - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| shard | [Shard](#banyandb-database-v1-Shard) | | | -| action | [Action](#banyandb-database-v1-Action) | | | -| time | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | - - - - - - - - + -### Action +### Role | Name | Number | Description | | ---- | ------ | ----------- | -| ACTION_UNSPECIFIED | 0 | | -| ACTION_PUT | 1 | | -| ACTION_DELETE | 2 | | +| ROLE_UNSPECIFIED | 0 | | +| ROLE_META | 1 | | +| ROLE_DATA | 2 | | +| ROLE_QUERY | 3 | | +| ROLE_LIAISON | 4 | | diff --git a/docs/concept/clustering.md b/docs/concept/clustering.md index 733f14e8d..0f8b72464 100644 --- a/docs/concept/clustering.md +++ b/docs/concept/clustering.md @@ -16,7 +16,7 @@ In addition to persistent raw data, Data Nodes also handle TopN aggregation calc ### 1.2 Meta Nodes -Meta Nodes are responsible for maintaining high-level metadata of the cluster, which includes: +Meta Nodes is implemented by etcd. They are responsible for maintaining high-level metadata of the cluster, which includes: - All nodes in the cluster - All database schemas @@ -52,6 +52,10 @@ All nodes within a BanyanDB cluster communicate with other nodes according to th - Query Nodes interact with Data Nodes to execute queries and return results to the Liaison Nodes. - Liaison Nodes distribute incoming requests to the appropriate Query Nodes or Data Nodes. +### Nodes Discovery + +All nodes in the cluster are discovered by the Meta Nodes. When a node starts up, it registers itself with the Meta Nodes. The Meta Nodes then share this information with the Liaison Nodes and Query Nodes, which use it to route requests to the appropriate nodes. + ## 3. **Data Organization** Different nodes in BanyanDB are responsible for different parts of the database, while Query and Liaison Nodes manage the routing and processing of queries. diff --git a/docs/installation.md b/docs/installation.md index d29ab2132..9e85cd650 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -129,11 +129,13 @@ The banyand-server would be listening on the `0.0.0.0:17912` if no errors occurr The standalone node is running as a standalone process by ```shell -$ ./banyand-server standalone