diff --git a/batch_writer.go b/batch_writer.go index 9ae9d13d..5ed22cd8 100644 --- a/batch_writer.go +++ b/batch_writer.go @@ -19,7 +19,7 @@ func init() { } type BatchWriterVerificationFailed struct { - mismatchedPaginationKeys []uint64 + mismatchedPaginationKeys []InlineVerifierMismatches table string } diff --git a/inline_verifier.go b/inline_verifier.go index d221479a..80d56bab 100644 --- a/inline_verifier.go +++ b/inline_verifier.go @@ -232,6 +232,12 @@ func (s *BinlogVerifyStore) Serialize() BinlogVerifySerializedStore { return s.store.Copy() } +type InlineVerifierMismatches struct { + Pk uint64 + SourceChecksum string + TargetChecksum string +} + type InlineVerifier struct { SourceDB *sql.DB TargetDB *sql.DB @@ -304,7 +310,7 @@ func (v *InlineVerifier) Result() (VerificationResultAndStatus, error) { return v.backgroundVerificationResultAndStatus, v.backgroundVerificationErr } -func (v *InlineVerifier) CheckFingerprintInline(tx *sql.Tx, targetSchema, targetTable string, sourceBatch *RowBatch, enforceInlineVerification bool) ([]uint64, error) { +func (v *InlineVerifier) CheckFingerprintInline(tx *sql.Tx, targetSchema, targetTable string, sourceBatch *RowBatch, enforceInlineVerification bool) ([]InlineVerifierMismatches, error) { table := sourceBatch.TableSchema() paginationKeys := make([]uint64, len(sourceBatch.Values())) @@ -353,8 +359,8 @@ func (v *InlineVerifier) CheckFingerprintInline(tx *sql.Tx, targetSchema, target mismatches := v.compareHashesAndData(sourceFingerprints, targetFingerprints, sourceDecompressedData, targetDecompressedData) if !enforceInlineVerification { - for _, mismatchedPk := range mismatches { - v.reverifyStore.Add(table, mismatchedPk) + for _, mismatch := range mismatches { + v.reverifyStore.Add(table, mismatch.Pk) } if len(mismatches) > 0 { @@ -446,15 +452,19 @@ func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error) { messageBuf.WriteString("cutover verification failed for: ") incorrectTables := make([]string, 0) for schemaName, _ := range mismatches { - for tableName, paginationKeys := range mismatches[schemaName] { + for tableName, mismatches := range mismatches[schemaName] { tableName = fmt.Sprintf("%s.%s", schemaName, tableName) incorrectTables = append(incorrectTables, tableName) messageBuf.WriteString(tableName) messageBuf.WriteString(" [paginationKeys: ") - for _, paginationKey := range paginationKeys { - messageBuf.WriteString(strconv.FormatUint(paginationKey, 10)) - messageBuf.WriteString(" ") + for _, mismatch := range mismatches { + messageBuf.WriteString(strconv.FormatUint(mismatch.Pk, 10)) + messageBuf.WriteString(" (source: ") + messageBuf.WriteString(mismatch.SourceChecksum) + messageBuf.WriteString(", target: ") + messageBuf.WriteString(mismatch.TargetChecksum) + messageBuf.WriteString(") ") } messageBuf.WriteString("] ") } @@ -555,20 +565,28 @@ func (v *InlineVerifier) decompressData(table *TableSchema, column string, compr } } -func (v *InlineVerifier) compareHashes(source, target map[uint64][]byte) map[uint64]struct{} { - mismatchSet := map[uint64]struct{}{} +func (v *InlineVerifier) compareHashes(source, target map[uint64][]byte) map[uint64]InlineVerifierMismatches { + mismatchSet := map[uint64]InlineVerifierMismatches{} for paginationKey, targetHash := range target { sourceHash, exists := source[paginationKey] if !bytes.Equal(sourceHash, targetHash) || !exists { - mismatchSet[paginationKey] = struct{}{} + mismatchSet[paginationKey] = InlineVerifierMismatches{ + Pk: paginationKey, + SourceChecksum: string(sourceHash), + TargetChecksum: string(targetHash), + } } } for paginationKey, sourceHash := range source { targetHash, exists := target[paginationKey] if !bytes.Equal(sourceHash, targetHash) || !exists { - mismatchSet[paginationKey] = struct{}{} + mismatchSet[paginationKey] = InlineVerifierMismatches{ + Pk: paginationKey, + SourceChecksum: string(sourceHash), + TargetChecksum: string(targetHash), + } } } @@ -613,17 +631,21 @@ func (v *InlineVerifier) compareDecompressedData(source, target map[uint64]map[s return mismatchSet } -func (v *InlineVerifier) compareHashesAndData(sourceHashes, targetHashes map[uint64][]byte, sourceData, targetData map[uint64]map[string][]byte) []uint64 { +func (v *InlineVerifier) compareHashesAndData(sourceHashes, targetHashes map[uint64][]byte, sourceData, targetData map[uint64]map[string][]byte) []InlineVerifierMismatches { mismatches := v.compareHashes(sourceHashes, targetHashes) compressedMismatch := v.compareDecompressedData(sourceData, targetData) for paginationKey, _ := range compressedMismatch { - mismatches[paginationKey] = struct{}{} + mismatches[paginationKey] = InlineVerifierMismatches{ + Pk: paginationKey, + SourceChecksum: "compressed-data-mismatch", // TODO: compute the hash of the compressed data and put it here + TargetChecksum: "compressed-data-mismatch", + } } - mismatchList := make([]uint64, 0, len(mismatches)) + mismatchList := make([]InlineVerifierMismatches, 0, len(mismatches)) - for paginationKey, _ := range mismatches { - mismatchList = append(mismatchList, paginationKey) + for _, mismatch := range mismatches { + mismatchList = append(mismatchList, mismatch) } return mismatchList @@ -650,21 +672,21 @@ func (v *InlineVerifier) binlogEventListener(evs []DMLEvent) error { return nil } -func (v *InlineVerifier) readdMismatchedPaginationKeysToBeVerifiedAgain(mismatches map[string]map[string][]uint64) { +func (v *InlineVerifier) readdMismatchedPaginationKeysToBeVerifiedAgain(mismatches map[string]map[string][]InlineVerifierMismatches) { for schemaName, _ := range mismatches { - for tableName, paginationKeys := range mismatches[schemaName] { + for tableName, mismatches := range mismatches[schemaName] { table := v.TableSchemaCache.Get(schemaName, tableName) - for _, paginationKey := range paginationKeys { - v.reverifyStore.Add(table, paginationKey) + for _, mismatch := range mismatches { + v.reverifyStore.Add(table, mismatch.Pk) } } } } // Returns mismatches in the form of db -> table -> paginationKeys -func (v *InlineVerifier) verifyAllEventsInStore() (bool, map[string]map[string][]uint64, error) { +func (v *InlineVerifier) verifyAllEventsInStore() (bool, map[string]map[string][]InlineVerifierMismatches, error) { mismatchFound := false - mismatches := make(map[string]map[string][]uint64) + mismatches := make(map[string]map[string][]InlineVerifierMismatches) allBatches := v.reverifyStore.Batches(v.BatchSize) if len(allBatches) == 0 { @@ -684,11 +706,11 @@ func (v *InlineVerifier) verifyAllEventsInStore() (bool, map[string]map[string][ mismatchFound = true if _, exists := mismatches[batch.SchemaName]; !exists { - mismatches[batch.SchemaName] = make(map[string][]uint64) + mismatches[batch.SchemaName] = make(map[string][]InlineVerifierMismatches) } if _, exists := mismatches[batch.SchemaName][batch.TableName]; !exists { - mismatches[batch.SchemaName][batch.TableName] = make([]uint64, 0) + mismatches[batch.SchemaName][batch.TableName] = make([]InlineVerifierMismatches, 0) } mismatches[batch.SchemaName][batch.TableName] = append(mismatches[batch.SchemaName][batch.TableName], batchMismatches...) @@ -702,7 +724,7 @@ func (v *InlineVerifier) verifyAllEventsInStore() (bool, map[string]map[string][ // Since the mismatches gets re-added to the reverify store, this must return // a union of mismatches of fingerprints and mismatches due to decompressed // data. -func (v *InlineVerifier) verifyBinlogBatch(batch BinlogVerifyBatch) ([]uint64, error) { +func (v *InlineVerifier) verifyBinlogBatch(batch BinlogVerifyBatch) ([]InlineVerifierMismatches, error) { targetSchema := batch.SchemaName if targetSchemaName, exists := v.DatabaseRewrites[targetSchema]; exists { targetSchema = targetSchemaName @@ -715,7 +737,7 @@ func (v *InlineVerifier) verifyBinlogBatch(batch BinlogVerifyBatch) ([]uint64, e sourceTableSchema := v.TableSchemaCache.Get(batch.SchemaName, batch.TableName) if sourceTableSchema == nil { - return []uint64{}, fmt.Errorf("programming error? %s.%s is not found in TableSchemaCache but is being reverified", batch.SchemaName, batch.TableName) + return []InlineVerifierMismatches{}, fmt.Errorf("programming error? %s.%s is not found in TableSchemaCache but is being reverified", batch.SchemaName, batch.TableName) } wg := &sync.WaitGroup{}