diff --git a/api/common/id.go b/api/common/id.go index c09c58d19..074060cdf 100644 --- a/api/common/id.go +++ b/api/common/id.go @@ -19,11 +19,13 @@ package common import ( "context" - "crypto/rand" - "encoding/base64" "fmt" + "net" + "strconv" + "strings" "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/host" ) type ( @@ -113,42 +115,92 @@ func (e Error) Msg() string { return e.msg } -// NodeID identities a node in a cluster. -type NodeID string - -// GenerateNodeID generates a node id. -func GenerateNodeID(prefix string, statefulID string) (NodeID, error) { - // If statefulID is empty, return prefix + random suffix. - if statefulID == "" { - suffix, err := generateRandomString(8) - if err != nil { - return NodeID(""), err - } - return NodeID(fmt.Sprintf("%s-%s", prefix, suffix)), nil - } - return NodeID(fmt.Sprintf("%s-%s", prefix, statefulID)), nil +// Node contains the node id and address. +type Node struct { + NodeID string + GrpcAddress string + HTTPAddress string } -func generateRandomString(length int) (string, error) { - randomBytes := make([]byte, length) - _, err := rand.Read(randomBytes) - if err != nil { - return "", err - } +var ( + // FlagNodeHost is the node id from flag. + FlagNodeHost string + // FlagNodeHostProvider is the node id provider from flag. + FlagNodeHostProvider NodeHostProvider +) - // Encode random bytes to base64 URL encoding - randomString := base64.RawURLEncoding.EncodeToString(randomBytes) +// NodeHostProvider is the provider of node id. +type NodeHostProvider int - // Trim any padding characters '=' from the end of the string - randomString = randomString[:length] +// NodeIDProvider constants. +const ( + NodeHostProviderHostname NodeHostProvider = iota + NodeHostProviderIP + NodeHostProviderFlag +) + +// String returns the string representation of NodeIDProvider. +func (n *NodeHostProvider) String() string { + return [...]string{"Hostname", "IP", "Flag"}[*n] +} - return randomString, nil +// ParseNodeHostProvider parses the string to NodeIDProvider. +func ParseNodeHostProvider(s string) (NodeHostProvider, error) { + switch strings.ToLower(s) { + case "hostname": + return NodeHostProviderHostname, nil + case "ip": + return NodeHostProviderIP, nil + case "flag": + return NodeHostProviderFlag, nil + default: + return 0, fmt.Errorf("unknown node id provider %s", s) + } +} + +// GenerateNode generates a node id. +func GenerateNode(grpcPort, httpPort *uint32) (Node, error) { + port := grpcPort + if port == nil { + port = httpPort + } + if port == nil { + return Node{}, fmt.Errorf("no port found") + } + node := Node{} + var nodeHost string + switch FlagNodeHostProvider { + case NodeHostProviderHostname: + h, err := host.Name() + if err != nil { + return Node{}, err + } + nodeHost = h + case NodeHostProviderIP: + ip, err := host.IP() + if err != nil { + return Node{}, err + } + nodeHost = ip + case NodeHostProviderFlag: + nodeHost = FlagNodeHost + default: + return Node{}, fmt.Errorf("unknown node id provider %d", FlagNodeHostProvider) + } + node.NodeID = net.JoinHostPort(nodeHost, strconv.FormatUint(uint64(*port), 10)) + if grpcPort != nil { + node.GrpcAddress = net.JoinHostPort(nodeHost, strconv.FormatUint(uint64(*grpcPort), 10)) + } + if httpPort != nil { + node.HTTPAddress = net.JoinHostPort(nodeHost, strconv.FormatUint(uint64(*httpPort), 10)) + } + return node, nil } -// ContextNodeIDKey is a context key to store the node id. -var ContextNodeIDKey = contextNodeIDKey{} +// ContextNodeKey is a context key to store the node id. +var ContextNodeKey = contextNodeKey{} -type contextNodeIDKey struct{} +type contextNodeKey struct{} // ContextNodeRolesKey is a context key to store the node roles. var ContextNodeRolesKey = contextNodeRolesKey{} diff --git a/api/proto/banyandb/database/v1/database.proto b/api/proto/banyandb/database/v1/database.proto index 2e0bb6d91..0e758d829 100644 --- a/api/proto/banyandb/database/v1/database.proto +++ b/api/proto/banyandb/database/v1/database.proto @@ -36,22 +36,9 @@ enum Role { message Node { string name = 1; repeated Role roles = 2; - google.protobuf.Timestamp created_at = 3; -} - -message Endpoint { - string name = 1; - string node = 2; - repeated string addresses = 3; - uint32 port = 4; - Protocol protocol = 5; - google.protobuf.Timestamp created_at = 6; -} - -enum Protocol { - PROTOCOL_UNSPECIFIED = 0; - PROTOCOL_HTTP = 1; - PROTOCOL_GRPC = 2; + string grpc_address = 3; + string http_address = 4; + google.protobuf.Timestamp created_at = 5; } message Shard { diff --git a/banyand/internal/cmd/liaison.go b/banyand/internal/cmd/liaison.go index 10ba5c8e0..d8847c7ca 100644 --- a/banyand/internal/cmd/liaison.go +++ b/banyand/internal/cmd/liaison.go @@ -19,18 +19,16 @@ package cmd import ( "context" - "fmt" "os" "github.com/spf13/cobra" "github.com/apache/skywalking-banyandb/api/common" - "github.com/apache/skywalking-banyandb/banyand/liaison" + "github.com/apache/skywalking-banyandb/banyand/liaison/grpc" "github.com/apache/skywalking-banyandb/banyand/liaison/http" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/queue" - "github.com/apache/skywalking-banyandb/pkg/config" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/signal" @@ -51,18 +49,15 @@ func newLiaisonCmd() *cobra.Command { if err != nil { l.Fatal().Err(err).Msg("failed to initiate metadata service") } - tcp, err := liaison.NewEndpoint(ctx, pipeline, metaSvc) - if err != nil { - l.Fatal().Err(err).Msg("failed to initiate Endpoint transport layer") - } + grpcServer := grpc.NewServer(ctx, pipeline, metaSvc) profSvc := observability.NewProfService() metricSvc := observability.NewMetricService() - httpServer := http.NewService() + httpServer := http.NewServer() units := []run.Unit{ new(signal.Handler), pipeline, - tcp, + grpcServer, httpServer, profSvc, } @@ -71,37 +66,24 @@ func newLiaisonCmd() *cobra.Command { } // Meta the run Group units. liaisonGroup.Register(units...) - logging := logger.Logging{} liaisonCmd := &cobra.Command{ Use: "liaison", Version: version.Build(), Short: "Run as the liaison server", - PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) { - if err = config.Load("logging", cmd.Flags()); err != nil { - return err - } - return logger.Init(logging) - }, RunE: func(cmd *cobra.Command, args []string) (err error) { - nodeID, err := common.GenerateNodeID("liaison", "") + node, err := common.GenerateNode(grpcServer.GetPort(), httpServer.GetPort()) if err != nil { return err } - fmt.Print(logo) logger.GetLogger().Info().Msg("starting as a liaison server") // Spawn our go routines and wait for shutdown. - if err := liaisonGroup.Run(context.WithValue(context.Background(), common.ContextNodeIDKey, nodeID)); err != nil { + if err := liaisonGroup.Run(context.WithValue(context.Background(), common.ContextNodeKey, node)); err != nil { logger.GetLogger().Error().Err(err).Stack().Str("name", liaisonGroup.Name()).Msg("Exit") os.Exit(-1) } return nil }, } - - liaisonCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the logging") - liaisonCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") - liaisonCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") - liaisonCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") liaisonCmd.Flags().AddFlagSet(liaisonGroup.RegisterFlags().FlagSet) return liaisonCmd } diff --git a/banyand/internal/cmd/meta.go b/banyand/internal/cmd/meta.go deleted file mode 100644 index d8a263e66..000000000 --- a/banyand/internal/cmd/meta.go +++ /dev/null @@ -1,97 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package cmd - -import ( - "context" - "fmt" - "os" - - "github.com/spf13/cobra" - - "github.com/apache/skywalking-banyandb/api/common" - "github.com/apache/skywalking-banyandb/banyand/metadata" - "github.com/apache/skywalking-banyandb/banyand/observability" - "github.com/apache/skywalking-banyandb/pkg/config" - "github.com/apache/skywalking-banyandb/pkg/logger" - "github.com/apache/skywalking-banyandb/pkg/run" - "github.com/apache/skywalking-banyandb/pkg/signal" - "github.com/apache/skywalking-banyandb/pkg/version" -) - -var metaGroup = run.NewGroup("meta") - -func newMetaCmd() *cobra.Command { - l := logger.GetLogger("bootstrap") - ctx := context.Background() - metaSvc, err := metadata.NewService(ctx) - if err != nil { - l.Fatal().Err(err).Msg("failed to initiate metadata service") - } - profSvc := observability.NewProfService() - metricSvc := observability.NewMetricService() - - units := []run.Unit{ - new(signal.Handler), - metaSvc, - profSvc, - } - if metricSvc != nil { - units = append(units, metricSvc) - } - // Meta the run Group units. - metaGroup.Register(units...) - logging := logger.Logging{} - var flagNodeID string - metaCmd := &cobra.Command{ - Use: "meta", - Version: version.Build(), - Short: "Run as the meta server", - PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) { - if err = config.Load("logging", cmd.Flags()); err != nil { - return err - } - return logger.Init(logging) - }, - RunE: func(cmd *cobra.Command, args []string) (err error) { - if flagNodeID == "" { - return fmt.Errorf("data node id is required") - } - nodeID, err := common.GenerateNodeID("meta", flagNodeID) - if err != nil { - return err - } - fmt.Print(logo) - logger.GetLogger().Info().Msg("starting as a meta server") - // Spawn our go routines and wait for shutdown. - if err := metaGroup.Run(context.WithValue(context.Background(), common.ContextNodeIDKey, nodeID)); err != nil { - logger.GetLogger().Error().Err(err).Stack().Str("name", metaGroup.Name()).Msg("Exit") - os.Exit(-1) - } - return nil - }, - } - - metaCmd.Flags().StringVar(&flagNodeID, "meta-node-id", "", "the meta node id") - metaCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the logging") - metaCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") - metaCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") - metaCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") - metaCmd.Flags().AddFlagSet(metaGroup.RegisterFlags().FlagSet) - return metaCmd -} diff --git a/banyand/internal/cmd/root.go b/banyand/internal/cmd/root.go index da67766da..b1535a2b8 100644 --- a/banyand/internal/cmd/root.go +++ b/banyand/internal/cmd/root.go @@ -19,8 +19,13 @@ package cmd import ( + "fmt" + "github.com/spf13/cobra" + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/config" + "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/version" ) @@ -35,6 +40,7 @@ const logo = ` // NewRoot returns a root command. func NewRoot() *cobra.Command { + logging := logger.Logging{} cmd := &cobra.Command{ DisableAutoGenTag: true, Version: version.Build(), @@ -42,10 +48,44 @@ func NewRoot() *cobra.Command { Long: logo + ` BanyanDB, as an observability database, aims to ingest, analyze and store Metrics, Tracing and Logging data `, + PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) { + fmt.Print(logo) + if err = config.Load("logging", cmd.Flags()); err != nil { + return err + } + return logger.Init(logging) + }, } + cmd.PersistentFlags().Var(&nodeIDProviderValue{&common.FlagNodeHostProvider}, + "node-host-provider", "the node host provider, can be hostname, ip or flag, default is hostname") + cmd.PersistentFlags().StringVar(&common.FlagNodeHost, "node-host", "", "the node host of the server only used when node-host-provider is \"flag\"") + cmd.PersistentFlags().StringVar(&logging.Env, "logging-env", "prod", "the logging") + cmd.PersistentFlags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") + cmd.PersistentFlags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") + cmd.PersistentFlags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") cmd.AddCommand(newStandaloneCmd()) - cmd.AddCommand(newMetaCmd()) cmd.AddCommand(newStorageCmd()) cmd.AddCommand(newLiaisonCmd()) return cmd } + +type nodeIDProviderValue struct { + value *common.NodeHostProvider +} + +func (c *nodeIDProviderValue) Set(s string) error { + v, err := common.ParseNodeHostProvider(s) + if err != nil { + return err + } + *c.value = v + return nil +} + +func (c *nodeIDProviderValue) String() string { + return c.value.String() +} + +func (c *nodeIDProviderValue) Type() string { + return "nodeIDProvider" +} diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go index 77cabce5f..f45a68d3d 100644 --- a/banyand/internal/cmd/standalone.go +++ b/banyand/internal/cmd/standalone.go @@ -19,13 +19,12 @@ package cmd import ( "context" - "fmt" "os" "github.com/spf13/cobra" "github.com/apache/skywalking-banyandb/api/common" - "github.com/apache/skywalking-banyandb/banyand/liaison" + "github.com/apache/skywalking-banyandb/banyand/liaison/grpc" "github.com/apache/skywalking-banyandb/banyand/liaison/http" "github.com/apache/skywalking-banyandb/banyand/measure" "github.com/apache/skywalking-banyandb/banyand/metadata" @@ -33,7 +32,6 @@ import ( "github.com/apache/skywalking-banyandb/banyand/query" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/stream" - "github.com/apache/skywalking-banyandb/pkg/config" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/signal" @@ -65,13 +63,10 @@ func newStandaloneCmd() *cobra.Command { if err != nil { l.Fatal().Err(err).Msg("failed to initiate query processor") } - tcp, err := liaison.NewEndpoint(ctx, pipeline, metaSvc) - if err != nil { - l.Fatal().Err(err).Msg("failed to initiate Endpoint transport layer") - } + grpcServer := grpc.NewServer(ctx, pipeline, metaSvc) profSvc := observability.NewProfService() metricSvc := observability.NewMetricService() - httpServer := http.NewService() + httpServer := http.NewServer() units := []run.Unit{ new(signal.Handler), @@ -80,7 +75,7 @@ func newStandaloneCmd() *cobra.Command { measureSvc, streamSvc, q, - tcp, + grpcServer, httpServer, profSvc, } @@ -89,43 +84,25 @@ func newStandaloneCmd() *cobra.Command { } // Meta the run Group units. standaloneGroup.Register(units...) - logging := logger.Logging{} - var flagNodeID string standaloneCmd := &cobra.Command{ Use: "standalone", Version: version.Build(), Short: "Run as the standalone server", - PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) { - if err = config.Load("logging", cmd.Flags()); err != nil { - return err - } - return logger.Init(logging) - }, RunE: func(cmd *cobra.Command, args []string) (err error) { - if flagNodeID == "" { - return fmt.Errorf("data node id is required") - } - nodeID, err := common.GenerateNodeID("standalone", flagNodeID) + nodeID, err := common.GenerateNode(grpcServer.GetPort(), httpServer.GetPort()) if err != nil { return err } - fmt.Print(logo) logger.GetLogger().Info().Msg("starting as a standalone server") // Spawn our go routines and wait for shutdown. - if err := standaloneGroup.Run(context.WithValue(context.Background(), common.ContextNodeIDKey, nodeID)); err != nil { + if err := standaloneGroup.Run(context.WithValue(context.Background(), common.ContextNodeKey, nodeID)); err != nil { logger.GetLogger().Error().Err(err).Stack().Str("name", standaloneGroup.Name()).Msg("Exit") os.Exit(-1) } return nil }, } - - standaloneCmd.Flags().StringVar(&flagNodeID, "data-node-id", "single-node", "the data node id of the standalone server") - standaloneCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the logging") - standaloneCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") - standaloneCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") - standaloneCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") standaloneCmd.Flags().AddFlagSet(standaloneGroup.RegisterFlags().FlagSet) return standaloneCmd } diff --git a/banyand/internal/cmd/storage.go b/banyand/internal/cmd/storage.go index b3aa601ec..a8f4e159c 100644 --- a/banyand/internal/cmd/storage.go +++ b/banyand/internal/cmd/storage.go @@ -19,7 +19,6 @@ package cmd import ( "context" - "fmt" "os" "github.com/spf13/cobra" @@ -90,7 +89,6 @@ func newStorageCmd() *cobra.Command { // Meta the run Group units. storageGroup.Register(units...) logging := logger.Logging{} - var flagNodeID string storageCmd := &cobra.Command{ Use: "storage", Version: version.Build(), @@ -116,29 +114,19 @@ func newStorageCmd() *cobra.Command { } }, RunE: func(cmd *cobra.Command, args []string) (err error) { - if flagNodeID == "" && flagStorageMode != storageModeQuery { - return fmt.Errorf("data node id is required") - } - nodeID, err := common.GenerateNodeID(flagStorageMode, flagNodeID) + node, err := common.GenerateNode(nil, nil) if err != nil { return err } - fmt.Print(logo) logger.GetLogger().Info().Msg("starting as a storage server") // Spawn our go routines and wait for shutdown. - if err := storageGroup.Run(context.WithValue(context.Background(), common.ContextNodeIDKey, nodeID)); err != nil { + if err := storageGroup.Run(context.WithValue(context.Background(), common.ContextNodeKey, node)); err != nil { logger.GetLogger().Error().Err(err).Stack().Str("name", storageGroup.Name()).Msg("Exit") os.Exit(-1) } return nil }, } - - storageCmd.Flags().StringVar(&flagNodeID, "data-node-id", "", "the data node id") - storageCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the logging") - storageCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") - storageCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") - storageCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") storageCmd.Flags().StringVarP(&flagStorageMode, "mode", "m", storageModeMix, "the storage mode, one of [data, query, mix]") storageCmd.Flags().AddFlagSet(storageGroup.RegisterFlags().FlagSet) return storageCmd diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index 35fb46530..eae908f94 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -22,6 +22,7 @@ import ( "context" "net" "runtime/debug" + "strconv" "time" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" @@ -56,33 +57,58 @@ var ( errAccessLogRootPath = errors.New("access log root path is required") ) +// // Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Server defines the gRPC server. +type Server interface { + run.Unit + GetPort() *uint32 +} + type server struct { pipeline queue.Queue creds credentials.TransportCredentials - *indexRuleRegistryServer - measureSVC *measureService - log *logger.Logger - ser *grpclib.Server + *streamRegistryServer + log *logger.Logger + *indexRuleBindingRegistryServer + ser *grpclib.Server *propertyServer *topNAggregationRegistryServer *groupRegistryServer - stopCh chan struct{} - streamSVC *streamService + stopCh chan struct{} + *indexRuleRegistryServer *measureRegistryServer - *streamRegistryServer - *indexRuleBindingRegistryServer - addr string + streamSVC *streamService + measureSVC *measureService + host string keyFile string certFile string accessLogRootPath string + addr string accessLogRecorders []accessLogRecorder maxRecvMsgSize run.Bytes - tls bool + port uint32 enableIngestionAccessLog bool + tls bool } // NewServer returns a new gRPC server. -func NewServer(_ context.Context, pipeline queue.Queue, schemaRegistry metadata.Repo) run.Unit { +func NewServer(_ context.Context, pipeline queue.Queue, schemaRegistry metadata.Repo) Server { streamSVC := &streamService{ discoveryService: newDiscoveryService(pipeline, schema.KindStream, schemaRegistry), } @@ -148,6 +174,14 @@ func (s *server) Name() string { return "grpc" } +func (s *server) Role() databasev1.Role { + return databasev1.Role_ROLE_LIAISON +} + +func (s *server) GetPort() *uint32 { + return &s.port +} + func (s *server) FlagSet() *run.FlagSet { fs := run.NewFlagSet("grpc") s.maxRecvMsgSize = defaultRecvSize @@ -155,14 +189,16 @@ func (s *server) FlagSet() *run.FlagSet { fs.BoolVar(&s.tls, "tls", false, "connection uses TLS if true, else plain TCP") fs.StringVar(&s.certFile, "cert-file", "", "the TLS cert file") fs.StringVar(&s.keyFile, "key-file", "", "the TLS key file") - fs.StringVar(&s.addr, "addr", ":17912", "the address of banyand listens") + fs.StringVar(&s.host, "grpc-host", "", "the host of banyand listens") + fs.Uint32Var(&s.port, "grpc-port", 17912, "the port of banyand listens") fs.BoolVar(&s.enableIngestionAccessLog, "enable-ingestion-access-log", false, "enable ingestion access log") fs.StringVar(&s.accessLogRootPath, "access-log-root-path", "", "access log root path") return fs } func (s *server) Validate() error { - if s.addr == "" { + s.addr = net.JoinHostPort(s.host, strconv.FormatUint(uint64(s.port), 10)) + if s.addr == ":" { return errNoAddr } if s.enableIngestionAccessLog && s.accessLogRootPath == "" { diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go index 60a432a70..6be5c79f1 100644 --- a/banyand/liaison/http/server.go +++ b/banyand/liaison/http/server.go @@ -22,7 +22,9 @@ import ( "context" "fmt" "io/fs" + "net" "net/http" + "strconv" "strings" "time" @@ -45,39 +47,48 @@ import ( ) var ( - _ run.Config = (*service)(nil) - _ run.Service = (*service)(nil) + _ run.Config = (*server)(nil) + _ run.Service = (*server)(nil) errServerCert = errors.New("http: invalid server cert file") errServerKey = errors.New("http: invalid server key file") errNoAddr = errors.New("http: no address") ) -// NewService return a http service. -func NewService() run.Unit { - return &service{ +// NewServer return a http service. +func NewServer() Server { + return &server{ stopCh: make(chan struct{}), } } -type service struct { - mux *chi.Mux - stopCh chan struct{} - clientCloser context.CancelFunc +// Server is the http service. +type Server interface { + run.Unit + GetPort() *uint32 +} + +type server struct { + creds credentials.TransportCredentials l *logger.Logger + clientCloser context.CancelFunc + mux *chi.Mux srv *http.Server + stopCh chan struct{} + host string listenAddr string grpcAddr string - creds credentials.TransportCredentials keyFile string certFile string grpcCert string + port uint32 tls bool } -func (p *service) FlagSet() *run.FlagSet { +func (p *server) FlagSet() *run.FlagSet { flagSet := run.NewFlagSet("http") - flagSet.StringVar(&p.listenAddr, "http-addr", ":17913", "listen addr for http") + flagSet.StringVar(&p.host, "http-host", "localhost", "listen host for http") + flagSet.Uint32Var(&p.port, "http-port", 17913, "listen port for http") flagSet.StringVar(&p.grpcAddr, "http-grpc-addr", "localhost:17912", "http server redirect grpc requests to this address") flagSet.StringVar(&p.certFile, "http-cert-file", "", "the TLS cert file of http server") flagSet.StringVar(&p.keyFile, "http-key-file", "", "the TLS key file of http server") @@ -86,8 +97,9 @@ func (p *service) FlagSet() *run.FlagSet { return flagSet } -func (p *service) Validate() error { - if p.listenAddr == "" { +func (p *server) Validate() error { + p.listenAddr = net.JoinHostPort(p.host, strconv.FormatUint(uint64(p.port), 10)) + if p.listenAddr == ":" { return errNoAddr } observability.UpdateAddress("http", p.listenAddr) @@ -110,11 +122,19 @@ func (p *service) Validate() error { return nil } -func (p *service) Name() string { +func (p *server) Name() string { return "liaison-http" } -func (p *service) PreRun(_ context.Context) error { +func (p *server) Role() databasev1.Role { + return databasev1.Role_ROLE_LIAISON +} + +func (p *server) GetPort() *uint32 { + return &p.port +} + +func (p *server) PreRun(_ context.Context) error { p.l = logger.GetLogger(p.Name()) p.mux = chi.NewRouter() @@ -134,7 +154,7 @@ func (p *service) PreRun(_ context.Context) error { return nil } -func (p *service) Serve() run.StopNotify { +func (p *server) Serve() run.StopNotify { var ctx context.Context ctx, p.clientCloser = context.WithCancel(context.Background()) opts := make([]grpc.DialOption, 0, 1) @@ -183,7 +203,7 @@ func (p *service) Serve() run.StopNotify { return p.stopCh } -func (p *service) GracefulStop() { +func (p *server) GracefulStop() { if err := p.srv.Close(); err != nil { p.l.Error().Err(err) } diff --git a/banyand/liaison/liaison.go b/banyand/liaison/liaison.go deleted file mode 100644 index a1b657cfb..000000000 --- a/banyand/liaison/liaison.go +++ /dev/null @@ -1,33 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// Package liaison implements a transmission layer between a data layer and a client. -package liaison - -import ( - "context" - - "github.com/apache/skywalking-banyandb/banyand/liaison/grpc" - "github.com/apache/skywalking-banyandb/banyand/metadata" - "github.com/apache/skywalking-banyandb/banyand/queue" - "github.com/apache/skywalking-banyandb/pkg/run" -) - -// NewEndpoint return a new endpoint which is the entry point for the database server. -func NewEndpoint(ctx context.Context, pipeline queue.Queue, schemaRegistry metadata.Repo) (run.Unit, error) { - return grpc.NewServer(ctx, pipeline, schemaRegistry), nil -} diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go index 533842327..73768a1fb 100644 --- a/banyand/metadata/client.go +++ b/banyand/metadata/client.go @@ -70,11 +70,11 @@ func (s *clientService) PreRun(ctx context.Context) error { if err != nil { return err } - val := ctx.Value(common.ContextNodeIDKey) + val := ctx.Value(common.ContextNodeKey) if val == nil { return errors.New("node id is empty") } - nodeID := val.(common.NodeID) + node := val.(common.Node) val = ctx.Value(common.ContextNodeRolesKey) if val == nil { return errors.New("node roles is empty") @@ -83,9 +83,11 @@ func (s *clientService) PreRun(ctx context.Context) error { ctxRegister, cancel := context.WithTimeout(ctx, time.Second*5) defer cancel() if err = s.schemaRegistry.RegisterNode(ctxRegister, &databasev1.Node{ - Name: string(nodeID), - Roles: nodeRoles, - CreatedAt: timestamppb.Now(), + Name: node.NodeID, + GrpcAddress: node.GrpcAddress, + HttpAddress: node.HTTPAddress, + Roles: nodeRoles, + CreatedAt: timestamppb.Now(), }); err != nil { return err } @@ -140,10 +142,6 @@ func (s *clientService) ShardRegistry() schema.Shard { return s.schemaRegistry } -func (s *clientService) EndpointRegistry() schema.Endpoint { - return s.schemaRegistry -} - func (s *clientService) Name() string { return "metadata" } diff --git a/banyand/metadata/embeddedetcd/server.go b/banyand/metadata/embeddedetcd/server.go index d6ac0cb5d..dfd87b73e 100644 --- a/banyand/metadata/embeddedetcd/server.go +++ b/banyand/metadata/embeddedetcd/server.go @@ -85,6 +85,7 @@ func (e *server) StoppingNotify() <-chan struct{} { func (e *server) Close() error { e.server.Close() + <-e.server.Server.StopNotify() return nil } diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go index 91d5a25b4..9bad1e72d 100644 --- a/banyand/metadata/metadata.go +++ b/banyand/metadata/metadata.go @@ -48,7 +48,6 @@ type Repo interface { PropertyRegistry() schema.Property ShardRegistry() schema.Shard RegisterHandler(schema.Kind, schema.EventHandler) - EndpointRegistry() schema.Endpoint } // Service is the metadata repository. diff --git a/banyand/metadata/metadata_test.go b/banyand/metadata/metadata_test.go index d6cb4b7ae..8c60f2ff6 100644 --- a/banyand/metadata/metadata_test.go +++ b/banyand/metadata/metadata_test.go @@ -49,7 +49,7 @@ func Test_service_RulesBySubject(t *testing.T) { err = s.FlagSet().Parse([]string{"--metadata-root-path=" + rootDir}) is.NoError(err) is.NoError(s.Validate()) - ctx = context.WithValue(ctx, common.ContextNodeIDKey, common.NodeID("test-node-id")) + ctx = context.WithValue(ctx, common.ContextNodeKey, common.Node{NodeID: "test"}) ctx = context.WithValue(ctx, common.ContextNodeRolesKey, []databasev1.Role{databasev1.Role_ROLE_META}) err = s.PreRun(ctx) is.NoError(err) diff --git a/banyand/metadata/schema/checker.go b/banyand/metadata/schema/checker.go index 80763e57a..cdedc54b4 100644 --- a/banyand/metadata/schema/checker.go +++ b/banyand/metadata/schema/checker.go @@ -95,12 +95,6 @@ var checkerMap = map[Kind]equalityChecker{ protocmp.IgnoreFields(&commonv1.Metadata{}, "id", "create_revision", "mod_revision"), protocmp.Transform()) }, - KindEndpoint: func(a, b proto.Message) bool { - return cmp.Equal(a, b, - protocmp.IgnoreUnknown(), - protocmp.IgnoreFields(&databasev1.Endpoint{}, "created_at"), - protocmp.Transform()) - }, KindMask: func(a, b proto.Message) bool { return false }, diff --git a/banyand/metadata/schema/endpoint.go b/banyand/metadata/schema/endpoint.go deleted file mode 100644 index cf143ecdc..000000000 --- a/banyand/metadata/schema/endpoint.go +++ /dev/null @@ -1,66 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package schema - -import ( - "context" - - "google.golang.org/protobuf/proto" - - databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" -) - -var endpointKeyPrefix = "/endpoints/" - -func (e *etcdSchemaRegistry) ListEndpoint(ctx context.Context, nodeRole databasev1.Role) ([]*databasev1.Endpoint, error) { - nn, err := e.ListNode(ctx, nodeRole) - if err != nil { - return nil, err - } - messages, err := e.listWithPrefix(ctx, endpointKeyPrefix, func() proto.Message { - return &databasev1.Node{} - }) - if err != nil { - return nil, err - } - entities := make([]*databasev1.Endpoint, 0, len(messages)) - for _, message := range messages { - endpoint := message.(*databasev1.Endpoint) - for _, n := range nn { - if n.Name == endpoint.Node { - entities = append(entities, endpoint) - break - } - } - } - return entities, nil -} - -func (e *etcdSchemaRegistry) RegisterEndpoint(ctx context.Context, endpoint *databasev1.Endpoint) error { - return e.register(ctx, Metadata{ - TypeMeta: TypeMeta{ - Kind: KindEndpoint, - Name: endpoint.Name, - }, - Spec: endpoint, - }) -} - -func formatEndpointKey(name string) string { - return endpointKeyPrefix + name -} diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go index 0dd053f4e..6cbb459c7 100644 --- a/banyand/metadata/schema/etcd.go +++ b/banyand/metadata/schema/etcd.go @@ -354,16 +354,6 @@ func (e *etcdSchemaRegistry) register(ctx context.Context, metadata Metadata) er if err != nil { return err } - getResp, err := e.client.Get(ctx, key) - if err != nil { - return err - } - if getResp.Count > 1 { - return errUnexpectedNumberOfEntities - } - if getResp.Count > 0 { - return errGRPCAlreadyExists - } val, err := proto.Marshal(metadata.Spec.(proto.Message)) if err != nil { return err @@ -373,10 +363,18 @@ func (e *etcdSchemaRegistry) register(ctx context.Context, metadata Metadata) er if err != nil { return err } - _, err = e.client.Put(ctx, key, string(val), clientv3.WithLease(lease.ID)) + var ops []clientv3.Cmp + ops = append(ops, clientv3.Compare(clientv3.CreateRevision(key), "=", 0)) + txn := e.client.Txn(ctx).If(ops...) + txn = txn.Then(clientv3.OpPut(key, string(val), clientv3.WithLease(lease.ID))) + txn = txn.Else(clientv3.OpGet(key)) + response, err := txn.Commit() if err != nil { return err } + if !response.Succeeded { + return errGRPCAlreadyExists + } // Keep the lease alive // nolint:contextcheck keepAliveChan, err := e.client.KeepAlive(context.Background(), lease.ID) diff --git a/banyand/metadata/schema/register_test.go b/banyand/metadata/schema/register_test.go new file mode 100644 index 000000000..2fd2ace96 --- /dev/null +++ b/banyand/metadata/schema/register_test.go @@ -0,0 +1,82 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package schema + +import ( + "context" + "fmt" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "github.com/onsi/gomega/gleak" + + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" +) + +var _ = ginkgo.Describe("etcd_register", func() { + var endpoints []string + var goods []gleak.Goroutine + var server embeddedetcd.Server + var r *etcdSchemaRegistry + md := Metadata{ + TypeMeta: TypeMeta{ + Name: "test", + Kind: KindNode, + }, + Spec: &databasev1.Node{}, + } + ginkgo.BeforeEach(func() { + goods = gleak.Goroutines() + ports, err := test.AllocateFreePorts(2) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + endpoints = []string{fmt.Sprintf("http://127.0.0.1:%d", ports[0])} + server, err = embeddedetcd.NewServer( + embeddedetcd.ConfigureListener(endpoints, []string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}), + embeddedetcd.RootDir(randomTempDir())) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + <-server.ReadyNotify() + schemaRegistry, err := NewEtcdSchemaRegistry(ConfigureServerEndpoints(endpoints)) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + r = schemaRegistry.(*etcdSchemaRegistry) + }) + ginkgo.AfterEach(func() { + gomega.Expect(r.Close()).ShouldNot(gomega.HaveOccurred()) + server.Close() + gomega.Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + }) + + ginkgo.It("should revoke the leaser", func() { + gomega.Expect(r.register(context.Background(), md)).ShouldNot(gomega.HaveOccurred()) + k, err := md.key() + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(r.get(context.Background(), k, &databasev1.Node{})).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(r.Close()).ShouldNot(gomega.HaveOccurred()) + schemaRegistry, err := NewEtcdSchemaRegistry(ConfigureServerEndpoints(endpoints)) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + r = schemaRegistry.(*etcdSchemaRegistry) + gomega.Expect(r.get(context.Background(), k, &databasev1.Node{})).Should(gomega.MatchError(ErrGRPCResourceNotFound)) + }) + + ginkgo.It("should register only once", func() { + gomega.Expect(r.register(context.Background(), md)).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(r.register(context.Background(), md)).Should(gomega.MatchError(errGRPCAlreadyExists)) + }) +}) diff --git a/banyand/metadata/schema/schema.go b/banyand/metadata/schema/schema.go index 22ae1a148..fed58409c 100644 --- a/banyand/metadata/schema/schema.go +++ b/banyand/metadata/schema/schema.go @@ -52,10 +52,9 @@ const ( KindProperty KindNode KindShard - KindEndpoint KindMask = KindGroup | KindStream | KindMeasure | KindIndexRuleBinding | KindIndexRule | - KindTopNAggregation | KindProperty | KindNode | KindShard | KindEndpoint + KindTopNAggregation | KindProperty | KindNode | KindShard ) // ListOpt contains options to list resources. @@ -75,7 +74,6 @@ type Registry interface { Property Node Shard - Endpoint RegisterHandler(Kind, EventHandler) } @@ -116,8 +114,6 @@ func (tm TypeMeta) Unmarshal(data []byte) (m proto.Message, err error) { m = &databasev1.Node{} case KindShard: m = &databasev1.Shard{} - case KindEndpoint: - m = &databasev1.Endpoint{} default: return nil, errUnsupportedEntityType } @@ -166,8 +162,6 @@ func (m Metadata) key() (string, error) { Group: m.Group, Name: m.Name, }), nil - case KindEndpoint: - return formatEndpointKey(m.Name), nil default: return "", errUnsupportedEntityType } @@ -255,12 +249,6 @@ type Node interface { RegisterNode(ctx context.Context, node *databasev1.Node) error } -// Endpoint allows CRUD endpoint schemas in a group. -type Endpoint interface { - ListEndpoint(ctx context.Context, nodeRole databasev1.Role) ([]*databasev1.Endpoint, error) - RegisterEndpoint(ctx context.Context, endpoint *databasev1.Endpoint) error -} - // Shard allows CRUD shard schemas in a group. type Shard interface { CreateOrUpdateShard(ctx context.Context, shard *databasev1.Shard) error diff --git a/docs/api-reference.md b/docs/api-reference.md index 01e67e6ae..45fa7b258 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -13,11 +13,9 @@ - [IntervalRule.Unit](#banyandb-common-v1-IntervalRule-Unit) - [banyandb/database/v1/database.proto](#banyandb_database_v1_database-proto) - - [Endpoint](#banyandb-database-v1-Endpoint) - [Node](#banyandb-database-v1-Node) - [Shard](#banyandb-database-v1-Shard) - - [Protocol](#banyandb-database-v1-Protocol) - [Role](#banyandb-database-v1-Role) - [banyandb/model/v1/common.proto](#banyandb_model_v1_common-proto) @@ -329,26 +327,6 @@ Metadata is for multi-tenant, multi-model use - - -### Endpoint - - - -| Field | Type | Label | Description | -| ----- | ---- | ----- | ----------- | -| name | [string](#string) | | | -| node | [string](#string) | | | -| addresses | [string](#string) | repeated | | -| port | [uint32](#uint32) | | | -| protocol | [Protocol](#banyandb-database-v1-Protocol) | | | -| created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | - - - - - - ### Node @@ -359,6 +337,8 @@ Metadata is for multi-tenant, multi-model use | ----- | ---- | ----- | ----------- | | name | [string](#string) | | | | roles | [Role](#banyandb-database-v1-Role) | repeated | | +| grpc_address | [string](#string) | | | +| http_address | [string](#string) | | | | created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | @@ -389,19 +369,6 @@ Metadata is for multi-tenant, multi-model use - - -### Protocol - - -| Name | Number | Description | -| ---- | ------ | ----------- | -| PROTOCOL_UNSPECIFIED | 0 | | -| PROTOCOL_HTTP | 1 | | -| PROTOCOL_GRPC | 2 | | - - - ### Role diff --git a/pkg/host/host.go b/pkg/host/host.go new file mode 100644 index 000000000..126f47685 --- /dev/null +++ b/pkg/host/host.go @@ -0,0 +1,66 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package host provides information about the host. +package host + +import ( + "errors" + "net" + "os" +) + +// Name returns the host name reported by the kernel. +func Name() (name string, err error) { + return os.Hostname() +} + +// IP returns the first non-loopback IP address of the host. +func IP() (ip string, err error) { + ifaces, err := net.Interfaces() + if err != nil { + return "", err + } + for _, iface := range ifaces { + if iface.Flags&net.FlagUp == 0 || iface.Flags&net.FlagLoopback != 0 { + continue + } + addrs, err := iface.Addrs() + if err != nil { + return "", err + } + for _, addr := range addrs { + var ip net.IP + switch v := addr.(type) { + case *net.IPNet: + ip = v.IP + case *net.IPAddr: + ip = v.IP + } + if ip == nil || ip.IsLoopback() { + continue + } + ip = ip.To4() + if ip == nil { + // not an ipv4 address + continue + } + return ip.String(), nil + } + } + return "", errors.New("are you connected to the network?") +} diff --git a/pkg/test/setup.go b/pkg/test/setup.go index 9cc47b53c..d270220c6 100644 --- a/pkg/test/setup.go +++ b/pkg/test/setup.go @@ -42,7 +42,7 @@ func SetupModules(flags []string, units ...run.Unit) func() { defer func() { wg.Done() }() - errRun := g.Run(context.WithValue(context.Background(), common.ContextNodeIDKey, common.NodeID("test-node-id"))) + errRun := g.Run(context.WithValue(context.Background(), common.ContextNodeKey, common.Node{NodeID: "test"})) gomega.Expect(errRun).ShouldNot(gomega.HaveOccurred()) }() g.WaitTillReady() diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go index 57ad28205..3c551382c 100644 --- a/pkg/test/setup/setup.go +++ b/pkg/test/setup/setup.go @@ -57,8 +57,10 @@ func CommonWithSchemaLoaders(schemaLoaders []SchemaLoader, flags ...string) (str addr := fmt.Sprintf("%s:%d", host, ports[0]) httpAddr := fmt.Sprintf("%s:%d", host, ports[1]) ff := []string{ - "--addr=" + addr, - "--http-addr=" + httpAddr, + "--grpc-host=" + host, + fmt.Sprintf("--grpc-port=%d", ports[0]), + "--http-host=" + host, + fmt.Sprintf("--http-port=%d", ports[1]), "--http-grpc-addr=" + addr, "--stream-root-path=" + path, "--measure-root-path=" + path, @@ -92,7 +94,7 @@ func modules(schemaLoaders []SchemaLoader, flags []string) func() { q, err := query.NewService(context.TODO(), streamSvc, measureSvc, metaSvc, pipeline) gomega.Expect(err).NotTo(gomega.HaveOccurred()) tcp := grpc.NewServer(context.TODO(), pipeline, metaSvc) - httpServer := http.NewService() + httpServer := http.NewServer() units := []run.Unit{ pipeline,