From a29351aae1050283979f3fa0939205e9bff8779d Mon Sep 17 00:00:00 2001 From: Albert Louis Rossi Date: Fri, 29 Sep 2023 18:08:34 -0500 Subject: [PATCH] dcache-bulk: only set request status to QUEUED when permissions and targets are all inserted Motivation: Sometimes when bulk requests are interrupted (service goes down) the insert of the initial request may not have completed. When reloaded or retried, a failure having to do with missing Subject or missing targets may occur. Modification: Add a new state, INCOMPLETE. Do not set the state of the request to QUEUED until permissions and targets have been added to their respective tables. On reset or reload, query again only for QUEUED. Leave the incomplete requests as INCOMPLETE, but add the INCOMPLETE state to the archiver logic so that they are eventually cleared. Result: No runtime execution errors for corrupted requests. Target: master Request: 9.2 Patch: https://rb.dcache.org/r/14113/ Requires-notes: yes Acked-by: Tigran --- .../bulk/store/jdbc/request/JdbcBulkRequestArchiver.java | 3 ++- .../bulk/store/jdbc/request/JdbcBulkRequestCriterion.java | 7 +++++++ .../bulk/store/jdbc/request/JdbcBulkRequestDao.java | 2 +- .../bulk/store/jdbc/request/JdbcBulkRequestStore.java | 4 ++++ .../java/org/dcache/services/bulk/BulkRequestStatus.java | 1 + 5 files changed, 15 insertions(+), 2 deletions(-) diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestArchiver.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestArchiver.java index 14744921c1d..58bc3911f4a 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestArchiver.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestArchiver.java @@ -61,6 +61,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import static org.dcache.services.bulk.BulkRequestStatus.CANCELLED; import static org.dcache.services.bulk.BulkRequestStatus.COMPLETED; +import static org.dcache.services.bulk.BulkRequestStatus.INCOMPLETE; import dmg.cells.nucleus.CellInfoProvider; import java.io.PrintWriter; @@ -137,7 +138,7 @@ public void run() { long threshhold = System.currentTimeMillis() - archiverWindowUnit.toMillis(archiverWindow); List expiredUids = requestDao.getUids( - requestDao.where().modifiedBefore(threshhold).status(COMPLETED, CANCELLED), + requestDao.where().modifiedBefore(threshhold).status(INCOMPLETE, COMPLETED, CANCELLED), Integer.MAX_VALUE); /* diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestCriterion.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestCriterion.java index bb3324d8f6d..6d029efc7a7 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestCriterion.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestCriterion.java @@ -92,6 +92,13 @@ public JdbcBulkRequestCriterion id(Long id) { return this; } + public JdbcBulkRequestCriterion unique(Long id) { + if (id != null) { + addClause("bulk_request.id = ?", id); + } + return this; + } + public JdbcBulkRequestCriterion pnfsId(String pnfsid) { if (pnfsid != null) { addClause("request_target.pnfsid = ?", pnfsid); diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestDao.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestDao.java index 5224733968f..25b64dec381 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestDao.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestDao.java @@ -300,7 +300,7 @@ public JdbcBulkRequestUpdate updateFrom(BulkRequest request, String user) .clearOnSuccess(request.isClearOnSuccess()).clearOnFailure(request.isClearOnFailure()) .depth(request.getExpandDirectories()) .targetPrefix(request.getTargetPrefix()).urlPrefix(request.getUrlPrefix()).user(user) - .status(BulkRequestStatus.QUEUED).arrivedAt(System.currentTimeMillis()); + .status(BulkRequestStatus.INCOMPLETE).arrivedAt(System.currentTimeMillis()); } public JdbcBulkRequestCriterion where() { diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestStore.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestStore.java index f36708d5633..064a06361e6 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestStore.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/store/jdbc/request/JdbcBulkRequestStore.java @@ -610,6 +610,8 @@ public void store(Subject subject, Restriction restriction, BulkRequest request) try { /* * Insertion order: request, permissions, must be maintained. + * Initial insertion status is INCOMPLETE. This is immediately changed to QUEUED + * after insertion is complete. */ requestDao.insert( requestDao.updateFrom(request, BulkRequestStore.uidGidKey(subject))) @@ -623,6 +625,8 @@ public void store(Subject subject, Restriction restriction, BulkRequest request) requestDao.insertArguments(request); requestTargetDao.insertInitialTargets(request); + + requestDao.update(requestDao.where().unique(request.getId()), requestDao.set().status(QUEUED)); } catch (BulkStorageException e) { throw new BulkStorageException("store failed for " + request.getUid(), e); } diff --git a/modules/dcache-vehicles/src/main/java/org/dcache/services/bulk/BulkRequestStatus.java b/modules/dcache-vehicles/src/main/java/org/dcache/services/bulk/BulkRequestStatus.java index 783aeff2beb..dcd8d6f78c4 100644 --- a/modules/dcache-vehicles/src/main/java/org/dcache/services/bulk/BulkRequestStatus.java +++ b/modules/dcache-vehicles/src/main/java/org/dcache/services/bulk/BulkRequestStatus.java @@ -60,6 +60,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING package org.dcache.services.bulk; public enum BulkRequestStatus { + INCOMPLETE("Request has been created but insertion is not yet complete."), QUEUED("Request has been submitted to the service and awaits processing."), STARTED("Request has been set to active and has begun processing."), COMPLETED("All targets of the request have reached terminal state."),