diff --git a/docker-compose.yml b/docker-compose.yml index f8146f372c..b45b16aea8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,6 +8,17 @@ services: volumes: - .:/go/src/github.com/m3db/m3 - /usr/bin/buildkite-agent:/usr/bin/buildkite-agent + # Support running docker within docker. That is, buildkite jobs themselves run in a container; that container + # needs to be able to spin up functioning docker containers. + - /var/run/docker.sock:/var/run/docker.sock + extra_hosts: + # Allow routing from the buildkite container to the host machine, as host.docker.internal. This allows us to do + # the following: + # - Spin up an etcd container with ports published to the host machine + # - Connect to the etcd container from the buildkite test process using host.docker.internal + # See + # https://medium.com/@TimvanBaarsen/how-to-connect-to-the-docker-host-from-inside-a-docker-container-112b4c71bc66 + - "host.docker.internal:host-gateway" environment: - CI - BUILDKITE diff --git a/go.mod b/go.mod index c40d0e3a82..485f992594 100644 --- a/go.mod +++ b/go.mod @@ -70,7 +70,6 @@ require ( go.etcd.io/etcd/client/pkg/v3 v3.6.0-alpha.0 go.etcd.io/etcd/client/v3 v3.6.0-alpha.0 go.etcd.io/etcd/server/v3 v3.6.0-alpha.0 - go.etcd.io/etcd/tests/v3 v3.6.0-alpha.0 go.opentelemetry.io/collector v0.45.0 go.opentelemetry.io/otel v1.4.1 go.opentelemetry.io/otel/bridge/opentracing v1.4.1 @@ -121,7 +120,6 @@ require ( github.com/go-playground/locales v0.13.0 // indirect github.com/go-playground/universal-translator v0.17.0 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect - github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/google/btree v1.0.1 // indirect github.com/gorilla/handlers v1.5.1 // indirect github.com/gorilla/websocket v1.4.2 // indirect diff --git a/go.sum b/go.sum index 770fa1c583..22fa99d1b7 100644 --- a/go.sum +++ b/go.sum @@ -1592,8 +1592,6 @@ go.etcd.io/etcd/raft/v3 v3.6.0-alpha.0 h1:BQ6CnNP4pIpy5rusFlTBxAacDgPXhuiHFwoTsB go.etcd.io/etcd/raft/v3 v3.6.0-alpha.0/go.mod h1:/kZdrBXlc5fUgYXfIEQ0B5sb7ejXPKbtF4jWzF1exiQ= go.etcd.io/etcd/server/v3 v3.6.0-alpha.0 h1:BQUVqBqNFZZyrRbfydrRLzq9hYvCcRj97SsX1YwD7CA= go.etcd.io/etcd/server/v3 v3.6.0-alpha.0/go.mod h1:3QM2rLq3B3hSXmVEvgVt3vEEbG/AumSs0Is7EgrlKzU= -go.etcd.io/etcd/tests/v3 v3.6.0-alpha.0 h1:3qrZ3p/E7CxdV1kKtAU75hHOcUoXcSTwC7ELKWyzMJo= -go.etcd.io/etcd/tests/v3 v3.6.0-alpha.0/go.mod h1:hFQkP/cTsZIXXvUv+BsGHZ3TK+76XZMi5GToYA94iac= go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.2/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= diff --git a/src/aggregator/integration/custom_aggregations_test.go b/src/aggregator/integration/custom_aggregations_test.go index d9c58cbe41..6bae441f46 100644 --- a/src/aggregator/integration/custom_aggregations_test.go +++ b/src/aggregator/integration/custom_aggregations_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -68,7 +68,6 @@ func testCustomAggregations(t *testing.T, metadataFns [4]metadataFn) { if testing.Short() { t.SkipNow() } - aggTypesOpts := aggregation.NewTypesOptions(). SetCounterTypeStringTransformFn(aggregation.SuffixTransform). SetTimerTypeStringTransformFn(aggregation.SuffixTransform). @@ -179,7 +178,7 @@ func testCustomAggregations(t *testing.T, metadataFns [4]metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := end.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/election.go b/src/aggregator/integration/election.go index 1f24e02263..221996347d 100644 --- a/src/aggregator/integration/election.go +++ b/src/aggregator/integration/election.go @@ -26,9 +26,9 @@ import ( "github.com/m3db/m3/src/cluster/services" "github.com/m3db/m3/src/cluster/services/leader" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" ) var ( @@ -40,27 +40,38 @@ var ( ) type testCluster struct { - t *testing.T - cluster *integration.Cluster + t *testing.T + cluster *integration.Cluster + leaderService services.LeaderService } func newTestCluster(t *testing.T) *testCluster { integration.BeforeTestExternal(t) - return &testCluster{ + cluster := &testCluster{ t: t, cluster: integration.NewCluster(t, &integration.ClusterConfig{ Size: testClusterSize, + // UseBridge: true, }), } + return cluster } func (tc *testCluster) LeaderService() services.LeaderService { + if tc.leaderService != nil { + return tc.leaderService + } + svc, err := leader.NewService(tc.etcdClient(), tc.options()) require.NoError(tc.t, err) - return svc + tc.leaderService = svc + return tc.leaderService } func (tc *testCluster) Close() { + if tc.leaderService != nil { + require.NoError(tc.t, tc.leaderService.Close()) + } tc.cluster.Terminate(tc.t) } diff --git a/src/aggregator/integration/metadata_change_test.go b/src/aggregator/integration/metadata_change_test.go index fa93ce7baf..9822579101 100644 --- a/src/aggregator/integration/metadata_change_test.go +++ b/src/aggregator/integration/metadata_change_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -138,7 +138,7 @@ func testMetadataChange(t *testing.T, oldMetadataFn, newMetadataFn metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := end.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/multi_client_one_type_test.go b/src/aggregator/integration/multi_client_one_type_test.go index d05185a9ab..2af620205f 100644 --- a/src/aggregator/integration/multi_client_one_type_test.go +++ b/src/aggregator/integration/multi_client_one_type_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -126,7 +126,7 @@ func testMultiClientOneType(t *testing.T, metadataFn metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := stop.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(4 * time.Second) + time.Sleep(waitForDataToFlush) for i := 0; i < numClients; i++ { require.NoError(t, clients[i].close()) diff --git a/src/aggregator/integration/multi_server_forwarding_pipeline_test.go b/src/aggregator/integration/multi_server_forwarding_pipeline_test.go index ec83c8f362..fb1cdbd407 100644 --- a/src/aggregator/integration/multi_server_forwarding_pipeline_test.go +++ b/src/aggregator/integration/multi_server_forwarding_pipeline_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2018 Uber Technologies, Inc. // diff --git a/src/aggregator/integration/multi_server_resend_test.go b/src/aggregator/integration/multi_server_resend_test.go index 6b7c2fb71e..7833e8d991 100644 --- a/src/aggregator/integration/multi_server_resend_test.go +++ b/src/aggregator/integration/multi_server_resend_test.go @@ -1,5 +1,4 @@ //go:build integration -// +build integration // Copyright (c) 2018 Uber Technologies, Inc. // @@ -142,6 +141,7 @@ func TestMultiServerResendAggregatedValues(t *testing.T) { // Election cluster setup. electionCluster := newTestCluster(t) + defer electionCluster.Close() // Sharding function maps all metrics to shard 0 except for the rollup metric, // which gets mapped to the last shard. diff --git a/src/aggregator/integration/one_client_multi_type_forwarded_test.go b/src/aggregator/integration/one_client_multi_type_forwarded_test.go index 9ead187784..039eeae391 100644 --- a/src/aggregator/integration/one_client_multi_type_forwarded_test.go +++ b/src/aggregator/integration/one_client_multi_type_forwarded_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2018 Uber Technologies, Inc. // @@ -121,7 +121,7 @@ func TestOneClientMultiTypeForwardedMetrics(t *testing.T) { // Move time forward and wait for flushing to happen. finalTime := stop.Add(2 * time.Second) clock.SetNow(finalTime) - time.Sleep(2 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/one_client_multi_type_timed_test.go b/src/aggregator/integration/one_client_multi_type_timed_test.go index 8449f5858b..80f8d6ec3f 100644 --- a/src/aggregator/integration/one_client_multi_type_timed_test.go +++ b/src/aggregator/integration/one_client_multi_type_timed_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2018 Uber Technologies, Inc. // @@ -119,7 +119,7 @@ func TestOneClientMultiTypeTimedMetrics(t *testing.T) { // Move time forward and wait for flushing to happen. finalTime := stop.Add(time.Minute + 2*time.Second) clock.SetNow(finalTime) - time.Sleep(2 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/one_client_multi_type_untimed_test.go b/src/aggregator/integration/one_client_multi_type_untimed_test.go index 59f6800b66..0adc3a5c2d 100644 --- a/src/aggregator/integration/one_client_multi_type_untimed_test.go +++ b/src/aggregator/integration/one_client_multi_type_untimed_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -26,9 +26,17 @@ import ( "testing" "time" + "github.com/m3db/m3/src/cluster/placement" + "github.com/stretchr/testify/require" +) - "github.com/m3db/m3/src/cluster/placement" +const ( + // waitForDataToFlush is the amount of time we will wait in these tests between finishing writing data to + // the aggregator, and attempting to assert that data went through. + // The aggregator generally, and these tests specifically are quite sensitive to time. + // The tests probably need a bit of a rethink to wait on (or poll for) an actual condition instead of sleeping. + waitForDataToFlush = 10 * time.Second ) func TestOneClientMultiTypeUntimedMetricsWithStagedMetadatas(t *testing.T) { @@ -114,7 +122,7 @@ func testOneClientMultiType(t *testing.T, metadataFn metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := stop.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(4 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/one_client_passthru_test.go b/src/aggregator/integration/one_client_passthru_test.go index 75a6f49e25..489730a123 100644 --- a/src/aggregator/integration/one_client_passthru_test.go +++ b/src/aggregator/integration/one_client_passthru_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2020 Uber Technologies, Inc. // diff --git a/src/aggregator/integration/placement_change_test.go b/src/aggregator/integration/placement_change_test.go index 99683d7f3d..9f798b0ee1 100644 --- a/src/aggregator/integration/placement_change_test.go +++ b/src/aggregator/integration/placement_change_test.go @@ -1,5 +1,4 @@ //go:build integration -// +build integration // Copyright (c) 2018 Uber Technologies, Inc. // @@ -227,9 +226,9 @@ func TestPlacementChange(t *testing.T) { } clock.SetNow(start2) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) setPlacement(t, placementKey, clusterClient, finalPlacement) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) for _, data := range datasets[1] { clock.SetNow(data.timestamp) @@ -245,7 +244,7 @@ func TestPlacementChange(t *testing.T) { // Move time forward and wait for flushing to happen. clock.SetNow(finalTime) - time.Sleep(6 * time.Second) + time.Sleep(waitForDataToFlush) // Remove all the topic consumers before closing clients and servers. This allows to close the // connections between servers while they still are running. Otherwise, during server shutdown, diff --git a/src/aggregator/integration/resend_stress_test.go b/src/aggregator/integration/resend_stress_test.go index c1510ac7ee..691edad117 100644 --- a/src/aggregator/integration/resend_stress_test.go +++ b/src/aggregator/integration/resend_stress_test.go @@ -1,5 +1,4 @@ //go:build integration -// +build integration // Copyright (c) 2018 Uber Technologies, Inc. // diff --git a/src/aggregator/integration/same_id_multi_type_test.go b/src/aggregator/integration/same_id_multi_type_test.go index 09974adb2d..fe6b70d999 100644 --- a/src/aggregator/integration/same_id_multi_type_test.go +++ b/src/aggregator/integration/same_id_multi_type_test.go @@ -1,4 +1,4 @@ -// +build integration +//go:build integration // Copyright (c) 2016 Uber Technologies, Inc. // @@ -138,7 +138,7 @@ func testSameIDMultiType(t *testing.T, metadataFn metadataFn) { // must be the longer than the lowest resolution across all policies. finalTime := stop.Add(6 * time.Second) clock.SetNow(finalTime) - time.Sleep(4 * time.Second) + time.Sleep(waitForDataToFlush) require.NoError(t, client.close()) diff --git a/src/aggregator/integration/setup.go b/src/aggregator/integration/setup.go index b0723392dc..3ed6726d6e 100644 --- a/src/aggregator/integration/setup.go +++ b/src/aggregator/integration/setup.go @@ -88,6 +88,7 @@ type testServerSetup struct { // Signals. doneCh chan struct{} closedCh chan struct{} + stopped bool } func newTestServerSetup(t *testing.T, opts testServerOptions) *testServerSetup { @@ -448,6 +449,10 @@ func (ts *testServerSetup) sortedResults() []aggregated.MetricWithStoragePolicy } func (ts *testServerSetup) stopServer() error { + if ts.stopped { + return nil + } + ts.stopped = true if err := ts.aggregator.Close(); err != nil { return err } @@ -460,6 +465,9 @@ func (ts *testServerSetup) stopServer() error { func (ts *testServerSetup) close() { ts.electionCluster.Close() + if err := ts.stopServer(); err != nil { + panic(err.Error()) + } } func (tss testServerSetups) newClient(t *testing.T) *client { diff --git a/src/cluster/client/etcd/client.go b/src/cluster/client/etcd/client.go index af4a71828a..6dd5f0a338 100644 --- a/src/cluster/client/etcd/client.go +++ b/src/cluster/client/etcd/client.go @@ -339,8 +339,14 @@ func newConfigFromCluster(rnd randInt63N, cluster Cluster) (clientv3.Config, err if err != nil { return clientv3.Config{}, err } + + // Support disabling autosync if a user very explicitly requests it (via negative duration). + autoSyncInterval := cluster.AutoSyncInterval() + if autoSyncInterval < 0 { + autoSyncInterval = 0 + } cfg := clientv3.Config{ - AutoSyncInterval: cluster.AutoSyncInterval(), + AutoSyncInterval: autoSyncInterval, DialTimeout: cluster.DialTimeout(), DialOptions: cluster.DialOptions(), Endpoints: cluster.Endpoints(), diff --git a/src/cluster/client/etcd/client_test.go b/src/cluster/client/etcd/client_test.go index 3d1d0b60db..343842eb4e 100644 --- a/src/cluster/client/etcd/client_test.go +++ b/src/cluster/client/etcd/client_test.go @@ -25,18 +25,23 @@ import ( "testing" "time" + "github.com/m3db/m3/src/cluster/kv" + "github.com/m3db/m3/src/cluster/services" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" + "github.com/m3db/m3/src/x/retry" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" "google.golang.org/grpc" - - "github.com/m3db/m3/src/cluster/kv" - "github.com/m3db/m3/src/cluster/services" ) func TestETCDClientGen(t *testing.T) { - cs, err := NewConfigServiceClient(testOptions()) + cs, err := NewConfigServiceClient( + testOptions(). + // These are error cases; don't retry for no reason. + SetRetryOptions(retry.NewOptions().SetMaxRetries(0)), + ) require.NoError(t, err) c := cs.(*csclient) @@ -414,6 +419,15 @@ func Test_newConfigFromCluster(t *testing.T) { ) }) + t.Run("negative autosync on M3 disables autosync for etcd", func(t *testing.T) { + inputCfg := newFullConfig() + inputCfg.AutoSyncInterval = -1 + etcdCfg, err := newConfigFromCluster(testRnd, inputCfg.NewCluster()) + require.NoError(t, err) + + assert.Equal(t, time.Duration(0), etcdCfg.AutoSyncInterval) + }) + // Separate test just because the assert.Equal won't work for functions. t.Run("passes through dial options", func(t *testing.T) { clusterCfg := newFullConfig() diff --git a/src/cluster/client/etcd/config.go b/src/cluster/client/etcd/config.go index 877f2051e5..520afa385e 100644 --- a/src/cluster/client/etcd/config.go +++ b/src/cluster/client/etcd/config.go @@ -35,12 +35,22 @@ import ( // ClusterConfig is the config for a zoned etcd cluster. type ClusterConfig struct { - Zone string `yaml:"zone"` - Endpoints []string `yaml:"endpoints"` - KeepAlive *KeepAliveConfig `yaml:"keepAlive"` - TLS *TLSConfig `yaml:"tls"` - AutoSyncInterval time.Duration `yaml:"autoSyncInterval"` - DialTimeout time.Duration `yaml:"dialTimeout"` + Zone string `yaml:"zone"` + Endpoints []string `yaml:"endpoints"` + KeepAlive *KeepAliveConfig `yaml:"keepAlive"` + TLS *TLSConfig `yaml:"tls"` + // AutoSyncInterval configures the etcd client's AutoSyncInterval + // (go.etcd.io/etcd/client/v3@v3.6.0-alpha.0/config.go:32). + // By default, it is 1m. + // + // Advanced: + // + // One important difference from the etcd config: we have autosync *on* by default (unlike etcd), meaning that + // the zero value here doesn't indicate autosync off. + // Instead, users should pass in a negative value to indicate "disable autosync" + // Only do this if you truly have a good reason for it! Most production use cases want autosync on. + AutoSyncInterval time.Duration `yaml:"autoSyncInterval"` + DialTimeout time.Duration `yaml:"dialTimeout"` DialOptions []grpc.DialOption `yaml:"-"` // nonserializable } @@ -59,7 +69,10 @@ func (c ClusterConfig) NewCluster() Cluster { SetKeepAliveOptions(keepAliveOpts). SetTLSOptions(c.TLS.newOptions()) - if c.AutoSyncInterval > 0 { + // Autosync should *always* be on, unless the user very explicitly requests it to be off. They can do this via a + // negative value (in which case we can assume they know what they're doing). + // Therefore, only update if it's nonzero, on the assumption that zero is just the empty value. + if c.AutoSyncInterval != 0 { cluster = cluster.SetAutoSyncInterval(c.AutoSyncInterval) } diff --git a/src/cluster/client/etcd/config_test.go b/src/cluster/client/etcd/config_test.go index 1bc8959117..278b41bb1f 100644 --- a/src/cluster/client/etcd/config_test.go +++ b/src/cluster/client/etcd/config_test.go @@ -181,3 +181,10 @@ func TestDefaultConfig(t *testing.T) { require.Equal(t, defaultDialTimeout, cluster.DialTimeout()) require.Equal(t, defaultAutoSyncInterval, cluster.AutoSyncInterval()) } + +func TestConfig_negativeAutosync(t *testing.T) { + cluster := ClusterConfig{ + AutoSyncInterval: -5, + }.NewCluster() + require.Equal(t, time.Duration(-5), cluster.AutoSyncInterval()) +} diff --git a/src/cluster/client/etcd/types.go b/src/cluster/client/etcd/types.go index 9caf012e70..5b60832801 100644 --- a/src/cluster/client/etcd/types.go +++ b/src/cluster/client/etcd/types.go @@ -159,6 +159,11 @@ type Cluster interface { SetTLSOptions(TLSOptions) Cluster AutoSyncInterval() time.Duration + + // SetAutoSyncInterval sets the etcd client to autosync cluster endpoints periodically. This defaults to + // 1 minute (defaultAutoSyncInterval). If negative or zero, it will disable autosync. This differs slightly + // from the underlying etcd configuration its setting, which only supports 0 for disabling. We do this because + // there's otherwise no good way to specify "disable" in our configs (which default to SetAutoSyncInterval(1m)). SetAutoSyncInterval(value time.Duration) Cluster DialTimeout() time.Duration diff --git a/src/cluster/etcd/watchmanager/manager_test.go b/src/cluster/etcd/watchmanager/manager_test.go index d65758e8ff..ddc8df306c 100644 --- a/src/cluster/etcd/watchmanager/manager_test.go +++ b/src/cluster/etcd/watchmanager/manager_test.go @@ -22,16 +22,15 @@ package watchmanager import ( "fmt" - "runtime" "sync/atomic" "testing" "time" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" "golang.org/x/net/context" "github.com/m3db/m3/src/x/clock" @@ -164,114 +163,117 @@ func TestWatchRecreate(t *testing.T) { <-doneCh } -func TestWatchNoLeader(t *testing.T) { - t.Skip("flaky, started to fail very consistently on CI") - const ( - watchInitAndRetryDelay = 200 * time.Millisecond - watchCheckInterval = 50 * time.Millisecond - ) - - integration.BeforeTestExternal(t) - ecluster := integration.NewCluster(t, &integration.ClusterConfig{Size: 3}) - defer ecluster.Terminate(t) - - var ( - ec = ecluster.Client(0) - tickDuration = 10 * time.Millisecond - electionTimeout = time.Duration(3*ecluster.Members[0].ElectionTicks) * tickDuration - doneCh = make(chan struct{}, 1) - eventLog = []*clientv3.Event{} - updateCalled int32 - shouldStop int32 - ) - - opts := NewOptions(). - SetClient(ec). - SetUpdateFn( - func(_ string, e []*clientv3.Event) error { - atomic.AddInt32(&updateCalled, 1) - if len(e) > 0 { - eventLog = append(eventLog, e...) - } - return nil - }, - ). - SetTickAndStopFn( - func(string) bool { - if atomic.LoadInt32(&shouldStop) == 0 { - return false - } - - close(doneCh) - - return true - }, - ). - SetWatchChanInitTimeout(watchInitAndRetryDelay). - SetWatchChanResetInterval(watchInitAndRetryDelay). - SetWatchChanCheckInterval(watchCheckInterval) - - integration.WaitClientV3(t, ec) - - wh, err := NewWatchManager(opts) - require.NoError(t, err) - - go wh.Watch("foo") - - runtime.Gosched() - time.Sleep(10 * time.Millisecond) - - // there should be a valid watch now, trigger a notification - _, err = ec.Put(context.Background(), "foo", "bar") - require.NoError(t, err) - - leaderIdx := ecluster.WaitLeader(t) - require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Members), "got invalid leader") - - // simulate quorum loss - ecluster.Members[1].Stop(t) - ecluster.Members[2].Stop(t) - - // wait for election timeout, then member[0] will not have a leader. - time.Sleep(electionTimeout) - - require.NoError(t, ecluster.Members[1].Restart(t)) - require.NoError(t, ecluster.Members[2].Restart(t)) - - // wait for leader + election delay just in case - time.Sleep(time.Duration(3*ecluster.Members[0].ElectionTicks) * tickDuration) - - leaderIdx = ecluster.WaitLeader(t) - require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Members), "got invalid leader") - integration.WaitClientV3(t, ec) // wait for client to be ready again - - _, err = ec.Put(context.Background(), "foo", "baz") - require.NoError(t, err) - - // give some time for watch to be updated - require.True(t, clock.WaitUntil(func() bool { - return atomic.LoadInt32(&updateCalled) >= 2 - }, 10*time.Second)) - - updates := atomic.LoadInt32(&updateCalled) - if updates < 2 { - require.Fail(t, - "insufficient update calls", - "expected at least 2 update attempts, got %d during a partition", - updates) - } - - atomic.AddInt32(&shouldStop, 1) - <-doneCh - - require.Len(t, eventLog, 2) - require.NotNil(t, eventLog[0]) - require.Equal(t, eventLog[0].Kv.Key, []byte("foo")) - require.Equal(t, eventLog[0].Kv.Value, []byte("bar")) - require.NotNil(t, eventLog[1]) - require.Equal(t, eventLog[1].Kv.Key, []byte("foo")) - require.Equal(t, eventLog[1].Kv.Value, []byte("baz")) -} +// TODO: this test has been skipped for a while, and now doesn't work with the docker based etcd integration package. +// Revive it if it's useful, and make it no longer flake. +//nolint:gocritic +//func TestWatchNoLeader(t *testing.T) { +// t.Skip("flaky, started to fail very consistently on CI") +// const ( +// watchInitAndRetryDelay = 200 * time.Millisecond +// watchCheckInterval = 50 * time.Millisecond +// ) +// +// integration.BeforeTestExternal(t) +// ecluster := integration.NewCluster(t, &integration.ClusterConfig{Size: 3}) +// defer ecluster.Terminate(t) +// +// var ( +// ec = ecluster.Client(0) +// tickDuration = 10 * time.Millisecond +// electionTimeout = time.Duration(3*ecluster.Address[0].ElectionTicks) * tickDuration +// doneCh = make(chan struct{}, 1) +// eventLog = []*clientv3.Event{} +// updateCalled int32 +// shouldStop int32 +// ) +// +// opts := NewOptions(). +// SetClient(ec). +// SetUpdateFn( +// func(_ string, e []*clientv3.Event) error { +// atomic.AddInt32(&updateCalled, 1) +// if len(e) > 0 { +// eventLog = append(eventLog, e...) +// } +// return nil +// }, +// ). +// SetTickAndStopFn( +// func(string) bool { +// if atomic.LoadInt32(&shouldStop) == 0 { +// return false +// } +// +// close(doneCh) +// +// return true +// }, +// ). +// SetWatchChanInitTimeout(watchInitAndRetryDelay). +// SetWatchChanResetInterval(watchInitAndRetryDelay). +// SetWatchChanCheckInterval(watchCheckInterval) +// +// integration.WaitClientV3(t, ec) +// +// wh, err := NewWatchManager(opts) +// require.NoError(t, err) +// +// go wh.Watch("foo") +// +// runtime.Gosched() +// time.Sleep(10 * time.Millisecond) +// +// // there should be a valid watch now, trigger a notification +// _, err = ec.Put(context.Background(), "foo", "bar") +// require.NoError(t, err) +// +// leaderIdx := ecluster.WaitLeader(t) +// require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Address), "got invalid leader") +// +// // simulate quorum loss +// ecluster.Address[1].Stop(t) +// ecluster.Address[2].Stop(t) +// +// // wait for election timeout, then member[0] will not have a leader. +// time.Sleep(electionTimeout) +// +// require.NoError(t, ecluster.Address[1].Restart(t)) +// require.NoError(t, ecluster.Address[2].Restart(t)) +// +// // wait for leader + election delay just in case +// time.Sleep(time.Duration(3*ecluster.Address[0].ElectionTicks) * tickDuration) +// +// leaderIdx = ecluster.WaitLeader(t) +// require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Address), "got invalid leader") +// integration.WaitClientV3(t, ec) // wait for client to be ready again +// +// _, err = ec.Put(context.Background(), "foo", "baz") +// require.NoError(t, err) +// +// // give some time for watch to be updated +// require.True(t, clock.WaitUntil(func() bool { +// return atomic.LoadInt32(&updateCalled) >= 2 +// }, 10*time.Second)) +// +// updates := atomic.LoadInt32(&updateCalled) +// if updates < 2 { +// require.Fail(t, +// "insufficient update calls", +// "expected at least 2 update attempts, got %d during a partition", +// updates) +// } +// +// atomic.AddInt32(&shouldStop, 1) +// <-doneCh +// +// require.Len(t, eventLog, 2) +// require.NotNil(t, eventLog[0]) +// require.Equal(t, eventLog[0].Kv.Key, []byte("foo")) +// require.Equal(t, eventLog[0].Kv.Value, []byte("bar")) +// require.NotNil(t, eventLog[1]) +// require.Equal(t, eventLog[1].Kv.Key, []byte("foo")) +// require.Equal(t, eventLog[1].Kv.Value, []byte("baz")) +//} func TestWatchCompactedRevision(t *testing.T) { wh, ec, updateCalled, shouldStop, doneCh, closer := testSetup(t) diff --git a/src/cluster/integration/etcd/etcd.go b/src/cluster/integration/etcd/etcd.go index 2c93bf1200..b836e36967 100644 --- a/src/cluster/integration/etcd/etcd.go +++ b/src/cluster/integration/etcd/etcd.go @@ -21,26 +21,24 @@ package etcd import ( + "context" "encoding/json" "fmt" "io/ioutil" "net/http" - "net/url" - "os" "strings" - "time" "github.com/m3db/m3/src/cluster/client" etcdclient "github.com/m3db/m3/src/cluster/client/etcd" "github.com/m3db/m3/src/cluster/services" - xclock "github.com/m3db/m3/src/x/clock" - "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/integration/resources/docker/dockerexternal" + "github.com/m3db/m3/src/x/instrument" - "go.etcd.io/etcd/server/v3/embed" + "github.com/ory/dockertest/v3" ) type embeddedKV struct { - etcd *embed.Etcd + etcd *dockerexternal.EtcdNode opts Options dir string } @@ -51,12 +49,12 @@ func New(opts Options) (EmbeddedKV, error) { if err != nil { return nil, err } - cfg := embed.NewConfig() - cfg.Dir = dir - cfg.Logger = "zap" - setRandomPorts(cfg) - e, err := embed.StartEtcd(cfg) + pool, err := dockertest.NewPool("") + if err != nil { + return nil, fmt.Errorf("constructing dockertest.Pool for EmbeddedKV: %w", err) + } + e, err := dockerexternal.NewEtcd(pool, instrument.NewOptions()) if err != nil { return nil, fmt.Errorf("unable to start etcd, err: %v", err) } @@ -67,56 +65,16 @@ func New(opts Options) (EmbeddedKV, error) { }, nil } -func setRandomPorts(cfg *embed.Config) { - randomPortURL, err := url.Parse("http://localhost:0") - if err != nil { - panic(err.Error()) - } - - cfg.LPUrls = []url.URL{*randomPortURL} - cfg.APUrls = []url.URL{*randomPortURL} - cfg.LCUrls = []url.URL{*randomPortURL} - cfg.ACUrls = []url.URL{*randomPortURL} - - cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) -} - func (e *embeddedKV) Close() error { - var multi errors.MultiError - - // see if there's any errors - select { - case err := <-e.etcd.Err(): - multi = multi.Add(err) - default: - } - - // shutdown and release - e.etcd.Server.Stop() - e.etcd.Close() - - multi = multi.Add(os.RemoveAll(e.dir)) - return multi.FinalError() + return e.etcd.Close(context.TODO()) } func (e *embeddedKV) Start() error { timeout := e.opts.InitTimeout() - select { - case <-e.etcd.Server.ReadyNotify(): - break - case <-time.After(timeout): - return fmt.Errorf("etcd server took too long to start") - } - - // ensure v3 api endpoints are available, https://github.com/coreos/etcd/pull/7075 - apiVersionEndpoint := fmt.Sprintf("http://%s/version", e.etcd.Clients[0].Addr().String()) - fn := func() bool { return version3Available(apiVersionEndpoint) } - ok := xclock.WaitUntil(fn, timeout) - if !ok { - return fmt.Errorf("api version 3 not available") - } + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() - return nil + return e.etcd.Setup(ctx) } type versionResponse struct { @@ -144,11 +102,7 @@ func version3Available(endpoint string) bool { } func (e *embeddedKV) Endpoints() []string { - addresses := make([]string, 0, len(e.etcd.Clients)) - for _, c := range e.etcd.Clients { - addresses = append(addresses, c.Addr().String()) - } - return addresses + return []string{e.etcd.Address()} } func (e *embeddedKV) ConfigServiceClient(fns ...ClientOptFn) (client.Client, error) { @@ -156,7 +110,9 @@ func (e *embeddedKV) ConfigServiceClient(fns ...ClientOptFn) (client.Client, err SetInstrumentOptions(e.opts.InstrumentOptions()). SetServicesOptions(services.NewOptions().SetInitTimeout(e.opts.InitTimeout())). SetClusters([]etcdclient.Cluster{ - etcdclient.NewCluster().SetZone(e.opts.Zone()).SetEndpoints(e.Endpoints()), + etcdclient.NewCluster().SetZone(e.opts.Zone()). + SetEndpoints(e.Endpoints()). + SetAutoSyncInterval(-1), }). SetService(e.opts.ServiceID()). SetEnv(e.opts.Environment()). diff --git a/src/cluster/integration/etcd/options.go b/src/cluster/integration/etcd/options.go index 2c986bebfb..4382996a3a 100644 --- a/src/cluster/integration/etcd/options.go +++ b/src/cluster/integration/etcd/options.go @@ -27,7 +27,7 @@ import ( ) const ( - defaulTimeout = 5 * time.Second + defaultTimeout = 30 * time.Second defaultDir = "etcd.dir" defaultServiceID = "integration.service" defaultEnv = "integration.env" @@ -48,7 +48,7 @@ func NewOptions() Options { return &opts{ iopts: instrument.NewOptions(), workingDir: defaultDir, - initTimeout: defaulTimeout, + initTimeout: defaultTimeout, serviceID: defaultServiceID, env: defaultEnv, zone: defaultZone, diff --git a/src/cluster/kv/etcd/store_test.go b/src/cluster/kv/etcd/store_test.go index a4f4773934..64c7e781d2 100644 --- a/src/cluster/kv/etcd/store_test.go +++ b/src/cluster/kv/etcd/store_test.go @@ -36,10 +36,10 @@ import ( "github.com/m3db/m3/src/x/retry" "github.com/golang/protobuf/proto" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" "golang.org/x/net/context" ) @@ -513,7 +513,7 @@ func TestWatchNonBlocking(t *testing.T) { ecluster, opts, closeFn := testCluster(t) defer closeFn() - ec := ecluster.Client(0) + ec := ecluster.RandClient() opts = opts.SetWatchChanResetInterval(200 * time.Millisecond).SetWatchChanInitTimeout(500 * time.Millisecond) diff --git a/src/cluster/services/heartbeat/etcd/store_test.go b/src/cluster/services/heartbeat/etcd/store_test.go index 4950becc1e..b38025259b 100644 --- a/src/cluster/services/heartbeat/etcd/store_test.go +++ b/src/cluster/services/heartbeat/etcd/store_test.go @@ -28,9 +28,9 @@ import ( "github.com/m3db/m3/src/cluster/placement" "github.com/m3db/m3/src/cluster/services" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" ) func TestKeys(t *testing.T) { diff --git a/src/cluster/services/leader/client_test.go b/src/cluster/services/leader/client_test.go index 766d9e59fd..0fb7b75d4c 100644 --- a/src/cluster/services/leader/client_test.go +++ b/src/cluster/services/leader/client_test.go @@ -30,10 +30,10 @@ import ( "github.com/m3db/m3/src/cluster/services/leader/campaign" "github.com/m3db/m3/src/cluster/services/leader/election" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/framework/integration" "golang.org/x/net/context" ) @@ -127,11 +127,15 @@ func TestNewClient(t *testing.T) { assert.NotNil(t, svc) } +// TODO: this test most likely wasn't testing what we thought it was. While using etcd/testing/framework/integration, +// the client gets closed func TestNewClient_BadCluster(t *testing.T) { + t.Skip("This test only works with the etcd/testing/framework/integration package, " + + "and doesn't provide much signal on correctness of our code.") tc := newTestCluster(t) cl := tc.etcdClient() tc.close() - + require.NoError(t, cl.Close()) _, err := newClient(cl, tc.options(), "") assert.Error(t, err) } diff --git a/src/cluster/services/leader/election/client_test.go b/src/cluster/services/leader/election/client_test.go index af9c0b1759..6cc2db348d 100644 --- a/src/cluster/services/leader/election/client_test.go +++ b/src/cluster/services/leader/election/client_test.go @@ -25,11 +25,11 @@ import ( "testing" "time" + integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" - "go.etcd.io/etcd/tests/v3/framework/integration" ) type testCluster struct { diff --git a/src/cmd/services/m3dbnode/main/main_test.go b/src/cmd/services/m3dbnode/main/main_test.go index f488662251..f46e6c39aa 100644 --- a/src/cmd/services/m3dbnode/main/main_test.go +++ b/src/cmd/services/m3dbnode/main/main_test.go @@ -1,4 +1,6 @@ +//go:build big // +build big + // // Copyright (c) 2017 Uber Technologies, Inc. // @@ -51,7 +53,7 @@ import ( // TestConfig tests booting a server using file based configuration. func TestConfig(t *testing.T) { // Embedded kv - embeddedKV, err := etcd.New(etcd.NewOptions()) + embeddedKV, err := etcd.New(etcd.NewOptions().SetInitTimeout(30 * time.Second)) require.NoError(t, err) defer func() { require.NoError(t, embeddedKV.Close()) @@ -631,6 +633,7 @@ db: etcdClusters: - zone: {{.ServiceZone}} endpoints: {{.EtcdEndpoints}} + autoSyncInterval: -1 ` embeddedKVConfigPortion = ` diff --git a/src/integration/resources/docker/dockerexternal/etcd.go b/src/integration/resources/docker/dockerexternal/etcd.go new file mode 100644 index 0000000000..71383df7cc --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcd.go @@ -0,0 +1,279 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package dockerexternal + +import ( + "context" + "errors" + "fmt" + "math/rand" + "net" + "strconv" + "time" + + "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration/bridge" + xdockertest "github.com/m3db/m3/src/x/dockertest" + xerrors "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/x/instrument" + "github.com/m3db/m3/src/x/retry" + + "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +var ( + etcdImage = xdockertest.Image{ + Name: "bitnami/etcd", + Tag: "3.5.4", + } +) + +// NewEtcd constructs a single etcd node, running in a docker container. +func NewEtcd( + pool *dockertest.Pool, + instrumentOpts instrument.Options, + options ...EtcdClusterOption, +) (*EtcdNode, error) { + logger := instrumentOpts.Logger() + if logger == nil { + logger = zap.NewNop() + instrumentOpts = instrumentOpts.SetLogger(logger) + } + + var opts etcdClusterOptions + for _, o := range options { + o.apply(&opts) + } + + return &EtcdNode{ + pool: pool, + instrumentOpts: instrumentOpts, + logger: logger, + opts: opts, + // Solely for mocking in tests--unfortunately we don't want to take in the etcd client as a dependency here + // (we don't know the endpoints, and therefore need to construct it ourselves). + // Thus, we do two hops (mock newClient returning mock memberClient) + newClient: func(config clientv3.Config) (memberClient, error) { + return clientv3.New(config) + }, + }, nil +} + +// EtcdNode is a single etcd node, running via a docker container. +//nolint:maligned +type EtcdNode struct { + instrumentOpts instrument.Options + logger *zap.Logger + pool *dockertest.Pool + opts etcdClusterOptions + + // namePrefix is used to name the cluster. Exists solely for unittests in this package; otherwise a const + namePrefix string + newClient func(config clientv3.Config) (memberClient, error) + + // initialized by Setup + address string + resource *xdockertest.Resource + etcdCli *clientv3.Client + bridge *bridge.Bridge + + stopped bool +} + +// Setup starts the docker container. +func (c *EtcdNode) Setup(ctx context.Context) (closeErr error) { + if c.resource != nil { + return errors.New("etcd cluster already started") + } + + // nolint:gosec + id := rand.New(rand.NewSource(time.Now().UnixNano())).Int() + + namePrefix := "m3-test-etcd-" + if c.namePrefix != "" { + // support overriding for tests + namePrefix = c.namePrefix + } + + // Roughly, runs: + // docker run --rm --env ALLOW_NONE_AUTHENTICATION=yes -it --name Etcd bitnami/etcd + // Port 2379 on the container is bound to a free port on the host + resource, err := xdockertest.NewDockerResource(c.pool, xdockertest.ResourceOptions{ + OverrideDefaults: false, + // TODO: what even is this? + Source: "etcd", + ContainerName: fmt.Sprintf("%s%d", namePrefix, id), + Image: etcdImage, + Env: []string{"ALLOW_NONE_AUTHENTICATION=yes"}, + InstrumentOpts: c.instrumentOpts, + PortMappings: map[docker.Port][]docker.PortBinding{ + "2379/tcp": {{ + HostIP: "0.0.0.0", + HostPort: strconv.Itoa(c.opts.port), + }}, + }, + NoNetworkOverlay: true, + }) + + if err != nil { + return fmt.Errorf("starting etcd container: %w", err) + } + + container := resource.Resource().Container + c.logger.Info("etcd container started", + zap.String("containerID", container.ID), + zap.Any("ports", container.NetworkSettings.Ports), + // Uncomment if you need gory details about the container printed; equivalent of `docker inspect + // zap.Any("container", container), + ) + // Extract the port on which we are listening. + // This is coming from the equivalent of docker inspect + portBinds := container.NetworkSettings.Ports["2379/tcp"] + + // If running in a docker container e.g. on buildkite, route to etcd using the published port on the *host* machine. + // See also http://github.com/m3db/m3/blob/master/docker-compose.yml#L16-L16 + ipAddr := "127.0.0.1" + _, err = net.ResolveIPAddr("ip4", "host.docker.internal") + if err == nil { + c.logger.Info("Running tests within a docker container (e.g. for buildkite. " + + "Using host.docker.internal to talk to etcd") + ipAddr = "host.docker.internal" + } + + c.resource = resource + c.address = fmt.Sprintf("%s:%s", ipAddr, portBinds[0].HostPort) + + etcdCli, err := clientv3.New( + clientv3.Config{ + Endpoints: []string{c.address}, + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + Logger: c.logger, + }, + ) + if err != nil { + return fmt.Errorf("constructing etcd client: %w", err) + } + + defer func() { + if err := etcdCli.Close(); err != nil { + var merr xerrors.MultiError + closeErr = merr. + Add(closeErr). + Add(fmt.Errorf("closing etcd client: %w", err)). + FinalError() + } + }() + + return c.waitForHealth(ctx, etcdCli) +} + +func (c *EtcdNode) containerHostPort() string { + portBinds := c.resource.Resource().Container.NetworkSettings.Ports["2379/tcp"] + + return fmt.Sprintf("127.0.0.1:%s", portBinds[0].HostPort) +} + +func (c *EtcdNode) waitForHealth(ctx context.Context, memberCli memberClient) error { + retrier := retry.NewRetrier(retry.NewOptions(). + SetForever(true). + SetMaxBackoff(5 * time.Second), + ) + + var timeout time.Duration + deadline, ok := ctx.Deadline() + if ok { + timeout = deadline.Sub(time.Now()) + } + c.logger.Info( + "Waiting for etcd to report healthy (via member list)", + zap.String("timeout", timeout.String()), + ) + err := retrier.AttemptContext(ctx, func() error { + _, err := memberCli.MemberList(ctx) + if err != nil { + c.logger.Info( + "Failed connecting to etcd while waiting for container to come up", + zap.Error(err), + zap.String("endpoints", c.address), + ) + } + return err + }) + if err == nil { + c.logger.Info("etcd is healthy") + return nil + } + return fmt.Errorf("waiting for etcd to become healthy: %w", err) +} + +// Close stops the etcd node, and removes it. +func (c *EtcdNode) Close(ctx context.Context) error { + var err xerrors.MultiError + err = err. + Add(c.resource.Close()) + return err.FinalError() +} + +// Address is the host:port of the etcd node for use by etcd clients. +func (c *EtcdNode) Address() string { + return c.address +} + +// Stop stops the etcd container, but does not purge it. A stopped container can be restarted with Restart. +func (c *EtcdNode) Stop(ctx context.Context) error { + if c.stopped { + return errors.New("etcd node is already stopped") + } + if err := c.pool.Client.StopContainerWithContext(c.resource.Resource().Container.ID, 0, ctx); err != nil { + return err + } + c.stopped = true + return nil +} + +// Restart restarts the etcd container. If it isn't currently stopped, the etcd container will be stopped and then +// started; else it will just be start. +func (c *EtcdNode) Restart(ctx context.Context) error { + if !c.stopped { + c.logger.Info("Stopping etcd node") + + if err := c.Stop(ctx); err != nil { + return fmt.Errorf("stopping etcd node for Restart: %w", err) + } + } + err := c.pool.Client.StartContainerWithContext(c.resource.Resource().Container.ID, nil, ctx) + if err != nil { + return fmt.Errorf("starting etcd node for Restart: %w", err) + } + c.stopped = false + return nil +} + +var _ memberClient = (*clientv3.Client)(nil) + +// memberClient exposes just one method of *clientv3.Client, for purposes of tests. +type memberClient interface { + MemberList(ctx context.Context) (*clientv3.MemberListResponse, error) +} diff --git a/src/integration/resources/docker/dockerexternal/etcd_options.go b/src/integration/resources/docker/dockerexternal/etcd_options.go new file mode 100644 index 0000000000..e4a9addee5 --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcd_options.go @@ -0,0 +1,57 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package dockerexternal + +// EtcdClusterOption configure an etcd cluster. +type EtcdClusterOption interface { + apply(opts *etcdClusterOptions) +} + +type etcdClusterOptions struct { + useBridge bool + port int +} + +// EtcdClusterUseBridge configures an EtcdNode to insert a networking "bridge" between the etcd container and the +// calling processes. The bridge intercepts network traffic, and forwards it, unless told not to via e.g. Blackhole(). +// See the bridge package. +// As noted in that package, this implementation is lifted directly from the etcd/integration package; all credit goes +// to etcd authors for the approach. +func EtcdClusterUseBridge(shouldUseBridge bool) EtcdClusterOption { + return useBridge(shouldUseBridge) +} + +type useBridge bool + +func (u useBridge) apply(opts *etcdClusterOptions) { + opts.useBridge = bool(u) +} + +// EtcdClusterPort sets a specific port for etcd to listen on. Default is to listen on :0 (any free port). +func EtcdClusterPort(port int) EtcdClusterOption { + return withPort(port) +} + +type withPort int + +func (w withPort) apply(opts *etcdClusterOptions) { + opts.port = int(w) +} diff --git a/src/integration/resources/docker/dockerexternal/etcd_test.go b/src/integration/resources/docker/dockerexternal/etcd_test.go new file mode 100644 index 0000000000..a267afb75c --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcd_test.go @@ -0,0 +1,184 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package dockerexternal + +import ( + "context" + "math/rand" + "strconv" + "strings" + "testing" + "time" + + "github.com/m3db/m3/src/x/instrument" + + "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" +) + +const ( + testKey = "etcd-test" +) + +var ( + testLogger, _ = zap.NewDevelopment() +) + +type etcdTestDeps struct { + Pool *dockertest.Pool + InstrumentOpts instrument.Options + Etcd *EtcdNode +} + +func setupEtcdTest(t *testing.T) etcdTestDeps { + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + iopts := instrument.NewOptions().SetLogger(testLogger) + c, err := NewEtcd(pool, iopts) + require.NoError(t, err) + + return etcdTestDeps{ + Pool: pool, + Etcd: c, + InstrumentOpts: iopts, + } +} + +func TestCluster(t *testing.T) { + t.Run("starts a functioning cluster", func(t *testing.T) { + ctx, cancel := newTestContext() + defer cancel() + + deps := setupEtcdTest(t) + require.NoError(t, deps.Etcd.Setup(ctx)) + + t.Cleanup(func() { + require.NoError(t, deps.Etcd.Close(ctx)) + }) + + cli, err := clientv3.New( + clientv3.Config{ + Endpoints: []string{deps.Etcd.Address()}, + }, + ) + require.NoError(t, err) + + //nolint:gosec + testVal := strconv.Itoa(rand.Intn(10000)) + _, err = cli.Put(ctx, testKey, testVal) + require.NoError(t, err) + + actualVal, err := cli.Get(ctx, testKey) + require.NoError(t, err) + + assert.Equal(t, testVal, string(actualVal.Kvs[0].Value)) + }) + + t.Run("can run multiple at once", func(t *testing.T) { + ctx, cancel := newTestContext() + defer cancel() + + deps := setupEtcdTest(t) + + require.NoError(t, deps.Etcd.Setup(ctx)) + defer func() { + require.NoError(t, deps.Etcd.Close(ctx)) + }() + + c2, err := NewEtcd(deps.Pool, deps.InstrumentOpts) + require.NoError(t, err) + require.NoError(t, c2.Setup(ctx)) + defer func() { + require.NoError(t, c2.Close(ctx)) + }() + }) + + t.Run("cleans up containers on shutdown", func(t *testing.T) { + ctx, cancel := newTestContext() + defer cancel() + + deps := setupEtcdTest(t) + testPrefix := "cleanup-test-" + deps.Etcd.namePrefix = testPrefix + + findContainers := func(namePrefix string, pool *dockertest.Pool) ([]docker.APIContainers, error) { + containers, err := deps.Pool.Client.ListContainers(docker.ListContainersOptions{}) + if err != nil { + return nil, err + } + + var rtn []docker.APIContainers + for _, ct := range containers { + for _, name := range ct.Names { + // Docker response prefixes the container name with / regardless of what you give it as input. + if strings.HasPrefix(name, "/"+namePrefix) { + rtn = append(rtn, ct) + break + } + } + } + return rtn, nil + } + + require.NoError(t, deps.Etcd.Setup(ctx)) + + cts, err := findContainers(testPrefix, deps.Pool) + require.NoError(t, err) + assert.Len(t, cts, 1) + + require.NoError(t, deps.Etcd.Close(ctx)) + cts, err = findContainers(testPrefix, deps.Pool) + require.NoError(t, err) + assert.Len(t, cts, 0) + }) +} + +func TestCluster_waitForHealth(t *testing.T) { + t.Run("errors when context is canceled", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + deps := setupEtcdTest(t) + etcdCli := fakeMemberClient{err: assert.AnError} + cancel() + require.EqualError( + t, + deps.Etcd.waitForHealth(ctx, etcdCli), + "waiting for etcd to become healthy: context canceled while retrying: context canceled", + ) + }) +} + +type fakeMemberClient struct { + err error +} + +func (f fakeMemberClient) MemberList(ctx context.Context) (*clientv3.MemberListResponse, error) { + return nil, f.err +} + +func newTestContext() (context.Context, func()) { + return context.WithTimeout(context.Background(), 10*time.Second) +} diff --git a/src/integration/resources/docker/dockerexternal/etcdintegration/bridge/README.md b/src/integration/resources/docker/dockerexternal/etcdintegration/bridge/README.md new file mode 100644 index 0000000000..860656226a --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcdintegration/bridge/README.md @@ -0,0 +1,11 @@ +# Fork Notice + +This code is taken very directly from the etcd codebase (as is allowed by the Apache License). +The original license is included in all files. + +The file was copied from: https://github.com/etcd-io/etcd/blob/cdd2b737f04812a919a5735380fdaa1f932346d0/tests/framework/integration/bridge.go + +As of this notice only slight modifications have been made--primarily to satisfy linters +(lint ignores, public method comments). + +Thank you to the etcd maintainers for use of this code! \ No newline at end of file diff --git a/src/integration/resources/docker/dockerexternal/etcdintegration/bridge/bridge.go b/src/integration/resources/docker/dockerexternal/etcdintegration/bridge/bridge.go new file mode 100644 index 0000000000..f8ff1525e4 --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcdintegration/bridge/bridge.go @@ -0,0 +1,256 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bridge + +import ( + "io" + "net" + "sync" +) + +// Dialer makes TCP connections. +type Dialer interface { + Dial() (net.Conn, error) +} + +// Bridge proxies connections between listener and dialer, making it possible +// to disconnect grpc network connections without closing the logical grpc connection. +type Bridge struct { + dialer Dialer + l net.Listener + conns map[*bridgeConn]struct{} + + stopc chan struct{} + pausec chan struct{} + blackholec chan struct{} + wg sync.WaitGroup + + mu sync.Mutex +} + +// New constructs a bridge listening to the given listener and connecting using the given dialer. +func New(dialer Dialer, listener net.Listener) (*Bridge, error) { + b := &Bridge{ + // Bridge "port" is ("%05d%05d0", port, pid) since go1.8 expects the port to be a number + dialer: dialer, + l: listener, + conns: make(map[*bridgeConn]struct{}), + stopc: make(chan struct{}), + pausec: make(chan struct{}), + blackholec: make(chan struct{}), + } + close(b.pausec) + b.wg.Add(1) + go b.serveListen() + return b, nil +} + +// Close stops the bridge. +func (b *Bridge) Close() { + //nolint:errcheck + b.l.Close() + b.mu.Lock() + select { + case <-b.stopc: + default: + close(b.stopc) + } + b.mu.Unlock() + b.wg.Wait() +} + +// DropConnections drops connections to the bridge. +func (b *Bridge) DropConnections() { + b.mu.Lock() + defer b.mu.Unlock() + for bc := range b.conns { + bc.Close() + } + b.conns = make(map[*bridgeConn]struct{}) +} + +// PauseConnections pauses all connections. +func (b *Bridge) PauseConnections() { + b.mu.Lock() + b.pausec = make(chan struct{}) + b.mu.Unlock() +} + +// UnpauseConnections unpauses all connections. +func (b *Bridge) UnpauseConnections() { + b.mu.Lock() + select { + case <-b.pausec: + default: + close(b.pausec) + } + b.mu.Unlock() +} + +func (b *Bridge) serveListen() { + defer func() { + //nolint:errcheck + b.l.Close() + b.mu.Lock() + for bc := range b.conns { + bc.Close() + } + b.mu.Unlock() + b.wg.Done() + }() + + for { + inc, ierr := b.l.Accept() + if ierr != nil { + return + } + b.mu.Lock() + pausec := b.pausec + b.mu.Unlock() + select { + case <-b.stopc: + //nolint:errcheck + inc.Close() + return + case <-pausec: + } + + outc, oerr := b.dialer.Dial() + if oerr != nil { + //nolint:errcheck + inc.Close() + return + } + + bc := &bridgeConn{inc, outc, make(chan struct{})} + b.wg.Add(1) + b.mu.Lock() + b.conns[bc] = struct{}{} + go b.serveConn(bc) + b.mu.Unlock() + } +} + +func (b *Bridge) serveConn(bc *bridgeConn) { + defer func() { + close(bc.donec) + bc.Close() + b.mu.Lock() + delete(b.conns, bc) + b.mu.Unlock() + b.wg.Done() + }() + + var wg sync.WaitGroup + wg.Add(2) + go func() { + //nolint:errcheck + b.ioCopy(bc.out, bc.in) + bc.close() + wg.Done() + }() + go func() { + //nolint:errcheck + b.ioCopy(bc.in, bc.out) + bc.close() + wg.Done() + }() + wg.Wait() +} + +type bridgeConn struct { + in net.Conn + out net.Conn + donec chan struct{} +} + +func (bc *bridgeConn) Close() { + bc.close() + <-bc.donec +} + +func (bc *bridgeConn) close() { + //nolint:errcheck + bc.in.Close() + //nolint:errcheck + bc.out.Close() +} + +// Blackhole stops connections to the bridge. +func (b *Bridge) Blackhole() { + b.mu.Lock() + close(b.blackholec) + b.mu.Unlock() +} + +// Unblackhole stops connections to the bridge. +func (b *Bridge) Unblackhole() { + b.mu.Lock() + for bc := range b.conns { + bc.Close() + } + b.conns = make(map[*bridgeConn]struct{}) + b.blackholec = make(chan struct{}) + b.mu.Unlock() +} + +// ref. https://github.com/golang/go/blob/master/src/io/io.go copyBuffer +func (b *Bridge) ioCopy(dst io.Writer, src io.Reader) (err error) { + buf := make([]byte, 32*1024) + for { + select { + case <-b.blackholec: + //nolint:errcheck + io.Copy(io.Discard, src) + return nil + default: + } + nr, er := src.Read(buf) + if nr > 0 { + nw, ew := dst.Write(buf[0:nr]) + if ew != nil { + return ew + } + if nr != nw { + return io.ErrShortWrite + } + } + if er != nil { + err = er + break + } + } + return err +} diff --git a/src/integration/resources/docker/dockerexternal/etcdintegration/cluster.go b/src/integration/resources/docker/dockerexternal/etcdintegration/cluster.go new file mode 100644 index 0000000000..a83af92fe1 --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcdintegration/cluster.go @@ -0,0 +1,287 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Package etcdintegration is a mostly drop-in replacement for the etcd integration +// (github.com/etcd-io/etcd/tests/v3/framework/integration) package. +// Instead of starting etcd within this Go process, it starts etcd using a docker container. +package etcdintegration + +import ( + "context" + "fmt" + "math/rand" + "net" + "time" + + "github.com/m3db/m3/src/integration/resources/docker/dockerexternal" + "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration/bridge" + xerrors "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/x/instrument" + "github.com/m3db/m3/src/x/retry" + + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + "google.golang.org/grpc" +) + +const ( + startTimeout = 30 * time.Second + stopTimeout = 30 * time.Second + + clientHealthTimeout = 30 * time.Second +) + +// ClusterConfig configures an etcd integration test cluster. +type ClusterConfig struct { + // Size is the number of nodes in the cluster. Provided as a parameter to be API compatible with the etcd package, + // but currently only one node is supported. + Size int + + // UseBridge enables a networking bridge on etcd members, accessible via Node.Bridge(). This allows manipulation + // of connections to particular members. + UseBridge bool +} + +// Cluster is an etcd cluster. Currently, the implementation is such that only one node clusters are allowed. +type Cluster struct { + // Members are the etcd nodes that make up the cluster. + Members []*Node + + terminated bool +} + +// NewCluster starts an etcd cluster using docker. +func NewCluster(t testingT, cfg *ClusterConfig) *Cluster { + if cfg.Size > 1 { + t.Errorf("NewCluster currently only supports single node clusters") + t.FailNow() + return nil + } + + logger := zaptest.NewLogger(t) + + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + r, err := dockerexternal.NewEtcd(pool, instrument.NewOptions(), dockerexternal.EtcdClusterUseBridge(cfg.UseBridge)) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), startTimeout) + defer cancel() + + cluster := &Cluster{ + Members: []*Node{newNode(r, logger, cfg)}, + } + + require.NoError(t, cluster.start(ctx)) + + // Paranoia: try to ensure that we cleanup the containers, even if our callers mess up. + t.Cleanup(func() { + if !cluster.terminated { + cluster.Terminate(t) + } + }) + return cluster +} + +// start is private because NewCluster is intended to always start the cluster. +func (c *Cluster) start(ctx context.Context) error { + var merr xerrors.MultiError + for _, m := range c.Members { + merr = merr.Add(m.start(ctx)) + } + if err := merr.FinalError(); err != nil { + return fmt.Errorf("failed starting etcd cluster: %w", err) + } + return nil +} + +// RandClient returns a client from any member in the cluster. +func (c *Cluster) RandClient() *clientv3.Client { + //nolint:gosec + return c.Members[rand.Intn(len(c.Members))].Client +} + +// Terminate stops all nodes in the cluster. +func (c *Cluster) Terminate(t testingT) { + ctx, cancel := context.WithTimeout(context.Background(), stopTimeout) + defer cancel() + + c.terminated = true + + var err xerrors.MultiError + for _, node := range c.Members { + err = err.Add(node.close(ctx)) + } + require.NoError(t, err.FinalError()) +} + +// Node is a single etcd server process, running in a docker container. +type Node struct { + Client *clientv3.Client + + resource dockerEtcd + cfg *ClusterConfig + logger *zap.Logger + bridge *bridge.Bridge +} + +func newNode(r dockerEtcd, logger *zap.Logger, cfg *ClusterConfig) *Node { + return &Node{ + resource: r, + logger: logger, + cfg: cfg, + } +} + +// Stop stops the etcd container, but doesn't remove it. +func (n *Node) Stop(t testingT) { + ctx, cancel := context.WithTimeout(context.Background(), stopTimeout) + defer cancel() + require.NoError(t, n.resource.Stop(ctx)) + + if n.bridge != nil { + n.bridge.Close() + } +} + +// Bridge can be used to manipulate connections to this etcd node. It +// is a man-in-the-middle listener which mostly transparently forwards connections, unless told to drop them via e.g. +// the Blackhole method. +// Bridge will only be active if cfg.UseBridge is true; calling this method otherwise will panic. +func (n *Node) Bridge() *bridge.Bridge { + if !n.cfg.UseBridge { + panic("EtcdNode wasn't configured to use a Bridge; pass EtcdClusterUseBridge(true) to enable.") + } + return n.bridge +} + +// Restart starts a stopped etcd container, stopping it first if it's not already. +func (n *Node) Restart(t testingT) error { + ctx, cancel := context.WithTimeout(context.Background(), startTimeout) + defer cancel() + require.NoError(t, n.resource.Restart(ctx)) + return nil +} + +// start starts the etcd node. It is private because it isn't part of the etcd/integration package API, and +// should only be called by Cluster.start. +func (n *Node) start(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, startTimeout) + defer cancel() + + if err := n.resource.Setup(ctx); err != nil { + return err + } + + address := n.resource.Address() + if n.cfg.UseBridge { + addr, err := n.setupBridge() + if err != nil { + return fmt.Errorf("setting up connection bridge for etcd node: %w", err) + } + address = addr + } + + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{"http://" + address}, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + DialTimeout: 5 * time.Second, + Logger: n.logger, + }) + + if err != nil { + return fmt.Errorf("constructing etcd client for member: %w", err) + } + + n.logger.Info("Connecting to docker etcd using host machine port", + zap.String("endpoint", address), + ) + + n.Client = etcdCli + return nil +} + +// setupBridge puts a man-in-the-middle listener in between the etcd docker process and the client. See Bridge() for +// details. +// Returns the new address of the bridge, which clients should connect to. +func (n *Node) setupBridge() (string, error) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return "", fmt.Errorf("setting up listener for bridge: %w", err) + } + + n.logger.Info("etcd bridge is listening", zap.String("addr", listener.Addr().String())) + + // dialer = make connections to the etcd container + // listener = the bridge's inbounds + n.bridge, err = bridge.New(dialer{hostport: n.resource.Address()}, listener) + if err != nil { + return "", err + } + + return listener.Addr().String(), nil +} + +func (n *Node) close(ctx context.Context) error { + var err xerrors.MultiError + err = err.Add(n.Client.Close()) + return err.Add(n.resource.Close(ctx)).FinalError() +} + +type dialer struct { + hostport string +} + +func (d dialer) Dial() (net.Conn, error) { + return net.Dial("tcp", d.hostport) +} + +// testingT wraps *testing.T. Allows us to not directly depend on *testing package. +type testingT interface { + zaptest.TestingT + require.TestingT + + Cleanup(func()) +} + +// BeforeTestExternal -- solely here to match etcd API's. +func BeforeTestExternal(t testingT) {} + +// WaitClientV3 waits for an etcd client to be healthy. +func WaitClientV3(t testingT, kv clientv3.KV) { + ctx, cancel := context.WithTimeout(context.Background(), clientHealthTimeout) + defer cancel() + + err := retry.NewRetrier(retry.NewOptions().SetForever(true)).AttemptContext( + ctx, + func() error { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + _, err := kv.Get(ctx, "/") + return err + }, + ) + + require.NoError(t, err) +} diff --git a/src/integration/resources/docker/dockerexternal/etcdintegration/cluster_test.go b/src/integration/resources/docker/dockerexternal/etcdintegration/cluster_test.go new file mode 100644 index 0000000000..ed84e57362 --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcdintegration/cluster_test.go @@ -0,0 +1,80 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package etcdintegration + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + testKey = "test-key" +) + +func TestCluster(t *testing.T) { + c := NewCluster(t, &ClusterConfig{Size: 1}) + + defer c.Terminate(t) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + testVal := newTestValue(t) + _, err := c.RandClient().Put(ctx, testKey, testVal) + require.NoError(t, err) + + resp, err := c.RandClient().Get(ctx, testKey) + require.NoError(t, err) + assert.Equal(t, testVal, string(resp.Kvs[0].Value)) +} + +func TestCluster_withBridge(t *testing.T) { + c := NewCluster(t, &ClusterConfig{Size: 1, UseBridge: true}) + + defer c.Terminate(t) + + c.Members[0].Bridge().DropConnections() + c.Members[0].Bridge().Blackhole() + + ctx := context.Background() + + blackholedCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + + _, err := c.RandClient().MemberList(blackholedCtx) + require.EqualError(t, err, context.DeadlineExceeded.Error()) + + c.Members[0].Bridge().Unblackhole() + + availCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + _, err = c.RandClient().MemberList(availCtx) + require.NoError(t, err) +} + +func newTestValue(t *testing.T) string { + return fmt.Sprintf("%s-%d", t.Name(), time.Now().Unix()) +} diff --git a/src/integration/resources/docker/dockerexternal/etcdintegration/types.go b/src/integration/resources/docker/dockerexternal/etcdintegration/types.go new file mode 100644 index 0000000000..7e9dc35ca9 --- /dev/null +++ b/src/integration/resources/docker/dockerexternal/etcdintegration/types.go @@ -0,0 +1,34 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package etcdintegration + +import "context" + +// dockerEtcd are the operations we need out of a docker based etcd process. It's implemented by at least +// resources/dockerexternal.EtcdNode +type dockerEtcd interface { + Setup(ctx context.Context) error + Close(ctx context.Context) error + + Stop(ctx context.Context) error + Restart(ctx context.Context) error + Address() string +} diff --git a/src/x/dockertest/common.go b/src/x/dockertest/common.go index 01558b0d98..0cb4938651 100644 --- a/src/x/dockertest/common.go +++ b/src/x/dockertest/common.go @@ -58,6 +58,8 @@ type ResourceOptions struct { Image Image PortList []int PortMappings map[dc.Port][]dc.PortBinding + + // NoNetworkOverlay if set, disables use of the default integration testing network we create (networkName). NoNetworkOverlay bool // Env is the environment for the docker container; it corresponds 1:1 with dockertest.RunOptions. @@ -111,8 +113,7 @@ func (o ResourceOptions) WithDefaults( func newOptions(name string) *dockertest.RunOptions { return &dockertest.RunOptions{ - Name: name, - NetworkID: networkName, + Name: name, } } diff --git a/src/x/dockertest/docker_resource.go b/src/x/dockertest/docker_resource.go index c47ca1f69e..927539bcea 100644 --- a/src/x/dockertest/docker_resource.go +++ b/src/x/dockertest/docker_resource.go @@ -83,6 +83,9 @@ func NewDockerResource( } opts := newOptions(containerName) + if !resourceOpts.NoNetworkOverlay { + opts.NetworkID = networkName + } opts, err := exposePorts(opts, portList, resourceOpts.PortMappings) if err != nil { return nil, err