Skip to content

Commit

Permalink
VTGateProxy CI (#414)
Browse files Browse the repository at this point in the history
* Add vtgateproxy tests to CI
* Refactor tests to make them less flappy
---------
Signed-off-by: Riley Laine <rlaine@slack-corp.com>
Signed-off-by: Esme Lamb <dlamb@slack-corp.com>
  • Loading branch information
rjlaine authored and dedelala committed Nov 12, 2024
1 parent ddf936d commit de86aba
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 118 deletions.
119 changes: 43 additions & 76 deletions go/test/endtoend/vtgateproxy/rebalance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@ package vtgateproxy

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strconv"
"testing"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/assert"
Expand All @@ -47,8 +44,7 @@ func testVtgateProxyRebalance(t *testing.T, loadBalancer string) {

const targetAffinity = "use1-az1"
const targetPool = "pool1"
const vtgateCount = 10
const vtgatesInAffinity = 5
const vtgateCount = 5
const vtgateproxyConnections = 4

vtgates, err := startAdditionalVtgates(vtgateCount)
Expand All @@ -61,28 +57,20 @@ func testVtgateProxyRebalance(t *testing.T, loadBalancer string) {
var config []map[string]string

for i, vtgate := range vtgates {
affinity := targetAffinity
if i >= vtgatesInAffinity {
affinity = "use1-az2"
}
config = append(config, map[string]string{
"host": fmt.Sprintf("vtgate%v", i),
"address": clusterInstance.Hostname,
"grpc": strconv.Itoa(vtgate.GrpcPort),
"az_id": affinity,
"az_id": targetAffinity,
"type": targetPool,
})
}

vtgateIdx := vtgateproxyConnections
b, err := json.Marshal(config[:vtgateIdx])
if err != nil {
t.Fatal(err)
}
if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil {
if err := writeConfig(t, vtgateHostsFile, config, nil); err != nil {
t.Fatal(err)
}

// Spin up proxy
vtgateproxyHTTPPort := clusterInstance.GetAndReservePort()
vtgateproxyGrpcPort := clusterInstance.GetAndReservePort()
vtgateproxyMySQLPort := clusterInstance.GetAndReservePort()
Expand Down Expand Up @@ -114,27 +102,32 @@ func testVtgateProxyRebalance(t *testing.T, loadBalancer string) {

log.Info("Reading test value while adding vtgates")

const totalQueries = 1000
addVtgateEveryN := totalQueries / len(vtgates)
// Scale up
for i := 1; i <= vtgateCount; i++ {
if err := writeConfig(t, vtgateHostsFile, config[:i], vtgateproxyProcInstance); err != nil {
t.Fatal(err)
}

for i := 0; i < totalQueries; i++ {
if i%(addVtgateEveryN) == 0 && vtgateIdx <= len(vtgates) {
log.Infof("Adding vtgate %v", vtgateIdx-1)
b, err = json.Marshal(config[:vtgateIdx])
// Run queries at each configuration
for j := 0; j < 100; j++ {
result, err := selectHelper[customerEntry](context.Background(), conn, "select id, email from customer")
if err != nil {
t.Fatal(err)
}
if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil {
t.Fatal(err)
}

if err := vtgateproxyProcInstance.WaitForConfig(config[:vtgateIdx], 5*time.Second); err != nil {
t.Fatal(err)
}

vtgateIdx++
assert.Equal(t, []customerEntry{{1, "email1"}}, result)
}
}

log.Info("Removing first vtgates")

// Pop the first 2 vtgates off to force first_ready to pick a new target
if err := writeConfig(t, vtgateHostsFile, config[2:], vtgateproxyProcInstance); err != nil {
t.Fatal(err)
}

// Run queries in the last configuration
for j := 0; j < 100; j++ {
result, err := selectHelper[customerEntry](context.Background(), conn, "select id, email from customer")
if err != nil {
t.Fatal(err)
Expand All @@ -143,59 +136,33 @@ func testVtgateProxyRebalance(t *testing.T, loadBalancer string) {
assert.Equal(t, []customerEntry{{1, "email1"}}, result)
}

// No queries should be sent to vtgates outside target affinity
const expectMaxQueryCountNonAffinity = 0
var expectVtgatesWithQueries int

switch loadBalancer {
case "round_robin":
// At least 1 query should be sent to every vtgate matching target
// affinity
const expectMinQueryCountAffinity = 1

for i, vtgate := range vtgates {
queryCount, err := getVtgateQueryCount(vtgate)
if err != nil {
t.Fatal(err)
}

affinity := config[i]["az_id"]

log.Infof("vtgate %v (%v) query counts: %+v", i, affinity, queryCount)

if affinity == targetAffinity {
assert.GreaterOrEqual(t, queryCount.Sum(), expectMinQueryCountAffinity, "vtgate %v did not recieve the expected number of queries", i)
} else {
assert.LessOrEqual(t, queryCount.Sum(), expectMaxQueryCountNonAffinity, "vtgate %v recieved more than the expected number of queries", i)
}
}
// Every vtgate should get some queries. We went from 1 vtgates to
// NumConnections+1 vtgates, and then removed the first vtgate.
expectVtgatesWithQueries = len(vtgates)
case "first_ready":
// A single vtgate should become the target, and it should recieve all
// queries
targetVtgate := -1

for i, vtgate := range vtgates {
queryCount, err := getVtgateQueryCount(vtgate)
if err != nil {
t.Fatal(err)
}

affinity := config[i]["az_id"]
// Only 2 vtgates should have queries. The first vtgate should get all
// queries until it is removed, and then a new vtgate should be picked
// to get all subsequent queries.
expectVtgatesWithQueries = 2
}

log.Infof("vtgate %v (%v) query counts: %+v", i, affinity, queryCount)
var vtgatesWithQueries int
for i, vtgate := range vtgates {
queryCount, err := getVtgateQueryCount(vtgate)
if err != nil {
t.Fatal(err)
}

sum := queryCount.Sum()
if sum == 0 {
continue
}
log.Infof("vtgate %v query counts: %+v", i, queryCount)

if targetVtgate != -1 {
t.Logf("only vtgate %v should have received queries; vtgate %v got %v", targetVtgate, i, sum)
t.Fail()
} else if affinity == targetAffinity {
targetVtgate = i
} else {
assert.LessOrEqual(t, queryCount.Sum(), expectMaxQueryCountNonAffinity, "vtgate %v recieved more than the expected number of queries", i)
}
if queryCount.Sum() > 0 {
vtgatesWithQueries++
}
}

assert.Equal(t, expectVtgatesWithQueries, vtgatesWithQueries)
}
96 changes: 54 additions & 42 deletions go/test/endtoend/vtgateproxy/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func testVtgateProxyScale(t *testing.T, loadBalancer string) {
vtgateHostsFile := filepath.Join(clusterInstance.TmpDirectory, "hosts")
var config []map[string]string

// Spin up vtgates
for i, vtgate := range vtgates {
pool := targetPool
if i == 0 {
Expand All @@ -76,14 +77,12 @@ func testVtgateProxyScale(t *testing.T, loadBalancer string) {
"type": pool,
})
}
b, err := json.Marshal(config[:1])
if err != nil {
t.Fatal(err)
}
if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil {

if err := writeConfig(t, vtgateHostsFile, config[:1], nil); err != nil {
t.Fatal(err)
}

// Spin up proxy
vtgateproxyHTTPPort := clusterInstance.GetAndReservePort()
vtgateproxyGrpcPort := clusterInstance.GetAndReservePort()
vtgateproxyMySQLPort := clusterInstance.GetAndReservePort()
Expand Down Expand Up @@ -113,35 +112,19 @@ func testVtgateProxyScale(t *testing.T, loadBalancer string) {
t.Fatal("no vtgates in the pool, ping should have failed")
}

log.Info("Reading test value while scaling vtgates")

// Start with an empty list of vtgates, then scale up, then scale back to
// 0. We should expect to see immediate failure when there are no vtgates,
// then success at each scale, until we hit 0 vtgates again, at which point
// we should fail fast again.
i := 0
scaleUp := true
for {
t.Logf("writing config file with %v vtgates", i)
b, err = json.Marshal(config[:i])
if err != nil {
t.Fatal(err)
}
if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil {
t.Fatal(err)
}

if err := vtgateproxyProcInstance.WaitForConfig(config[:i], 5*time.Second); err != nil {
testQuery := func(i int) {
if err := writeConfig(t, vtgateHostsFile, config[:i], vtgateproxyProcInstance); err != nil {
t.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

result, err := selectHelper[customerEntry](ctx, conn, "select id, email from customer")
// 0 vtgates should fail
// First vtgate is in the wrong pool, so it should also fail
if i <= 1 {

switch i {
case 0:
// If there are 0 vtgates, expect a failure
if err == nil {
t.Fatal("query should have failed with no vtgates")
}
Expand All @@ -150,25 +133,54 @@ func testVtgateProxyScale(t *testing.T, loadBalancer string) {
if loadBalancer == "first_ready" && errors.Is(err, context.DeadlineExceeded) {
t.Fatal("query timed out but it should have failed fast")
}
} else if err != nil {
t.Fatalf("%v vtgates were present, but the query still failed: %v", i, err)
} else {
assert.Equal(t, []customerEntry{{1, "email1"}}, result)
}
case 1:
// If there is 1 vtgate, expect a failure since it's in the wrong pool
if err == nil {
t.Fatal("query should have failed with no vtgates in the pool")
}

if scaleUp {
i++
if i >= len(config) {
scaleUp = false
i -= 2
// In first_ready mode, we expect to fail fast and not time out.
if loadBalancer == "first_ready" && errors.Is(err, context.DeadlineExceeded) {
t.Fatal("query timed out but it should have failed fast")
}
default:
if err != nil {
t.Fatalf("%v vtgates were present, but the query still failed: %v", i, err)
}

continue
assert.Equal(t, []customerEntry{{1, "email1"}}, result)
}
}

i--
if i < 0 {
break
}
log.Info("Reading test value while scaling vtgates")

// Start with an empty list of vtgates, then scale up, then scale back to
// 0. We should expect to see immediate failure when there are no vtgates,
// then success at each scale, until we hit 0 vtgates again, at which point
// we should fail fast again.
for i := 0; i <= len(config); i++ {
testQuery(i)
}

for i := len(config) - 1; i >= 0; i-- {
testQuery(i)
}
}

func writeConfig(t *testing.T, target string, config []map[string]string, proc *VtgateProxyProcess) error {
t.Logf("writing config with %v vtgates", len(config))

b, err := json.Marshal(config)
if err != nil {
return err
}
if err := os.WriteFile(target, b, 0644); err != nil {
return err
}

if proc != nil {
return proc.WaitForConfig(config, 5*time.Second)
}

return nil
}
9 changes: 9 additions & 0 deletions test/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -1203,6 +1203,15 @@
"Shard": "vttablet_prscomplex",
"RetryMax": 1,
"Tags": [""]
},
"vtgateproxy": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vtgateproxy"],
"Command": [],
"Manual": false,
"Shard": "vtgate_general_heavy",
"RetryMax": 1,
"Tags": []
}
}
}

0 comments on commit de86aba

Please sign in to comment.