Skip to content

Commit

Permalink
Merge pull request #7370 from alrossi/feature/9.2/bulk-container-thre…
Browse files Browse the repository at this point in the history
…ading-etc

dcache-bulk: container rewrite to optimize threading
  • Loading branch information
svemeyer authored Oct 4, 2023
2 parents a55628d + b640b5e commit 0797008
Show file tree
Hide file tree
Showing 22 changed files with 1,127 additions and 1,268 deletions.
19 changes: 1 addition & 18 deletions docs/TheBook/src/main/markdown/config-bulk.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,26 +90,9 @@ a different id. The default lifetime is five minutes (the same as for the NFS do
the [QoS Engine](config-qos-engine.md).
- **LOG_TARGET** : logs metadata for each target at the INFO level.

Each activity is associated with

- a permit count (used in connection with a semaphore for throttling execution);
- two thread queues, one for the execution of the container job,
and the other for the execution of callbacks on activity futures;
- a retry policy (currently the only retry policy is a NOP, i.e., no retry).

The permits are configurable using either the property or the admin shell
command ``request policy``.

Each activity is associated with a retry policy (currently the only retry policy is a NOP, i.e., no retry).
Should other retry policies become available, these can be set via a property.

The number and distribution of thread executors is hard-coded for the activities, but their
respective sizes can be adjusted using the properties:

```
bulk.limits.container-processing-threads=110
bulk.limits.activity-callback-threads=50
```

## Container Design

Version 2 of the bulk service has introduced improvements for better scalability and recovery.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
*/
package org.dcache.services.bulk;

import static org.dcache.services.bulk.job.AbstractRequestContainerJob.findAbsolutePath;
import static org.dcache.services.bulk.job.BulkRequestContainerJob.findAbsolutePath;
import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath;

import com.google.common.base.Strings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.security.auth.Subject;
Expand Down Expand Up @@ -181,7 +182,7 @@ public final class BulkServiceCommands implements CellCommandListener {
/**
* name | class | type | permits
*/
private static final String FORMAT_ACTIVITY = "%-20s | %100s | %7s | %10s ";
private static final String FORMAT_ACTIVITY = "%-20s | %100s | %7s ";

/**
* name | required | description
Expand Down Expand Up @@ -267,8 +268,7 @@ private static String formatActivity(Entry<String, BulkActivityProvider> entry)
return String.format(FORMAT_ACTIVITY,
entry.getKey(),
provider.getActivityClass(),
provider.getTargetType(),
provider.getMaxPermits());
provider.getTargetType());
}

private static String formatArgument(BulkActivityArgumentDescriptor descriptor) {
Expand Down Expand Up @@ -550,7 +550,7 @@ public String call() throws Exception {
return "There are no mapped activities!";
}

return String.format(FORMAT_ACTIVITY, "NAME", "CLASS", "TYPE", "PERMITS")
return String.format(FORMAT_ACTIVITY, "NAME", "CLASS", "TYPE")
+ "\n" + activities;
}
}
Expand Down Expand Up @@ -1371,7 +1371,7 @@ public PagedTargetResult call() throws Exception {
private BulkActivityFactory activityFactory;
private BulkTargetStore targetStore;
private BulkServiceStatistics statistics;
private ExecutorService executor;
private ExecutorService executor = Executors.newSingleThreadExecutor();

private JdbcBulkArchiveDao archiveDao;

Expand All @@ -1390,11 +1390,6 @@ public void setArchiveDao(JdbcBulkArchiveDao archiveDao) {
this.archiveDao = archiveDao;
}

@Required
public void setExecutor(ExecutorService executor) {
this.executor = executor;
}

@Required
public void setRequestManager(BulkRequestManager requestManager) {
this.requestManager = requestManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,13 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.security.auth.Subject;
import org.dcache.auth.attributes.Restriction;
import org.dcache.namespace.FileAttribute;
import org.dcache.services.bulk.BulkServiceException;
import org.dcache.services.bulk.activity.retry.BulkTargetRetryPolicy;
import org.dcache.services.bulk.activity.retry.NoRetryPolicy;
import org.dcache.services.bulk.util.BatchedResult;
import org.dcache.services.bulk.util.BulkRequestTarget;
import org.dcache.vehicles.FileAttributes;

Expand All @@ -98,36 +97,24 @@ public enum TargetType {

private static final BulkTargetRetryPolicy DEFAULT_RETRY_POLICY = new NoRetryPolicy();

private static final int DEFAULT_PERMITS = 50;

protected final String name;
protected final TargetType targetType;

protected Subject subject;
protected Restriction restriction;
protected Set<FileAttribute> requiredAttributes;
protected int maxPermits;
protected ExecutorService activityExecutor;
protected ExecutorService callbackExecutor;
protected BulkTargetRetryPolicy retryPolicy;
protected Set<BulkActivityArgumentDescriptor> descriptors;

protected BulkActivity(String name, TargetType targetType) {
this.name = name;
this.targetType = targetType;
requiredAttributes = MINIMALLY_REQUIRED_ATTRIBUTES;
maxPermits = DEFAULT_PERMITS;
retryPolicy = DEFAULT_RETRY_POLICY;
}

public void cancel(BulkRequestTarget target) {
target.cancel();
}

public int getMaxPermits() {
return maxPermits;
}

public String getName() {
return name;
}
Expand All @@ -144,10 +131,6 @@ public TargetType getTargetType() {
return targetType;
}

public Set<FileAttribute> getRequiredAttributes() {
return requiredAttributes;
}

public Subject getSubject() {
return subject;
}
Expand All @@ -164,39 +147,10 @@ public void setRestriction(Restriction restriction) {
this.restriction = restriction;
}

public ExecutorService getActivityExecutor() {
return activityExecutor;
}

public void setActivityExecutor(ExecutorService activityExecutor) {
this.activityExecutor = activityExecutor;
}

public ExecutorService getCallbackExecutor() {
return callbackExecutor;
}

public void setCallbackExecutor(ExecutorService callbackExecutor) {
this.callbackExecutor = callbackExecutor;
}

public void setDescriptors(Set<BulkActivityArgumentDescriptor> descriptors) {
this.descriptors = descriptors;
}

public void setMaxPermits(int maxPermits) {
this.maxPermits = maxPermits;
}

/**
* Completion handler method. Calls the internal implementation.
*
* @param result of the targeted activity.
*/
public void handleCompletion(BatchedResult<R> result) {
handleCompletion(result.getTarget(), result.getFuture());
}

/**
* Performs the activity.
*
Expand All @@ -223,5 +177,5 @@ public abstract ListenableFuture<R> perform(String rid, long tid, FsPath path, F
* @param target which has terminate.
* @param future the future returned by the activity call to perform();
*/
protected abstract void handleCompletion(BulkRequestTarget target, ListenableFuture<R> future);
public abstract void handleCompletion(BulkRequestTarget target, Future<R> future);
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.ExecutorService;
import javax.security.auth.Subject;
import org.dcache.auth.Subjects;
import org.dcache.auth.attributes.Restriction;
Expand Down Expand Up @@ -102,9 +101,6 @@ public final class BulkActivityFactory implements CellMessageSender, Environment
new HashMap<>());

private Map<String, BulkTargetRetryPolicy> retryPolicies;
private Map<String, ExecutorService> activityExecutors;
private Map<String, ExecutorService> callbackExecutors;
private Map<String, Integer> maxPermits;
private Map<String, Object> environment;

private CellStub pnfsManager;
Expand Down Expand Up @@ -142,8 +138,6 @@ public BulkActivity createActivity(BulkRequest request, Subject subject,
bulkActivity.setSubject(subject);
bulkActivity.setRestriction(restriction);

bulkActivity.setActivityExecutor(activityExecutors.get(activity));
bulkActivity.setCallbackExecutor(callbackExecutors.get(activity));
BulkTargetRetryPolicy retryPolicy = retryPolicies.get(activity);
if (retryPolicy != null) {
bulkActivity.setRetryPolicy(retryPolicy);
Expand All @@ -163,8 +157,6 @@ public void initialize() {
ServiceLoader<BulkActivityProvider> serviceLoader
= ServiceLoader.load(BulkActivityProvider.class);
for (BulkActivityProvider provider : serviceLoader) {
String activity = provider.getActivity();
provider.setMaxPermits(maxPermits.get(activity));
provider.configure(environment);
providers.put(provider.getActivity(), provider);
}
Expand Down Expand Up @@ -215,26 +207,11 @@ public void setQoSResponseReceiver(QoSResponseReceiver qoSResponseReceiver) {
this.qoSResponseReceiver = qoSResponseReceiver;
}

@Required
public void setMaxPermits(Map<String, Integer> maxPermits) {
this.maxPermits = maxPermits;
}

@Required
public void setRetryPolicies(Map<String, BulkTargetRetryPolicy> retryPolicies) {
this.retryPolicies = retryPolicies;
}

@Required
public void setActivityExecutors(Map<String, ExecutorService> activityExecutors) {
this.activityExecutors = activityExecutors;
}

@Required
public void setCallbackExecutors(Map<String, ExecutorService> callbackExecutors) {
this.callbackExecutors = callbackExecutors;
}

@Override
public void setEnvironment(Map<String, Object> environment) {
this.environment = environment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ public abstract class BulkActivityProvider<J extends BulkActivity> {

protected final String activity;
protected final TargetType targetType;
protected int maxPermits;

protected BulkActivityProvider(String activity, TargetType targetType) {
this.activity = activity;
Expand All @@ -88,22 +87,13 @@ public TargetType getTargetType() {
return targetType;
}

public int getMaxPermits() {
return maxPermits;
}

public void setMaxPermits(int maxPermits) {
this.maxPermits = maxPermits;
}

/**
* @return an instance of the specific activity type to be configured by factory.
*
* @throws BulkServiceException
*/
public J createActivity() throws BulkServiceException {
J activity = activityInstance();
activity.setMaxPermits(maxPermits);
activity.setDescriptors(getDescriptors());
return activity;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.dcache.namespace.FileType;
import org.dcache.services.bulk.activity.BulkActivity;
import org.dcache.services.bulk.util.BulkRequestTarget;
Expand Down Expand Up @@ -105,8 +106,8 @@ public void setNamespaceHandler(PnfsHandler pnfsHandler) {
}

@Override
protected void handleCompletion(BulkRequestTarget target,
ListenableFuture<PnfsDeleteEntryMessage> future) {
public void handleCompletion(BulkRequestTarget target,
Future<PnfsDeleteEntryMessage> future) {
PnfsDeleteEntryMessage reply;
try {
reply = getUninterruptibly(future);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import com.google.common.util.concurrent.ListenableFuture;
import diskCacheV111.util.FsPath;
import java.util.Map;
import java.util.concurrent.Future;
import org.dcache.services.bulk.activity.BulkActivity;
import org.dcache.services.bulk.util.BulkRequestTarget;
import org.dcache.services.bulk.util.BulkRequestTarget.State;
Expand Down Expand Up @@ -110,7 +111,7 @@ public ListenableFuture<BulkRequestTarget> perform(String ruid, long tid, FsPath
}

@Override
protected void handleCompletion(BulkRequestTarget target, ListenableFuture future) {
public void handleCompletion(BulkRequestTarget target, Future future) {
target.setState(State.COMPLETED);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.dcache.cells.CellStub;
import org.dcache.pinmanager.PinManagerAware;
import org.dcache.pinmanager.PinManagerPinMessage;
Expand Down Expand Up @@ -104,7 +105,7 @@ public void setNamespaceHandler(PnfsHandler pnfsHandler) {
}

@Override
protected void handleCompletion(BulkRequestTarget target, ListenableFuture<Message> future) {
public void handleCompletion(BulkRequestTarget target, Future<Message> future) {
Message reply;
try {
reply = getUninterruptibly(future);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.dcache.cells.CellStub;
import org.dcache.qos.QoSException;
import org.dcache.qos.data.FileQoSRequirements;
Expand Down Expand Up @@ -197,7 +198,7 @@ protected void configure(Map<String, String> arguments) throws BulkServiceExcept

@Override
public void handleCompletion(BulkRequestTarget target,
ListenableFuture<QoSTransitionCompletedMessage> future) {
Future<QoSTransitionCompletedMessage> future) {
QoSTransitionCompletedMessage message;
try {
message = getUninterruptibly(future);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.services.bulk.BulkRequestStatus;
import org.dcache.services.bulk.BulkServiceException;
import org.dcache.services.bulk.BulkStorageException;
import org.dcache.services.bulk.job.AbstractRequestContainerJob;
import org.dcache.services.bulk.job.BulkRequestContainerJob;
import org.dcache.services.bulk.job.RequestContainerJobFactory;
import org.dcache.services.bulk.manager.BulkRequestManager;
import org.dcache.services.bulk.store.BulkRequestStore;
Expand Down Expand Up @@ -210,7 +210,7 @@ public void setTargetStore(BulkTargetStore targetStore) {

@Override
public void submitRequestJob(BulkRequest request) throws BulkServiceException {
AbstractRequestContainerJob job = jobFactory.createRequestJob(request);
BulkRequestContainerJob job = jobFactory.createRequestJob(request);
if (storeJobTarget(job.getTarget())) {
requestManager.submit(job);
}
Expand Down
Loading

0 comments on commit 0797008

Please sign in to comment.