From 452615853787b9a2c213fa93abef1b91ef959a1d Mon Sep 17 00:00:00 2001 From: ishan16696 Date: Mon, 9 Nov 2020 00:29:12 +0530 Subject: [PATCH] Fixed an issue when WALs file have uncommitted data and etcd terminates abruptly, this can lead to unnecessary data restoration. --- pkg/initializer/initializer.go | 6 +- pkg/initializer/validator/datavalidator.go | 103 ++++++++++++++++-- .../validator/datavalidator_test.go | 58 +++++++++- pkg/initializer/validator/types.go | 13 ++- .../validator/validator_suite_test.go | 35 +++++- pkg/snapshot/restorer/restorer.go | 10 +- 6 files changed, 199 insertions(+), 26 deletions(-) diff --git a/pkg/initializer/initializer.go b/pkg/initializer/initializer.go index 459ea6763..bce01ff06 100644 --- a/pkg/initializer/initializer.go +++ b/pkg/initializer/initializer.go @@ -51,6 +51,7 @@ func (e *EtcdInitializer) Initialize(mode validator.Mode, failBelowRevision int6 metrics.ValidationDurationSeconds.With(prometheus.Labels{metrics.LabelSucceeded: metrics.ValueSucceededFalse}).Observe(time.Now().Sub(start).Seconds()) return fmt.Errorf("failed to initialize since fail below revision check failed") } + metrics.ValidationDurationSeconds.With(prometheus.Labels{metrics.LabelSucceeded: metrics.ValueSucceededTrue}).Observe(time.Now().Sub(start).Seconds()) if dataDirStatus != validator.DataDirectoryValid { @@ -77,8 +78,9 @@ func NewInitializer(options *restorer.RestoreOptions, snapstoreConfig *snapstore }, Validator: &validator.DataValidator{ Config: &validator.Config{ - DataDir: options.Config.RestoreDataDir, - SnapstoreConfig: snapstoreConfig, + DataDir: options.Config.RestoreDataDir, + EmbeddedEtcdQuotaBytes: options.Config.EmbeddedEtcdQuotaBytes, + SnapstoreConfig: snapstoreConfig, }, Logger: logger, ZapLogger: zapLogger, diff --git a/pkg/initializer/validator/datavalidator.go b/pkg/initializer/validator/datavalidator.go index 48d0097e9..b703eb27b 100644 --- a/pkg/initializer/validator/datavalidator.go +++ b/pkg/initializer/validator/datavalidator.go @@ -15,6 +15,7 @@ package validator import ( + "context" "encoding/binary" "errors" "fmt" @@ -22,11 +23,14 @@ import ( "io" "os" "path/filepath" + "time" "github.com/gardener/etcd-backup-restore/pkg/miscellaneous" + "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" "github.com/gardener/etcd-backup-restore/pkg/snapstore" "github.com/sirupsen/logrus" bolt "go.etcd.io/bbolt" + "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/etcdserver/api/snap" "go.etcd.io/etcd/raft/raftpb" "go.etcd.io/etcd/wal" @@ -114,8 +118,19 @@ func (d *DataValidator) sanityCheck(failBelowRevision int64) (DataDirStatus, err d.Logger.Infof("unable to get current etcd revision from backend db file: %v", err) return DataDirectoryCorrupt, nil } - d.Logger.Info("Checking for revision consistency...") - return d.checkRevisionConsistency(etcdRevision, failBelowRevision) + + d.Logger.Info("Checking for etcd revision consistency...") + etcdRevisionStatus, latestSnapshotRevision, err := d.checkEtcdDataRevisionConsistency(etcdRevision, failBelowRevision) + + // if etcd revision is inconsistent with latest snapshot revision then + // check the etcd revision consistency by starting an embedded etcd since the WALs file can have uncommited data which it was unable to flush to Bolt DB. + if etcdRevisionStatus == RevisionConsistencyError { + d.Logger.Info("Checking for Full revision consistency...") + fullRevisionConsistencyStatus, err := d.checkFullRevisionConsistency(dataDir, latestSnapshotRevision) + return fullRevisionConsistencyStatus, err + } + + return etcdRevisionStatus, err } // checkForDataCorruption will check for corruption of different files used by etcd. @@ -229,18 +244,20 @@ func verifyDB(path string) error { }) } -// checkRevisionConsistency compares the latest revisions on the etcd db file and the latest snapshot to verify that the etcd revision is not lesser than snapshot revision. -// Return DataDirStatus indicating whether it is due to failBelowRevision or latest snapshot revision for snapstore. -func (d *DataValidator) checkRevisionConsistency(etcdRevision, failBelowRevision int64) (DataDirStatus, error) { +// checkEtcdDataRevisionConsistency compares the latest revision of the etcd db file and the latest snapshot revision to verify that the etcd revision is not lesser than snapshot revision. +// Return DataDirStatus indicating whether it is due to failBelowRevision or latest snapshot revision for snapstore and also return the latest snapshot revision. +func (d *DataValidator) checkEtcdDataRevisionConsistency(etcdRevision, failBelowRevision int64) (DataDirStatus, int64, error) { + var latestSnapshotRevision int64 + latestSnapshotRevision = 0 + store, err := snapstore.GetSnapstore(d.Config.SnapstoreConfig) if err != nil { - return DataDirectoryStatusUnknown, fmt.Errorf("unable to fetch snapstore: %v", err) + return DataDirectoryStatusUnknown, latestSnapshotRevision, fmt.Errorf("unable to fetch snapstore: %v", err) } - var latestSnapshotRevision int64 fullSnap, deltaSnaps, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) if err != nil { - return DataDirectoryStatusUnknown, fmt.Errorf("unable to get snapshots from store: %v", err) + return DataDirectoryStatusUnknown, latestSnapshotRevision, fmt.Errorf("unable to get snapshots from store: %v", err) } if len(deltaSnaps) != 0 { latestSnapshotRevision = deltaSnaps[len(deltaSnaps)-1].LastRevision @@ -250,13 +267,61 @@ func (d *DataValidator) checkRevisionConsistency(etcdRevision, failBelowRevision d.Logger.Infof("No snapshot found.") if etcdRevision < failBelowRevision { d.Logger.Infof("current etcd revision (%d) is less than fail below revision (%d): possible data loss", etcdRevision, failBelowRevision) - return FailBelowRevisionConsistencyError, nil + return FailBelowRevisionConsistencyError, latestSnapshotRevision, nil } - return DataDirectoryValid, nil + return DataDirectoryValid, latestSnapshotRevision, nil } if etcdRevision < latestSnapshotRevision { - d.Logger.Infof("current etcd revision (%d) is less than latest snapshot revision (%d): possible data loss", etcdRevision, latestSnapshotRevision) + d.Logger.Infof("current etcd revision (%d) is less than latest snapshot revision (%d)", etcdRevision, latestSnapshotRevision) + return RevisionConsistencyError, latestSnapshotRevision, nil + } + + return DataDirectoryValid, latestSnapshotRevision, nil +} + +// checkFullRevisionConsistency starts an embedded etcd and then compares the latest revision of etcd db file and the latest snapshot revision to verify that the etcd revision is not lesser than snapshot revision. +// Return DataDirStatus indicating whether WALs file have uncommited data which it was unable to flush to DB or latest DB revision is still less than snapshot revision. +func (d *DataValidator) checkFullRevisionConsistency(dataDir string, latestSnapshotRevision int64) (DataDirStatus, error) { + var latestSyncedEtcdRevision int64 + + d.Logger.Info("Starting embedded etcd server...") + e, err := restorer.StartEmbeddedEtcd(logrus.NewEntry(d.Logger), dataDir, d.Config.EmbeddedEtcdQuotaBytes) + if err != nil { + d.Logger.Infof("unable to start embedded etcd: %v", err) + return DataDirectoryCorrupt, err + } + defer func() { + e.Server.Stop() + e.Close() + }() + client, err := clientv3.NewFromURL(e.Clients[0].Addr().String()) + if err != nil { + d.Logger.Infof("unable to get the embedded etcd client: %v", err) + return DataDirectoryCorrupt, err + } + defer client.Close() + + timer := time.NewTimer(embeddedEtcdPingLimitDuration) + +waitLoop: + for { + select { + case <-timer.C: + break waitLoop + default: + latestSyncedEtcdRevision, _ = getLatestSyncedRevision(client) + if latestSyncedEtcdRevision >= latestSnapshotRevision { + d.Logger.Infof("After starting embeddedEtcd backend DB file revision (%d) is greater than or equal to latest snapshot revision (%d): no data loss", latestSyncedEtcdRevision, latestSnapshotRevision) + break waitLoop + } + time.Sleep(1 * time.Second) + } + } + defer timer.Stop() + + if latestSyncedEtcdRevision < latestSnapshotRevision { + d.Logger.Infof("After starting embeddedEtcd backend DB file revision (%d) is less than latest snapshot revision (%d): possible data loss", latestSyncedEtcdRevision, latestSnapshotRevision) return RevisionConsistencyError, nil } @@ -300,3 +365,19 @@ func getLatestEtcdRevision(path string) (int64, error) { return rev, nil } + +// getLatestSyncedRevision finds out the latest revision on etcd db file when embedded etcd is started to double check the latest revision of etcd db file. +func getLatestSyncedRevision(client *clientv3.Client) (int64, error) { + var latestSyncedRevision int64 + + ctx, cancel := context.WithTimeout(context.TODO(), connectionTimeout) + defer cancel() + resp, err := client.Get(ctx, "", clientv3.WithLastRev()...) + if err != nil { + fmt.Printf("Failed to get the latest etcd revision: %v\n", err) + return latestSyncedRevision, err + } + latestSyncedRevision = resp.Header.Revision + + return latestSyncedRevision, nil +} diff --git a/pkg/initializer/validator/datavalidator_test.go b/pkg/initializer/validator/datavalidator_test.go index 5b15d0922..12c9e6823 100644 --- a/pkg/initializer/validator/datavalidator_test.go +++ b/pkg/initializer/validator/datavalidator_test.go @@ -14,10 +14,12 @@ package validator_test import ( + "context" "fmt" "math" "os" "path" + "time" "github.com/gardener/etcd-backup-restore/pkg/snapstore" "github.com/gardener/etcd-backup-restore/test/utils" @@ -132,11 +134,12 @@ var _ = Describe("Running Datavalidator", func() { Context("with corrupt db file", func() { It("should return DataDirStatus as DataDirectoryCorrupt, and nil error", func() { dbFile := path.Join(restoreDataDir, "member", "snap", "db") - _, err = os.Stat(dbFile) + dbFileInfo, err := os.Stat(dbFile) Expect(err).ShouldNot(HaveOccurred()) tempFile := path.Join(outputDir, "temp", "db") - err = copyFile(dbFile, tempFile) + + err = copyFile(dbFile, tempFile, dbFileInfo.Mode()) Expect(err).ShouldNot(HaveOccurred()) file, err := os.OpenFile( @@ -225,6 +228,56 @@ var _ = Describe("Running Datavalidator", func() { }) }) + Context("with inconsistent revision numbers between etcd and latest snapshot and WALs file have some uncommitted data", func() { + It("should return DataDirStatus as DataDirectoryValid and nil error", func() { + + snapPath := path.Join(restoreDataDir, "member", "snap") + tempPath := path.Join(outputDir, "temp") + + // copy the snap dir to tempPath + err = copyDir(snapPath, tempPath) + Expect(err).ShouldNot(HaveOccurred()) + + defer func() { + err = os.RemoveAll(tempPath) + Expect(err).ShouldNot(HaveOccurred()) + }() + + // start etcd + etcd, err := utils.StartEmbeddedEtcd(testCtx, restoreDataDir, logger) + Expect(err).ShouldNot(HaveOccurred()) + endpoints := []string{etcd.Clients[0].Addr().String()} + + resp := &utils.EtcdDataPopulationResponse{} + // populate the etcd with some more keys + utils.PopulateEtcd(testCtx, logger, endpoints, 0, int(keyTo/2), resp) + Expect(resp.Err).ShouldNot(HaveOccurred()) + + //run the snapshotter + deltaSnapshotPeriod := 5 * time.Second + ctx, cancel := context.WithTimeout(testCtx, time.Duration(15*time.Second)) + err = runSnapshotter(logger, deltaSnapshotPeriod, endpoints, ctx.Done()) + Expect(err).ShouldNot(HaveOccurred()) + + etcd.Close() + cancel() + + //remove the snap dir,so that WALs file can have data which is ahead of DB file. + err = os.RemoveAll(snapPath) + Expect(err).ShouldNot(HaveOccurred()) + + //make a snap dir, copy the content of old snap dir and place inside member dir + err = os.Mkdir(snapPath, 0777) + Expect(err).ShouldNot(HaveOccurred()) + err = copyDir(tempPath, snapPath) + Expect(err).ShouldNot(HaveOccurred()) + + dataDirStatus, err := validator.Validate(Full, 0) + Expect(err).ShouldNot(HaveOccurred()) + Expect(int(dataDirStatus)).Should(Equal(DataDirectoryValid)) + }) + }) + Context("with fail below revision configured to low value and no snapshots taken", func() { It("should return DataDirStatus as DataDirectoryValid, and nil error", func() { validator.Config.SnapstoreConfig.Container = path.Join(snapstoreBackupDir, "tmp") @@ -293,6 +346,7 @@ var _ = Describe("Running Datavalidator", func() { Expect(int(dataDirStatus)).Should(Equal(DataDirectoryStatusUnknown)) }) }) + }) func createCorruptSnap(filePath string) { diff --git a/pkg/initializer/validator/types.go b/pkg/initializer/validator/types.go index eab1ac906..ed181ddf7 100644 --- a/pkg/initializer/validator/types.go +++ b/pkg/initializer/validator/types.go @@ -15,6 +15,8 @@ package validator import ( + "time" + "github.com/gardener/etcd-backup-restore/pkg/snapstore" "github.com/sirupsen/logrus" "go.uber.org/zap" @@ -36,12 +38,14 @@ const ( DataDirectoryStatusUnknown // RevisionConsistencyError indicates current etcd revision is inconsistent with latest snapshot revision. RevisionConsistencyError - //FailBelowRevisionConsistencyError indicates the current etcd revision is inconsistent with failBelowRevision. + // FailBelowRevisionConsistencyError indicates the current etcd revision is inconsistent with failBelowRevision. FailBelowRevisionConsistencyError ) const ( - snapSuffix = ".snap" + snapSuffix = ".snap" + connectionTimeout = time.Duration(10 * time.Second) + embeddedEtcdPingLimitDuration = 60 * time.Second ) // Mode is the Validation mode passed on to the DataValidator @@ -56,8 +60,9 @@ const ( // Config store configuration for DataValidator. type Config struct { - DataDir string - SnapstoreConfig *snapstore.Config + DataDir string + EmbeddedEtcdQuotaBytes int64 + SnapstoreConfig *snapstore.Config } // DataValidator contains implements Validator interface to perform data validation. diff --git a/pkg/initializer/validator/validator_suite_test.go b/pkg/initializer/validator/validator_suite_test.go index 9854056bb..710a047f3 100644 --- a/pkg/initializer/validator/validator_suite_test.go +++ b/pkg/initializer/validator/validator_suite_test.go @@ -111,16 +111,47 @@ func runSnapshotter(logger *logrus.Entry, deltaSnapshotPeriod time.Duration, end } // copyFile copies the contents of the file at sourceFilePath into the file at destinationFilePath. If no file exists at destinationFilePath, a new file is created before copying -func copyFile(sourceFilePath, destinationFilePath string) error { +func copyFile(sourceFilePath, destinationFilePath string, filePermission os.FileMode) error { data, err := ioutil.ReadFile(sourceFilePath) if err != nil { return fmt.Errorf("unable to read source file %s: %v", sourceFilePath, err) } - err = ioutil.WriteFile(destinationFilePath, data, 0700) + err = ioutil.WriteFile(destinationFilePath, data, filePermission) if err != nil { return fmt.Errorf("unable to create destination file %s: %v", destinationFilePath, err) } + return nil +} +// copyDir copies the contents of the Source dir to the destination dir. +func copyDir(sourceDirPath, destinationDirPath string) error { + if len(sourceDirPath) == 0 || len(destinationDirPath) == 0 { + return nil + } + + files, err := ioutil.ReadDir(sourceDirPath) + if err != nil { + return err + } + + for _, file := range files { + sourcePath := path.Join(sourceDirPath, file.Name()) + destPath := path.Join(destinationDirPath, file.Name()) + + fileInfo, err := os.Stat(sourcePath) + if err != nil { + return err + } + + if fileInfo.Mode().IsDir() { + os.Mkdir(destPath, fileInfo.Mode()) + copyDir(sourcePath, destPath) + } else if fileInfo.Mode().IsRegular() { + copyFile(sourcePath, destPath, fileInfo.Mode()) + } else { + return fmt.Errorf("File format not known") + } + } return nil } diff --git a/pkg/snapshot/restorer/restorer.go b/pkg/snapshot/restorer/restorer.go index 893c39005..3df79e6c0 100644 --- a/pkg/snapshot/restorer/restorer.go +++ b/pkg/snapshot/restorer/restorer.go @@ -73,7 +73,7 @@ func (r *Restorer) Restore(ro RestoreOptions) error { return nil } r.logger.Infof("Starting embedded etcd server...") - e, err := startEmbeddedEtcd(r.logger, ro) + e, err := StartEmbeddedEtcd(r.logger, ro.Config.RestoreDataDir, ro.Config.EmbeddedEtcdQuotaBytes) if err != nil { return err } @@ -307,10 +307,10 @@ func makeWALAndSnap(logger *zap.Logger, waldir, snapdir string, cl *membership.R return w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term}) } -// startEmbeddedEtcd starts the embedded etcd server. -func startEmbeddedEtcd(logger *logrus.Entry, ro RestoreOptions) (*embed.Etcd, error) { +// StartEmbeddedEtcd starts the embedded etcd server. +func StartEmbeddedEtcd(logger *logrus.Entry, dataDir string, embeddedEtcdQuotaBytes int64) (*embed.Etcd, error) { cfg := embed.NewConfig() - cfg.Dir = filepath.Join(ro.Config.RestoreDataDir) + cfg.Dir = filepath.Join(dataDir) DefaultListenPeerURLs := "http://localhost:0" DefaultListenClientURLs := "http://localhost:0" DefaultInitialAdvertisePeerURLs := "http://localhost:0" @@ -324,7 +324,7 @@ func startEmbeddedEtcd(logger *logrus.Entry, ro RestoreOptions) (*embed.Etcd, er cfg.APUrls = []url.URL{*apurl} cfg.ACUrls = []url.URL{*acurl} cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) - cfg.QuotaBackendBytes = ro.Config.EmbeddedEtcdQuotaBytes + cfg.QuotaBackendBytes = embeddedEtcdQuotaBytes cfg.Logger = "zap" e, err := embed.StartEtcd(cfg) if err != nil {