From eda6bea41bb8111f0ba61b3030c5afcdb32c4d69 Mon Sep 17 00:00:00 2001 From: Gao Hongtao Date: Wed, 16 Aug 2023 17:39:59 +0300 Subject: [PATCH] Implement node role registration (#314) --- CHANGES.md | 1 + api/common/id.go | 96 +++++++++++++++++++ api/proto/banyandb/database/v1/database.proto | 18 +++- api/proto/banyandb/database/v1/event.proto | 50 ---------- banyand/internal/cmd/liaison.go | 31 ++---- banyand/internal/cmd/meta.go | 87 ----------------- banyand/internal/cmd/root.go | 42 +++++++- banyand/internal/cmd/standalone.go | 32 ++----- banyand/internal/cmd/storage.go | 14 ++- banyand/liaison/grpc/discovery.go | 18 ++-- banyand/liaison/grpc/registry_test.go | 4 +- banyand/liaison/grpc/server.go | 66 ++++++++++--- banyand/liaison/http/server.go | 56 +++++++---- banyand/liaison/liaison.go | 33 ------- banyand/measure/measure_suite_test.go | 4 +- banyand/measure/metadata.go | 12 +-- banyand/measure/service.go | 32 ++++--- banyand/metadata/allocator.go | 11 +-- banyand/metadata/client.go | 30 +++++- banyand/metadata/embeddedetcd/server.go | 1 + banyand/metadata/metadata_test.go | 7 +- banyand/metadata/schema/checker.go | 3 +- banyand/metadata/schema/etcd.go | 59 ++++++++++++ banyand/metadata/schema/node.go | 28 ++++-- banyand/metadata/schema/register_test.go | 82 ++++++++++++++++ banyand/metadata/schema/schema.go | 32 +------ banyand/metadata/server.go | 9 +- banyand/query/processor.go | 7 +- banyand/stream/service.go | 17 ++-- banyand/stream/stream_suite_test.go | 4 +- docs/README.md | 4 +- docs/api-reference.md | 96 +++---------------- docs/concept/clustering.md | 6 +- docs/installation.md | 19 +++- docs/installation/binaries.md | 68 +++++++++++++ docs/installation/cluster.md | 45 +++++++++ docs/installation/standalone.md | 21 ++++ docs/menu.yml | 8 +- pkg/host/host.go | 66 +++++++++++++ pkg/run/run.go | 85 +++++++++++----- pkg/schema/metadata.go | 24 ++--- pkg/signal/handler.go | 3 +- pkg/test/matcher.go | 82 ---------------- pkg/test/measure/etcd.go | 12 +-- pkg/test/setup.go | 4 +- pkg/test/setup/setup.go | 14 +-- pkg/test/stream/etcd.go | 10 +- test/stress/cases/istio/repo.go | 2 +- 48 files changed, 868 insertions(+), 587 deletions(-) delete mode 100644 api/proto/banyandb/database/v1/event.proto delete mode 100644 banyand/internal/cmd/meta.go delete mode 100644 banyand/liaison/liaison.go create mode 100644 banyand/metadata/schema/register_test.go create mode 100644 docs/installation/binaries.md create mode 100644 docs/installation/cluster.md create mode 100644 docs/installation/standalone.md create mode 100644 pkg/host/host.go delete mode 100644 pkg/test/matcher.go diff --git a/CHANGES.md b/CHANGES.md index e537ba38d..20d2bee10 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,6 +11,7 @@ Release Notes. - Document the clustering. - Support multiple roles for banyand server. - Support for recovery buffer using wal. +- Register the node role to the metadata registry. ### Bugs diff --git a/api/common/id.go b/api/common/id.go index 6cc879ca8..074060cdf 100644 --- a/api/common/id.go +++ b/api/common/id.go @@ -20,8 +20,12 @@ package common import ( "context" "fmt" + "net" + "strconv" + "strings" "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/host" ) type ( @@ -110,3 +114,95 @@ func NewError(tpl string, args ...any) Error { func (e Error) Msg() string { return e.msg } + +// Node contains the node id and address. +type Node struct { + NodeID string + GrpcAddress string + HTTPAddress string +} + +var ( + // FlagNodeHost is the node id from flag. + FlagNodeHost string + // FlagNodeHostProvider is the node id provider from flag. + FlagNodeHostProvider NodeHostProvider +) + +// NodeHostProvider is the provider of node id. +type NodeHostProvider int + +// NodeIDProvider constants. +const ( + NodeHostProviderHostname NodeHostProvider = iota + NodeHostProviderIP + NodeHostProviderFlag +) + +// String returns the string representation of NodeIDProvider. +func (n *NodeHostProvider) String() string { + return [...]string{"Hostname", "IP", "Flag"}[*n] +} + +// ParseNodeHostProvider parses the string to NodeIDProvider. +func ParseNodeHostProvider(s string) (NodeHostProvider, error) { + switch strings.ToLower(s) { + case "hostname": + return NodeHostProviderHostname, nil + case "ip": + return NodeHostProviderIP, nil + case "flag": + return NodeHostProviderFlag, nil + default: + return 0, fmt.Errorf("unknown node id provider %s", s) + } +} + +// GenerateNode generates a node id. +func GenerateNode(grpcPort, httpPort *uint32) (Node, error) { + port := grpcPort + if port == nil { + port = httpPort + } + if port == nil { + return Node{}, fmt.Errorf("no port found") + } + node := Node{} + var nodeHost string + switch FlagNodeHostProvider { + case NodeHostProviderHostname: + h, err := host.Name() + if err != nil { + return Node{}, err + } + nodeHost = h + case NodeHostProviderIP: + ip, err := host.IP() + if err != nil { + return Node{}, err + } + nodeHost = ip + case NodeHostProviderFlag: + nodeHost = FlagNodeHost + default: + return Node{}, fmt.Errorf("unknown node id provider %d", FlagNodeHostProvider) + } + node.NodeID = net.JoinHostPort(nodeHost, strconv.FormatUint(uint64(*port), 10)) + if grpcPort != nil { + node.GrpcAddress = net.JoinHostPort(nodeHost, strconv.FormatUint(uint64(*grpcPort), 10)) + } + if httpPort != nil { + node.HTTPAddress = net.JoinHostPort(nodeHost, strconv.FormatUint(uint64(*httpPort), 10)) + } + return node, nil +} + +// ContextNodeKey is a context key to store the node id. +var ContextNodeKey = contextNodeKey{} + +type contextNodeKey struct{} + +// ContextNodeRolesKey is a context key to store the node roles. +var ContextNodeRolesKey = contextNodeRolesKey{} + +type contextNodeRolesKey struct{} diff --git a/api/proto/banyandb/database/v1/database.proto b/api/proto/banyandb/database/v1/database.proto index c530b60be..0e758d829 100644 --- a/api/proto/banyandb/database/v1/database.proto +++ b/api/proto/banyandb/database/v1/database.proto @@ -25,11 +25,19 @@ import "google/protobuf/timestamp.proto"; option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"; option java_package = "org.apache.skywalking.banyandb.database.v1"; +enum Role { + ROLE_UNSPECIFIED = 0; + ROLE_META = 1; + ROLE_DATA = 2; + ROLE_QUERY = 3; + ROLE_LIAISON = 4; +} + message Node { - string id = 1; - common.v1.Metadata metadata = 2; - string addr = 3; - google.protobuf.Timestamp updated_at = 4; + string name = 1; + repeated Role roles = 2; + string grpc_address = 3; + string http_address = 4; google.protobuf.Timestamp created_at = 5; } @@ -37,7 +45,7 @@ message Shard { uint64 id = 1; common.v1.Metadata metadata = 2; common.v1.Catalog catalog = 3; - Node node = 4; + string node = 4; uint32 total = 5; google.protobuf.Timestamp updated_at = 6; google.protobuf.Timestamp created_at = 7; diff --git a/api/proto/banyandb/database/v1/event.proto b/api/proto/banyandb/database/v1/event.proto deleted file mode 100644 index 19f98f68f..000000000 --- a/api/proto/banyandb/database/v1/event.proto +++ /dev/null @@ -1,50 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -syntax = "proto3"; - -package banyandb.database.v1; - -import "banyandb/common/v1/common.proto"; -import "banyandb/database/v1/database.proto"; -import "google/protobuf/timestamp.proto"; - -option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"; -option java_package = "org.apache.skywalking.banyandb.database.v1"; - -enum Action { - ACTION_UNSPECIFIED = 0; - ACTION_PUT = 1; - ACTION_DELETE = 2; -} - -message ShardEvent { - Shard shard = 1; - Action action = 2; - google.protobuf.Timestamp time = 3; -} - -message EntityEvent { - common.v1.Metadata subject = 1; - message TagLocator { - uint32 family_offset = 1; - uint32 tag_offset = 2; - } - repeated TagLocator entity_locator = 2; - Action action = 3; - google.protobuf.Timestamp time = 4; -} diff --git a/banyand/internal/cmd/liaison.go b/banyand/internal/cmd/liaison.go index 97dff1e7e..d8847c7ca 100644 --- a/banyand/internal/cmd/liaison.go +++ b/banyand/internal/cmd/liaison.go @@ -19,17 +19,16 @@ package cmd import ( "context" - "fmt" "os" "github.com/spf13/cobra" - "github.com/apache/skywalking-banyandb/banyand/liaison" + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/liaison/grpc" "github.com/apache/skywalking-banyandb/banyand/liaison/http" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/queue" - "github.com/apache/skywalking-banyandb/pkg/config" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/signal" @@ -50,18 +49,15 @@ func newLiaisonCmd() *cobra.Command { if err != nil { l.Fatal().Err(err).Msg("failed to initiate metadata service") } - tcp, err := liaison.NewEndpoint(ctx, pipeline, metaSvc) - if err != nil { - l.Fatal().Err(err).Msg("failed to initiate Endpoint transport layer") - } + grpcServer := grpc.NewServer(ctx, pipeline, metaSvc) profSvc := observability.NewProfService() metricSvc := observability.NewMetricService() - httpServer := http.NewService() + httpServer := http.NewServer() units := []run.Unit{ new(signal.Handler), pipeline, - tcp, + grpcServer, httpServer, profSvc, } @@ -70,33 +66,24 @@ func newLiaisonCmd() *cobra.Command { } // Meta the run Group units. liaisonGroup.Register(units...) - logging := logger.Logging{} liaisonCmd := &cobra.Command{ Use: "liaison", Version: version.Build(), Short: "Run as the liaison server", - PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) { - if err = config.Load("logging", cmd.Flags()); err != nil { + RunE: func(cmd *cobra.Command, args []string) (err error) { + node, err := common.GenerateNode(grpcServer.GetPort(), httpServer.GetPort()) + if err != nil { return err } - return logger.Init(logging) - }, - RunE: func(cmd *cobra.Command, args []string) (err error) { - fmt.Print(logo) logger.GetLogger().Info().Msg("starting as a liaison server") // Spawn our go routines and wait for shutdown. - if err := liaisonGroup.Run(); err != nil { + if err := liaisonGroup.Run(context.WithValue(context.Background(), common.ContextNodeKey, node)); err != nil { logger.GetLogger().Error().Err(err).Stack().Str("name", liaisonGroup.Name()).Msg("Exit") os.Exit(-1) } return nil }, } - - liaisonCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the logging") - liaisonCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") - liaisonCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") - liaisonCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") liaisonCmd.Flags().AddFlagSet(liaisonGroup.RegisterFlags().FlagSet) return liaisonCmd } diff --git a/banyand/internal/cmd/meta.go b/banyand/internal/cmd/meta.go deleted file mode 100644 index df7530d26..000000000 --- a/banyand/internal/cmd/meta.go +++ /dev/null @@ -1,87 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package cmd - -import ( - "context" - "fmt" - "os" - - "github.com/spf13/cobra" - - "github.com/apache/skywalking-banyandb/banyand/metadata" - "github.com/apache/skywalking-banyandb/banyand/observability" - "github.com/apache/skywalking-banyandb/pkg/config" - "github.com/apache/skywalking-banyandb/pkg/logger" - "github.com/apache/skywalking-banyandb/pkg/run" - "github.com/apache/skywalking-banyandb/pkg/signal" - "github.com/apache/skywalking-banyandb/pkg/version" -) - -var metaGroup = run.NewGroup("meta") - -func newMetaCmd() *cobra.Command { - l := logger.GetLogger("bootstrap") - ctx := context.Background() - metaSvc, err := metadata.NewService(ctx) - if err != nil { - l.Fatal().Err(err).Msg("failed to initiate metadata service") - } - profSvc := observability.NewProfService() - metricSvc := observability.NewMetricService() - - units := []run.Unit{ - new(signal.Handler), - metaSvc, - profSvc, - } - if metricSvc != nil { - units = append(units, metricSvc) - } - // Meta the run Group units. - metaGroup.Register(units...) - logging := logger.Logging{} - metaCmd := &cobra.Command{ - Use: "meta", - Version: version.Build(), - Short: "Run as the meta server", - PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) { - if err = config.Load("logging", cmd.Flags()); err != nil { - return err - } - return logger.Init(logging) - }, - RunE: func(cmd *cobra.Command, args []string) (err error) { - fmt.Print(logo) - logger.GetLogger().Info().Msg("starting as a meta server") - // Spawn our go routines and wait for shutdown. - if err := metaGroup.Run(); err != nil { - logger.GetLogger().Error().Err(err).Stack().Str("name", metaGroup.Name()).Msg("Exit") - os.Exit(-1) - } - return nil - }, - } - - metaCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the logging") - metaCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") - metaCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") - metaCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") - metaCmd.Flags().AddFlagSet(metaGroup.RegisterFlags().FlagSet) - return metaCmd -} diff --git a/banyand/internal/cmd/root.go b/banyand/internal/cmd/root.go index da67766da..b1535a2b8 100644 --- a/banyand/internal/cmd/root.go +++ b/banyand/internal/cmd/root.go @@ -19,8 +19,13 @@ package cmd import ( + "fmt" + "github.com/spf13/cobra" + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/config" + "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/version" ) @@ -35,6 +40,7 @@ const logo = ` // NewRoot returns a root command. func NewRoot() *cobra.Command { + logging := logger.Logging{} cmd := &cobra.Command{ DisableAutoGenTag: true, Version: version.Build(), @@ -42,10 +48,44 @@ func NewRoot() *cobra.Command { Long: logo + ` BanyanDB, as an observability database, aims to ingest, analyze and store Metrics, Tracing and Logging data `, + PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) { + fmt.Print(logo) + if err = config.Load("logging", cmd.Flags()); err != nil { + return err + } + return logger.Init(logging) + }, } + cmd.PersistentFlags().Var(&nodeIDProviderValue{&common.FlagNodeHostProvider}, + "node-host-provider", "the node host provider, can be hostname, ip or flag, default is hostname") + cmd.PersistentFlags().StringVar(&common.FlagNodeHost, "node-host", "", "the node host of the server only used when node-host-provider is \"flag\"") + cmd.PersistentFlags().StringVar(&logging.Env, "logging-env", "prod", "the logging") + cmd.PersistentFlags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") + cmd.PersistentFlags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") + cmd.PersistentFlags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") cmd.AddCommand(newStandaloneCmd()) - cmd.AddCommand(newMetaCmd()) cmd.AddCommand(newStorageCmd()) cmd.AddCommand(newLiaisonCmd()) return cmd } + +type nodeIDProviderValue struct { + value *common.NodeHostProvider +} + +func (c *nodeIDProviderValue) Set(s string) error { + v, err := common.ParseNodeHostProvider(s) + if err != nil { + return err + } + *c.value = v + return nil +} + +func (c *nodeIDProviderValue) String() string { + return c.value.String() +} + +func (c *nodeIDProviderValue) Type() string { + return "nodeIDProvider" +} diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go index 9d6eccaa9..f45a68d3d 100644 --- a/banyand/internal/cmd/standalone.go +++ b/banyand/internal/cmd/standalone.go @@ -19,12 +19,12 @@ package cmd import ( "context" - "fmt" "os" "github.com/spf13/cobra" - "github.com/apache/skywalking-banyandb/banyand/liaison" + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/liaison/grpc" "github.com/apache/skywalking-banyandb/banyand/liaison/http" "github.com/apache/skywalking-banyandb/banyand/measure" "github.com/apache/skywalking-banyandb/banyand/metadata" @@ -32,7 +32,6 @@ import ( "github.com/apache/skywalking-banyandb/banyand/query" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/stream" - "github.com/apache/skywalking-banyandb/pkg/config" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/signal" @@ -64,13 +63,10 @@ func newStandaloneCmd() *cobra.Command { if err != nil { l.Fatal().Err(err).Msg("failed to initiate query processor") } - tcp, err := liaison.NewEndpoint(ctx, pipeline, metaSvc) - if err != nil { - l.Fatal().Err(err).Msg("failed to initiate Endpoint transport layer") - } + grpcServer := grpc.NewServer(ctx, pipeline, metaSvc) profSvc := observability.NewProfService() metricSvc := observability.NewMetricService() - httpServer := http.NewService() + httpServer := http.NewServer() units := []run.Unit{ new(signal.Handler), @@ -79,7 +75,7 @@ func newStandaloneCmd() *cobra.Command { measureSvc, streamSvc, q, - tcp, + grpcServer, httpServer, profSvc, } @@ -88,33 +84,25 @@ func newStandaloneCmd() *cobra.Command { } // Meta the run Group units. standaloneGroup.Register(units...) - logging := logger.Logging{} + standaloneCmd := &cobra.Command{ Use: "standalone", Version: version.Build(), Short: "Run as the standalone server", - PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) { - if err = config.Load("logging", cmd.Flags()); err != nil { + RunE: func(cmd *cobra.Command, args []string) (err error) { + nodeID, err := common.GenerateNode(grpcServer.GetPort(), httpServer.GetPort()) + if err != nil { return err } - return logger.Init(logging) - }, - RunE: func(cmd *cobra.Command, args []string) (err error) { - fmt.Print(logo) logger.GetLogger().Info().Msg("starting as a standalone server") // Spawn our go routines and wait for shutdown. - if err := standaloneGroup.Run(); err != nil { + if err := standaloneGroup.Run(context.WithValue(context.Background(), common.ContextNodeKey, nodeID)); err != nil { logger.GetLogger().Error().Err(err).Stack().Str("name", standaloneGroup.Name()).Msg("Exit") os.Exit(-1) } return nil }, } - - standaloneCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the logging") - standaloneCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") - standaloneCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") - standaloneCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") standaloneCmd.Flags().AddFlagSet(standaloneGroup.RegisterFlags().FlagSet) return standaloneCmd } diff --git a/banyand/internal/cmd/storage.go b/banyand/internal/cmd/storage.go index 7d297fc09..a8f4e159c 100644 --- a/banyand/internal/cmd/storage.go +++ b/banyand/internal/cmd/storage.go @@ -19,11 +19,11 @@ package cmd import ( "context" - "fmt" "os" "github.com/spf13/cobra" + "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/measure" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/observability" @@ -114,21 +114,19 @@ func newStorageCmd() *cobra.Command { } }, RunE: func(cmd *cobra.Command, args []string) (err error) { - fmt.Print(logo) + node, err := common.GenerateNode(nil, nil) + if err != nil { + return err + } logger.GetLogger().Info().Msg("starting as a storage server") // Spawn our go routines and wait for shutdown. - if err := storageGroup.Run(); err != nil { + if err := storageGroup.Run(context.WithValue(context.Background(), common.ContextNodeKey, node)); err != nil { logger.GetLogger().Error().Err(err).Stack().Str("name", storageGroup.Name()).Msg("Exit") os.Exit(-1) } return nil }, } - - storageCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the logging") - storageCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") - storageCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") - storageCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") storageCmd.Flags().StringVarP(&flagStorageMode, "mode", "m", storageModeMix, "the storage mode, one of [data, query, mix]") storageCmd.Flags().AddFlagSet(storageGroup.RegisterFlags().FlagSet) return storageCmd diff --git a/banyand/liaison/grpc/discovery.go b/banyand/liaison/grpc/discovery.go index a2a38841a..606d93fb1 100644 --- a/banyand/liaison/grpc/discovery.go +++ b/banyand/liaison/grpc/discovery.go @@ -60,9 +60,9 @@ func newDiscoveryService(pipeline queue.Queue, kind schema.Kind, metadataRepo me } } -func (ds *discoveryService) initialize() error { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - groups, err := ds.metadataRepo.GroupRegistry().ListGroup(ctx) +func (ds *discoveryService) initialize(ctx context.Context) error { + ctxLocal, cancel := context.WithTimeout(ctx, 5*time.Second) + groups, err := ds.metadataRepo.GroupRegistry().ListGroup(ctxLocal) cancel() if err != nil { return err @@ -74,8 +74,8 @@ func (ds *discoveryService) initialize() error { default: continue } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - shards, innerErr := ds.metadataRepo.ShardRegistry().ListShard(ctx, schema.ListOpt{Group: g.Metadata.Name}) + ctxLocal, cancel := context.WithTimeout(ctx, 5*time.Second) + shards, innerErr := ds.metadataRepo.ShardRegistry().ListShard(ctxLocal, schema.ListOpt{Group: g.Metadata.Name}) cancel() if innerErr != nil { return innerErr @@ -93,8 +93,8 @@ func (ds *discoveryService) initialize() error { switch ds.kind { case schema.KindMeasure: - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) - mm, innerErr := ds.metadataRepo.MeasureRegistry().ListMeasure(ctx, schema.ListOpt{Group: g.Metadata.Name}) + ctxLocal, cancel = context.WithTimeout(ctx, 5*time.Second) + mm, innerErr := ds.metadataRepo.MeasureRegistry().ListMeasure(ctxLocal, schema.ListOpt{Group: g.Metadata.Name}) cancel() if innerErr != nil { return innerErr @@ -110,8 +110,8 @@ func (ds *discoveryService) initialize() error { }) } case schema.KindStream: - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) - ss, innerErr := ds.metadataRepo.StreamRegistry().ListStream(ctx, schema.ListOpt{Group: g.Metadata.Name}) + ctxLocal, cancel = context.WithTimeout(ctx, 5*time.Second) + ss, innerErr := ds.metadataRepo.StreamRegistry().ListStream(ctxLocal, schema.ListOpt{Group: g.Metadata.Name}) cancel() if innerErr != nil { return innerErr diff --git a/banyand/liaison/grpc/registry_test.go b/banyand/liaison/grpc/registry_test.go index 4b12a1ac2..213f52426 100644 --- a/banyand/liaison/grpc/registry_test.go +++ b/banyand/liaison/grpc/registry_test.go @@ -212,6 +212,6 @@ func (p *preloadStreamService) Name() string { return "preload-stream" } -func (p *preloadStreamService) PreRun() error { - return teststream.PreloadSchema(p.metaSvc.SchemaRegistry()) +func (p *preloadStreamService) PreRun(ctx context.Context) error { + return teststream.PreloadSchema(ctx, p.metaSvc.SchemaRegistry()) } diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index 2bb906647..eae908f94 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -22,6 +22,7 @@ import ( "context" "net" "runtime/debug" + "strconv" "time" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" @@ -56,33 +57,58 @@ var ( errAccessLogRootPath = errors.New("access log root path is required") ) +// // Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Server defines the gRPC server. +type Server interface { + run.Unit + GetPort() *uint32 +} + type server struct { pipeline queue.Queue creds credentials.TransportCredentials - *indexRuleRegistryServer - measureSVC *measureService - log *logger.Logger - ser *grpclib.Server + *streamRegistryServer + log *logger.Logger + *indexRuleBindingRegistryServer + ser *grpclib.Server *propertyServer *topNAggregationRegistryServer *groupRegistryServer - stopCh chan struct{} - streamSVC *streamService + stopCh chan struct{} + *indexRuleRegistryServer *measureRegistryServer - *streamRegistryServer - *indexRuleBindingRegistryServer - addr string + streamSVC *streamService + measureSVC *measureService + host string keyFile string certFile string accessLogRootPath string + addr string accessLogRecorders []accessLogRecorder maxRecvMsgSize run.Bytes - tls bool + port uint32 enableIngestionAccessLog bool + tls bool } // NewServer returns a new gRPC server. -func NewServer(_ context.Context, pipeline queue.Queue, schemaRegistry metadata.Repo) run.Unit { +func NewServer(_ context.Context, pipeline queue.Queue, schemaRegistry metadata.Repo) Server { streamSVC := &streamService{ discoveryService: newDiscoveryService(pipeline, schema.KindStream, schemaRegistry), } @@ -119,7 +145,7 @@ func NewServer(_ context.Context, pipeline queue.Queue, schemaRegistry metadata. return s } -func (s *server) PreRun() error { +func (s *server) PreRun(ctx context.Context) error { s.log = logger.GetLogger("liaison-grpc") s.streamSVC.setLogger(s.log) s.measureSVC.setLogger(s.log) @@ -129,7 +155,7 @@ func (s *server) PreRun() error { } for _, c := range components { c.SetLogger(s.log) - if err := c.initialize(); err != nil { + if err := c.initialize(ctx); err != nil { return err } } @@ -148,6 +174,14 @@ func (s *server) Name() string { return "grpc" } +func (s *server) Role() databasev1.Role { + return databasev1.Role_ROLE_LIAISON +} + +func (s *server) GetPort() *uint32 { + return &s.port +} + func (s *server) FlagSet() *run.FlagSet { fs := run.NewFlagSet("grpc") s.maxRecvMsgSize = defaultRecvSize @@ -155,14 +189,16 @@ func (s *server) FlagSet() *run.FlagSet { fs.BoolVar(&s.tls, "tls", false, "connection uses TLS if true, else plain TCP") fs.StringVar(&s.certFile, "cert-file", "", "the TLS cert file") fs.StringVar(&s.keyFile, "key-file", "", "the TLS key file") - fs.StringVar(&s.addr, "addr", ":17912", "the address of banyand listens") + fs.StringVar(&s.host, "grpc-host", "", "the host of banyand listens") + fs.Uint32Var(&s.port, "grpc-port", 17912, "the port of banyand listens") fs.BoolVar(&s.enableIngestionAccessLog, "enable-ingestion-access-log", false, "enable ingestion access log") fs.StringVar(&s.accessLogRootPath, "access-log-root-path", "", "access log root path") return fs } func (s *server) Validate() error { - if s.addr == "" { + s.addr = net.JoinHostPort(s.host, strconv.FormatUint(uint64(s.port), 10)) + if s.addr == ":" { return errNoAddr } if s.enableIngestionAccessLog && s.accessLogRootPath == "" { diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go index 2cbba9b85..6be5c79f1 100644 --- a/banyand/liaison/http/server.go +++ b/banyand/liaison/http/server.go @@ -22,7 +22,9 @@ import ( "context" "fmt" "io/fs" + "net" "net/http" + "strconv" "strings" "time" @@ -45,39 +47,48 @@ import ( ) var ( - _ run.Config = (*service)(nil) - _ run.Service = (*service)(nil) + _ run.Config = (*server)(nil) + _ run.Service = (*server)(nil) errServerCert = errors.New("http: invalid server cert file") errServerKey = errors.New("http: invalid server key file") errNoAddr = errors.New("http: no address") ) -// NewService return a http service. -func NewService() run.Unit { - return &service{ +// NewServer return a http service. +func NewServer() Server { + return &server{ stopCh: make(chan struct{}), } } -type service struct { - mux *chi.Mux - stopCh chan struct{} - clientCloser context.CancelFunc +// Server is the http service. +type Server interface { + run.Unit + GetPort() *uint32 +} + +type server struct { + creds credentials.TransportCredentials l *logger.Logger + clientCloser context.CancelFunc + mux *chi.Mux srv *http.Server + stopCh chan struct{} + host string listenAddr string grpcAddr string - creds credentials.TransportCredentials keyFile string certFile string grpcCert string + port uint32 tls bool } -func (p *service) FlagSet() *run.FlagSet { +func (p *server) FlagSet() *run.FlagSet { flagSet := run.NewFlagSet("http") - flagSet.StringVar(&p.listenAddr, "http-addr", ":17913", "listen addr for http") + flagSet.StringVar(&p.host, "http-host", "localhost", "listen host for http") + flagSet.Uint32Var(&p.port, "http-port", 17913, "listen port for http") flagSet.StringVar(&p.grpcAddr, "http-grpc-addr", "localhost:17912", "http server redirect grpc requests to this address") flagSet.StringVar(&p.certFile, "http-cert-file", "", "the TLS cert file of http server") flagSet.StringVar(&p.keyFile, "http-key-file", "", "the TLS key file of http server") @@ -86,8 +97,9 @@ func (p *service) FlagSet() *run.FlagSet { return flagSet } -func (p *service) Validate() error { - if p.listenAddr == "" { +func (p *server) Validate() error { + p.listenAddr = net.JoinHostPort(p.host, strconv.FormatUint(uint64(p.port), 10)) + if p.listenAddr == ":" { return errNoAddr } observability.UpdateAddress("http", p.listenAddr) @@ -110,11 +122,19 @@ func (p *service) Validate() error { return nil } -func (p *service) Name() string { +func (p *server) Name() string { return "liaison-http" } -func (p *service) PreRun() error { +func (p *server) Role() databasev1.Role { + return databasev1.Role_ROLE_LIAISON +} + +func (p *server) GetPort() *uint32 { + return &p.port +} + +func (p *server) PreRun(_ context.Context) error { p.l = logger.GetLogger(p.Name()) p.mux = chi.NewRouter() @@ -134,7 +154,7 @@ func (p *service) PreRun() error { return nil } -func (p *service) Serve() run.StopNotify { +func (p *server) Serve() run.StopNotify { var ctx context.Context ctx, p.clientCloser = context.WithCancel(context.Background()) opts := make([]grpc.DialOption, 0, 1) @@ -183,7 +203,7 @@ func (p *service) Serve() run.StopNotify { return p.stopCh } -func (p *service) GracefulStop() { +func (p *server) GracefulStop() { if err := p.srv.Close(); err != nil { p.l.Error().Err(err) } diff --git a/banyand/liaison/liaison.go b/banyand/liaison/liaison.go deleted file mode 100644 index a1b657cfb..000000000 --- a/banyand/liaison/liaison.go +++ /dev/null @@ -1,33 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// Package liaison implements a transmission layer between a data layer and a client. -package liaison - -import ( - "context" - - "github.com/apache/skywalking-banyandb/banyand/liaison/grpc" - "github.com/apache/skywalking-banyandb/banyand/metadata" - "github.com/apache/skywalking-banyandb/banyand/queue" - "github.com/apache/skywalking-banyandb/pkg/run" -) - -// NewEndpoint return a new endpoint which is the entry point for the database server. -func NewEndpoint(ctx context.Context, pipeline queue.Queue, schemaRegistry metadata.Repo) (run.Unit, error) { - return grpc.NewServer(ctx, pipeline, schemaRegistry), nil -} diff --git a/banyand/measure/measure_suite_test.go b/banyand/measure/measure_suite_test.go index a1c829bbd..25e31fd97 100644 --- a/banyand/measure/measure_suite_test.go +++ b/banyand/measure/measure_suite_test.go @@ -54,8 +54,8 @@ func (p *preloadMeasureService) Name() string { return "preload-measure" } -func (p *preloadMeasureService) PreRun() error { - return testmeasure.PreloadSchema(p.metaSvc.SchemaRegistry()) +func (p *preloadMeasureService) PreRun(ctx context.Context) error { + return testmeasure.PreloadSchema(ctx, p.metaSvc.SchemaRegistry()) } type services struct { diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index a86bfcc49..21d44fc0d 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -119,7 +119,7 @@ func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) { } case schema.KindTopNAggregation: // createOrUpdate TopN schemas in advance - _, err := createOrUpdateTopNMeasure(sr.metadata.MeasureRegistry(), metadata.Spec.(*databasev1.TopNAggregation)) + _, err := createOrUpdateTopNMeasure(context.Background(), sr.metadata.MeasureRegistry(), metadata.Spec.(*databasev1.TopNAggregation)) if err != nil { sr.l.Error().Err(err).Msg("fail to create/update topN measure") return @@ -134,13 +134,13 @@ func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) { } } -func createOrUpdateTopNMeasure(measureSchemaRegistry schema.Measure, topNSchema *databasev1.TopNAggregation) (*databasev1.Measure, error) { - oldTopNSchema, err := measureSchemaRegistry.GetMeasure(context.TODO(), topNSchema.GetMetadata()) +func createOrUpdateTopNMeasure(ctx context.Context, measureSchemaRegistry schema.Measure, topNSchema *databasev1.TopNAggregation) (*databasev1.Measure, error) { + oldTopNSchema, err := measureSchemaRegistry.GetMeasure(ctx, topNSchema.GetMetadata()) if err != nil && !errors.Is(err, schema.ErrGRPCResourceNotFound) { return nil, err } - sourceMeasureSchema, err := measureSchemaRegistry.GetMeasure(context.Background(), topNSchema.GetSourceMeasure()) + sourceMeasureSchema, err := measureSchemaRegistry.GetMeasure(ctx, topNSchema.GetSourceMeasure()) if err != nil { return nil, err } @@ -184,7 +184,7 @@ func createOrUpdateTopNMeasure(measureSchemaRegistry schema.Measure, topNSchema Fields: []*databasev1.FieldSpec{TopNValueFieldSpec}, } if oldTopNSchema == nil { - if innerErr := measureSchemaRegistry.CreateMeasure(context.Background(), newTopNMeasure); innerErr != nil { + if innerErr := measureSchemaRegistry.CreateMeasure(ctx, newTopNMeasure); innerErr != nil { return nil, innerErr } return newTopNMeasure, nil @@ -198,7 +198,7 @@ func createOrUpdateTopNMeasure(measureSchemaRegistry schema.Measure, topNSchema return oldTopNSchema, nil } // update - if err = measureSchemaRegistry.UpdateMeasure(context.Background(), newTopNMeasure); err != nil { + if err = measureSchemaRegistry.UpdateMeasure(ctx, newTopNMeasure); err != nil { return nil, err } return newTopNMeasure, nil diff --git a/banyand/measure/service.go b/banyand/measure/service.go index 6e46f841f..24689347e 100644 --- a/banyand/measure/service.go +++ b/banyand/measure/service.go @@ -103,11 +103,15 @@ func (s *service) Name() string { return "measure" } -func (s *service) PreRun() error { +func (s *service) Role() databasev1.Role { + return databasev1.Role_ROLE_DATA +} + +func (s *service) PreRun(ctx context.Context) error { s.l = logger.GetLogger(s.Name()) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - groups, err := s.metadata.GroupRegistry().ListGroup(ctx) - cancel() + ctxGroup, cancelGroup := context.WithTimeout(ctx, 5*time.Second) + groups, err := s.metadata.GroupRegistry().ListGroup(ctxGroup) + cancelGroup() if err != nil { return err } @@ -123,20 +127,20 @@ func (s *service) PreRun() error { if innerErr != nil { return innerErr } - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + ctxMeasure, cancelMeasure := context.WithTimeout(ctx, 5*time.Second) allMeasureSchemas, innerErr := s.metadata.MeasureRegistry(). - ListMeasure(ctx, schema.ListOpt{Group: gp.GetSchema().GetMetadata().GetName()}) - cancel() + ListMeasure(ctxMeasure, schema.ListOpt{Group: gp.GetSchema().GetMetadata().GetName()}) + cancelMeasure() if innerErr != nil { return innerErr } for _, measureSchema := range allMeasureSchemas { // sanity check before calling StoreResource // since StoreResource may be called inside the event loop - if checkErr := s.sanityCheck(gp, measureSchema); checkErr != nil { + if checkErr := s.sanityCheck(ctx, gp, measureSchema); checkErr != nil { return checkErr } - if _, innerErr := gp.StoreResource(measureSchema); innerErr != nil { + if _, innerErr := gp.StoreResource(ctx, measureSchema); innerErr != nil { return innerErr } } @@ -155,20 +159,20 @@ func (s *service) PreRun() error { return nil } -func (s *service) sanityCheck(group resourceSchema.Group, measureSchema *databasev1.Measure) error { +func (s *service) sanityCheck(ctx context.Context, group resourceSchema.Group, measureSchema *databasev1.Measure) error { var topNAggrs []*databasev1.TopNAggregation - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctxLocal, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - topNAggrs, err := s.metadata.MeasureRegistry().TopNAggregations(ctx, measureSchema.GetMetadata()) + topNAggrs, err := s.metadata.MeasureRegistry().TopNAggregations(ctxLocal, measureSchema.GetMetadata()) if err != nil || len(topNAggrs) == 0 { return err } for _, topNAggr := range topNAggrs { - topNMeasure, innerErr := createOrUpdateTopNMeasure(s.metadata.MeasureRegistry(), topNAggr) + topNMeasure, innerErr := createOrUpdateTopNMeasure(ctx, s.metadata.MeasureRegistry(), topNAggr) err = multierr.Append(err, innerErr) if topNMeasure != nil { - _, storeErr := group.StoreResource(topNMeasure) + _, storeErr := group.StoreResource(ctx, topNMeasure) if storeErr != nil { err = multierr.Append(err, storeErr) } diff --git a/banyand/metadata/allocator.go b/banyand/metadata/allocator.go index acf52dbfa..0607aacc8 100644 --- a/banyand/metadata/allocator.go +++ b/banyand/metadata/allocator.go @@ -21,8 +21,6 @@ import ( "context" "time" - "google.golang.org/protobuf/types/known/timestamppb" - commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" @@ -51,8 +49,6 @@ func (a *allocator) OnAddOrUpdate(metadata schema.Metadata) { if groupSchema.Catalog == commonv1.Catalog_CATALOG_UNSPECIFIED { return } - now := time.Now() - nowPb := timestamppb.New(now) shardNum := groupSchema.GetResourceOpts().GetShardNum() syncShard := func(id uint64) error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -63,12 +59,7 @@ func (a *allocator) OnAddOrUpdate(metadata schema.Metadata) { Metadata: &commonv1.Metadata{ Name: groupSchema.GetMetadata().GetName(), }, - Node: &databasev1.Node{ - Id: "TODO", - CreatedAt: nowPb, - UpdatedAt: nowPb, - Addr: "TODO", - }, + Node: "TODO", }) } for i := 0; i < int(shardNum); i++ { diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go index 0fc967c77..73768a1fb 100644 --- a/banyand/metadata/client.go +++ b/banyand/metadata/client.go @@ -23,7 +23,9 @@ import ( "time" "go.uber.org/multierr" + "google.golang.org/protobuf/types/known/timestamppb" + "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" @@ -62,13 +64,33 @@ func (s *clientService) Validate() error { return nil } -func (s *clientService) PreRun() error { +func (s *clientService) PreRun(ctx context.Context) error { var err error s.schemaRegistry, err = schema.NewEtcdSchemaRegistry(schema.ConfigureServerEndpoints(s.endpoints)) if err != nil { return err } - + val := ctx.Value(common.ContextNodeKey) + if val == nil { + return errors.New("node id is empty") + } + node := val.(common.Node) + val = ctx.Value(common.ContextNodeRolesKey) + if val == nil { + return errors.New("node roles is empty") + } + nodeRoles := val.([]databasev1.Role) + ctxRegister, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() + if err = s.schemaRegistry.RegisterNode(ctxRegister, &databasev1.Node{ + Name: node.NodeID, + GrpcAddress: node.GrpcAddress, + HttpAddress: node.HTTPAddress, + Roles: nodeRoles, + CreatedAt: timestamppb.Now(), + }); err != nil { + return err + } s.alc = newAllocator(s.schemaRegistry, logger.GetLogger(s.Name()).Named("allocator")) s.schemaRegistry.RegisterHandler(schema.KindGroup|schema.KindNode, s.alc) return nil @@ -124,6 +146,10 @@ func (s *clientService) Name() string { return "metadata" } +func (s *clientService) Role() databasev1.Role { + return databasev1.Role_ROLE_META +} + func (s *clientService) IndexRules(ctx context.Context, subject *commonv1.Metadata) ([]*databasev1.IndexRule, error) { bindings, err := s.schemaRegistry.ListIndexRuleBinding(ctx, schema.ListOpt{Group: subject.Group}) if err != nil { diff --git a/banyand/metadata/embeddedetcd/server.go b/banyand/metadata/embeddedetcd/server.go index d6ac0cb5d..dfd87b73e 100644 --- a/banyand/metadata/embeddedetcd/server.go +++ b/banyand/metadata/embeddedetcd/server.go @@ -85,6 +85,7 @@ func (e *server) StoppingNotify() <-chan struct{} { func (e *server) Close() error { e.server.Close() + <-e.server.Server.StopNotify() return nil } diff --git a/banyand/metadata/metadata_test.go b/banyand/metadata/metadata_test.go index 56eac46ce..8c60f2ff6 100644 --- a/banyand/metadata/metadata_test.go +++ b/banyand/metadata/metadata_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" + "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -48,14 +49,16 @@ func Test_service_RulesBySubject(t *testing.T) { err = s.FlagSet().Parse([]string{"--metadata-root-path=" + rootDir}) is.NoError(err) is.NoError(s.Validate()) - err = s.PreRun() + ctx = context.WithValue(ctx, common.ContextNodeKey, common.Node{NodeID: "test"}) + ctx = context.WithValue(ctx, common.ContextNodeRolesKey, []databasev1.Role{databasev1.Role_ROLE_META}) + err = s.PreRun(ctx) is.NoError(err) defer func() { s.GracefulStop() deferFn() }() - err = test.PreloadSchema(s.SchemaRegistry()) + err = test.PreloadSchema(ctx, s.SchemaRegistry()) is.NoError(err) tests := []struct { diff --git a/banyand/metadata/schema/checker.go b/banyand/metadata/schema/checker.go index 3d669cbc4..cdedc54b4 100644 --- a/banyand/metadata/schema/checker.go +++ b/banyand/metadata/schema/checker.go @@ -85,8 +85,7 @@ var checkerMap = map[Kind]equalityChecker{ KindNode: func(a, b proto.Message) bool { return cmp.Equal(a, b, protocmp.IgnoreUnknown(), - protocmp.IgnoreFields(&databasev1.Node{}, "updated_at"), - protocmp.IgnoreFields(&commonv1.Metadata{}, "id", "create_revision", "mod_revision"), + protocmp.IgnoreFields(&databasev1.Node{}, "created_at"), protocmp.Transform()) }, KindShard: func(a, b proto.Message) bool { diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go index 2c147f30d..6cbb459c7 100644 --- a/banyand/metadata/schema/etcd.go +++ b/banyand/metadata/schema/etcd.go @@ -345,6 +345,65 @@ func (e *etcdSchemaRegistry) delete(ctx context.Context, metadata Metadata) (boo return false, nil } +func (e *etcdSchemaRegistry) register(ctx context.Context, metadata Metadata) error { + if !e.closer.AddRunning() { + return ErrClosed + } + defer e.closer.Done() + key, err := metadata.key() + if err != nil { + return err + } + val, err := proto.Marshal(metadata.Spec.(proto.Message)) + if err != nil { + return err + } + // Create a lease with a short TTL + lease, err := e.client.Grant(ctx, 5) // 5 seconds + if err != nil { + return err + } + var ops []clientv3.Cmp + ops = append(ops, clientv3.Compare(clientv3.CreateRevision(key), "=", 0)) + txn := e.client.Txn(ctx).If(ops...) + txn = txn.Then(clientv3.OpPut(key, string(val), clientv3.WithLease(lease.ID))) + txn = txn.Else(clientv3.OpGet(key)) + response, err := txn.Commit() + if err != nil { + return err + } + if !response.Succeeded { + return errGRPCAlreadyExists + } + // Keep the lease alive + // nolint:contextcheck + keepAliveChan, err := e.client.KeepAlive(context.Background(), lease.ID) + if err != nil { + return err + } + go func() { + if !e.closer.AddRunning() { + return + } + defer func() { + _, _ = e.client.Lease.Revoke(context.Background(), lease.ID) + e.closer.Done() + }() + for { + select { + case <-e.closer.CloseNotify(): + return + case keepAliveResp := <-keepAliveChan: + if keepAliveResp == nil { + // The channel has been closed + return + } + } + } + }() + return nil +} + func formatKey(entityPrefix string, metadata *commonv1.Metadata) string { return groupsKeyPrefix + metadata.GetGroup() + entityPrefix + metadata.GetName() } diff --git a/banyand/metadata/schema/node.go b/banyand/metadata/schema/node.go index 0c6dcdb16..d6d1fd4f8 100644 --- a/banyand/metadata/schema/node.go +++ b/banyand/metadata/schema/node.go @@ -27,11 +27,11 @@ import ( var nodeKeyPrefix = "/nodes/" -func (e *etcdSchemaRegistry) ListNode(ctx context.Context, role Role) ([]*databasev1.Node, error) { - if role == "" { +func (e *etcdSchemaRegistry) ListNode(ctx context.Context, role databasev1.Role) ([]*databasev1.Node, error) { + if role == databasev1.Role_ROLE_UNSPECIFIED { return nil, BadRequest("group", "group should not be empty") } - messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(string(role), nodeKeyPrefix), func() proto.Message { + messages, err := e.listWithPrefix(ctx, nodeKeyPrefix, func() proto.Message { return &databasev1.Node{} }) if err != nil { @@ -39,15 +39,27 @@ func (e *etcdSchemaRegistry) ListNode(ctx context.Context, role Role) ([]*databa } entities := make([]*databasev1.Node, 0, len(messages)) for _, message := range messages { - entities = append(entities, message.(*databasev1.Node)) + node := message.(*databasev1.Node) + for _, r := range node.Roles { + if r == role { + entities = append(entities, node) + break + } + } } return entities, nil } -func formatNodePrefix(role Role) string { - return nodeKeyPrefix + string(role) +func (e *etcdSchemaRegistry) RegisterNode(ctx context.Context, node *databasev1.Node) error { + return e.register(ctx, Metadata{ + TypeMeta: TypeMeta{ + Kind: KindNode, + Name: node.Name, + }, + Spec: node, + }) } -func formatNodeKey(role Role, id string) string { - return formatNodePrefix(role) + "/" + id +func formatNodeKey(name string) string { + return nodeKeyPrefix + name } diff --git a/banyand/metadata/schema/register_test.go b/banyand/metadata/schema/register_test.go new file mode 100644 index 000000000..2fd2ace96 --- /dev/null +++ b/banyand/metadata/schema/register_test.go @@ -0,0 +1,82 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package schema + +import ( + "context" + "fmt" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "github.com/onsi/gomega/gleak" + + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" +) + +var _ = ginkgo.Describe("etcd_register", func() { + var endpoints []string + var goods []gleak.Goroutine + var server embeddedetcd.Server + var r *etcdSchemaRegistry + md := Metadata{ + TypeMeta: TypeMeta{ + Name: "test", + Kind: KindNode, + }, + Spec: &databasev1.Node{}, + } + ginkgo.BeforeEach(func() { + goods = gleak.Goroutines() + ports, err := test.AllocateFreePorts(2) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + endpoints = []string{fmt.Sprintf("http://127.0.0.1:%d", ports[0])} + server, err = embeddedetcd.NewServer( + embeddedetcd.ConfigureListener(endpoints, []string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}), + embeddedetcd.RootDir(randomTempDir())) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + <-server.ReadyNotify() + schemaRegistry, err := NewEtcdSchemaRegistry(ConfigureServerEndpoints(endpoints)) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + r = schemaRegistry.(*etcdSchemaRegistry) + }) + ginkgo.AfterEach(func() { + gomega.Expect(r.Close()).ShouldNot(gomega.HaveOccurred()) + server.Close() + gomega.Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + }) + + ginkgo.It("should revoke the leaser", func() { + gomega.Expect(r.register(context.Background(), md)).ShouldNot(gomega.HaveOccurred()) + k, err := md.key() + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(r.get(context.Background(), k, &databasev1.Node{})).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(r.Close()).ShouldNot(gomega.HaveOccurred()) + schemaRegistry, err := NewEtcdSchemaRegistry(ConfigureServerEndpoints(endpoints)) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + r = schemaRegistry.(*etcdSchemaRegistry) + gomega.Expect(r.get(context.Background(), k, &databasev1.Node{})).Should(gomega.MatchError(ErrGRPCResourceNotFound)) + }) + + ginkgo.It("should register only once", func() { + gomega.Expect(r.register(context.Background(), md)).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(r.register(context.Background(), md)).Should(gomega.MatchError(errGRPCAlreadyExists)) + }) +}) diff --git a/banyand/metadata/schema/schema.go b/banyand/metadata/schema/schema.go index fd97a1d6c..fed58409c 100644 --- a/banyand/metadata/schema/schema.go +++ b/banyand/metadata/schema/schema.go @@ -156,11 +156,7 @@ func (m Metadata) key() (string, error) { Name: m.Name, }), nil case KindNode: - r, err := strToRole(m.Group) - if err != nil { - return "", err - } - return formatNodeKey(r, m.Name), nil + return formatNodeKey(m.Name), nil case KindShard: return formatShardKey(&commonv1.Metadata{ Group: m.Group, @@ -247,32 +243,10 @@ type Property interface { DeleteProperty(ctx context.Context, metadata *propertyv1.Metadata, tags []string) (bool, uint32, error) } -// Role is the role of node. -type Role string - -const ( - // RoleMeta is the role of meta node. - RoleMeta = "meta" - // RoleData is the role of data node. - RoleData = "data" - // RoleQuery is the role of query node. - RoleQuery = "query" - // RoleLiaison is the role of liaison node. - RoleLiaison = "liaison" -) - -func strToRole(role string) (Role, error) { - switch role { - case RoleMeta, RoleData, RoleQuery, RoleLiaison: - return Role(role), nil - default: - return "", errors.New("invalid role") - } -} - // Node allows CRUD node schemas in a group. type Node interface { - ListNode(ctx context.Context, role Role) ([]*databasev1.Node, error) + ListNode(ctx context.Context, role databasev1.Role) ([]*databasev1.Node, error) + RegisterNode(ctx context.Context, node *databasev1.Node) error } // Shard allows CRUD shard schemas in a group. diff --git a/banyand/metadata/server.go b/banyand/metadata/server.go index ce004e3c3..a4c59359a 100644 --- a/banyand/metadata/server.go +++ b/banyand/metadata/server.go @@ -22,6 +22,7 @@ import ( "errors" "strings" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd" "github.com/apache/skywalking-banyandb/pkg/run" ) @@ -38,6 +39,10 @@ func (s *server) Name() string { return "metadata" } +func (s *server) Role() databasev1.Role { + return databasev1.Role_ROLE_META +} + func (s *server) FlagSet() *run.FlagSet { fs := run.NewFlagSet("metadata") fs.StringVar(&s.rootDir, "metadata-root-path", "/tmp", "the root path of metadata") @@ -63,14 +68,14 @@ func (s *server) Validate() error { return s.Service.Validate() } -func (s *server) PreRun() error { +func (s *server) PreRun(ctx context.Context) error { var err error s.metaServer, err = embeddedetcd.NewServer(embeddedetcd.RootDir(s.rootDir), embeddedetcd.ConfigureListener(s.listenClientURL, s.listenPeerURL)) if err != nil { return err } <-s.metaServer.ReadyNotify() - return s.Service.PreRun() + return s.Service.PreRun(ctx) } func (s *server) Serve() run.StopNotify { diff --git a/banyand/query/processor.go b/banyand/query/processor.go index 2884f8d2c..cf36474a0 100644 --- a/banyand/query/processor.go +++ b/banyand/query/processor.go @@ -25,6 +25,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/banyand/measure" @@ -180,7 +181,11 @@ func (q *queryService) Name() string { return moduleName } -func (q *queryService) PreRun() error { +func (q *queryService) Role() databasev1.Role { + return databasev1.Role_ROLE_QUERY +} + +func (q *queryService) PreRun(_ context.Context) error { q.log = logger.GetLogger(moduleName) return multierr.Combine( q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp), diff --git a/banyand/stream/service.go b/banyand/stream/service.go index 07749f5b9..f614b9e96 100644 --- a/banyand/stream/service.go +++ b/banyand/stream/service.go @@ -26,6 +26,7 @@ import ( "github.com/apache/skywalking-banyandb/api/data" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/observability" @@ -93,10 +94,14 @@ func (s *service) Name() string { return "stream" } -func (s *service) PreRun() error { +func (s *service) Role() databasev1.Role { + return databasev1.Role_ROLE_DATA +} + +func (s *service) PreRun(ctx context.Context) error { s.l = logger.GetLogger(s.Name()) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - groups, err := s.metadata.GroupRegistry().ListGroup(ctx) + ctxLocal, cancel := context.WithTimeout(ctx, 5*time.Second) + groups, err := s.metadata.GroupRegistry().ListGroup(ctxLocal) cancel() if err != nil { return err @@ -112,14 +117,14 @@ func (s *service) PreRun() error { if err != nil { return err } - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) - schemas, err := s.metadata.StreamRegistry().ListStream(ctx, schema.ListOpt{Group: gp.GetSchema().GetMetadata().Name}) + ctxLocal, cancel := context.WithTimeout(ctx, 5*time.Second) + schemas, err := s.metadata.StreamRegistry().ListStream(ctxLocal, schema.ListOpt{Group: gp.GetSchema().GetMetadata().Name}) cancel() if err != nil { return err } for _, sa := range schemas { - if _, innerErr := gp.StoreResource(sa); innerErr != nil { + if _, innerErr := gp.StoreResource(ctx, sa); innerErr != nil { return innerErr } } diff --git a/banyand/stream/stream_suite_test.go b/banyand/stream/stream_suite_test.go index fa5fa2c3c..898da965f 100644 --- a/banyand/stream/stream_suite_test.go +++ b/banyand/stream/stream_suite_test.go @@ -53,8 +53,8 @@ func (p *preloadStreamService) Name() string { return "preload-stream" } -func (p *preloadStreamService) PreRun() error { - return teststream.PreloadSchema(p.metaSvc.SchemaRegistry()) +func (p *preloadStreamService) PreRun(ctx context.Context) error { + return teststream.PreloadSchema(ctx, p.metaSvc.SchemaRegistry()) } type services struct { diff --git a/docs/README.md b/docs/README.md index 9b759f879..8185677b0 100644 --- a/docs/README.md +++ b/docs/README.md @@ -6,9 +6,9 @@ Here you can learn all you need to know about BanyanDB. - **Installation**. Instruments about how to download and onboard BanyanDB server, Banyand. - **Clients**. Some native clients to access Banyand. -- **Schema**. Pivotal database native resources. -- **CRUD Operations**. To create, read, update, and delete data points or entities on resources in the schema. - **Observability**. Learn how to effectively monitor, diagnose and optimize Banyand. +- **Concept**. Learn the concepts of Banyand. Includes the architecture, data model, and so on. +- **CRUD Operations**. To create, read, update, and delete data points or entities on resources in the schema. You might also find these links interesting: diff --git a/docs/api-reference.md b/docs/api-reference.md index 96931a186..45fa7b258 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -16,12 +16,7 @@ - [Node](#banyandb-database-v1-Node) - [Shard](#banyandb-database-v1-Shard) -- [banyandb/database/v1/event.proto](#banyandb_database_v1_event-proto) - - [EntityEvent](#banyandb-database-v1-EntityEvent) - - [EntityEvent.TagLocator](#banyandb-database-v1-EntityEvent-TagLocator) - - [ShardEvent](#banyandb-database-v1-ShardEvent) - - - [Action](#banyandb-database-v1-Action) + - [Role](#banyandb-database-v1-Role) - [banyandb/model/v1/common.proto](#banyandb_model_v1_common-proto) - [FieldValue](#banyandb-model-v1-FieldValue) @@ -340,10 +335,10 @@ Metadata is for multi-tenant, multi-model use | Field | Type | Label | Description | | ----- | ---- | ----- | ----------- | -| id | [string](#string) | | | -| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | | -| addr | [string](#string) | | | -| updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | +| name | [string](#string) | | | +| roles | [Role](#banyandb-database-v1-Role) | repeated | | +| grpc_address | [string](#string) | | | +| http_address | [string](#string) | | | | created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | @@ -362,7 +357,7 @@ Metadata is for multi-tenant, multi-model use | id | [uint64](#uint64) | | | | metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | | | catalog | [banyandb.common.v1.Catalog](#banyandb-common-v1-Catalog) | | | -| node | [Node](#banyandb-database-v1-Node) | | | +| node | [string](#string) | | | | total | [uint32](#uint32) | | | | updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | | created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | @@ -373,84 +368,19 @@ Metadata is for multi-tenant, multi-model use - - - - - - - - - -

Top

- -## banyandb/database/v1/event.proto - - - - - -### EntityEvent - - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| subject | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) | | | -| entity_locator | [EntityEvent.TagLocator](#banyandb-database-v1-EntityEvent-TagLocator) | repeated | | -| action | [Action](#banyandb-database-v1-Action) | | | -| time | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | - - - - - - - - -### EntityEvent.TagLocator - - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| family_offset | [uint32](#uint32) | | | -| tag_offset | [uint32](#uint32) | | | - - - - - - - - -### ShardEvent - - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| shard | [Shard](#banyandb-database-v1-Shard) | | | -| action | [Action](#banyandb-database-v1-Action) | | | -| time | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | - - - - - - - - + -### Action +### Role | Name | Number | Description | | ---- | ------ | ----------- | -| ACTION_UNSPECIFIED | 0 | | -| ACTION_PUT | 1 | | -| ACTION_DELETE | 2 | | +| ROLE_UNSPECIFIED | 0 | | +| ROLE_META | 1 | | +| ROLE_DATA | 2 | | +| ROLE_QUERY | 3 | | +| ROLE_LIAISON | 4 | | diff --git a/docs/concept/clustering.md b/docs/concept/clustering.md index 733f14e8d..0f8b72464 100644 --- a/docs/concept/clustering.md +++ b/docs/concept/clustering.md @@ -16,7 +16,7 @@ In addition to persistent raw data, Data Nodes also handle TopN aggregation calc ### 1.2 Meta Nodes -Meta Nodes are responsible for maintaining high-level metadata of the cluster, which includes: +Meta Nodes is implemented by etcd. They are responsible for maintaining high-level metadata of the cluster, which includes: - All nodes in the cluster - All database schemas @@ -52,6 +52,10 @@ All nodes within a BanyanDB cluster communicate with other nodes according to th - Query Nodes interact with Data Nodes to execute queries and return results to the Liaison Nodes. - Liaison Nodes distribute incoming requests to the appropriate Query Nodes or Data Nodes. +### Nodes Discovery + +All nodes in the cluster are discovered by the Meta Nodes. When a node starts up, it registers itself with the Meta Nodes. The Meta Nodes then share this information with the Liaison Nodes and Query Nodes, which use it to route requests to the appropriate nodes. + ## 3. **Data Organization** Different nodes in BanyanDB are responsible for different parts of the database, while Query and Liaison Nodes manage the routing and processing of queries. diff --git a/docs/installation.md b/docs/installation.md index d29ab2132..9e85cd650 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -129,11 +129,13 @@ The banyand-server would be listening on the `0.0.0.0:17912` if no errors occurr The standalone node is running as a standalone process by ```shell -$ ./banyand-server standalone -id n1 -$ ./banyand-server standalone -id n2 -$ ./banyand-server standalone -id n3 +$ ./banyand-server standalone --data-node-id n1 +$ ./banyand-server standalone --data-node-id n2 +$ ./banyand-server standalone --data-node-id n3 ``` +`data-node-id` is the unique identifier of the standalone node. + The standalone node would be listening on the `` if no errors occurred. ### Setup Role-Based Nodes @@ -150,16 +152,23 @@ The meta node would be listening on the `` if no errors occurred. Data nodes, query nodes and liaison nodes are running as independent processes by ```shell -$ ./banyand-server storage --mode data +$ ./banyand-server storage --mode data --data-node-id n1 +$ ./banyand-server storage --mode data --data-node-id n2 +$ ./banyand-server storage --mode data --data-node-id n3 +$ ./banyand-server storage --mode query $ ./banyand-server storage --mode query $ ./banyand-server liaison ``` +`data-node-id` is the unique identifier of the data node. + The data node, query node and liaison node would be listening on the `` if no errors occurred. If you want to use a `mix` mode instead of separate query and data nodes, you can run the banyand-server as processes by ```shell -$ ./banyand-server storage +$ ./banyand-server storage --data-node-id n1 +$ ./banyand-server storage --data-node-id n2 +$ ./banyand-server storage --data-node-id n3 $ ./banyand-server liaison ``` diff --git a/docs/installation/binaries.md b/docs/installation/binaries.md new file mode 100644 index 000000000..697eecc93 --- /dev/null +++ b/docs/installation/binaries.md @@ -0,0 +1,68 @@ +# Get Binaries + +This page shows how to get binaries of Banyand. + +## Prebuilt Released binaries + +Get binaries from the [download](https://skywalking.apache.org/downloads/). + +## Build From Source + +### Requirements + +Users who want to build a binary from sources have to set up: + +* Go 1.20 +* Node 18.16 +* Git >= 2.30 +* Linux, macOS or Windows+WSL2 +* GNU make + +### Windows + +BanyanDB is built on Linux and macOS that introduced several platform-specific characters to the building system. Therefore, we highly recommend you use [WSL2+Ubuntu](https://ubuntu.com/wsl) to execute tasks of the Makefile. + +### Build Binaries + +To issue the below command to get basic binaries of banyand and bydbctl. + +```shell +$ make generate +... +$ make build +... +--- banyand: all --- +make[1]: Entering directory '/banyand' +... +chmod +x build/bin/banyand-server +Done building banyand server +make[1]: Leaving directory '/banyand' +... +--- bydbctl: all --- +make[1]: Entering directory '/bydbctl' +... +chmod +x build/bin/bydbctl +Done building bydbctl +make[1]: Leaving directory '/bydbctl' +``` + +The build system provides a series of binary options as well. + +* `make -C banyand banyand-server` generates a basic `banyand-server`. +* `make -C banyand release` builds out a static binary for releasing. +* `make -C banyand debug` gives a binary for debugging without the complier's optimizations. +* `make -C banyand debug-static` is a static binary for debugging. +* `make -C bydbctl release` cross-builds several binaries for multi-platforms. + +Then users get binaries as below + +``` shell +$ ls banyand/build/bin +banyand-server +banyand-server-debug +banyand-server-debug-static +banyand-server-static + +$ ls banyand/build/bin +bydbctl +``` diff --git a/docs/installation/cluster.md b/docs/installation/cluster.md new file mode 100644 index 000000000..d419c06e6 --- /dev/null +++ b/docs/installation/cluster.md @@ -0,0 +1,45 @@ +# Cluster Installation + +## Setup Meta Nodes + +Meta nodes are a etcd cluster which is required for the metadata module to provide the metadata service and nodes discovery service for the whole cluster. + +The etcd cluster can be setup by the [etcd installation guide](https://etcd.io/docs/v3.5/install/) + +## Role-base Banyand Cluster + +There is an example: The etcd cluster is spread across three nodes with the addresses `10.0.0.1:2379`, `10.0.0.2:2379`, and `10.0.0.3:2379`. + +Data nodes, query nodes and liaison nodes are running as independent processes by + +```shell +$ ./banyand-server storage --mode data --etcd-endpoints=http://10.0.0.1:2379,http://10.0.0.2:2379,http://10.0.0.3:2379 +$ ./banyand-server storage --mode data --etcd-endpoints=http://10.0.0.1:2379,http://10.0.0.2:2379,http://10.0.0.3:2379 +$ ./banyand-server storage --mode data --etcd-endpoints=http://10.0.0.1:2379,http://10.0.0.2:2379,http://10.0.0.3:2379 +$ ./banyand-server storage --mode query --etcd-endpoints=http://10.0.0.1:2379,http://10.0.0.2:2379,http://10.0.0.3:2379 +$ ./banyand-server storage --mode query --etcd-endpoints=http://10.0.0.1:2379,http://10.0.0.2:2379,http://10.0.0.3:2379 +$ ./banyand-server liaison --etcd-endpoints=http://10.0.0.1:2379,http://10.0.0.2:2379,http://10.0.0.3:2379 +``` + +The data node, query node and liaison node would be listening on the `` if no errors occurred. + +## Mix-mode Data Nodes + +If you want to use a `mix` mode instead of separate query and data nodes, you can run the banyand-server as processes by + +```shell +$ ./banyand-server storage --etcd-endpoints=http://10.0.0.1:2379,http://10.0.0.2:2379,http://10.0.0.3:2379 +$ ./banyand-server storage --etcd-endpoints=http://10.0.0.1:2379,http://10.0.0.2:2379,http://10.0.0.3:2379 +$ ./banyand-server storage --etcd-endpoints=http://10.0.0.1:2379,http://10.0.0.2:2379,http://10.0.0.3:2379 +$ ./banyand-server liaison --etcd-endpoints=http://10.0.0.1:2379,http://10.0.0.2:2379,http://10.0.0.3:2379 +``` + +## Node Discovery + +The node discovery is based on the etcd cluster. The etcd cluster is required for the metadata module to provide the metadata service and nodes discovery service for the whole cluster. + +The host is registered to the etcd cluster by the `banyand-server` automatically based on `node-host-provider` : + +- `node-host-provider=hostname` : Default. The OS's hostname is registered as the host part in the address. +- `node-host-provider=ip` : The OS's the first non-loopback active IP address(IPv4) is registered as the host part in the address. +- `node-host-provider=flag` : `node-host` is registered as the host part in the address. diff --git a/docs/installation/standalone.md b/docs/installation/standalone.md new file mode 100644 index 000000000..d42a716e3 --- /dev/null +++ b/docs/installation/standalone.md @@ -0,0 +1,21 @@ +# Standalone Mode + +The standalone mode is the simplest way to run Banyand. It is suitable for the development and testing environment. The standalone mode is running as a standalone process by + +```shell +$ ./banyand-server standalone +██████╗ █████╗ ███╗ ██╗██╗ ██╗ █████╗ ███╗ ██╗██████╗ ██████╗ +██╔══██╗██╔══██╗████╗ ██║╚██╗ ██╔╝██╔══██╗████╗ ██║██╔══██╗██╔══██╗ +██████╔╝███████║██╔██╗ ██║ ╚████╔╝ ███████║██╔██╗ ██║██║ ██║██████╔╝ +██╔══██╗██╔══██║██║╚██╗██║ ╚██╔╝ ██╔══██║██║╚██╗██║██║ ██║██╔══██╗ +██████╔╝██║ ██║██║ ╚████║ ██║ ██║ ██║██║ ╚████║██████╔╝██████╔╝ +╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝ ╚═╝ ╚═╝ ╚═╝╚═╝ ╚═══╝╚═════╝ ╚═════╝ +***starting as a standalone server**** +... +... +***Listening to**** addr::17912 module:LIAISON-GRPC +``` + +The banyand-server would be listening on the `0.0.0.0:17912` to access gRPC requests. if no errors occurred. + +At the same time, the banyand-server would be listening on the `0.0.0.0:17913` to access HTTP requests. if no errors occurred. The HTTP server is used for CLI and Web UI. diff --git a/docs/menu.yml b/docs/menu.yml index fc1993575..61aaef704 100644 --- a/docs/menu.yml +++ b/docs/menu.yml @@ -19,7 +19,13 @@ catalog: - name: "Welcome" path: "/readme" - name: "Installation" - path: "/installation" + catalog: + - name: "Get Binaries" + path: "/installation/binaries" + - name: "Standalone Mode" + path: "/installation/standalone" + - name: "Cluster Mode" + path: "/installation/cluster" - name: "Clients" path: "/clients" - name: "Observability" diff --git a/pkg/host/host.go b/pkg/host/host.go new file mode 100644 index 000000000..126f47685 --- /dev/null +++ b/pkg/host/host.go @@ -0,0 +1,66 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package host provides information about the host. +package host + +import ( + "errors" + "net" + "os" +) + +// Name returns the host name reported by the kernel. +func Name() (name string, err error) { + return os.Hostname() +} + +// IP returns the first non-loopback IP address of the host. +func IP() (ip string, err error) { + ifaces, err := net.Interfaces() + if err != nil { + return "", err + } + for _, iface := range ifaces { + if iface.Flags&net.FlagUp == 0 || iface.Flags&net.FlagLoopback != 0 { + continue + } + addrs, err := iface.Addrs() + if err != nil { + return "", err + } + for _, addr := range addrs { + var ip net.IP + switch v := addr.(type) { + case *net.IPNet: + ip = v.IP + case *net.IPAddr: + ip = v.IP + } + if ip == nil || ip.IsLoopback() { + continue + } + ip = ip.To4() + if ip == nil { + // not an ipv4 address + continue + } + return ip.String(), nil + } + } + return "", errors.New("are you connected to the network?") +} diff --git a/pkg/run/run.go b/pkg/run/run.go index 4630ff07e..8e9a14c28 100644 --- a/pkg/run/run.go +++ b/pkg/run/run.go @@ -19,9 +19,11 @@ package run import ( + "context" "fmt" "os" "path" + "sort" "sync" "github.com/oklog/run" @@ -29,6 +31,8 @@ import ( "github.com/spf13/pflag" "go.uber.org/multierr" + "github.com/apache/skywalking-banyandb/api/common" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/pkg/config" "github.com/apache/skywalking-banyandb/pkg/logger" ) @@ -55,6 +59,12 @@ type Unit interface { Name() string } +// Role is an interface that should be implemented by Group Unit objects that +// need to be able to return their Role. +type Role interface { + Role() databasev1.Role +} + // Config interface should be implemented by Group Unit objects that manage // their own configuration through the use of flags. // If a Unit's Validate returns an error it will stop the Group immediately. @@ -73,26 +83,7 @@ type Config interface { type PreRunner interface { // Unit for Group registration and identification Unit - PreRun() error -} - -// NewPreRunner takes a name and a standalone pre runner compatible function -// and turns them into a Group compatible PreRunner, ready for registration. -func NewPreRunner(name string, fn func() error) PreRunner { - return preRunner{name: name, fn: fn} -} - -type preRunner struct { - fn func() error - name string -} - -func (p preRunner) Name() string { - return p.name -} - -func (p preRunner) PreRun() error { - return p.fn() + PreRun(context.Context) error } // StopNotify sends the stopped event to the running system. @@ -128,6 +119,7 @@ type Group struct { c []Config p []PreRunner s []Service + rr []Role showRunGroup bool configured bool } @@ -172,6 +164,10 @@ func (g *Group) Register(units ...Unit) []bool { g.s = append(g.s, s) hasRegistered[idx] = true } + if r, ok := units[idx].(Role); ok { + g.rr = append(g.rr, r) + hasRegistered[idx] = true + } } return hasRegistered } @@ -208,6 +204,14 @@ func (g *Group) Deregister(units ...Unit) []bool { } } } + if r, ok := units[idx].(Role); ok { + for i := range g.rr { + if g.rr[i] == r { + g.rr[i] = nil + hasDeregistered[idx] = true + } + } + } } return hasDeregistered } @@ -325,7 +329,7 @@ func (g *Group) RunConfig() (interrupted bool, err error) { // - Validate() Validate Config Units. Exit on first error. // // PreRunner phase (serially, in order of Unit registration) -// - PreRun() Execute PreRunner Units. Exit on first error. +// - PreRun(ctx context.Context) Execute PreRunner Units. Exit on first error. // // Service phase (concurrently) // - Serve() Execute all Service Units in separate Go routines. @@ -334,9 +338,11 @@ func (g *Group) RunConfig() (interrupted bool, err error) { // // Run will return with the originating error on: // - first Config.Validate() returning an error -// - first PreRunner.PreRun() returning an error +// - first PreRunner.PreRun(ctx context.Context) returning an error // - first Service.Serve() returning (error or nil) -func (g *Group) Run() (err error) { +// +//nolint:contextcheck +func (g *Group) Run(ctx context.Context) (err error) { // run config registration and flag parsing stages if interrupted, errRun := g.RunConfig(); interrupted || errRun != nil { return errRun @@ -346,7 +352,38 @@ func (g *Group) Run() (err error) { g.log.Fatal().Err(err).Stack().Msg("unexpected exit") } }() + rr := make([]databasev1.Role, 0, len(g.rr)) + for idx := range g.rr { + if g.rr[idx] == nil || g.rr[idx].Role() == databasev1.Role_ROLE_UNSPECIFIED { + continue + } + rr = append(rr, g.rr[idx].Role()) + } + // Sort and deduplicate roles + sort.Slice(rr, func(i, j int) bool { + return rr[i] < rr[j] + }) + deduplicateRoles := func(rr []databasev1.Role) []databasev1.Role { + if len(rr) < 2 { + return rr + } + // deduplicate roles + rrDedup := make([]databasev1.Role, 0, len(rr)) + for i := 0; i < len(rr)-1; i++ { + if rr[i] != rr[i+1] { + rrDedup = append(rrDedup, rr[i]) + } + } + rrDedup = append(rrDedup, rr[len(rr)-1]) + return rrDedup + } + rr = deduplicateRoles(rr) + + g.log.Info().Interface("roles", rr).Msg("the node will run as") + if ctx == nil { + ctx = context.Background() + } // execute pre run stage and exit on error for idx := range g.p { // a PreRunner might have been deregistered during Run @@ -354,7 +391,7 @@ func (g *Group) Run() (err error) { continue } g.log.Debug().Uint32("ran", uint32(idx+1)).Uint32("total", uint32(len(g.p))).Str("name", g.p[idx].Name()).Msg("pre-run") - if err := g.p[idx].PreRun(); err != nil { + if err := g.p[idx].PreRun(context.WithValue(ctx, common.ContextNodeRolesKey, rr)); err != nil { return err } } diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go index 6d20cd308..0b4cc94ad 100644 --- a/pkg/schema/metadata.go +++ b/pkg/schema/metadata.go @@ -60,7 +60,7 @@ const ( // Group is the root node, allowing get resources from its sub nodes. type Group interface { GetSchema() *commonv1.Group - StoreResource(resourceSchema ResourceSchema) (Resource, error) + StoreResource(ctx context.Context, resourceSchema ResourceSchema) (Resource, error) LoadResource(name string) (Resource, bool) } @@ -296,7 +296,7 @@ func (sr *schemaRepo) storeResource(metadata *commonv1.Metadata) (Resource, erro if err != nil { return nil, errors.WithMessage(err, "fails to get the resource") } - return group.StoreResource(stm) + return group.StoreResource(context.Background(), stm) } func (sr *schemaRepo) deleteResource(metadata *commonv1.Metadata) error { @@ -370,22 +370,24 @@ func (g *group) setDB(db tsdb.Database) { g.db.Store(db) } -func (g *group) StoreResource(resourceSchema ResourceSchema) (Resource, error) { +func (g *group) StoreResource(ctx context.Context, resourceSchema ResourceSchema) (Resource, error) { g.mapMutex.Lock() defer g.mapMutex.Unlock() key := resourceSchema.GetMetadata().GetName() preResource := g.schemaMap[key] + var localCtx context.Context + var cancel context.CancelFunc if preResource != nil && resourceSchema.GetMetadata().GetModRevision() <= preResource.GetMetadata().GetModRevision() { // we only need to check the max modifications revision observed for index rules - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - idxRules, errIndexRules := g.metadata.IndexRules(ctx, resourceSchema.GetMetadata()) + localCtx, cancel = context.WithTimeout(ctx, 5*time.Second) + idxRules, errIndexRules := g.metadata.IndexRules(localCtx, resourceSchema.GetMetadata()) cancel() if errIndexRules != nil { return nil, errIndexRules } - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) - topNAggrs, errTopN := g.metadata.MeasureRegistry().TopNAggregations(ctx, resourceSchema.GetMetadata()) + localCtx, cancel = context.WithTimeout(ctx, 5*time.Second) + topNAggrs, errTopN := g.metadata.MeasureRegistry().TopNAggregations(localCtx, resourceSchema.GetMetadata()) cancel() if errTopN != nil { return nil, errTopN @@ -397,18 +399,18 @@ func (g *group) StoreResource(resourceSchema ResourceSchema) (Resource, error) { } } } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + localCtx, cancel = context.WithTimeout(ctx, 5*time.Second) defer cancel() - idxRules, err := g.metadata.IndexRules(ctx, resourceSchema.GetMetadata()) + idxRules, err := g.metadata.IndexRules(localCtx, resourceSchema.GetMetadata()) if err != nil { return nil, err } var topNAggrs []*databasev1.TopNAggregation if _, ok := resourceSchema.(*databasev1.Measure); ok { - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + localCtx, cancel = context.WithTimeout(ctx, 5*time.Second) var innerErr error - topNAggrs, innerErr = g.metadata.MeasureRegistry().TopNAggregations(ctx, resourceSchema.GetMetadata()) + topNAggrs, innerErr = g.metadata.MeasureRegistry().TopNAggregations(localCtx, resourceSchema.GetMetadata()) cancel() if innerErr != nil { return nil, innerErr diff --git a/pkg/signal/handler.go b/pkg/signal/handler.go index 2fe89a22d..1470485e2 100644 --- a/pkg/signal/handler.go +++ b/pkg/signal/handler.go @@ -19,6 +19,7 @@ package signal import ( + "context" "errors" "fmt" "os" @@ -46,7 +47,7 @@ func (h *Handler) Name() string { } // PreRun implements run.PreRunner to initialize the handler. -func (h *Handler) PreRun() error { +func (h *Handler) PreRun(_ context.Context) error { h.cancel = make(chan struct{}) h.signal = make(chan os.Signal, 1) signal.Notify(h.signal, diff --git a/pkg/test/matcher.go b/pkg/test/matcher.go deleted file mode 100644 index b94fbb102..000000000 --- a/pkg/test/matcher.go +++ /dev/null @@ -1,82 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package test - -import ( - "fmt" - - "go.uber.org/mock/gomock" - - databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" - "github.com/apache/skywalking-banyandb/pkg/bus" -) - -var ( - _ gomock.Matcher = (*shardEventMatcher)(nil) - _ gomock.Matcher = (*entityEventMatcher)(nil) -) - -type shardEventMatcher struct { - action databasev1.Action -} - -// NewShardEventMatcher return a Matcher to comparing shard event's action. -func NewShardEventMatcher(action databasev1.Action) gomock.Matcher { - return &shardEventMatcher{ - action: action, - } -} - -func (s *shardEventMatcher) Matches(x interface{}) bool { - if m, messageOk := x.(bus.Message); messageOk { - if evt, dataOk := m.Data().(*databasev1.ShardEvent); dataOk { - return evt.Action == s.action - } - } - - return false -} - -func (s *shardEventMatcher) String() string { - return fmt.Sprintf("shard-event-matcher(%s)", databasev1.Action_name[int32(s.action)]) -} - -type entityEventMatcher struct { - action databasev1.Action -} - -// NewEntityEventMatcher return a Matcher for comparing entity event's action. -func NewEntityEventMatcher(action databasev1.Action) gomock.Matcher { - return &entityEventMatcher{ - action: action, - } -} - -func (s *entityEventMatcher) Matches(x interface{}) bool { - if m, messageOk := x.(bus.Message); messageOk { - if evt, dataOk := m.Data().(*databasev1.EntityEvent); dataOk { - return evt.Action == s.action - } - } - - return false -} - -func (s *entityEventMatcher) String() string { - return fmt.Sprintf("entity-event-matcher(%s)", databasev1.Action_name[int32(s.action)]) -} diff --git a/pkg/test/measure/etcd.go b/pkg/test/measure/etcd.go index 8abe58b5c..18e48846d 100644 --- a/pkg/test/measure/etcd.go +++ b/pkg/test/measure/etcd.go @@ -44,29 +44,29 @@ const ( var store embed.FS // PreloadSchema loads schemas from files in the booting process. -func PreloadSchema(e schema.Registry) error { +func PreloadSchema(ctx context.Context, e schema.Registry) error { if err := loadSchema(groupDir, &commonv1.Group{}, func(group *commonv1.Group) error { - return e.CreateGroup(context.TODO(), group) + return e.CreateGroup(ctx, group) }); err != nil { return errors.WithStack(err) } if err := loadSchema(measureDir, &databasev1.Measure{}, func(measure *databasev1.Measure) error { - return e.CreateMeasure(context.TODO(), measure) + return e.CreateMeasure(ctx, measure) }); err != nil { return errors.WithStack(err) } if err := loadSchema(indexRuleDir, &databasev1.IndexRule{}, func(indexRule *databasev1.IndexRule) error { - return e.CreateIndexRule(context.TODO(), indexRule) + return e.CreateIndexRule(ctx, indexRule) }); err != nil { return errors.WithStack(err) } if err := loadSchema(indexRuleBindingDir, &databasev1.IndexRuleBinding{}, func(indexRuleBinding *databasev1.IndexRuleBinding) error { - return e.CreateIndexRuleBinding(context.TODO(), indexRuleBinding) + return e.CreateIndexRuleBinding(ctx, indexRuleBinding) }); err != nil { return errors.WithStack(err) } if err := loadSchema(topNAggregationDir, &databasev1.TopNAggregation{}, func(topN *databasev1.TopNAggregation) error { - return e.CreateTopNAggregation(context.TODO(), topN) + return e.CreateTopNAggregation(ctx, topN) }); err != nil { return errors.WithStack(err) } diff --git a/pkg/test/setup.go b/pkg/test/setup.go index a684b6467..d270220c6 100644 --- a/pkg/test/setup.go +++ b/pkg/test/setup.go @@ -18,10 +18,12 @@ package test import ( + "context" "sync" "github.com/onsi/gomega" + "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/run" ) @@ -40,7 +42,7 @@ func SetupModules(flags []string, units ...run.Unit) func() { defer func() { wg.Done() }() - errRun := g.Run() + errRun := g.Run(context.WithValue(context.Background(), common.ContextNodeKey, common.Node{NodeID: "test"})) gomega.Expect(errRun).ShouldNot(gomega.HaveOccurred()) }() g.WaitTillReady() diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go index 180709733..3c551382c 100644 --- a/pkg/test/setup/setup.go +++ b/pkg/test/setup/setup.go @@ -57,8 +57,10 @@ func CommonWithSchemaLoaders(schemaLoaders []SchemaLoader, flags ...string) (str addr := fmt.Sprintf("%s:%d", host, ports[0]) httpAddr := fmt.Sprintf("%s:%d", host, ports[1]) ff := []string{ - "--addr=" + addr, - "--http-addr=" + httpAddr, + "--grpc-host=" + host, + fmt.Sprintf("--grpc-port=%d", ports[0]), + "--http-host=" + host, + fmt.Sprintf("--http-port=%d", ports[1]), "--http-grpc-addr=" + addr, "--stream-root-path=" + path, "--measure-root-path=" + path, @@ -92,7 +94,7 @@ func modules(schemaLoaders []SchemaLoader, flags []string) func() { q, err := query.NewService(context.TODO(), streamSvc, measureSvc, metaSvc, pipeline) gomega.Expect(err).NotTo(gomega.HaveOccurred()) tcp := grpc.NewServer(context.TODO(), pipeline, metaSvc) - httpServer := http.NewService() + httpServer := http.NewServer() units := []run.Unit{ pipeline, @@ -129,11 +131,11 @@ func (p *preloadService) Name() string { return "preload-" + p.name } -func (p *preloadService) PreRun() error { +func (p *preloadService) PreRun(ctx context.Context) error { if p.name == "stream" { - return test_stream.PreloadSchema(p.metaSvc.SchemaRegistry()) + return test_stream.PreloadSchema(ctx, p.metaSvc.SchemaRegistry()) } - return test_measure.PreloadSchema(p.metaSvc.SchemaRegistry()) + return test_measure.PreloadSchema(ctx, p.metaSvc.SchemaRegistry()) } func (p *preloadService) SetMeta(meta metadata.Service) { diff --git a/pkg/test/stream/etcd.go b/pkg/test/stream/etcd.go index 36ad9482f..85c25d6fe 100644 --- a/pkg/test/stream/etcd.go +++ b/pkg/test/stream/etcd.go @@ -43,12 +43,12 @@ var ( ) // PreloadSchema loads schemas from files in the booting process. -func PreloadSchema(e schema.Registry) error { +func PreloadSchema(ctx context.Context, e schema.Registry) error { g := &commonv1.Group{} if err := protojson.Unmarshal([]byte(groupJSON), g); err != nil { return err } - if innerErr := e.CreateGroup(context.TODO(), g); innerErr != nil { + if innerErr := e.CreateGroup(ctx, g); innerErr != nil { return innerErr } @@ -56,7 +56,7 @@ func PreloadSchema(e schema.Registry) error { if unmarshalErr := protojson.Unmarshal([]byte(streamJSON), s); unmarshalErr != nil { return unmarshalErr } - if innerErr := e.CreateStream(context.TODO(), s); innerErr != nil { + if innerErr := e.CreateStream(ctx, s); innerErr != nil { return innerErr } @@ -64,7 +64,7 @@ func PreloadSchema(e schema.Registry) error { if unmarshalErr := protojson.Unmarshal([]byte(indexRuleBindingJSON), indexRuleBinding); unmarshalErr != nil { return unmarshalErr } - if innerErr := e.CreateIndexRuleBinding(context.TODO(), indexRuleBinding); innerErr != nil { + if innerErr := e.CreateIndexRuleBinding(ctx, indexRuleBinding); innerErr != nil { return innerErr } @@ -82,7 +82,7 @@ func PreloadSchema(e schema.Registry) error { if err != nil { return err } - if innerErr := e.CreateIndexRule(context.TODO(), &idxRule); innerErr != nil { + if innerErr := e.CreateIndexRule(ctx, &idxRule); innerErr != nil { return innerErr } } diff --git a/test/stress/cases/istio/repo.go b/test/stress/cases/istio/repo.go index 1b8006be5..6f5fe011b 100644 --- a/test/stress/cases/istio/repo.go +++ b/test/stress/cases/istio/repo.go @@ -139,7 +139,7 @@ func (p *preloadService) Name() string { return "preload-" + p.name } -func (p *preloadService) PreRun() error { +func (p *preloadService) PreRun(_ context.Context) error { e := p.metaSvc.SchemaRegistry() if err := loadSchema(groupDir, &commonv1.Group{}, func(group *commonv1.Group) error { return e.CreateGroup(context.TODO(), group)