Skip to content

Commit

Permalink
Log checksum?
Browse files Browse the repository at this point in the history
  • Loading branch information
shuhaowu committed Nov 3, 2022
1 parent 05fb06b commit bf386e2
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 27 deletions.
2 changes: 1 addition & 1 deletion batch_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func init() {
}

type BatchWriterVerificationFailed struct {
mismatchedPaginationKeys []uint64
mismatchedPaginationKeys []InlineVerifierMismatches
table string
}

Expand Down
74 changes: 48 additions & 26 deletions inline_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,12 @@ func (s *BinlogVerifyStore) Serialize() BinlogVerifySerializedStore {
return s.store.Copy()
}

type InlineVerifierMismatches struct {
Pk uint64
SourceChecksum string
TargetChecksum string
}

type InlineVerifier struct {
SourceDB *sql.DB
TargetDB *sql.DB
Expand Down Expand Up @@ -304,7 +310,7 @@ func (v *InlineVerifier) Result() (VerificationResultAndStatus, error) {
return v.backgroundVerificationResultAndStatus, v.backgroundVerificationErr
}

func (v *InlineVerifier) CheckFingerprintInline(tx *sql.Tx, targetSchema, targetTable string, sourceBatch *RowBatch, enforceInlineVerification bool) ([]uint64, error) {
func (v *InlineVerifier) CheckFingerprintInline(tx *sql.Tx, targetSchema, targetTable string, sourceBatch *RowBatch, enforceInlineVerification bool) ([]InlineVerifierMismatches, error) {
table := sourceBatch.TableSchema()

paginationKeys := make([]uint64, len(sourceBatch.Values()))
Expand Down Expand Up @@ -353,8 +359,8 @@ func (v *InlineVerifier) CheckFingerprintInline(tx *sql.Tx, targetSchema, target
mismatches := v.compareHashesAndData(sourceFingerprints, targetFingerprints, sourceDecompressedData, targetDecompressedData)

if !enforceInlineVerification {
for _, mismatchedPk := range mismatches {
v.reverifyStore.Add(table, mismatchedPk)
for _, mismatch := range mismatches {
v.reverifyStore.Add(table, mismatch.Pk)
}

if len(mismatches) > 0 {
Expand Down Expand Up @@ -446,15 +452,19 @@ func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error) {
messageBuf.WriteString("cutover verification failed for: ")
incorrectTables := make([]string, 0)
for schemaName, _ := range mismatches {
for tableName, paginationKeys := range mismatches[schemaName] {
for tableName, mismatches := range mismatches[schemaName] {
tableName = fmt.Sprintf("%s.%s", schemaName, tableName)
incorrectTables = append(incorrectTables, tableName)

messageBuf.WriteString(tableName)
messageBuf.WriteString(" [paginationKeys: ")
for _, paginationKey := range paginationKeys {
messageBuf.WriteString(strconv.FormatUint(paginationKey, 10))
messageBuf.WriteString(" ")
for _, mismatch := range mismatches {
messageBuf.WriteString(strconv.FormatUint(mismatch.Pk, 10))
messageBuf.WriteString(" (source: ")
messageBuf.WriteString(mismatch.SourceChecksum)
messageBuf.WriteString(", target: ")
messageBuf.WriteString(mismatch.TargetChecksum)
messageBuf.WriteString(") ")
}
messageBuf.WriteString("] ")
}
Expand Down Expand Up @@ -555,20 +565,28 @@ func (v *InlineVerifier) decompressData(table *TableSchema, column string, compr
}
}

func (v *InlineVerifier) compareHashes(source, target map[uint64][]byte) map[uint64]struct{} {
mismatchSet := map[uint64]struct{}{}
func (v *InlineVerifier) compareHashes(source, target map[uint64][]byte) map[uint64]InlineVerifierMismatches {
mismatchSet := map[uint64]InlineVerifierMismatches{}

for paginationKey, targetHash := range target {
sourceHash, exists := source[paginationKey]
if !bytes.Equal(sourceHash, targetHash) || !exists {
mismatchSet[paginationKey] = struct{}{}
mismatchSet[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
SourceChecksum: string(sourceHash),
TargetChecksum: string(targetHash),
}
}
}

for paginationKey, sourceHash := range source {
targetHash, exists := target[paginationKey]
if !bytes.Equal(sourceHash, targetHash) || !exists {
mismatchSet[paginationKey] = struct{}{}
mismatchSet[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
SourceChecksum: string(sourceHash),
TargetChecksum: string(targetHash),
}
}
}

Expand Down Expand Up @@ -613,17 +631,21 @@ func (v *InlineVerifier) compareDecompressedData(source, target map[uint64]map[s
return mismatchSet
}

func (v *InlineVerifier) compareHashesAndData(sourceHashes, targetHashes map[uint64][]byte, sourceData, targetData map[uint64]map[string][]byte) []uint64 {
func (v *InlineVerifier) compareHashesAndData(sourceHashes, targetHashes map[uint64][]byte, sourceData, targetData map[uint64]map[string][]byte) []InlineVerifierMismatches {
mismatches := v.compareHashes(sourceHashes, targetHashes)
compressedMismatch := v.compareDecompressedData(sourceData, targetData)
for paginationKey, _ := range compressedMismatch {
mismatches[paginationKey] = struct{}{}
mismatches[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
SourceChecksum: "compressed-data-mismatch", // TODO: compute the hash of the compressed data and put it here
TargetChecksum: "compressed-data-mismatch",
}
}

mismatchList := make([]uint64, 0, len(mismatches))
mismatchList := make([]InlineVerifierMismatches, 0, len(mismatches))

for paginationKey, _ := range mismatches {
mismatchList = append(mismatchList, paginationKey)
for _, mismatch := range mismatches {
mismatchList = append(mismatchList, mismatch)
}

return mismatchList
Expand All @@ -650,21 +672,21 @@ func (v *InlineVerifier) binlogEventListener(evs []DMLEvent) error {
return nil
}

func (v *InlineVerifier) readdMismatchedPaginationKeysToBeVerifiedAgain(mismatches map[string]map[string][]uint64) {
func (v *InlineVerifier) readdMismatchedPaginationKeysToBeVerifiedAgain(mismatches map[string]map[string][]InlineVerifierMismatches) {
for schemaName, _ := range mismatches {
for tableName, paginationKeys := range mismatches[schemaName] {
for tableName, mismatches := range mismatches[schemaName] {
table := v.TableSchemaCache.Get(schemaName, tableName)
for _, paginationKey := range paginationKeys {
v.reverifyStore.Add(table, paginationKey)
for _, mismatch := range mismatches {
v.reverifyStore.Add(table, mismatch.Pk)
}
}
}
}

// Returns mismatches in the form of db -> table -> paginationKeys
func (v *InlineVerifier) verifyAllEventsInStore() (bool, map[string]map[string][]uint64, error) {
func (v *InlineVerifier) verifyAllEventsInStore() (bool, map[string]map[string][]InlineVerifierMismatches, error) {
mismatchFound := false
mismatches := make(map[string]map[string][]uint64)
mismatches := make(map[string]map[string][]InlineVerifierMismatches)
allBatches := v.reverifyStore.Batches(v.BatchSize)

if len(allBatches) == 0 {
Expand All @@ -684,11 +706,11 @@ func (v *InlineVerifier) verifyAllEventsInStore() (bool, map[string]map[string][
mismatchFound = true

if _, exists := mismatches[batch.SchemaName]; !exists {
mismatches[batch.SchemaName] = make(map[string][]uint64)
mismatches[batch.SchemaName] = make(map[string][]InlineVerifierMismatches)
}

if _, exists := mismatches[batch.SchemaName][batch.TableName]; !exists {
mismatches[batch.SchemaName][batch.TableName] = make([]uint64, 0)
mismatches[batch.SchemaName][batch.TableName] = make([]InlineVerifierMismatches, 0)
}

mismatches[batch.SchemaName][batch.TableName] = append(mismatches[batch.SchemaName][batch.TableName], batchMismatches...)
Expand All @@ -702,7 +724,7 @@ func (v *InlineVerifier) verifyAllEventsInStore() (bool, map[string]map[string][
// Since the mismatches gets re-added to the reverify store, this must return
// a union of mismatches of fingerprints and mismatches due to decompressed
// data.
func (v *InlineVerifier) verifyBinlogBatch(batch BinlogVerifyBatch) ([]uint64, error) {
func (v *InlineVerifier) verifyBinlogBatch(batch BinlogVerifyBatch) ([]InlineVerifierMismatches, error) {
targetSchema := batch.SchemaName
if targetSchemaName, exists := v.DatabaseRewrites[targetSchema]; exists {
targetSchema = targetSchemaName
Expand All @@ -715,7 +737,7 @@ func (v *InlineVerifier) verifyBinlogBatch(batch BinlogVerifyBatch) ([]uint64, e

sourceTableSchema := v.TableSchemaCache.Get(batch.SchemaName, batch.TableName)
if sourceTableSchema == nil {
return []uint64{}, fmt.Errorf("programming error? %s.%s is not found in TableSchemaCache but is being reverified", batch.SchemaName, batch.TableName)
return []InlineVerifierMismatches{}, fmt.Errorf("programming error? %s.%s is not found in TableSchemaCache but is being reverified", batch.SchemaName, batch.TableName)
}

wg := &sync.WaitGroup{}
Expand Down

0 comments on commit bf386e2

Please sign in to comment.