Skip to content

Commit

Permalink
Merge pull request #697 from Netflix/record-change-validator
Browse files Browse the repository at this point in the history
add validator to validate percentage of record change
  • Loading branch information
workeatsleep authored Sep 9, 2024
2 parents 3c9ba6d + 2721e4d commit 4d7eba8
Show file tree
Hide file tree
Showing 2 changed files with 311 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package com.netflix.hollow.api.producer.validation;

import com.netflix.hollow.api.consumer.data.AbstractHollowDataAccessor;
import com.netflix.hollow.api.producer.HollowProducer;
import com.netflix.hollow.core.read.engine.HollowReadStateEngine;
import com.netflix.hollow.core.read.engine.HollowTypeReadState;

import java.util.function.Supplier;
import java.util.logging.Logger;

import static java.util.Objects.requireNonNull;

/**
* Validate the percentage of record change is within the {@link Threshold},
* this only works for types that have a primary key definition
*/
public class RecordCountPercentChangeValidator implements ValidatorListener {
private static final String NAME = RecordCountVarianceValidator.class.getName();
private final Logger log = Logger.getLogger(RecordCountPercentChangeValidator.class.getName());

private final String typeName;
private final Threshold threshold;
private AbstractHollowDataAccessor accessor;

public RecordCountPercentChangeValidator(String typeName,
Threshold threshold) {
this.typeName = typeName;
this.threshold = threshold;
}

@Override
public String getName() {
return NAME;
}

@Override
public ValidationResult onValidate(HollowProducer.ReadState readState) {
HollowReadStateEngine readStateEngine = requireNonNull(readState.getStateEngine(), "read state is null");
HollowTypeReadState typeState = requireNonNull(readStateEngine.getTypeState(typeName),
"type not loaded or does not exist in dataset; type=" + typeName);
accessor =
new AbstractHollowDataAccessor<Object>(readStateEngine, typeName) {
@Override public Object getRecord(int ordinal) { return null; }
};
ValidationResult validationResult = validateChanges(typeState);
log.info(validationResult.toString());
return validationResult;
}

private ValidationResult validateChanges(HollowTypeReadState typeState) {
if(typeState.getPreviousOrdinals().isEmpty()) {
return ValidationResult.from(this).passed("Ignore the check if previous records are empty.");
}
int addRecordNumber = accessor.getAddedRecords().size();
int removeRecordNumber = accessor.getRemovedRecords().size();
int updatedRecordNumber = accessor.getUpdatedRecords().size();
int previousRecordNumber = typeState.getPreviousOrdinals().cardinality();

float addedPercent = (float) addRecordNumber / previousRecordNumber;
float removedPercent = (float) removeRecordNumber / previousRecordNumber;
float updatedPercent = (float) updatedRecordNumber / previousRecordNumber;


float addedPercentageThreshold = threshold.addedPercentageThreshold.get();
float removedPercentageThreshold = threshold.removedPercentageThreshold.get();
float updatedPercentageThreshold = threshold.updatedPercentageThreshold.get();

ValidationResult.ValidationResultBuilder builder = ValidationResult.from(this);
builder.detail("addedRecordNumber", addRecordNumber);
builder.detail("removedRecordNumber", removeRecordNumber);
builder.detail("updatedRecordNumber", updatedRecordNumber);
builder.detail("previousRecordNumber", previousRecordNumber);
builder.detail("addedPercentageThreshold", addedPercentageThreshold);
builder.detail("removedPercentageThreshold", removedPercentageThreshold);
builder.detail("updatedPercentageThreshold", updatedPercentageThreshold);

boolean pass =
(addedPercentageThreshold < 0 || addedPercent < addedPercentageThreshold) &&
(removedPercentageThreshold < 0 || removedPercent < removedPercentageThreshold) &&
(updatedPercentageThreshold < 0 || updatedPercent < updatedPercentageThreshold);
if (pass) {
return builder.passed();
}
return builder.failed("record count change is more than threshold");
}

/**
* Define the percentage of value change as supplier of float in this class,
* for example 1% should be defined as 0.01.
* Not all three threshold needs to be defined. removedPercentageThreshold and updatedPercentageThreshold
* value range should be [0,1], addedPercentageThreshold should not be less than 0.
*/
public static class Threshold {
private final Supplier<Float> removedPercentageThreshold;
private final Supplier<Float> addedPercentageThreshold;
private final Supplier<Float> updatedPercentageThreshold;

public Threshold(Supplier<Float> removedPercentageThreshold,
Supplier<Float> addedPercentageThreshold,
Supplier<Float> updatedPercentageThreshold) {
this.removedPercentageThreshold = removedPercentageThreshold;
this.addedPercentageThreshold = addedPercentageThreshold;
this.updatedPercentageThreshold = updatedPercentageThreshold;
}


public static ThresholdBuilder builder() {
return new ThresholdBuilder();
}

public static class ThresholdBuilder {
private Supplier<Float> removedPercentageThreshold;
private Supplier<Float> addedPercentageThreshold;
private Supplier<Float> updatedPercentageThreshold;

public ThresholdBuilder withRemovedPercentageThreshold(Supplier<Float> removedPercentageThreshold) {
this.removedPercentageThreshold = removedPercentageThreshold;
return this;
}

public ThresholdBuilder withAddedPercentageThreshold(Supplier<Float> addedPercentageThreshold) {
this.addedPercentageThreshold = addedPercentageThreshold;
return this;
}

public ThresholdBuilder withUpdatedPercentageThreshold(Supplier<Float> updatedPercentageThreshold) {
this.updatedPercentageThreshold = updatedPercentageThreshold;
return this;
}

public Threshold build() {
if (removedPercentageThreshold != null && (removedPercentageThreshold.get() < 0 || removedPercentageThreshold.get() > 1)) {
throw new RuntimeException("removed percentage threshold must be between 0 and 1, value "
+ removedPercentageThreshold.get() + " is invalid.");
}
if (updatedPercentageThreshold != null && (updatedPercentageThreshold.get() < 0 || updatedPercentageThreshold.get() > 1)) {
throw new RuntimeException("updated percentage threshold must be between 0 and 1, value "
+ updatedPercentageThreshold.get() + " is invalid.");
}
if (addedPercentageThreshold != null && addedPercentageThreshold.get() < 0) {
throw new RuntimeException("added percentage threshold must be >= 0, value "
+ addedPercentageThreshold.get() + " is invalid.");
}
if (removedPercentageThreshold == null) {
removedPercentageThreshold = () -> -1f;
}
if (updatedPercentageThreshold == null) {
updatedPercentageThreshold = () -> -1f;
}
if (addedPercentageThreshold == null) {
addedPercentageThreshold = () -> -1f;
}
return new Threshold(removedPercentageThreshold,
addedPercentageThreshold,
updatedPercentageThreshold);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package com.netflix.hollow.api.producer.validation;

import com.netflix.hollow.api.producer.HollowProducer;
import com.netflix.hollow.api.producer.fs.HollowInMemoryBlobStager;
import com.netflix.hollow.core.write.objectmapper.HollowPrimaryKey;
import com.netflix.hollow.test.InMemoryBlobStore;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

public class RecordCountPercentChangeValidatorTests {
private InMemoryBlobStore blobStore;

@Before
public void setUp() {
blobStore = new InMemoryBlobStore();
}

@Test
public void testPassLessThanThreshold() {
try {
testHelper(RecordCountPercentChangeValidator.Threshold.builder()
.withAddedPercentageThreshold(() -> 0.5f)
.build(), 0, 0, 0);
} catch (Exception e) {
Assert.fail(); //should not reach here
}
}

@Test
public void testPassThresholdNotSet() {
try {
testHelper(RecordCountPercentChangeValidator.Threshold.builder()
.withAddedPercentageThreshold(() -> 0.5f)
.build(), 0, 0, 3);
} catch (Exception e) {
Assert.fail(); //should not reach here
}
}

@Test
public void testAddExceedThreshold() {
try {
testHelper(RecordCountPercentChangeValidator.Threshold.builder()
.withAddedPercentageThreshold(() -> 0.5f)
.build(), 0, 4, 0);
Assert.fail();
} catch (ValidationStatusException expected) {
Assert.assertEquals(1, expected.getValidationStatus().getResults().size());
}
}

@Test
public void testRemoveExceedThreshold() {
try {
testHelper(RecordCountPercentChangeValidator.Threshold.builder()
.withRemovedPercentageThreshold(() -> 0.5f)
.build(), 0, 0, 4);
Assert.fail();
} catch (ValidationStatusException expected) {
Assert.assertEquals(1, expected.getValidationStatus().getResults().size());
}
}

@Test
public void testUpdateExceedThreshold() {
try {
testHelper(RecordCountPercentChangeValidator.Threshold.builder()
.withUpdatedPercentageThreshold(() ->0.5f)
.withAddedPercentageThreshold(() -> 0.5f)
.withAddedPercentageThreshold(() -> 0.5f)
.build(), 4, 1, 1);
Assert.fail();
} catch (ValidationStatusException expected) {
Assert.assertEquals(1, expected.getValidationStatus().getResults().size());
}
}

private void testHelper(RecordCountPercentChangeValidator.Threshold threshold,
int updatedRecordCount,
int addedRecordCount,
int removedRecordCount) {
HollowProducer producer = HollowProducer.withPublisher(blobStore)
.withBlobStager(new HollowInMemoryBlobStager())
.withListener(new RecordCountPercentChangeValidator("TypeWithPrimaryKey", threshold)).build();

List<TypeWithPrimaryKey> previousItems = new ArrayList<>();
previousItems.add(new TypeWithPrimaryKey(0, "a", "aa"));
previousItems.add(new TypeWithPrimaryKey(1, "a", "aa"));
previousItems.add(new TypeWithPrimaryKey(2, "a", "aa"));
previousItems.add(new TypeWithPrimaryKey(3, "a", "aa"));
previousItems.add(new TypeWithPrimaryKey(4, "a", "aa"));

producer.runCycle(new HollowProducer.Populator() {
public void populate(HollowProducer.WriteState newState) throws Exception {
for (TypeWithPrimaryKey val : previousItems) {
newState.add(val);
}
}
});

List<TypeWithPrimaryKey> currentItems = new ArrayList<>();

currentItems.addAll(previousItems);

if (addedRecordCount > 0) {
for (int i = 0; i < addedRecordCount; i++) {
currentItems.add(new TypeWithPrimaryKey(i, String.valueOf(i), String.valueOf(i)));
}
}

if (removedRecordCount > 0) {
for (int i = 0; i < removedRecordCount; i++) {
currentItems.remove(0);
}
}

if (updatedRecordCount > 0) {
for (int i = 0; i < updatedRecordCount; i++) {
TypeWithPrimaryKey item = currentItems.get(0);
TypeWithPrimaryKey newItem = new TypeWithPrimaryKey(item.id, item.name, "bb");
currentItems.remove(0);
currentItems.add(newItem);
}
}

producer.runCycle(new HollowProducer.Populator() {
public void populate(HollowProducer.WriteState newState) throws Exception {
for (TypeWithPrimaryKey val : currentItems) {
newState.add(val);
}
}
});
}

@HollowPrimaryKey(fields = {"id", "name"})
static class TypeWithPrimaryKey {
int id;
String name;
String desc;

TypeWithPrimaryKey(int id, String name, String desc) {
this.id = id;
this.name = name;
this.desc = desc;
}
}

}

0 comments on commit 4d7eba8

Please sign in to comment.