Skip to content

Commit

Permalink
Merge pull request #275 from ishan16696/master
Browse files Browse the repository at this point in the history
Validator - double check latest revision by starting an embedded etcd if DB-based revision check fails
  • Loading branch information
shreyas-s-rao authored Nov 9, 2020
2 parents 009dcbf + 4526158 commit 8d50c71
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 26 deletions.
6 changes: 4 additions & 2 deletions pkg/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (e *EtcdInitializer) Initialize(mode validator.Mode, failBelowRevision int6
metrics.ValidationDurationSeconds.With(prometheus.Labels{metrics.LabelSucceeded: metrics.ValueSucceededFalse}).Observe(time.Now().Sub(start).Seconds())
return fmt.Errorf("failed to initialize since fail below revision check failed")
}

metrics.ValidationDurationSeconds.With(prometheus.Labels{metrics.LabelSucceeded: metrics.ValueSucceededTrue}).Observe(time.Now().Sub(start).Seconds())

if dataDirStatus != validator.DataDirectoryValid {
Expand All @@ -77,8 +78,9 @@ func NewInitializer(options *restorer.RestoreOptions, snapstoreConfig *snapstore
},
Validator: &validator.DataValidator{
Config: &validator.Config{
DataDir: options.Config.RestoreDataDir,
SnapstoreConfig: snapstoreConfig,
DataDir: options.Config.RestoreDataDir,
EmbeddedEtcdQuotaBytes: options.Config.EmbeddedEtcdQuotaBytes,
SnapstoreConfig: snapstoreConfig,
},
Logger: logger,
ZapLogger: zapLogger,
Expand Down
103 changes: 92 additions & 11 deletions pkg/initializer/validator/datavalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,22 @@
package validator

import (
"context"
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"io"
"os"
"path/filepath"
"time"

"github.com/gardener/etcd-backup-restore/pkg/miscellaneous"
"github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer"
"github.com/gardener/etcd-backup-restore/pkg/snapstore"
"github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver/api/snap"
"go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/wal"
Expand Down Expand Up @@ -114,8 +118,19 @@ func (d *DataValidator) sanityCheck(failBelowRevision int64) (DataDirStatus, err
d.Logger.Infof("unable to get current etcd revision from backend db file: %v", err)
return DataDirectoryCorrupt, nil
}
d.Logger.Info("Checking for revision consistency...")
return d.checkRevisionConsistency(etcdRevision, failBelowRevision)

d.Logger.Info("Checking for etcd revision consistency...")
etcdRevisionStatus, latestSnapshotRevision, err := d.checkEtcdDataRevisionConsistency(etcdRevision, failBelowRevision)

// if etcd revision is inconsistent with latest snapshot revision then
// check the etcd revision consistency by starting an embedded etcd since the WALs file can have uncommited data which it was unable to flush to Bolt DB.
if etcdRevisionStatus == RevisionConsistencyError {
d.Logger.Info("Checking for Full revision consistency...")
fullRevisionConsistencyStatus, err := d.checkFullRevisionConsistency(dataDir, latestSnapshotRevision)
return fullRevisionConsistencyStatus, err
}

return etcdRevisionStatus, err
}

// checkForDataCorruption will check for corruption of different files used by etcd.
Expand Down Expand Up @@ -229,18 +244,20 @@ func verifyDB(path string) error {
})
}

