diff --git a/docs/TheBook/src/main/markdown/cookbook-ha-with-replicable-services.md b/docs/TheBook/src/main/markdown/cookbook-ha-with-replicable-services.md index 16348d0e7d5..7ee51858e05 100644 --- a/docs/TheBook/src/main/markdown/cookbook-ha-with-replicable-services.md +++ b/docs/TheBook/src/main/markdown/cookbook-ha-with-replicable-services.md @@ -256,3 +256,19 @@ information. Requests will be load balanced over available instances of `topo`. This service collects nightly statistics about available pools. One can have multiple instances of `statistics` and each instance will collect the same information. + +### `bulk` + +This service processes bulk requests for pinning/unpinning, staging/releasing, +file deletion and qos updating or manipulation. It does not directly affect +transfers, but is perhaps more critical than the other services in this group +because it supports the `REST` API. + +Bulk is fully replicable. All instances *_must_* share the same database instance. +The configuration should be synchronized such that all instances are configured +the same way. + +Request querying is load-balanced over the physical instances, but only the +leader instance is responsible for submission, cancelling or clearing requests, +and only the leader actually processes them (i.e., runs the request container +servicing the request). \ No newline at end of file diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkService.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkService.java index 1b6a42fe579..55a0f03a112 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkService.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkService.java @@ -63,11 +63,13 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath; import com.google.common.base.Strings; +import com.google.common.base.Throwables; import diskCacheV111.util.CacheException; import diskCacheV111.util.FsPath; import diskCacheV111.util.PnfsHandler; import diskCacheV111.util.TimeoutCacheException; import diskCacheV111.vehicles.Message; +import dmg.cells.nucleus.CellAddressCore; import dmg.cells.nucleus.CellLifeCycleAware; import dmg.cells.nucleus.CellMessageReceiver; import dmg.cells.nucleus.Reply; @@ -81,10 +83,12 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.security.auth.Subject; +import org.apache.curator.framework.recipes.leader.LeaderLatchListener; import org.dcache.auth.Subjects; import org.dcache.auth.attributes.Restriction; import org.dcache.auth.attributes.Restrictions; import org.dcache.cells.CellStub; +import org.dcache.cells.HAServiceLeadershipManager; import org.dcache.cells.MessageReply; import org.dcache.namespace.FileAttribute; import org.dcache.services.bulk.BulkRequest.Depth; @@ -101,7 +105,8 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING /** * Bulk service façade. Handles incoming messages. Handles restart reloading. */ -public final class BulkService implements CellLifeCycleAware, CellMessageReceiver { +public final class BulkService implements CellLifeCycleAware, CellMessageReceiver, + LeaderLatchListener { private static final Logger LOGGER = LoggerFactory.getLogger(BulkService.class); private static final String TARGET_COUNT_ERROR_FORMAT = "The number of targets %s exceeds " @@ -120,14 +125,18 @@ public final class BulkService implements CellLifeCycleAware, CellMessageReceive private BulkTargetStore targetStore; private BulkSubmissionHandler submissionHandler; private BulkServiceStatistics statistics; + private HAServiceLeadershipManager leadershipManager; private ExecutorService incomingExecutorService; private CellStub namespace; + private CellStub endpoint; private Depth allowedDepth; private int maxRequestsPerUser; private int maxFlatTargets; private int maxShallowTargets; private int maxRecursiveTargets; + private boolean initialized; + @Override public void afterStart() { /* @@ -141,31 +150,51 @@ public void afterStart() { * is necessary for processing request targets. */ waitForNamespace(); + } + @Override + public synchronized void isLeader() { + LOGGER.info("isLeader called"); incomingExecutorService.execute(() -> initialize()); } + @Override + public synchronized void notLeader() { + LOGGER.info("notLeader called"); + try { + incomingExecutorService.execute(() -> parkService()); + } catch (Exception e) { + LOGGER.error("notLeader: {}, cause: {}.", + e.getMessage(), String.valueOf(Throwables.getRootCause(e))); + } + } + public Reply messageArrived(BulkRequestMessage message) { LOGGER.trace("received BulkRequestMessage {}", message); MessageReply reply = new MessageReply<>(); incomingExecutorService.execute(() -> { try { - BulkRequest request = message.getRequest(); - Subject subject = message.getSubject(); - Restriction restriction = message.getRestriction(); - checkQuota(subject); - String uid = UUID.randomUUID().toString(); - request.setUid(uid); - checkRestrictions(restriction, uid); - checkActivity(request); - checkDepthConstraints(request); - requestStore.store(subject, restriction, request); - statistics.incrementRequestsReceived(request.getActivity()); - requestManager.signal(); - message.setRequestUrl(request.getUrlPrefix() + "/" + request.getUid()); - reply.reply(message); + if (!leadershipManager.hasLeadership()) { + relayToLeader(message, reply); + } else { + BulkRequest request = message.getRequest(); + Subject subject = message.getSubject(); + Restriction restriction = message.getRestriction(); + checkQuota(subject); + String uid = UUID.randomUUID().toString(); + request.setUid(uid); + checkRestrictions(restriction, uid); + checkActivity(request); + checkDepthConstraints(request); + requestStore.store(subject, restriction, request); + statistics.incrementRequestsReceived(request.getActivity()); + requestManager.signal(); + message.setRequestUrl(request.getUrlPrefix() + "/" + request.getUid()); + reply.reply(message); + } } catch (BulkServiceException e) { - LOGGER.error("messageArrived(BulkRequestMessage) {}: {}", message, e.toString()); + LOGGER.error("messageArrived(BulkRequestMessage) {}: {}", message, + e.toString()); reply.fail(message, e); } catch (Exception e) { reply.fail(message, e); @@ -178,7 +207,6 @@ public Reply messageArrived(BulkRequestMessage message) { public Reply messageArrived(BulkRequestListMessage message) { LOGGER.trace("received BulkRequestListMessage {}", message); - MessageReply reply = new MessageReply<>(); incomingExecutorService.execute(() -> { try { @@ -203,7 +231,6 @@ public Reply messageArrived(BulkRequestListMessage message) { public Reply messageArrived(BulkRequestStatusMessage message) { LOGGER.trace("received BulkRequestStatusMessage {}", message); - MessageReply reply = new MessageReply<>(); incomingExecutorService.execute(() -> { Subject subject = message.getSubject(); @@ -234,26 +261,29 @@ public Reply messageArrived(BulkRequestStatusMessage message) { public Reply messageArrived(BulkRequestCancelMessage message) { LOGGER.trace("received BulkRequestCancelMessage {}", message); - MessageReply reply = new MessageReply<>(); incomingExecutorService.execute(() -> { try { - Subject subject = message.getSubject(); - String uuid = message.getRequestUuid(); - /* - * First check to see if the request corresponds to a stored one. - */ - requestStore.getKey(uuid); - checkRestrictions(message.getRestriction(), uuid); - matchActivity(message.getActivity(), uuid); - List targetPaths = message.getTargetPaths(); - if (targetPaths == null || targetPaths.isEmpty()) { - submissionHandler.cancelRequest(subject, uuid); + if (!leadershipManager.hasLeadership()) { + relayToLeader(message, reply); } else { - validateTargets(uuid, subject, targetPaths); - submissionHandler.cancelTargets(subject, uuid, targetPaths); + Subject subject = message.getSubject(); + String uuid = message.getRequestUuid(); + /* + * First check to see if the request corresponds to a stored one. + */ + requestStore.getKey(uuid); + checkRestrictions(message.getRestriction(), uuid); + matchActivity(message.getActivity(), uuid); + List targetPaths = message.getTargetPaths(); + if (targetPaths == null || targetPaths.isEmpty()) { + submissionHandler.cancelRequest(subject, uuid); + } else { + validateTargets(uuid, subject, targetPaths); + submissionHandler.cancelTargets(subject, uuid, targetPaths); + } + reply.reply(message); } - reply.reply(message); } catch (BulkServiceException e) { LOGGER.error("messageArrived(BulkRequestCancelMessage) {}: {}", message, e.toString()); @@ -267,22 +297,29 @@ public Reply messageArrived(BulkRequestCancelMessage message) { return reply; } + public static void main(String[] args) { + System.out.println(Long.parseLong("60000L")); + } + public Reply messageArrived(BulkRequestClearMessage message) { LOGGER.trace("received BulkRequestClearMessage {}", message); - MessageReply reply = new MessageReply<>(); incomingExecutorService.execute(() -> { try { - String uuid = message.getRequestUuid(); - Subject subject = message.getSubject(); - /* - * First check to see if the request corresponds to a stored one. - */ - requestStore.getKey(uuid); - checkRestrictions(message.getRestriction(), uuid); - matchActivity(message.getActivity(), uuid); - submissionHandler.clearRequest(subject, uuid, message.isCancelIfRunning()); - reply.reply(message); + if (!leadershipManager.hasLeadership()) { + relayToLeader(message, reply); + } else { + String uuid = message.getRequestUuid(); + Subject subject = message.getSubject(); + /* + * First check to see if the request corresponds to a stored one. + */ + requestStore.getKey(uuid); + checkRestrictions(message.getRestriction(), uuid); + matchActivity(message.getActivity(), uuid); + submissionHandler.clearRequest(subject, uuid, message.isCancelIfRunning()); + reply.reply(message); + } } catch (BulkServiceException e) { LOGGER.error("messageArrived(BulkRequestClearMessage) {}: {}", message, e.toString()); @@ -298,7 +335,6 @@ public Reply messageArrived(BulkRequestClearMessage message) { public Reply messageArrived(BulkArchivedRequestInfoMessage message) { LOGGER.trace("received BulkArchivedRequestInfoMessage {}", message); - MessageReply reply = new MessageReply<>(); incomingExecutorService.execute(() -> { try { @@ -322,7 +358,6 @@ public Reply messageArrived(BulkArchivedRequestInfoMessage message) { public Reply messageArrived(BulkArchivedSummaryInfoMessage message) { LOGGER.trace("received BulkArchivedSummaryInfoMessage {}", message); - MessageReply reply = new MessageReply<>(); incomingExecutorService.execute(() -> { try { @@ -368,6 +403,16 @@ public synchronized void setAllowedDepth(Depth allowedDepth) { this.allowedDepth = allowedDepth; } + @Required + public void setEndpointStub(CellStub endpoint) { + this.endpoint = endpoint; + } + + @Required + public void setLeadershipManager(HAServiceLeadershipManager leadershipManager) { + this.leadershipManager = leadershipManager; + } + @Required public synchronized void setMaxFlatTargets(int maxFlatTargets) { this.maxFlatTargets = maxFlatTargets; @@ -435,6 +480,22 @@ private void checkActivity(BulkRequest request) throws BulkServiceException { } } + private void relayToLeader(M message, MessageReply reply) + throws Exception { + CellAddressCore cellAddressCore = leadershipManager.getLeaderAddress(); + endpoint.setDestination(cellAddressCore.toString()); + Message response = endpoint.sendAndWait(message); + if (response.getReturnCode() != 0) { + Object error = response.getErrorObject(); + if (error instanceof Exception) { + throw (Exception) error; + } + throw new Exception(String.valueOf(error)); + } else { + reply.reply(message); + } + } + private synchronized void checkDepthConstraints(BulkRequest request) throws BulkPermissionDeniedException { switch (request.getExpandDirectories()) { @@ -515,7 +576,12 @@ private synchronized void checkTargetCount(BulkRequest request) } } - private void initialize() { + private synchronized void initialize() { + if (initialized) { + LOGGER.info("Service already initialized."); + return; + } + /* * See store specifics for how reload is handled, but the minimal contract is * that all incomplete requests be reset to the QUEUED state. @@ -542,6 +608,25 @@ private void initialize() { requestManager.signal(); LOGGER.info("Service startup completed."); + + initialized = true; + } + + private synchronized void parkService() { + if (!initialized) { + LOGGER.info("Service already parked."); + return; + } + + LOGGER.info("Stopping the job manager."); + try { + requestManager.shutdown(); + } catch (Exception e) { + LOGGER.error("parkService error: {}, {}.", + e.getMessage(), String.valueOf(Throwables.getRootCause(e))); + } + + initialized = false; } private void matchActivity(String activity, String uuid) diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java index e37840934bb..27f3e34711f 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java @@ -115,6 +115,8 @@ public final class BulkActivityFactory implements CellMessageSender, Environment private QoSResponseReceiver qoSResponseReceiver; private CellEndpoint endpoint; + private boolean initialized; + /** * Generates an instance of the plugin-specific activity to be used by the request jobs. * @@ -157,13 +159,16 @@ public Map getProviders() { } public void initialize() { - ServiceLoader serviceLoader - = ServiceLoader.load(BulkActivityProvider.class); - for (BulkActivityProvider provider : serviceLoader) { - String activity = provider.getActivity(); - provider.setMaxPermits(maxPermits.get(activity)); - provider.configure(environment); - providers.put(provider.getActivity(), provider); + if (!initialized) { + ServiceLoader serviceLoader + = ServiceLoader.load(BulkActivityProvider.class); + for (BulkActivityProvider provider : serviceLoader) { + String activity = provider.getActivity(); + provider.setMaxPermits(maxPermits.get(activity)); + provider.configure(environment); + providers.put(provider.getActivity(), provider); + } + initialized = true; } } diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/BulkRequestManager.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/BulkRequestManager.java index d693a0ac3c1..6bdf4fa40c8 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/BulkRequestManager.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/BulkRequestManager.java @@ -89,6 +89,11 @@ public interface BulkRequestManager extends SignalAware { */ void cancelTargets(String id, List targetPaths); + /** + * Should wipe out any in-memory request state. + */ + void shutdown() throws Exception; + /** * Implementation-specific. */ diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java index 9672e568f82..168b979d7da 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java @@ -72,6 +72,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -129,6 +130,7 @@ class ConcurrentRequestProcessor implements Runnable { @Override public void run() { try { + signals.set(0); while (!Thread.interrupted()) { doRun(); await(); @@ -337,13 +339,18 @@ private ListMultimap userRequests() { */ private ConcurrentRequestProcessor processor; + /** + * For clearing state (=> HA). + */ + private Future processorFuture; + @Override public void initialize() throws Exception { requestJobs = new LinkedHashMap<>(); cancelledJobs = new HashSet<>(); schedulerProvider.initialize(); processor = new ConcurrentRequestProcessor(schedulerProvider.getRequestScheduler()); - processorExecutorService.execute(processor); + processorFuture = processorExecutorService.submit(processor); } @Override @@ -385,6 +392,16 @@ public void cancelTargets(String id, List targetPaths) { } } + @Override + public void shutdown() throws Exception { + if (processorFuture != null) { + processorFuture.cancel(true); + } + requestJobs = null; + cancelledJobs = null; + requestStore.clearCache(); + } + @Override public int countSignals() { return processor.signals.get(); diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/BulkRequestStore.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/BulkRequestStore.java index 39eb96b532a..7130d466364 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/BulkRequestStore.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/BulkRequestStore.java @@ -253,6 +253,11 @@ Optional getRequestStatus(String uid) */ boolean isRequestSubject(Subject subject, String uid) throws BulkStorageException; + /** + * Clear all entries from memory. (May be a NOP). + */ + void clearCache() throws BulkStorageException; + /** * Load the store into memory. (May be a NOP). */ diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestArchiver.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestArchiver.java index 58bc3911f4a..fd9039fdd49 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestArchiver.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestArchiver.java @@ -70,6 +70,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import org.apache.curator.framework.recipes.leader.LeaderLatchListener; import org.dcache.services.bulk.BulkRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,7 +81,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * completed. This involves storing a serialized request info object and deleting the entry from * the main tables. */ -public class JdbcBulkRequestArchiver implements Runnable, CellInfoProvider { +public class JdbcBulkRequestArchiver implements Runnable, CellInfoProvider, LeaderLatchListener { private static final Logger LOGGER = LoggerFactory.getLogger(JdbcBulkRequestArchiver.class); @@ -95,6 +96,7 @@ public class JdbcBulkRequestArchiver implements Runnable, CellInfoProvider { private TimeUnit archiverWindowUnit; private ScheduledFuture future; private long lastRunCompleted; + private boolean leader; @Override public void getInfo(PrintWriter pw) { @@ -122,14 +124,34 @@ public synchronized void reset() { } public synchronized void runNow() { + if (!leader) { + return; + } + if (future != null) { future.cancel(true); } + archiverScheduler.submit(this); + future = archiverScheduler.scheduleAtFixedRate(this, archiverPeriod, archiverPeriod, archiverPeriodUnit); } + @Override + public synchronized void isLeader() { + leader = true; + reset(); + } + + @Override + public synchronized void notLeader() { + leader = false; + if (future != null) { + future.cancel(true); + } + } + @Override public void run() { /* diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestStore.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestStore.java index 064a06361e6..ae9b3c7bbc9 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestStore.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestStore.java @@ -114,7 +114,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import org.dcache.services.bulk.BulkRequestTargetInfo; import org.dcache.services.bulk.BulkStorageException; import org.dcache.services.bulk.store.BulkRequestStore; -import org.dcache.services.bulk.store.jdbc.JdbcBulkDaoUtils; import org.dcache.services.bulk.store.jdbc.rtarget.JdbcBulkTargetStore; import org.dcache.services.bulk.store.jdbc.rtarget.JdbcRequestTargetDao; import org.dcache.services.bulk.util.BulkRequestFilter; @@ -160,7 +159,6 @@ public Optional load(String uid) throws Exception { private JdbcBulkRequestPermissionsDao requestPermissionsDao; private JdbcBulkTargetStore targetStore; private JdbcRequestTargetDao requestTargetDao; - private JdbcBulkDaoUtils utils; private long expiry; private TimeUnit expiryUnit; private long capacity; @@ -473,6 +471,11 @@ public boolean isRequestSubject(Subject subject, String uid) .equals(BulkRequestStore.uidGidKey(requestSubject.get())); } + @Override + public void clearCache() throws BulkStorageException { + requestCache.invalidateAll(); + } + /** * With the RDBMS implementation of the store, the following applies on restart: *

