Skip to content

Commit

Permalink
new: allow cancellations in upgrade and downgrade process
Browse files Browse the repository at this point in the history
  • Loading branch information
ishland committed Nov 9, 2024
1 parent ac9b234 commit 1bd210e
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 15 deletions.
17 changes: 17 additions & 0 deletions src/main/java/com/ishland/flowsched/scheduler/Cancellable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.ishland.flowsched.scheduler;

import java.util.concurrent.atomic.AtomicBoolean;

public class Cancellable {

private final AtomicBoolean cancelled = new AtomicBoolean(false);

public void cancel() {
this.cancelled.set(true);
}

public boolean isCancelled() {
return this.cancelled.get();
}

}
8 changes: 5 additions & 3 deletions src/main/java/com/ishland/flowsched/scheduler/ItemHolder.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public void consolidateMarkDirty(StatusAdvancingScheduler<K, V, Ctx, ?> schedule
this.busyRefCounter.addListenerOnce(() -> scheduler.markDirty(this.getKey()));
}

public boolean setStatus(ItemStatus<K, V, Ctx> status) {
public boolean setStatus(ItemStatus<K, V, Ctx> status, boolean isCancellation) {
assertOpen();
ItemTicket<K, V, Ctx>[] ticketsToFire = null;
CompletableFuture<Void> futureToFire = null;
Expand Down Expand Up @@ -224,8 +224,10 @@ public boolean setStatus(ItemStatus<K, V, Ctx> status) {
this.status = status;
final CompletableFuture<Void> future = this.futures[status.ordinal()];

Assertions.assertTrue(future != UNLOADED_FUTURE);
Assertions.assertTrue(!future.isDone());
if (!isCancellation) {
Assertions.assertTrue(future != UNLOADED_FUTURE);
Assertions.assertTrue(!future.isDone());
}
futureToFire = future;
ticketsToFire = this.tickets.getTicketsForStatus(status).toArray(ItemTicket[]::new);
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/ishland/flowsched/scheduler/ItemStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ default ItemStatus<K, V, Ctx> getNext() {

int ordinal();

CompletionStage<Void> upgradeToThis(Ctx context);
CompletionStage<Void> upgradeToThis(Ctx context, Cancellable cancellable);

CompletionStage<Void> downgradeFromThis(Ctx context);
CompletionStage<Void> downgradeFromThis(Ctx context, Cancellable cancellable);

/**
* Get the dependencies of the given item at the given status.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public boolean tick() {
}
holder.submitOp(CompletableFuture.runAsync(() -> advanceStatus0(holder, nextStatus, key), getBackgroundExecutor()));
} else {
final boolean success = holder.setStatus(nextStatus);
final boolean success = holder.setStatus(nextStatus, false);
if (!success) {
continue; // target status is modified to be higher
}
Expand All @@ -186,10 +186,12 @@ private void downgradeStatus0(ItemHolder<K, V, Ctx, UserData> holder, ItemStatus
final KeyStatusPair<K, V, Ctx>[] dependencies = holder.getDependencies(current);
Assertions.assertTrue(dependencies != null, "No dependencies for downgrade");

Cancellable cancellable = new Cancellable();

final Completable completable = Completable.defer(() -> {
Assertions.assertTrue(holder.isBusy());
final Ctx ctx = makeContext(holder, current, dependencies, false);
final CompletionStage<Void> stage = current.downgradeFromThis(ctx);
final CompletionStage<Void> stage = current.downgradeFromThis(ctx, cancellable);
return Completable.fromCompletionStage(stage);
})
.subscribeOn(getSchedulerBackedByBackgroundExecutor())
Expand All @@ -198,6 +200,16 @@ private void downgradeStatus0(ItemHolder<K, V, Ctx, UserData> holder, ItemStatus
try {
Assertions.assertTrue(holder.isBusy());

{
Throwable actual = throwable;
while (actual instanceof CompletionException ex) actual = ex.getCause();
if (cancellable.isCancelled() && actual instanceof CancellationException) {
holder.setStatus(current, true);
holder.consolidateMarkDirty(this);
return;
}
}

final ExceptionHandlingAction action = this.tryHandleTransactionException(holder, nextStatus, false, throwable);
switch (action) {
case PROCEED -> {
Expand All @@ -221,10 +233,10 @@ private void advanceStatus0(ItemHolder<K, V, Ctx, UserData> holder, ItemStatus<K
// Advance
final KeyStatusPair<K, V, Ctx>[] dependencies = nextStatus.getDependencies(holder);
final CancellationSignaller dependencyCompletable = getDependencyFuture0(dependencies, holder, nextStatus);
AtomicBoolean isCancelled = new AtomicBoolean(false);
Cancellable cancellable = new Cancellable();

CancellationSignaller signaller = new CancellationSignaller(unused -> {
isCancelled.set(true);
cancellable.cancel();
dependencyCompletable.cancel();
});

Expand All @@ -239,7 +251,7 @@ private void advanceStatus0(ItemHolder<K, V, Ctx, UserData> holder, ItemStatus<K
.andThen(Completable.defer(() -> {
Assertions.assertTrue(holder.isBusy());
final Ctx ctx = makeContext(holder, nextStatus, dependencies, false);
final CompletionStage<Void> stage = nextStatus.upgradeToThis(ctx);
final CompletionStage<Void> stage = nextStatus.upgradeToThis(ctx, cancellable);
return Completable.fromCompletionStage(stage).cache();
}))
.observeOn(getSchedulerBackedByBackgroundExecutor())
Expand All @@ -250,7 +262,15 @@ private void advanceStatus0(ItemHolder<K, V, Ctx, UserData> holder, ItemStatus<K
{
Throwable actual = throwable;
while (actual instanceof CompletionException ex) actual = ex.getCause();
if (isCancelled.get() && actual instanceof CancellationException) {
if (cancellable.isCancelled() && actual instanceof CancellationException) {
if (holder.getDependencies(nextStatus) != null) {
releaseDependencies(holder, nextStatus);
}
try {
signaller.fireComplete(actual);
} catch (Throwable t) {
t.printStackTrace();
}
holder.consolidateMarkDirty(this);
return;
}
Expand All @@ -261,7 +281,7 @@ private void advanceStatus0(ItemHolder<K, V, Ctx, UserData> holder, ItemStatus<K
final ExceptionHandlingAction action = this.tryHandleTransactionException(holder, nextStatus, true, throwable);
switch (action) {
case PROCEED -> {
holder.setStatus(nextStatus);
holder.setStatus(nextStatus, false);
rerequestDependencies(holder, nextStatus);
holder.consolidateMarkDirty(this);
onItemUpgrade(holder, nextStatus);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.ishland.flowsched.scheduler.support;

import com.ishland.flowsched.scheduler.Cancellable;
import com.ishland.flowsched.scheduler.ItemHolder;
import com.ishland.flowsched.scheduler.ItemStatus;
import com.ishland.flowsched.scheduler.KeyStatusPair;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

Expand All @@ -30,14 +32,22 @@ public ItemStatus<Long, TestItem, TestContext>[] getAllStatuses() {
}

@Override
public CompletionStage<Void> upgradeToThis(TestContext context) {
public CompletionStage<Void> upgradeToThis(TestContext context, Cancellable cancellable) {
// System.out.println(String.format("Upgrading %d to %s", context.key(), this));
if (new Random().nextBoolean()) {
cancellable.cancel();
return CompletableFuture.failedFuture(new CancellationException());
}
return CompletableFuture.completedFuture(null);
}

@Override
public CompletionStage<Void> downgradeFromThis(TestContext context) {
public CompletionStage<Void> downgradeFromThis(TestContext context, Cancellable cancellable) {
// System.out.println(String.format("Downgrading %d from %s", context.key(), this));
if (new Random().nextBoolean()) {
cancellable.cancel();
return CompletableFuture.failedFuture(new CancellationException());
}
return CompletableFuture.completedFuture(null);
}

Expand Down

0 comments on commit 1bd210e

Please sign in to comment.