// checkRevisionConsistency compares the latest revisions on the etcd db file and the latest snapshot to verify that the etcd revision is not lesser than snapshot revision.
// Return DataDirStatus indicating whether it is due to failBelowRevision or latest snapshot revision for snapstore.
func (d *DataValidator) checkRevisionConsistency(etcdRevision, failBelowRevision int64) (DataDirStatus, error) {
// checkEtcdDataRevisionConsistency compares the latest revision of the etcd db file and the latest snapshot revision to verify that the etcd revision is not lesser than snapshot revision.
// Return DataDirStatus indicating whether it is due to failBelowRevision or latest snapshot revision for snapstore and also return the latest snapshot revision.
func (d *DataValidator) checkEtcdDataRevisionConsistency(etcdRevision, failBelowRevision int64) (DataDirStatus, int64, error) {
var latestSnapshotRevision int64
latestSnapshotRevision = 0

store, err := snapstore.GetSnapstore(d.Config.SnapstoreConfig)
if err != nil {
return DataDirectoryStatusUnknown, fmt.Errorf("unable to fetch snapstore: %v", err)
return DataDirectoryStatusUnknown, latestSnapshotRevision, fmt.Errorf("unable to fetch snapstore: %v", err)
}

var latestSnapshotRevision int64
fullSnap, deltaSnaps, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store)
if err != nil {
return DataDirectoryStatusUnknown, fmt.Errorf("unable to get snapshots from store: %v", err)
return DataDirectoryStatusUnknown, latestSnapshotRevision, fmt.Errorf("unable to get snapshots from store: %v", err)
}
if len(deltaSnaps) != 0 {
latestSnapshotRevision = deltaSnaps[len(deltaSnaps)-1].LastRevision
Expand All @@ -250,13 +267,61 @@ func (d *DataValidator) checkRevisionConsistency(etcdRevision, failBelowRevision
d.Logger.Infof("No snapshot found.")
if etcdRevision < failBelowRevision {
d.Logger.Infof("current etcd revision (%d) is less than fail below revision (%d): possible data loss", etcdRevision, failBelowRevision)
return FailBelowRevisionConsistencyError, nil
return FailBelowRevisionConsistencyError, latestSnapshotRevision, nil
}
return DataDirectoryValid, nil
return DataDirectoryValid, latestSnapshotRevision, nil
}

if etcdRevision < latestSnapshotRevision {
d.Logger.Infof("current etcd revision (%d) is less than latest snapshot revision (%d): possible data loss", etcdRevision, latestSnapshotRevision)
d.Logger.Infof("current etcd revision (%d) is less than latest snapshot revision (%d)", etcdRevision, latestSnapshotRevision)
return RevisionConsistencyError, latestSnapshotRevision, nil
}

return DataDirectoryValid, latestSnapshotRevision, nil
}

// checkFullRevisionConsistency starts an embedded etcd and then compares the latest revision of etcd db file and the latest snapshot revision to verify that the etcd revision is not lesser than snapshot revision.
// Return DataDirStatus indicating whether WALs file have uncommited data which it was unable to flush to DB or latest DB revision is still less than snapshot revision.
func (d *DataValidator) checkFullRevisionConsistency(dataDir string, latestSnapshotRevision int64) (DataDirStatus, error) {
var latestSyncedEtcdRevision int64

d.Logger.Info("Starting embedded etcd server...")
e, err := restorer.StartEmbeddedEtcd(logrus.NewEntry(d.Logger), dataDir, d.Config.EmbeddedEtcdQuotaBytes)
if err != nil {
d.Logger.Infof("unable to start embedded etcd: %v", err)
return DataDirectoryCorrupt, err
}
defer func() {
e.Server.Stop()
e.Close()
}()
client, err := clientv3.NewFromURL(e.Clients[0].Addr().String())
if err != nil {
d.Logger.Infof("unable to get the embedded etcd client: %v", err)
return DataDirectoryCorrupt, err
}
defer client.Close()

timer := time.NewTimer(embeddedEtcdPingLimitDuration)

waitLoop:
for {
select {
case <-timer.C:
break waitLoop
default:
latestSyncedEtcdRevision, _ = getLatestSyncedRevision(client)
if latestSyncedEtcdRevision >= latestSnapshotRevision {
d.Logger.Infof("After starting embeddedEtcd backend DB file revision (%d) is greater than or equal to latest snapshot revision (%d): no data loss", latestSyncedEtcdRevision, latestSnapshotRevision)
break waitLoop
}
time.Sleep(1 * time.Second)
}
}
defer timer.Stop()

if latestSyncedEtcdRevision < latestSnapshotRevision {
d.Logger.Infof("After starting embeddedEtcd backend DB file revision (%d) is less than latest snapshot revision (%d): possible data loss", latestSyncedEtcdRevision, latestSnapshotRevision)
return RevisionConsistencyError, nil
}

Expand Down Expand Up @@ -300,3 +365,19 @@ func getLatestEtcdRevision(path string) (int64, error) {

return rev, nil
}

// getLatestSyncedRevision finds out the latest revision on etcd db file when embedded etcd is started to double check the latest revision of etcd db file.
func getLatestSyncedRevision(client *clientv3.Client) (int64, error) {
var latestSyncedRevision int64

ctx, cancel := context.WithTimeout(context.TODO(), connectionTimeout)
defer cancel()
resp, err := client.Get(ctx, "", clientv3.WithLastRev()...)
if err != nil {
fmt.Printf("Failed to get the latest etcd revision: %v\n", err)
return latestSyncedRevision, err
}
latestSyncedRevision = resp.Header.Revision

return latestSyncedRevision, nil
}
58 changes: 56 additions & 2 deletions pkg/initializer/validator/datavalidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
package validator_test

import (
"context"
"fmt"
"math"
"os"
"path"
"time"

"github.com/gardener/etcd-backup-restore/pkg/snapstore"
"github.com/gardener/etcd-backup-restore/test/utils"
Expand Down Expand Up @@ -132,11 +134,12 @@ var _ = Describe("Running Datavalidator", func() {
Context("with corrupt db file", func() {
It("should return DataDirStatus as DataDirectoryCorrupt, and nil error", func() {
dbFile := path.Join(restoreDataDir, "member", "snap", "db")
_, err = os.Stat(dbFile)
dbFileInfo, err := os.Stat(dbFile)
Expect(err).ShouldNot(HaveOccurred())

tempFile := path.Join(outputDir, "temp", "db")
err = copyFile(dbFile, tempFile)

err = copyFile(dbFile, tempFile, dbFileInfo.Mode())
Expect(err).ShouldNot(HaveOccurred())

file, err := os.OpenFile(
Expand Down Expand Up @@ -225,6 +228,56 @@ var _ = Describe("Running Datavalidator", func() {
})
})

Context("with inconsistent revision numbers between etcd and latest snapshot and WALs file have some uncommitted data", func() {
It("should return DataDirStatus as DataDirectoryValid and nil error", func() {

snapPath := path.Join(restoreDataDir, "member", "snap")
tempPath := path.Join(outputDir, "temp")

// copy the snap dir to tempPath
err = copyDir(snapPath, tempPath)
Expect(err).ShouldNot(HaveOccurred())

defer func() {
err = os.RemoveAll(tempPath)
Expect(err).ShouldNot(HaveOccurred())
}()

// start etcd
etcd, err := utils.StartEmbeddedEtcd(testCtx, restoreDataDir, logger)
Expect(err).ShouldNot(HaveOccurred())
endpoints := []string{etcd.Clients[0].Addr().String()}

resp := &utils.EtcdDataPopulationResponse{}
// populate the etcd with some more keys
utils.PopulateEtcd(testCtx, logger, endpoints, 0, int(keyTo/2), resp)
Expect(resp.Err).ShouldNot(HaveOccurred())

//run the snapshotter
deltaSnapshotPeriod := 5 * time.Second
ctx, cancel := context.WithTimeout(testCtx, time.Duration(15*time.Second))
err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ctx.Done())
Expect(err).ShouldNot(HaveOccurred())

etcd.Close()
cancel()

//remove the snap dir,so that WALs file can have data which is ahead of DB file.
err = os.RemoveAll(snapPath)
Expect(err).ShouldNot(HaveOccurred())

//make a snap dir, copy the content of old snap dir and place inside member dir
err = os.Mkdir(snapPath, 0777)
Expect(err).ShouldNot(HaveOccurred())
err = copyDir(tempPath, snapPath)
Expect(err).ShouldNot(HaveOccurred())

dataDirStatus, err := validator.Validate(Full, 0)
Expect(err).ShouldNot(HaveOccurred())
Expect(int(dataDirStatus)).Should(Equal(DataDirectoryValid))
})
})

Context("with fail below revision configured to low value and no snapshots taken", func() {
It("should return DataDirStatus as DataDirectoryValid, and nil error", func() {
validator.Config.SnapstoreConfig.Container = path.Join(snapstoreBackupDir, "tmp")
Expand Down Expand Up @@ -293,6 +346,7 @@ var _ = Describe("Running Datavalidator", func() {
Expect(int(dataDirStatus)).Should(Equal(DataDirectoryStatusUnknown))
})
})

})

func createCorruptSnap(filePath string) {
Expand Down
13 changes: 9 additions & 4 deletions pkg/initializer/validator/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package validator

import (
"time"

"github.com/gardener/etcd-backup-restore/pkg/snapstore"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
Expand All @@ -36,12 +38,14 @@ const (
DataDirectoryStatusUnknown
// RevisionConsistencyError indicates current etcd revision is inconsistent with latest snapshot revision.
RevisionConsistencyError
//FailBelowRevisionConsistencyError indicates the current etcd revision is inconsistent with failBelowRevision.
// FailBelowRevisionConsistencyError indicates the current etcd revision is inconsistent with failBelowRevision.
FailBelowRevisionConsistencyError
)

const (
snapSuffix = ".snap"
snapSuffix = ".snap"
connectionTimeout = time.Duration(10 * time.Second)
embeddedEtcdPingLimitDuration = 60 * time.Second
)

// Mode is the Validation mode passed on to the DataValidator
Expand All @@ -56,8 +60,9 @@ const (

// Config store configuration for DataValidator.
type Config struct {
DataDir string
SnapstoreConfig *snapstore.Config
DataDir string
EmbeddedEtcdQuotaBytes int64
SnapstoreConfig *snapstore.Config
}

// DataValidator contains implements Validator interface to perform data validation.
Expand Down
35 changes: 33 additions & 2 deletions pkg/initializer/validator/validator_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,47 @@ func runSnapshotter(logger *logrus.Entry, deltaSnapshotPeriod time.Duration, end
}

// copyFile copies the contents of the file at sourceFilePath into the file at destinationFilePath. If no file exists at destinationFilePath, a new file is created before copying
func copyFile(sourceFilePath, destinationFilePath string) error {
func copyFile(sourceFilePath, destinationFilePath string, filePermission os.FileMode) error {
data, err := ioutil.ReadFile(sourceFilePath)
if err != nil {
return fmt.Errorf("unable to read source file %s: %v", sourceFilePath, err)
}

err = ioutil.WriteFile(destinationFilePath, data, 0700)
err = ioutil.WriteFile(destinationFilePath, data, filePermission)
if err != nil {
return fmt.Errorf("unable to create destination file %s: %v", destinationFilePath, err)
}
return nil
}

// copyDir copies the contents of the Source dir to the destination dir.
func copyDir(sourceDirPath, destinationDirPath string) error {
if len(sourceDirPath) == 0 || len(destinationDirPath) == 0 {
return nil
}

files, err := ioutil.ReadDir(sourceDirPath)
if err != nil {
return err
}

for _, file := range files {
sourcePath := path.Join(sourceDirPath, file.Name())
destPath := path.Join(destinationDirPath, file.Name())

fileInfo, err := os.Stat(sourcePath)
if err != nil {
return err
}

if fileInfo.Mode().IsDir() {
os.Mkdir(destPath, fileInfo.Mode())
copyDir(sourcePath, destPath)
} else if fileInfo.Mode().IsRegular() {
copyFile(sourcePath, destPath, fileInfo.Mode())
} else {
return fmt.Errorf("File format not known")
}
}
return nil
}
10 changes: 5 additions & 5 deletions pkg/snapshot/restorer/restorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (r *Restorer) Restore(ro RestoreOptions) error {
return nil
}
r.logger.Infof("Starting embedded etcd server...")
e, err := startEmbeddedEtcd(r.logger, ro)
e, err := StartEmbeddedEtcd(r.logger, ro.Config.RestoreDataDir, ro.Config.EmbeddedEtcdQuotaBytes)
if err != nil {
return err
}
Expand Down Expand Up @@ -307,10 +307,10 @@ func makeWALAndSnap(logger *zap.Logger, waldir, snapdir string, cl *membership.R
return w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term})
}

// startEmbeddedEtcd starts the embedded etcd server.
func startEmbeddedEtcd(logger *logrus.Entry, ro RestoreOptions) (*embed.Etcd, error) {
// StartEmbeddedEtcd starts the embedded etcd server.
func StartEmbeddedEtcd(logger *logrus.Entry, dataDir string, embeddedEtcdQuotaBytes int64) (*embed.Etcd, error) {
cfg := embed.NewConfig()
cfg.Dir = filepath.Join(ro.Config.RestoreDataDir)
cfg.Dir = filepath.Join(dataDir)
DefaultListenPeerURLs := "http://localhost:0"
DefaultListenClientURLs := "http://localhost:0"
DefaultInitialAdvertisePeerURLs := "http://localhost:0"
Expand All @@ -324,7 +324,7 @@ func startEmbeddedEtcd(logger *logrus.Entry, ro RestoreOptions) (*embed.Etcd, er
cfg.APUrls = []url.URL{*apurl}
cfg.ACUrls = []url.URL{*acurl}
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
cfg.QuotaBackendBytes = ro.Config.EmbeddedEtcdQuotaBytes
cfg.QuotaBackendBytes = embeddedEtcdQuotaBytes
cfg.Logger = "zap"
e, err := embed.StartEtcd(cfg)
if err != nil {
Expand Down

0 comments on commit 8d50c71

Please sign in to comment.