diff --git a/src/main/java/org/dataloader/DataLoaderRegistry.java b/src/main/java/org/dataloader/DataLoaderRegistry.java
index 9b19c29..0bc54cb 100644
--- a/src/main/java/org/dataloader/DataLoaderRegistry.java
+++ b/src/main/java/org/dataloader/DataLoaderRegistry.java
@@ -14,7 +14,7 @@
import java.util.function.Function;
/**
- * This allows data loaders to be registered together into a single place so
+ * This allows data loaders to be registered together into a single place, so
* they can be dispatched as one. It also allows you to retrieve data loaders by
* name from a central place
*/
diff --git a/src/main/java/org/dataloader/annotations/GuardedBy.java b/src/main/java/org/dataloader/annotations/GuardedBy.java
index c26b2ef..85c5765 100644
--- a/src/main/java/org/dataloader/annotations/GuardedBy.java
+++ b/src/main/java/org/dataloader/annotations/GuardedBy.java
@@ -15,7 +15,7 @@
public @interface GuardedBy {
/**
- * The lock that should be held.
+ * @return The lock that should be held.
*/
String value();
}
diff --git a/src/main/java/org/dataloader/registries/DispatchPredicate.java b/src/main/java/org/dataloader/registries/DispatchPredicate.java
index d5bd31b..247a51a 100644
--- a/src/main/java/org/dataloader/registries/DispatchPredicate.java
+++ b/src/main/java/org/dataloader/registries/DispatchPredicate.java
@@ -10,6 +10,16 @@
*/
@FunctionalInterface
public interface DispatchPredicate {
+
+ /**
+ * A predicate that always returns true
+ */
+ DispatchPredicate DISPATCH_ALWAYS = (dataLoaderKey, dataLoader) -> true;
+ /**
+ * A predicate that always returns false
+ */
+ DispatchPredicate DISPATCH_NEVER = (dataLoaderKey, dataLoader) -> false;
+
/**
* This predicate tests whether the data loader should be dispatched or not.
*
diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java
index 4be317e..5b1af76 100644
--- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java
+++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java
@@ -5,8 +5,9 @@
import org.dataloader.annotations.ExperimentalApi;
import java.time.Duration;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -14,12 +15,15 @@
import static org.dataloader.impl.Assertions.nonNull;
/**
- * This {@link DataLoaderRegistry} will use a {@link DispatchPredicate} when {@link #dispatchAll()} is called
+ * This {@link DataLoaderRegistry} will use {@link DispatchPredicate}s when {@link #dispatchAll()} is called
* to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false, then a task is scheduled
* to perform that predicate dispatch again via the {@link ScheduledExecutorService}.
*
+ * It;s possible to have a {@link DispatchPredicate} per dataloader as well as a default {@link DispatchPredicate} for the
+ * whole {@link ScheduledDataLoaderRegistry}.
+ *
* This will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case
- * no rescheduling will occur and you will need to call dispatch again to restart the process.
+ * no rescheduling will occur, and you will need to call dispatch again to restart the process.
*
* If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and
* call {@link #rescheduleNow()}.
@@ -29,17 +33,20 @@
@ExperimentalApi
public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements AutoCloseable {
- private final ScheduledExecutorService scheduledExecutorService;
+ private final Map, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>();
private final DispatchPredicate dispatchPredicate;
+ private final ScheduledExecutorService scheduledExecutorService;
private final Duration schedule;
private volatile boolean closed;
private ScheduledDataLoaderRegistry(Builder builder) {
+ super();
this.dataLoaders.putAll(builder.dataLoaders);
this.scheduledExecutorService = builder.scheduledExecutorService;
- this.dispatchPredicate = builder.dispatchPredicate;
this.schedule = builder.schedule;
this.closed = false;
+ this.dispatchPredicate = builder.dispatchPredicate;
+ this.dataLoaderPredicates.putAll(builder.dataLoaderPredicates);
}
/**
@@ -57,6 +64,88 @@ public Duration getScheduleDuration() {
return schedule;
}
+ /**
+ * This will combine all the current data loaders in this registry and all the data loaders from the specified registry
+ * and return a new combined registry
+ *
+ * @param registry the registry to combine into this registry
+ *
+ * @return a new combined registry
+ */
+ public ScheduledDataLoaderRegistry combine(DataLoaderRegistry registry) {
+ Builder combinedBuilder = ScheduledDataLoaderRegistry.newScheduledRegistry()
+ .dispatchPredicate(this.dispatchPredicate);
+ combinedBuilder.registerAll(this);
+ combinedBuilder.registerAll(registry);
+ return combinedBuilder.build();
+ }
+
+
+ /**
+ * This will unregister a new dataloader
+ *
+ * @param key the key of the data loader to unregister
+ *
+ * @return this registry
+ */
+ public ScheduledDataLoaderRegistry unregister(String key) {
+ DataLoader, ?> dataLoader = dataLoaders.remove(key);
+ if (dataLoader != null) {
+ dataLoaderPredicates.remove(dataLoader);
+ }
+ return this;
+ }
+
+ /**
+ * @return a map of data loaders to specific dispatch predicates
+ */
+ public Map, DispatchPredicate> getDataLoaderPredicates() {
+ return new LinkedHashMap<>(dataLoaderPredicates);
+ }
+
+ /**
+ * There is a default predicate that applies to the whole {@link ScheduledDataLoaderRegistry}
+ *
+ * @return the default dispatch predicate
+ */
+ public DispatchPredicate getDispatchPredicate() {
+ return dispatchPredicate;
+ }
+
+ /**
+ * This will register a new dataloader and dispatch predicate associated with that data loader
+ *
+ * @param key the key to put the data loader under
+ * @param dataLoader the data loader to register
+ * @param dispatchPredicate the dispatch predicate to associate with this data loader
+ *
+ * @return this registry
+ */
+ public ScheduledDataLoaderRegistry register(String key, DataLoader, ?> dataLoader, DispatchPredicate dispatchPredicate) {
+ dataLoaders.put(key, dataLoader);
+ dataLoaderPredicates.put(dataLoader, dispatchPredicate);
+ return this;
+ }
+
+ /**
+ * Returns true if the dataloader has a predicate which returned true, OR the overall
+ * registry predicate returned true.
+ *
+ * @param dataLoaderKey the key in the dataloader map
+ * @param dataLoader the dataloader
+ *
+ * @return true if it should dispatch
+ */
+ private boolean shouldDispatch(String dataLoaderKey, DataLoader, ?> dataLoader) {
+ DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader);
+ if (dispatchPredicate != null) {
+ if (dispatchPredicate.test(dataLoaderKey, dataLoader)) {
+ return true;
+ }
+ }
+ return this.dispatchPredicate.test(dataLoaderKey, dataLoader);
+ }
+
@Override
public void dispatchAll() {
dispatchAllWithCount();
@@ -68,7 +157,7 @@ public int dispatchAllWithCount() {
for (Map.Entry> entry : dataLoaders.entrySet()) {
DataLoader, ?> dataLoader = entry.getValue();
String key = entry.getKey();
- if (dispatchPredicate.test(key, dataLoader)) {
+ if (shouldDispatch(key, dataLoader)) {
sum += dataLoader.dispatchWithCounts().getKeysCount();
} else {
reschedule(key, dataLoader);
@@ -77,24 +166,28 @@ public int dispatchAllWithCount() {
return sum;
}
+
/**
* This will immediately dispatch the {@link DataLoader}s in the registry
- * without testing the predicate
+ * without testing the predicates
*/
public void dispatchAllImmediately() {
- super.dispatchAll();
+ dispatchAllWithCountImmediately();
}
/**
* This will immediately dispatch the {@link DataLoader}s in the registry
- * without testing the predicate
+ * without testing the predicates
*
* @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s.
*/
public int dispatchAllWithCountImmediately() {
- return super.dispatchAllWithCount();
+ return dataLoaders.values().stream()
+ .mapToInt(dataLoader -> dataLoader.dispatchWithCounts().getKeysCount())
+ .sum();
}
+
/**
* This will schedule a task to check the predicate and dispatch if true right now. It will not do
* a pre check of the preodicate like {@link #dispatchAll()} would
@@ -111,7 +204,7 @@ private void reschedule(String key, DataLoader, ?> dataLoader) {
}
private void dispatchOrReschedule(String key, DataLoader, ?> dataLoader) {
- if (dispatchPredicate.test(key, dataLoader)) {
+ if (shouldDispatch(key, dataLoader)) {
dataLoader.dispatch();
} else {
reschedule(key, dataLoader);
@@ -119,8 +212,8 @@ private void dispatchOrReschedule(String key, DataLoader, ?> dataLoader) {
}
/**
- * By default this will create use a {@link Executors#newSingleThreadScheduledExecutor()}
- * and a schedule duration of 10 milli seconds.
+ * By default, this will create use a {@link Executors#newSingleThreadScheduledExecutor()}
+ * and a schedule duration of 10 milliseconds.
*
* @return A builder of {@link ScheduledDataLoaderRegistry}s
*/
@@ -130,10 +223,11 @@ public static Builder newScheduledRegistry() {
public static class Builder {
+ private final Map> dataLoaders = new LinkedHashMap<>();
+ private final Map, DispatchPredicate> dataLoaderPredicates = new LinkedHashMap<>();
+ private DispatchPredicate dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS;
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
- private DispatchPredicate dispatchPredicate = (key, dl) -> true;
private Duration schedule = Duration.ofMillis(10);
- private final Map> dataLoaders = new HashMap<>();
public Builder scheduledExecutorService(ScheduledExecutorService executorService) {
this.scheduledExecutorService = nonNull(executorService);
@@ -145,11 +239,6 @@ public Builder schedule(Duration schedule) {
return this;
}
- public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) {
- this.dispatchPredicate = nonNull(dispatchPredicate);
- return this;
- }
-
/**
* This will register a new dataloader
*
@@ -163,8 +252,24 @@ public Builder register(String key, DataLoader, ?> dataLoader) {
return this;
}
+
/**
- * This will combine together the data loaders in this builder with the ones
+ * This will register a new dataloader with a specific {@link DispatchPredicate}
+ *
+ * @param key the key to put the data loader under
+ * @param dataLoader the data loader to register
+ * @param dispatchPredicate the dispatch predicate
+ *
+ * @return this builder for a fluent pattern
+ */
+ public Builder register(String key, DataLoader, ?> dataLoader, DispatchPredicate dispatchPredicate) {
+ register(key, dataLoader);
+ dataLoaderPredicates.put(dataLoader, dispatchPredicate);
+ return this;
+ }
+
+ /**
+ * This will combine the data loaders in this builder with the ones
* from a previous {@link DataLoaderRegistry}
*
* @param otherRegistry the previous {@link DataLoaderRegistry}
@@ -173,6 +278,23 @@ public Builder register(String key, DataLoader, ?> dataLoader) {
*/
public Builder registerAll(DataLoaderRegistry otherRegistry) {
dataLoaders.putAll(otherRegistry.getDataLoadersMap());
+ if (otherRegistry instanceof ScheduledDataLoaderRegistry) {
+ ScheduledDataLoaderRegistry other = (ScheduledDataLoaderRegistry) otherRegistry;
+ dataLoaderPredicates.putAll(other.dataLoaderPredicates);
+ }
+ return this;
+ }
+
+ /**
+ * This sets a default predicate on the {@link DataLoaderRegistry} that will control
+ * whether all {@link DataLoader}s in the {@link DataLoaderRegistry }should be dispatched.
+ *
+ * @param dispatchPredicate the predicate
+ *
+ * @return this builder for a fluent pattern
+ */
+ public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) {
+ this.dispatchPredicate = dispatchPredicate;
return this;
}
diff --git a/src/test/java/org/dataloader/fixtures/TestKit.java b/src/test/java/org/dataloader/fixtures/TestKit.java
index 0206bd9..1e7c02f 100644
--- a/src/test/java/org/dataloader/fixtures/TestKit.java
+++ b/src/test/java/org/dataloader/fixtures/TestKit.java
@@ -9,9 +9,12 @@
import org.dataloader.MappedBatchLoaderWithContext;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.LinkedHashSet;
import java.util.HashMap;
import java.util.List;
+import java.util.Set;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -94,4 +97,12 @@ public static void snooze(int millis) {
public static List sort(Collection extends T> collection) {
return collection.stream().sorted().collect(toList());
}
+
+ public static Set asSet(T... elements) {
+ return new LinkedHashSet<>(Arrays.asList(elements));
+ }
+
+ public static Set asSet(Collection elements) {
+ return new LinkedHashSet<>(elements);
+ }
}
diff --git a/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryPredicateTest.java b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryPredicateTest.java
new file mode 100644
index 0000000..43da82f
--- /dev/null
+++ b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryPredicateTest.java
@@ -0,0 +1,242 @@
+package org.dataloader.registries;
+
+import org.dataloader.BatchLoader;
+import org.dataloader.DataLoader;
+import org.dataloader.DataLoaderRegistry;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Arrays.asList;
+import static org.awaitility.Awaitility.await;
+import static org.dataloader.DataLoaderFactory.newDataLoader;
+import static org.dataloader.fixtures.TestKit.asSet;
+import static org.dataloader.registries.DispatchPredicate.DISPATCH_NEVER;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+public class ScheduledDataLoaderRegistryPredicateTest {
+ final BatchLoader