Skip to content

Commit

Permalink
Merge pull request #7357 from alrossi/fix/9.2/bulk-no-pin-online
Browse files Browse the repository at this point in the history
dcache-bulk:  do not PIN or STAGE files with AL ONLINE
  • Loading branch information
svemeyer authored Oct 4, 2023
2 parents d34029c + 2507eea commit ef878f4
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ public enum TargetType {

public static final Set<FileAttribute> MINIMALLY_REQUIRED_ATTRIBUTES
= Collections.unmodifiableSet(EnumSet.of(FileAttribute.PNFSID, FileAttribute.TYPE,
FileAttribute.OWNER_GROUP, FileAttribute.OWNER, FileAttribute.RETENTION_POLICY));
FileAttribute.OWNER_GROUP, FileAttribute.OWNER, FileAttribute.ACCESS_LATENCY,
FileAttribute.RETENTION_POLICY));

private static final BulkTargetRetryPolicy DEFAULT_RETRY_POLICY = new NoRetryPolicy();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.dcache.pinmanager.PinManagerPinMessage;
import org.dcache.services.bulk.BulkServiceException;
Expand Down Expand Up @@ -118,6 +119,12 @@ public ListenableFuture<Message> perform(String rid, long tid, FsPath target,
= new PinManagerPinMessage(attributes, getProtocolInfo(), id,
lifetimeInMillis);
message.setSubject(subject);

Optional<ListenableFuture<Message>> skipOption = skipIfOnline(attributes, message);
if (skipOption.isPresent()) {
return skipOption.get();
}

return pinManager.send(message, Long.MAX_VALUE);
} catch (URISyntaxException | CacheException e) {
return Futures.immediateFailedFuture(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,23 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING

import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
import static diskCacheV111.util.CacheException.INVALID_ARGS;
import static org.dcache.services.bulk.util.BulkRequestTarget.State.SKIPPED;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import diskCacheV111.util.AccessLatency;
import diskCacheV111.util.CacheException;
import diskCacheV111.util.FsPath;
import diskCacheV111.util.NamespaceHandlerAware;
import diskCacheV111.util.PnfsHandler;
import diskCacheV111.util.PnfsId;
import diskCacheV111.vehicles.Message;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import org.dcache.cells.CellStub;
import org.dcache.pinmanager.PinManagerAware;
import org.dcache.pinmanager.PinManagerPinMessage;
import org.dcache.pinmanager.PinManagerUnpinMessage;
import org.dcache.services.bulk.activity.BulkActivity;
import org.dcache.services.bulk.util.BulkRequestTarget;
Expand Down Expand Up @@ -105,6 +110,9 @@ protected void handleCompletion(BulkRequestTarget target, ListenableFuture<Messa
reply = getUninterruptibly(future);
if (reply.getReturnCode() != 0) {
target.setErrorObject(reply.getErrorObject());
} else if (reply instanceof PinManagerPinMessage
&& ((PinManagerPinMessage) reply).getLifetime() == -1L) {
target.setState(SKIPPED);
} else {
target.setState(State.COMPLETED);
}
Expand Down Expand Up @@ -144,4 +152,16 @@ protected void checkPinnable(FileAttributes attributes) throws CacheException {
throw new CacheException(INVALID_ARGS, "Not a regular file.");
}
}

protected Optional<ListenableFuture<Message>> skipIfOnline(FileAttributes attributes,
PinManagerPinMessage message) {
ListenableFuture<Message> future = null;
if (attributes.getAccessLatency() == AccessLatency.ONLINE) {
message.setReply();
message.setLifetime(-1L);
future = Futures.immediateFuture(message);
}

return Optional.ofNullable(future);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.dcache.pinmanager.PinManagerPinMessage;
import org.dcache.services.bulk.BulkServiceException;
Expand Down Expand Up @@ -128,6 +129,12 @@ public ListenableFuture<Message> perform(String rid, long tid, FsPath target,
= new PinManagerPinMessage(attributes, getProtocolInfo(), id,
getLifetimeInMillis(target));
message.setSubject(subject);

Optional<ListenableFuture<Message>> skipOption = skipIfOnline(attributes, message);
if (skipOption.isPresent()) {
return skipOption.get();
}

return pinManager.send(message, Long.MAX_VALUE);
} catch (URISyntaxException | CacheException e) {
return Futures.immediateFailedFuture(e);
Expand Down

0 comments on commit ef878f4

Please sign in to comment.