Skip to content

Commit

Permalink
Merge pull request #133 from graphql-java/predicate_per_registry_per_…
Browse files Browse the repository at this point in the history
…dataloader

Adds a predicate to DataLoaderRegistry and a per dataloader map of pedicates is also possible
  • Loading branch information
bbakerman authored Oct 8, 2023
2 parents 9da4779 + 3099f1a commit 442edf4
Show file tree
Hide file tree
Showing 6 changed files with 408 additions and 23 deletions.
2 changes: 1 addition & 1 deletion src/main/java/org/dataloader/DataLoaderRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.util.function.Function;

/**
* This allows data loaders to be registered together into a single place so
* This allows data loaders to be registered together into a single place, so
* they can be dispatched as one. It also allows you to retrieve data loaders by
* name from a central place
*/
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/dataloader/annotations/GuardedBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
public @interface GuardedBy {

/**
* The lock that should be held.
* @return The lock that should be held.
*/
String value();
}
10 changes: 10 additions & 0 deletions src/main/java/org/dataloader/registries/DispatchPredicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@
*/
@FunctionalInterface
public interface DispatchPredicate {

/**
* A predicate that always returns true
*/
DispatchPredicate DISPATCH_ALWAYS = (dataLoaderKey, dataLoader) -> true;
/**
* A predicate that always returns false
*/
DispatchPredicate DISPATCH_NEVER = (dataLoaderKey, dataLoader) -> false;

/**
* This predicate tests whether the data loader should be dispatched or not.
*
Expand Down
164 changes: 143 additions & 21 deletions src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,25 @@
import org.dataloader.annotations.ExperimentalApi;

import java.time.Duration;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.dataloader.impl.Assertions.nonNull;

/**
* This {@link DataLoaderRegistry} will use a {@link DispatchPredicate} when {@link #dispatchAll()} is called
* This {@link DataLoaderRegistry} will use {@link DispatchPredicate}s when {@link #dispatchAll()} is called
* to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false, then a task is scheduled
* to perform that predicate dispatch again via the {@link ScheduledExecutorService}.
* <p>
* It;s possible to have a {@link DispatchPredicate} per dataloader as well as a default {@link DispatchPredicate} for the
* whole {@link ScheduledDataLoaderRegistry}.
* <p>
* This will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case
* no rescheduling will occur and you will need to call dispatch again to restart the process.
* no rescheduling will occur, and you will need to call dispatch again to restart the process.
* <p>
* If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and
* call {@link #rescheduleNow()}.
Expand All @@ -29,17 +33,20 @@
@ExperimentalApi
public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements AutoCloseable {

private final ScheduledExecutorService scheduledExecutorService;
private final Map<DataLoader<?, ?>, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>();
private final DispatchPredicate dispatchPredicate;
private final ScheduledExecutorService scheduledExecutorService;
private final Duration schedule;
private volatile boolean closed;

private ScheduledDataLoaderRegistry(Builder builder) {
super();
this.dataLoaders.putAll(builder.dataLoaders);
this.scheduledExecutorService = builder.scheduledExecutorService;
this.dispatchPredicate = builder.dispatchPredicate;
this.schedule = builder.schedule;
this.closed = false;
this.dispatchPredicate = builder.dispatchPredicate;
this.dataLoaderPredicates.putAll(builder.dataLoaderPredicates);
}

/**
Expand All @@ -57,6 +64,88 @@ public Duration getScheduleDuration() {
return schedule;
}

/**
* This will combine all the current data loaders in this registry and all the data loaders from the specified registry
* and return a new combined registry
*
* @param registry the registry to combine into this registry
*
* @return a new combined registry
*/
public ScheduledDataLoaderRegistry combine(DataLoaderRegistry registry) {
Builder combinedBuilder = ScheduledDataLoaderRegistry.newScheduledRegistry()
.dispatchPredicate(this.dispatchPredicate);
combinedBuilder.registerAll(this);
combinedBuilder.registerAll(registry);
return combinedBuilder.build();
}


/**
* This will unregister a new dataloader
*
* @param key the key of the data loader to unregister
*
* @return this registry
*/
public ScheduledDataLoaderRegistry unregister(String key) {
DataLoader<?, ?> dataLoader = dataLoaders.remove(key);
if (dataLoader != null) {
dataLoaderPredicates.remove(dataLoader);
}
return this;
}

/**
* @return a map of data loaders to specific dispatch predicates
*/
public Map<DataLoader<?, ?>, DispatchPredicate> getDataLoaderPredicates() {
return new LinkedHashMap<>(dataLoaderPredicates);
}

/**
* There is a default predicate that applies to the whole {@link ScheduledDataLoaderRegistry}
*
* @return the default dispatch predicate
*/
public DispatchPredicate getDispatchPredicate() {
return dispatchPredicate;
}

/**
* This will register a new dataloader and dispatch predicate associated with that data loader
*
* @param key the key to put the data loader under
* @param dataLoader the data loader to register
* @param dispatchPredicate the dispatch predicate to associate with this data loader
*
* @return this registry
*/
public ScheduledDataLoaderRegistry register(String key, DataLoader<?, ?> dataLoader, DispatchPredicate dispatchPredicate) {
dataLoaders.put(key, dataLoader);
dataLoaderPredicates.put(dataLoader, dispatchPredicate);
return this;
}

/**
* Returns true if the dataloader has a predicate which returned true, OR the overall
* registry predicate returned true.
*
* @param dataLoaderKey the key in the dataloader map
* @param dataLoader the dataloader
*
* @return true if it should dispatch
*/
private boolean shouldDispatch(String dataLoaderKey, DataLoader<?, ?> dataLoader) {
DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader);
if (dispatchPredicate != null) {
if (dispatchPredicate.test(dataLoaderKey, dataLoader)) {
return true;
}
}
return this.dispatchPredicate.test(dataLoaderKey, dataLoader);
}

