From 6c0fc2153406b9a8269a54c7b690473548e4a018 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Mon, 20 May 2024 10:56:09 -0700 Subject: [PATCH] Allower to read cluster config directly from the k8s config-map (#484) --- cmd/coordinator/cmd.go | 99 ++++++++----- cmd/coordinator/cmd_test.go | 33 +++-- cmd/coordinator/viper_configmap.go | 132 ++++++++++++++++++ coordinator/coordinator.go | 25 ++-- coordinator/impl/coordinator.go | 72 ++++------ coordinator/impl/coordinator_e2e_test.go | 46 +++--- .../templates/coordinator-deployment.yaml | 8 +- go.mod | 1 - go.sum | 2 - maelstrom/main.go | 4 +- tests/security/tls/tls_encryption_test.go | 16 ++- 11 files changed, 297 insertions(+), 141 deletions(-) create mode 100644 cmd/coordinator/viper_configmap.go diff --git a/cmd/coordinator/cmd.go b/cmd/coordinator/cmd.go index 9b60cfbc..ab9e327c 100644 --- a/cmd/coordinator/cmd.go +++ b/cmd/coordinator/cmd.go @@ -17,30 +17,30 @@ package coordinator import ( "errors" "io" - "time" + "log/slog" + "strings" "github.com/fsnotify/fsnotify" - "github.com/spf13/cobra" "github.com/spf13/viper" - "github.com/streamnative/oxia/cmd/flag" "github.com/streamnative/oxia/common" + + "github.com/streamnative/oxia/cmd/flag" "github.com/streamnative/oxia/coordinator" "github.com/streamnative/oxia/coordinator/model" ) var ( - conf = coordinator.NewConfig() - configFile string - configChangeCh chan struct{} + conf = coordinator.NewConfig() + configFile string Cmd = &cobra.Command{ Use: "coordinator", Short: "Start a coordinator", Long: `Start a coordinator`, PreRunE: validate, - Run: exec, + RunE: exec, } ) @@ -48,16 +48,10 @@ func init() { flag.InternalAddr(Cmd, &conf.InternalServiceAddr) flag.MetricsAddr(Cmd, &conf.MetricsServiceAddr) Cmd.Flags().Var(&conf.MetadataProviderImpl, "metadata", "Metadata provider implementation: file, configmap or memory") - Cmd.Flags().StringVar(&conf.K8SMetadataNamespace, "k8s-namespace", conf.K8SMetadataNamespace, "Kubernetes namespace for metadata configmap") - Cmd.Flags().StringVar(&conf.K8SMetadataConfigMapName, "k8s-configmap-name", conf.K8SMetadataConfigMapName, "ConfigMap name for metadata configmap") + Cmd.Flags().StringVar(&conf.K8SMetadataNamespace, "k8s-namespace", conf.K8SMetadataNamespace, "Kubernetes namespace for oxia config maps") + Cmd.Flags().StringVar(&conf.K8SMetadataConfigMapName, "k8s-configmap-name", conf.K8SMetadataConfigMapName, "ConfigMap name for cluster status configmap") Cmd.Flags().StringVar(&conf.FileMetadataPath, "file-clusters-status-path", "data/cluster-status.json", "The path where the cluster status is stored when using 'file' provider") Cmd.Flags().StringVarP(&configFile, "conf", "f", "", "Cluster config file") - Cmd.Flags().DurationVar(&conf.ClusterConfigRefreshTime, "conf-file-refresh-time", 1*time.Minute, "How frequently to check for updates for cluster configuration file") - - setConfigPath() - viper.OnConfigChange(func(_ fsnotify.Event) { - configChangeCh <- struct{}{} - }) } func validate(*cobra.Command, []string) error { @@ -69,41 +63,80 @@ func validate(*cobra.Command, []string) error { return errors.New("k8s-configmap-name must be set with metadata=configmap") } } - if _, _, err := loadClusterConfig(); err != nil { - return err - } return nil } -func setConfigPath() { +func configIsRemote() bool { + return strings.HasPrefix(configFile, "configmap:") +} + +func setConfigPath(v *viper.Viper) error { + v.SetConfigType("yaml") + + if configIsRemote() { + err := v.AddRemoteProvider("configmap", "endpoint", configFile) + if err != nil { + slog.Error("Failed to add remote provider", slog.Any("error", err)) + return err + } + + return v.WatchRemoteConfigOnChannel() + } + if configFile == "" { - viper.AddConfigPath("/oxia/conf") - viper.AddConfigPath(".") - } else { - viper.SetConfigFile(configFile) + v.AddConfigPath("/oxia/conf") + v.AddConfigPath(".") } + + v.SetConfigFile(configFile) + v.WatchConfig() + return nil } -func loadClusterConfig() (model.ClusterConfig, chan struct{}, error) { - setConfigPath() +func loadClusterConfig(v *viper.Viper) (model.ClusterConfig, error) { cc := model.ClusterConfig{} - if err := viper.ReadInConfig(); err != nil { - return cc, configChangeCh, err + var err error + + if configIsRemote() { + err = v.ReadRemoteConfig() + } else { + err = v.ReadInConfig() + } + + if err != nil { + return cc, err } - if err := viper.Unmarshal(&cc); err != nil { - return cc, configChangeCh, err + if err := v.Unmarshal(&cc); err != nil { + return cc, err } - return cc, configChangeCh, nil + return cc, nil } -func exec(*cobra.Command, []string) { - viper.WatchConfig() - conf.ClusterConfigProvider = loadClusterConfig +func exec(*cobra.Command, []string) error { + v := viper.New() + + conf.ClusterConfigChangeNotifications = make(chan any) + conf.ClusterConfigProvider = func() (model.ClusterConfig, error) { + return loadClusterConfig(v) + } + + v.OnConfigChange(func(e fsnotify.Event) { + conf.ClusterConfigChangeNotifications <- nil + }) + + if err := setConfigPath(v); err != nil { + return err + } + + if _, err := loadClusterConfig(v); err != nil { + return err + } common.RunProcess(func() (io.Closer, error) { return coordinator.New(conf) }) + return nil } diff --git a/cmd/coordinator/cmd_test.go b/cmd/coordinator/cmd_test.go index 16c4b719..1833c9db 100644 --- a/cmd/coordinator/cmd_test.go +++ b/cmd/coordinator/cmd_test.go @@ -19,8 +19,9 @@ import ( "strings" "testing" - "github.com/spf13/cobra" "github.com/spf13/viper" + + "github.com/spf13/cobra" "github.com/stretchr/testify/assert" "gopkg.in/yaml.v3" @@ -62,10 +63,9 @@ func TestCmd(t *testing.T) { isErr bool }{ {[]string{}, coordinator.Config{ - InternalServiceAddr: "localhost:6649", - MetricsServiceAddr: "localhost:8080", - MetadataProviderImpl: coordinator.File, - ClusterConfigRefreshTime: 0, + InternalServiceAddr: "localhost:6649", + MetricsServiceAddr: "localhost:8080", + MetadataProviderImpl: coordinator.File, }, model.ClusterConfig{ Namespaces: []model.NamespaceConfig{{ Name: common.DefaultNamespace, @@ -143,22 +143,27 @@ func TestCmd(t *testing.T) { }, }, false}, {[]string{"-f=invalid.yaml"}, coordinator.Config{ - InternalServiceAddr: "localhost:6649", - MetricsServiceAddr: "localhost:8080", + InternalServiceAddr: "localhost:6649", + MetricsServiceAddr: "localhost:8080", + MetadataProviderImpl: coordinator.File, }, model.ClusterConfig{}, true}, } { t.Run(strings.Join(test.args, "_"), func(t *testing.T) { conf = coordinator.NewConfig() configFile = "" - viper.Reset() Cmd.SetArgs(test.args) - Cmd.Run = func(cmd *cobra.Command, args []string) { - assert.Equal(t, test.expectedConf, conf) + Cmd.RunE = func(cmd *cobra.Command, args []string) error { + v := viper.New() + assert.NoError(t, setConfigPath(v)) - conf.ClusterConfigProvider = loadClusterConfig - clusterConf, _, err := conf.ClusterConfigProvider() - assert.NoError(t, err) + assert.Equal(t, test.expectedConf, conf) + conf.ClusterConfigProvider = func() (model.ClusterConfig, error) { + return loadClusterConfig(v) + } + clusterConf, err := conf.ClusterConfigProvider() + assert.Equal(t, test.isErr, err != nil) assert.Equal(t, test.expectedClusterConf, clusterConf) + return err } err = Cmd.Execute() assert.Equal(t, test.isErr, err != nil) @@ -182,7 +187,7 @@ func TestCmd(t *testing.T) { configFile = "" viper.Reset() Cmd.SetArgs(test.args) - Cmd.Run = func(cmd *cobra.Command, args []string) {} + Cmd.RunE = func(cmd *cobra.Command, args []string) error { return nil } err = Cmd.Execute() assert.Equal(t, test.isErr, err != nil) }) diff --git a/cmd/coordinator/viper_configmap.go b/cmd/coordinator/viper_configmap.go new file mode 100644 index 00000000..11c9a30b --- /dev/null +++ b/cmd/coordinator/viper_configmap.go @@ -0,0 +1,132 @@ +// Copyright 2023 StreamNative, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package coordinator + +import ( + "bytes" + "context" + "io" + "log/slog" + "strings" + + "github.com/pkg/errors" + "github.com/spf13/viper" + v1 "k8s.io/api/core/v1" //nolint:revive + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + + "github.com/streamnative/oxia/common" + "github.com/streamnative/oxia/coordinator/impl" +) + +type cmConfigProvider struct { +} + +const filePath = "config.yaml" + +func getNamespaceAndCmName(rp viper.RemoteProvider) (namespace, cmName string, err error) { + p := strings.Split(strings.TrimPrefix(rp.Path(), "configmap:"), "/") + if len(p) != 2 { + return "", "", errors.New("Invalid configmap configuration") + } + + return p[0], p[1], nil +} + +func (*cmConfigProvider) Get(rp viper.RemoteProvider) (io.Reader, error) { + kubernetes := impl.NewK8SClientset(impl.NewK8SClientConfig()) + namespace, configmap, err := getNamespaceAndCmName(rp) + if err != nil { + return nil, err + } + cmValue, err := impl.K8SConfigMaps(kubernetes).Get(namespace, configmap) + if err != nil { + return nil, err + } + + data, ok := cmValue.Data[filePath] + if !ok { + return nil, errors.Errorf("path not found in config map: %s", rp.Path()) + } + return bytes.NewReader([]byte(data)), nil +} + +func (c *cmConfigProvider) Watch(rp viper.RemoteProvider) (io.Reader, error) { + return c.Get(rp) +} + +func (*cmConfigProvider) WatchChannel(rp viper.RemoteProvider) (<-chan *viper.RemoteResponse, chan bool) { + kubernetes := impl.NewK8SClientset(impl.NewK8SClientConfig()) + namespace, configmap, _ := getNamespaceAndCmName(rp) + + ch := make(chan *viper.RemoteResponse) + w, err := kubernetes.CoreV1().ConfigMaps(namespace).Watch(context.Background(), metav1.ListOptions{}) + if err != nil { + slog.Error("Failed to setup watch on config map", + slog.String("k8s-namespace", namespace), + slog.String("k8s-config-map", configmap), + slog.Any("error", err)) + ch <- &viper.RemoteResponse{Error: err} + close(ch) + return ch, nil + } + + go common.DoWithLabels(context.Background(), map[string]string{ + "component": "k8s-configmap-watch", + }, func() { + for res := range w.ResultChan() { + cm, ok := res.Object.(*v1.ConfigMap) + if !ok { + slog.Warn("Got wrong type of object notification", + slog.String("k8s-namespace", namespace), + slog.String("k8s-config-map", configmap), + slog.Any("object", res), + ) + } + if cm.Name != configmap { + continue + } + + slog.Info("Got watch event from K8S", + slog.String("k8s-namespace", namespace), + slog.String("k8s-config-map", configmap), + slog.Any("event-type", res.Type), + ) + + switch res.Type { + case watch.Added, watch.Modified: + ch <- &viper.RemoteResponse{ + Value: []byte(cm.Data[filePath]), + Error: nil, + } + + // Also notifies directly the oxia coordinator + conf.ClusterConfigChangeNotifications <- nil + default: + ch <- &viper.RemoteResponse{ + Value: nil, + Error: errors.Errorf("unexpected event on config map: %v", res.Type), + } + } + } + }) + + return ch, nil +} + +func init() { + viper.RemoteConfig = &cmConfigProvider{} + viper.SupportedRemoteProviders = []string{"configmap"} +} diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index b2e333bf..8ad6d3c3 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -19,7 +19,6 @@ import ( "errors" "fmt" "log/slog" - "time" "go.uber.org/multierr" @@ -30,17 +29,17 @@ import ( ) type Config struct { - InternalServiceAddr string - InternalSecureServiceAddr string - PeerTLS *tls.Config - ServerTLS *tls.Config - MetricsServiceAddr string - MetadataProviderImpl MetadataProviderImpl - K8SMetadataNamespace string - K8SMetadataConfigMapName string - FileMetadataPath string - ClusterConfigProvider func() (model.ClusterConfig, chan struct{}, error) - ClusterConfigRefreshTime time.Duration + InternalServiceAddr string + InternalSecureServiceAddr string + PeerTLS *tls.Config + ServerTLS *tls.Config + MetricsServiceAddr string + MetadataProviderImpl MetadataProviderImpl + K8SMetadataNamespace string + K8SMetadataConfigMapName string + FileMetadataPath string + ClusterConfigProvider func() (model.ClusterConfig, error) + ClusterConfigChangeNotifications chan any } type MetadataProviderImpl string @@ -109,7 +108,7 @@ func New(config Config) (*Coordinator, error) { rpcClient := impl.NewRpcProvider(s.clientPool) var err error - if s.coordinator, err = impl.NewCoordinator(metadataProvider, config.ClusterConfigProvider, config.ClusterConfigRefreshTime, rpcClient); err != nil { + if s.coordinator, err = impl.NewCoordinator(metadataProvider, config.ClusterConfigProvider, config.ClusterConfigChangeNotifications, rpcClient); err != nil { return nil, err } diff --git a/coordinator/impl/coordinator.go b/coordinator/impl/coordinator.go index be07003a..008cf18e 100644 --- a/coordinator/impl/coordinator.go +++ b/coordinator/impl/coordinator.go @@ -62,10 +62,9 @@ type coordinator struct { assignmentsChanged common.ConditionContext MetadataProvider - clusterConfigProvider func() (model.ClusterConfig, chan struct{}, error) + clusterConfigProvider func() (model.ClusterConfig, error) model.ClusterConfig - clusterConfigRefreshTime time.Duration - clusterConfigChangeCh chan struct{} + clusterConfigChangeCh chan any shardControllers map[int64]ShardController nodeControllers map[string]NodeController @@ -85,27 +84,23 @@ type coordinator struct { } func NewCoordinator(metadataProvider MetadataProvider, - clusterConfigProvider func() (model.ClusterConfig, chan struct{}, error), - clusterConfigRefreshTime time.Duration, rpc RpcProvider) (Coordinator, error) { - initialClusterConf, clusterConfigChangeCh, err := clusterConfigProvider() + clusterConfigProvider func() (model.ClusterConfig, error), + clusterConfigNotificationsCh chan any, + rpc RpcProvider) (Coordinator, error) { + initialClusterConf, err := clusterConfigProvider() if err != nil { return nil, err } - if clusterConfigRefreshTime == 0 { - clusterConfigRefreshTime = 1 * time.Minute - } - c := &coordinator{ - MetadataProvider: metadataProvider, - clusterConfigProvider: clusterConfigProvider, - clusterConfigChangeCh: clusterConfigChangeCh, - ClusterConfig: initialClusterConf, - clusterConfigRefreshTime: clusterConfigRefreshTime, - shardControllers: make(map[int64]ShardController), - nodeControllers: make(map[string]NodeController), - drainingNodes: make(map[string]NodeController), - rpc: rpc, + MetadataProvider: metadataProvider, + clusterConfigProvider: clusterConfigProvider, + clusterConfigChangeCh: clusterConfigNotificationsCh, + ClusterConfig: initialClusterConf, + shardControllers: make(map[int64]ShardController), + nodeControllers: make(map[string]NodeController), + drainingNodes: make(map[string]NodeController), + rpc: rpc, log: slog.With( slog.String("component", "coordinator"), ), @@ -405,35 +400,26 @@ func (c *coordinator) ClusterStatus() model.ClusterStatus { } func (c *coordinator) waitForExternalEvents() { - refreshTimer := time.NewTicker(c.clusterConfigRefreshTime) - defer refreshTimer.Stop() - - var handleEvent = func() { - if err := c.handleClusterConfigUpdated(); err != nil { - c.log.Warn( - "Failed to update cluster config", - slog.Any("error", err), - ) - } - - if err := c.rebalanceCluster(); err != nil { - c.log.Warn( - "Failed to rebalance cluster", - slog.Any("error", err), - ) - } - } - for { select { case <-c.ctx.Done(): return - case <-refreshTimer.C: - handleEvent() case <-c.clusterConfigChangeCh: c.log.Info("Received cluster config change event") - handleEvent() + if err := c.handleClusterConfigUpdated(); err != nil { + c.log.Warn( + "Failed to update cluster config", + slog.Any("error", err), + ) + } + + if err := c.rebalanceCluster(); err != nil { + c.log.Warn( + "Failed to rebalance cluster", + slog.Any("error", err), + ) + } } } } @@ -442,13 +428,13 @@ func (c *coordinator) handleClusterConfigUpdated() error { c.Lock() defer c.Unlock() - newClusterConfig, _, err := c.clusterConfigProvider() + newClusterConfig, err := c.clusterConfigProvider() if err != nil { return errors.Wrap(err, "failed to read cluster configuration") } if reflect.DeepEqual(newClusterConfig, c.ClusterConfig) { - c.log.Debug("Cluster config has not changed since last time") + c.log.Info("No cluster config changes detected") return nil } diff --git a/coordinator/impl/coordinator_e2e_test.go b/coordinator/impl/coordinator_e2e_test.go index e6e67a4f..46635dc8 100644 --- a/coordinator/impl/coordinator_e2e_test.go +++ b/coordinator/impl/coordinator_e2e_test.go @@ -70,7 +70,7 @@ func TestCoordinatorE2E(t *testing.T) { } clientPool := common.NewClientPool(nil) - coordinator, err := NewCoordinator(metadataProvider, func() (model.ClusterConfig, chan struct{}, error) { return clusterConfig, nil, nil }, 0, NewRpcProvider(clientPool)) + coordinator, err := NewCoordinator(metadataProvider, func() (model.ClusterConfig, error) { return clusterConfig, nil }, nil, NewRpcProvider(clientPool)) assert.NoError(t, err) @@ -108,7 +108,7 @@ func TestCoordinatorE2E_ShardsRanges(t *testing.T) { } clientPool := common.NewClientPool(nil) - coordinator, err := NewCoordinator(metadataProvider, func() (model.ClusterConfig, chan struct{}, error) { return clusterConfig, nil, nil }, 0, NewRpcProvider(clientPool)) + coordinator, err := NewCoordinator(metadataProvider, func() (model.ClusterConfig, error) { return clusterConfig, nil }, nil, NewRpcProvider(clientPool)) assert.NoError(t, err) cs := coordinator.ClusterStatus() @@ -161,7 +161,7 @@ func TestCoordinator_LeaderFailover(t *testing.T) { } clientPool := common.NewClientPool(nil) - coordinator, err := NewCoordinator(metadataProvider, func() (model.ClusterConfig, chan struct{}, error) { return clusterConfig, nil, nil }, 0, NewRpcProvider(clientPool)) + coordinator, err := NewCoordinator(metadataProvider, func() (model.ClusterConfig, error) { return clusterConfig, nil }, nil, NewRpcProvider(clientPool)) assert.NoError(t, err) nsStatus := coordinator.ClusterStatus().Namespaces[common.DefaultNamespace] @@ -265,7 +265,7 @@ func TestCoordinator_MultipleNamespaces(t *testing.T) { } clientPool := common.NewClientPool(nil) - coordinator, err := NewCoordinator(metadataProvider, func() (model.ClusterConfig, chan struct{}, error) { return clusterConfig, nil, nil }, 0, NewRpcProvider(clientPool)) + coordinator, err := NewCoordinator(metadataProvider, func() (model.ClusterConfig, error) { return clusterConfig, nil }, nil, NewRpcProvider(clientPool)) assert.NoError(t, err) nsDefaultStatus := coordinator.ClusterStatus().Namespaces[common.DefaultNamespace] @@ -356,7 +356,7 @@ func TestCoordinator_DeleteNamespace(t *testing.T) { } clientPool := common.NewClientPool(nil) - coordinator, err := NewCoordinator(metadataProvider, func() (model.ClusterConfig, chan struct{}, error) { return clusterConfig, nil, nil }, 0, NewRpcProvider(clientPool)) + coordinator, err := NewCoordinator(metadataProvider, func() (model.ClusterConfig, error) { return clusterConfig, nil }, nil, NewRpcProvider(clientPool)) assert.NoError(t, err) ns1Status := coordinator.ClusterStatus().Namespaces["my-ns-1"] @@ -401,7 +401,7 @@ func TestCoordinator_DeleteNamespace(t *testing.T) { Servers: []model.ServerAddress{sa1, sa2, sa3}, } - coordinator, err = NewCoordinator(metadataProvider, func() (model.ClusterConfig, chan struct{}, error) { return newClusterConfig, nil, nil }, 0, NewRpcProvider(clientPool)) + coordinator, err = NewCoordinator(metadataProvider, func() (model.ClusterConfig, error) { return newClusterConfig, nil }, nil, NewRpcProvider(clientPool)) assert.NoError(t, err) // Wait for all shards to be deleted @@ -438,11 +438,12 @@ func TestCoordinator_DynamicallAddNamespace(t *testing.T) { } clientPool := common.NewClientPool(nil) - configProvider := func() (model.ClusterConfig, chan struct{}, error) { - return clusterConfig, nil, nil + configChangesCh := make(chan any) + configProvider := func() (model.ClusterConfig, error) { + return clusterConfig, nil } - coordinator, err := NewCoordinator(metadataProvider, configProvider, 1*time.Second, NewRpcProvider(clientPool)) + coordinator, err := NewCoordinator(metadataProvider, configProvider, configChangesCh, NewRpcProvider(clientPool)) assert.NoError(t, err) ns1Status := coordinator.ClusterStatus().Namespaces["my-ns-1"] @@ -468,6 +469,7 @@ func TestCoordinator_DynamicallAddNamespace(t *testing.T) { InitialShardCount: 2, ReplicationFactor: 1, }) + configChangesCh <- nil // Wait for all shards to be ready assert.Eventually(t, func() bool { @@ -525,13 +527,14 @@ func TestCoordinator_RebalanceCluster(t *testing.T) { clientPool := common.NewClientPool(nil) mutex := &sync.Mutex{} - configProvider := func() (model.ClusterConfig, chan struct{}, error) { + configProvider := func() (model.ClusterConfig, error) { mutex.Lock() defer mutex.Unlock() - return clusterConfig, nil, nil + return clusterConfig, nil } - coordinator, err := NewCoordinator(metadataProvider, configProvider, 1*time.Second, NewRpcProvider(clientPool)) + configChangesCh := make(chan any) + coordinator, err := NewCoordinator(metadataProvider, configProvider, configChangesCh, NewRpcProvider(clientPool)) assert.NoError(t, err) ns1Status := coordinator.ClusterStatus().Namespaces["my-ns-1"] @@ -566,7 +569,7 @@ func TestCoordinator_RebalanceCluster(t *testing.T) { clusterConfig.Servers = []model.ServerAddress{sa2, sa3, sa4} mutex.Unlock() - time.Sleep(2 * time.Second) + configChangesCh <- nil // Wait for all shards to be ready assert.Eventually(t, func() bool { @@ -621,11 +624,12 @@ func TestCoordinator_AddRemoveNodes(t *testing.T) { } clientPool := common.NewClientPool(nil) - configProvider := func() (model.ClusterConfig, chan struct{}, error) { - return clusterConfig, nil, nil + configProvider := func() (model.ClusterConfig, error) { + return clusterConfig, nil } - c, err := NewCoordinator(metadataProvider, configProvider, 1*time.Second, NewRpcProvider(clientPool)) + configChangesCh := make(chan any) + c, err := NewCoordinator(metadataProvider, configProvider, configChangesCh, NewRpcProvider(clientPool)) assert.NoError(t, err) assert.Equal(t, 3, len(c.(*coordinator).getNodeControllers())) @@ -635,6 +639,8 @@ func TestCoordinator_AddRemoveNodes(t *testing.T) { // Remove s1 clusterConfig.Servers = clusterConfig.Servers[1:] + configChangesCh <- nil + // Wait for all shards to be ready assert.Eventually(t, func() bool { return len(c.(*coordinator).getNodeControllers()) == 4 @@ -680,11 +686,12 @@ func TestCoordinator_ShrinkCluster(t *testing.T) { } clientPool := common.NewClientPool(nil) - configProvider := func() (model.ClusterConfig, chan struct{}, error) { - return clusterConfig, nil, nil + configProvider := func() (model.ClusterConfig, error) { + return clusterConfig, nil } - c, err := NewCoordinator(metadataProvider, configProvider, 1*time.Second, NewRpcProvider(clientPool)) + configChangesCh := make(chan any) + c, err := NewCoordinator(metadataProvider, configProvider, configChangesCh, NewRpcProvider(clientPool)) assert.NoError(t, err) // Wait for all shards to be ready @@ -707,6 +714,7 @@ func TestCoordinator_ShrinkCluster(t *testing.T) { // Remove s1 clusterConfig.Servers = clusterConfig.Servers[1:] + configChangesCh <- nil assert.Eventually(t, func() bool { return len(c.(*coordinator).getNodeControllers()) == 3 }, 10*time.Second, 10*time.Millisecond) diff --git a/deploy/charts/oxia-cluster/templates/coordinator-deployment.yaml b/deploy/charts/oxia-cluster/templates/coordinator-deployment.yaml index e75bd4e9..86d413b3 100644 --- a/deploy/charts/oxia-cluster/templates/coordinator-deployment.yaml +++ b/deploy/charts/oxia-cluster/templates/coordinator-deployment.yaml @@ -40,6 +40,7 @@ spec: - command: - "oxia" - "coordinator" + - "--conf=configmap:{{ .Release.Namespace }}/{{ .Release.Name }}-coordinator" - "--log-json" - "--metadata=configmap" - "--k8s-namespace={{ .Release.Namespace }}" @@ -59,14 +60,7 @@ spec: limits: cpu: {{ .Values.coordinator.cpu }} memory: {{ .Values.coordinator.memory }} - volumeMounts: - - name: conf - mountPath: /oxia/conf livenessProbe: {{- include "oxia-cluster.probe" .Values.coordinator.ports.internal | nindent 12 }} readinessProbe: {{- include "oxia-cluster.probe" .Values.coordinator.ports.internal | nindent 12 }} - volumes: - - name: conf - configMap: - name: {{ .Release.Name }}-coordinator diff --git a/go.mod b/go.mod index ba54af9a..b8c0f0cc 100644 --- a/go.mod +++ b/go.mod @@ -98,7 +98,6 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/oleiade/lane/v2 v2.0.0 // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.6.1 // indirect diff --git a/go.sum b/go.sum index a0cceb94..03cb8dbc 100644 --- a/go.sum +++ b/go.sum @@ -250,8 +250,6 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/oleiade/lane/v2 v2.0.0 h1:XW/ex/Inr+bPkLd3O240xrFOhUkTd4Wy176+Gv0E3Qw= -github.com/oleiade/lane/v2 v2.0.0/go.mod h1:i5FBPFAYSWCgLh58UkUGCChjcCzef/MI7PlQm2TKCeg= github.com/onsi/ginkgo/v2 v2.15.0 h1:79HwNRBAZHOEwrczrgSOPy+eFTTlIGELKy5as+ClttY= github.com/onsi/ginkgo/v2 v2.15.0/go.mod h1:HlxMHtYF57y6Dpf+mc5529KKmSq9h2FpCF+/ZkwUxKM= github.com/onsi/gomega v1.31.0 h1:54UJxxj6cPInHS3a35wm6BK/F9nHYueZ1NVujHDrnXE= diff --git a/maelstrom/main.go b/maelstrom/main.go index 82104674..a500bb88 100644 --- a/maelstrom/main.go +++ b/maelstrom/main.go @@ -173,8 +173,8 @@ func main() { _, err := impl.NewCoordinator( impl.NewMetadataProviderFile(filepath.Join(dataDir, "cluster-status.json")), - func() (model.ClusterConfig, chan struct{}, error) { return clusterConfig, nil, nil }, - 0, newRpcProvider(dispatcher)) + func() (model.ClusterConfig, error) { return clusterConfig, nil }, nil, + newRpcProvider(dispatcher)) if err != nil { slog.Error( "failed to create coordinator", diff --git a/tests/security/tls/tls_encryption_test.go b/tests/security/tls/tls_encryption_test.go index 08fb7bd9..4580628f 100644 --- a/tests/security/tls/tls_encryption_test.go +++ b/tests/security/tls/tls_encryption_test.go @@ -122,7 +122,7 @@ func TestClusterHandshakeSuccess(t *testing.T) { clientPool := common.NewClientPool(tlsConf) defer clientPool.Close() - coordinator, err := impl.NewCoordinator(metadataProvider, func() (model.ClusterConfig, chan struct{}, error) { return clusterConfig, nil, nil }, 0, impl.NewRpcProvider(clientPool)) + coordinator, err := impl.NewCoordinator(metadataProvider, func() (model.ClusterConfig, error) { return clusterConfig, nil }, nil, impl.NewRpcProvider(clientPool)) assert.NoError(t, err) defer coordinator.Close() } @@ -152,11 +152,12 @@ func TestClientHandshakeFailByNoTlsConfig(t *testing.T) { clientPool := common.NewClientPool(tlsConf) defer clientPool.Close() - coordinator, err := impl.NewCoordinator(metadataProvider, func() (model.ClusterConfig, chan struct{}, error) { return clusterConfig, nil, nil }, 0, impl.NewRpcProvider(clientPool)) + coordinator, err := impl.NewCoordinator(metadataProvider, func() (model.ClusterConfig, error) { return clusterConfig, nil }, nil, impl.NewRpcProvider(clientPool)) assert.NoError(t, err) defer coordinator.Close() - client, _ := oxia.NewSyncClient(sa1.Public) + client, err := oxia.NewSyncClient(sa1.Public, oxia.WithRequestTimeout(1*time.Second)) + assert.Error(t, err) assert.Nil(t, client) } @@ -185,7 +186,7 @@ func TestClientHandshakeByAuthFail(t *testing.T) { clientPool := common.NewClientPool(tlsConf) defer clientPool.Close() - coordinator, err := impl.NewCoordinator(metadataProvider, func() (model.ClusterConfig, chan struct{}, error) { return clusterConfig, nil, nil }, 0, impl.NewRpcProvider(clientPool)) + coordinator, err := impl.NewCoordinator(metadataProvider, func() (model.ClusterConfig, error) { return clusterConfig, nil }, nil, impl.NewRpcProvider(clientPool)) assert.NoError(t, err) defer coordinator.Close() @@ -195,7 +196,8 @@ func TestClientHandshakeByAuthFail(t *testing.T) { assert.NoError(t, err) tlsConf, err = tlsOption.MakeClientTLSConf() assert.NoError(t, err) - client, _ := oxia.NewSyncClient(sa1.Public, oxia.WithTLS(tlsConf)) + client, err := oxia.NewSyncClient(sa1.Public, oxia.WithTLS(tlsConf), oxia.WithRequestTimeout(1*time.Second)) + assert.Error(t, err) assert.Nil(t, client) } @@ -224,7 +226,7 @@ func TestClientHandshakeWithInsecure(t *testing.T) { clientPool := common.NewClientPool(tlsConf) defer clientPool.Close() - coordinator, err := impl.NewCoordinator(metadataProvider, func() (model.ClusterConfig, chan struct{}, error) { return clusterConfig, nil, nil }, 0, impl.NewRpcProvider(clientPool)) + coordinator, err := impl.NewCoordinator(metadataProvider, func() (model.ClusterConfig, error) { return clusterConfig, nil }, nil, impl.NewRpcProvider(clientPool)) assert.NoError(t, err) defer coordinator.Close() @@ -265,7 +267,7 @@ func TestClientHandshakeSuccess(t *testing.T) { clientPool := common.NewClientPool(tlsConf) defer clientPool.Close() - coordinator, err := impl.NewCoordinator(metadataProvider, func() (model.ClusterConfig, chan struct{}, error) { return clusterConfig, nil, nil }, 0, impl.NewRpcProvider(clientPool)) + coordinator, err := impl.NewCoordinator(metadataProvider, func() (model.ClusterConfig, error) { return clusterConfig, nil }, nil, impl.NewRpcProvider(clientPool)) assert.NoError(t, err) defer coordinator.Close()