@@ -552,11 +555,6 @@ public void setArchiveDao(JdbcBulkArchiveDao archiveDao) { this.archiveDao = archiveDao; } - @Required - public void setBulkUtils(JdbcBulkDaoUtils utils) { - this.utils = utils; - } - @Required public void setCapacity(long capacity) { this.capacity = capacity; diff --git a/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml b/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml index 904dfa01a0e..08076c1e0d7 100644 --- a/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml +++ b/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml @@ -38,6 +38,12 @@ + + Endpoint communication stub + + + + Maintains runtime information about all pools @@ -125,6 +131,7 @@ + @@ -187,7 +194,6 @@ - @@ -317,6 +323,8 @@ + + @@ -351,7 +359,7 @@ + destroy-method="shutdown"> @@ -364,4 +372,23 @@ + + + Coordinates which bulk service handles the requests + + + + + + + + Propagates leadership change notifications to managed listeners + + + + + + + diff --git a/modules/dcache/src/main/java/org/dcache/cells/HAServiceLeadershipManager.java b/modules/dcache/src/main/java/org/dcache/cells/HAServiceLeadershipManager.java index a51f7d2cd9a..26eae15938e 100644 --- a/modules/dcache/src/main/java/org/dcache/cells/HAServiceLeadershipManager.java +++ b/modules/dcache/src/main/java/org/dcache/cells/HAServiceLeadershipManager.java @@ -80,6 +80,10 @@ public void setLeadershipListener(LeaderLatchListener leadershipListener) { this.leadershipListener = leadershipListener; } + public CellAddressCore getLeaderAddress() throws Exception { + return new CellAddressCore(zkLeaderLatch.getLeader().getId()); + } + public void shutdown() { if (zkLeaderLatch != null) { CloseableUtils.closeQuietly(zkLeaderLatch); diff --git a/skel/share/defaults/bulk.properties b/skel/share/defaults/bulk.properties index b6269940500..e1f09cd01b8 100644 --- a/skel/share/defaults/bulk.properties +++ b/skel/share/defaults/bulk.properties @@ -23,7 +23,18 @@ bulk.cell.subscribe=${bulk.pool-monitor.topic},${bulk.qos-transition-topic} # # This property indicates if this service is replicable. # -(immutable)bulk.cell.replicable = false +# Bulk physical instances should all run over the same database instance. +# HA will use leader semantics to guarantee that only one of those instances +# is actually processing, i.e., storing, running, canceling or clearing requests. +# The other instances are free to query for request information or status. +# +# It is the administrator's responsibility to ensure that all +# bulk service instances: +# +# o have a consistent dCache 'bulk' configuration, +# o share the same database. +# +(immutable)bulk.cell.replicable = true # ---- Global setting for directory expansion. # @@ -154,6 +165,11 @@ bulk.service.qos=${dcache.service.qos} bulk.service.qos.timeout=1 (one-of?MILLISECONDS|SECONDS|MINUTES|HOURS|DAYS)bulk.service.qos.timeout.unit=MINUTES +# ---- How long to wait for a response from the HA leader. +# +bulk.service.ha-leader.timeout=1 +(one-of?MILLISECONDS|SECONDS|MINUTES|HOURS|DAYS)bulk.service.ha-leader.timeout.unit=MINUTES + # Topic on which to expect pool monitor updates # bulk.pool-monitor.topic=${dcache.pool-monitor.topic} diff --git a/skel/share/services/bulk.batch b/skel/share/services/bulk.batch index 33c7af13597..2acaa4ba60b 100644 --- a/skel/share/services/bulk.batch +++ b/skel/share/services/bulk.batch @@ -38,6 +38,8 @@ check -strong bulk.service.poolmanager.timeout.unit check -strong bulk.service.qos check -strong bulk.service.qos.timeout check -strong bulk.service.qos.timeout.unit +check -strong bulk.service.ha-leader.timeout +check -strong bulk.service.ha-leader.timeout.unit check -strong bulk.pool-monitor.topic check -strong bulk.qos-transition-topic check -strong bulk.db.connections.max