@Override
public void dispatchAll() {
dispatchAllWithCount();
Expand All @@ -68,7 +157,7 @@ public int dispatchAllWithCount() {
for (Map.Entry<String, DataLoader<?, ?>> entry : dataLoaders.entrySet()) {
DataLoader<?, ?> dataLoader = entry.getValue();
String key = entry.getKey();
if (dispatchPredicate.test(key, dataLoader)) {
if (shouldDispatch(key, dataLoader)) {
sum += dataLoader.dispatchWithCounts().getKeysCount();
} else {
reschedule(key, dataLoader);
Expand All @@ -77,24 +166,28 @@ public int dispatchAllWithCount() {
return sum;
}


/**
* This will immediately dispatch the {@link DataLoader}s in the registry
* without testing the predicate
* without testing the predicates
*/
public void dispatchAllImmediately() {
super.dispatchAll();
dispatchAllWithCountImmediately();
}

/**
* This will immediately dispatch the {@link DataLoader}s in the registry
* without testing the predicate
* without testing the predicates
*
* @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s.
*/
public int dispatchAllWithCountImmediately() {
return super.dispatchAllWithCount();
return dataLoaders.values().stream()
.mapToInt(dataLoader -> dataLoader.dispatchWithCounts().getKeysCount())
.sum();
}


/**
* This will schedule a task to check the predicate and dispatch if true right now. It will not do
* a pre check of the preodicate like {@link #dispatchAll()} would
Expand All @@ -111,16 +204,16 @@ private void reschedule(String key, DataLoader<?, ?> dataLoader) {
}

private void dispatchOrReschedule(String key, DataLoader<?, ?> dataLoader) {
if (dispatchPredicate.test(key, dataLoader)) {
if (shouldDispatch(key, dataLoader)) {
dataLoader.dispatch();
} else {
reschedule(key, dataLoader);
}
}

/**
* By default this will create use a {@link Executors#newSingleThreadScheduledExecutor()}
* and a schedule duration of 10 milli seconds.
* By default, this will create use a {@link Executors#newSingleThreadScheduledExecutor()}
* and a schedule duration of 10 milliseconds.
*
* @return A builder of {@link ScheduledDataLoaderRegistry}s
*/
Expand All @@ -130,10 +223,11 @@ public static Builder newScheduledRegistry() {

public static class Builder {

private final Map<String, DataLoader<?, ?>> dataLoaders = new LinkedHashMap<>();
private final Map<DataLoader<?, ?>, DispatchPredicate> dataLoaderPredicates = new LinkedHashMap<>();
private DispatchPredicate dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS;
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private DispatchPredicate dispatchPredicate = (key, dl) -> true;
private Duration schedule = Duration.ofMillis(10);
private final Map<String, DataLoader<?, ?>> dataLoaders = new HashMap<>();

public Builder scheduledExecutorService(ScheduledExecutorService executorService) {
this.scheduledExecutorService = nonNull(executorService);
Expand All @@ -145,11 +239,6 @@ public Builder schedule(Duration schedule) {
return this;
}

public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) {
this.dispatchPredicate = nonNull(dispatchPredicate);
return this;
}

/**
* This will register a new dataloader
*
Expand All @@ -163,8 +252,24 @@ public Builder register(String key, DataLoader<?, ?> dataLoader) {
return this;
}


/**
* This will combine together the data loaders in this builder with the ones
* This will register a new dataloader with a specific {@link DispatchPredicate}
*
* @param key the key to put the data loader under
* @param dataLoader the data loader to register
* @param dispatchPredicate the dispatch predicate
*
* @return this builder for a fluent pattern
*/
public Builder register(String key, DataLoader<?, ?> dataLoader, DispatchPredicate dispatchPredicate) {
register(key, dataLoader);
dataLoaderPredicates.put(dataLoader, dispatchPredicate);
return this;
}

/**
* This will combine the data loaders in this builder with the ones
* from a previous {@link DataLoaderRegistry}
*
* @param otherRegistry the previous {@link DataLoaderRegistry}
Expand All @@ -173,6 +278,23 @@ public Builder register(String key, DataLoader<?, ?> dataLoader) {
*/
public Builder registerAll(DataLoaderRegistry otherRegistry) {
dataLoaders.putAll(otherRegistry.getDataLoadersMap());
if (otherRegistry instanceof ScheduledDataLoaderRegistry) {
ScheduledDataLoaderRegistry other = (ScheduledDataLoaderRegistry) otherRegistry;
dataLoaderPredicates.putAll(other.dataLoaderPredicates);
}
return this;
}

/**
* This sets a default predicate on the {@link DataLoaderRegistry} that will control
* whether all {@link DataLoader}s in the {@link DataLoaderRegistry }should be dispatched.
*
* @param dispatchPredicate the predicate
*
* @return this builder for a fluent pattern
*/
public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) {
this.dispatchPredicate = dispatchPredicate;
return this;
}

Expand Down
11 changes: 11 additions & 0 deletions src/test/java/org/dataloader/fixtures/TestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
import org.dataloader.MappedBatchLoaderWithContext;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -94,4 +97,12 @@ public static void snooze(int millis) {
public static <T> List<T> sort(Collection<? extends T> collection) {
return collection.stream().sorted().collect(toList());
}

public static <T> Set<T> asSet(T... elements) {
return new LinkedHashSet<>(Arrays.asList(elements));
}

public static <T> Set<T> asSet(Collection<T> elements) {
return new LinkedHashSet<>(elements);
}
}
Loading

0 comments on commit 442edf4

Please sign in to comment.