Skip to content

Commit

Permalink
perf: consolidate markDirty calls
Browse files Browse the repository at this point in the history
  • Loading branch information
ishland committed Aug 31, 2024
1 parent 4a46b50 commit 4febfeb
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
public class BusyRefCounter {

private final ReferenceList<Runnable> onComplete = new ReferenceArrayList<>();
private Runnable onCompleteOnce = null;
private int counter = 0;

public synchronized boolean isBusy() {
Expand All @@ -17,13 +18,32 @@ public synchronized boolean isBusy() {

public void addListener(Runnable runnable) {
Objects.requireNonNull(runnable);
boolean runNow = false;
synchronized (this) {
if (!isBusy()) {
runnable.run();
runNow = true;
} else {
onComplete.add(runnable);
}
}
if (runNow) {
runnable.run();
}
}

public void addListenerOnce(Runnable runnable) {
Objects.requireNonNull(runnable);
boolean runNow = false;
synchronized (this) {
if (!isBusy()) {
runNow = true;
} else {
onCompleteOnce = runnable;
}
}
if (runNow) {
runnable.run();
}
}

public synchronized void incrementRefCount() {
Expand All @@ -32,12 +52,15 @@ public synchronized void incrementRefCount() {

public void decrementRefCount() {
Runnable[] onCompleteArray = null;
Runnable onCompleteOnce = null;
synchronized (this) {
Assertions.assertTrue(counter > 0);
if (--counter == 0) {
onCompleteArray = onComplete.toArray(Runnable[]::new);
onComplete.clear();
}
onCompleteOnce = this.onCompleteOnce;
this.onCompleteOnce = null;
}
if (onCompleteArray != null) {
for (Runnable runnable : onCompleteArray) {
Expand All @@ -48,6 +71,13 @@ public void decrementRefCount() {
}
}
}
if (onCompleteOnce != null) {
try {
onCompleteOnce.run();
} catch (Throwable t) {
t.printStackTrace();
}
}
}

}
5 changes: 5 additions & 0 deletions src/main/java/com/ishland/flowsched/scheduler/ItemHolder.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ public void submitOpListener(Runnable runnable) {
this.busyRefCounter.addListener(runnable);
}

public void consolidateMarkDirty(StatusAdvancingScheduler<K, V, Ctx, ?> scheduler) {
assertOpen();
this.busyRefCounter.addListenerOnce(() -> scheduler.markDirty(this.getKey()));
}

public boolean setStatus(ItemStatus<K, V, Ctx> status) {
assertOpen();
ItemTicket<K, V, Ctx>[] ticketsToFire = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,16 @@ public boolean tick() {
if (projectedCurrent.ordinal() > nextStatus.ordinal()) {
getExecutor().execute(holder::tryCancelUpgradeAction);
}
holder.submitOpListener(() -> markDirty(key));
holder.consolidateMarkDirty(this);
continue;
}
if (nextStatus == current) {
if (current.equals(getUnloadedStatus())) {
if (holder.isDependencyDirty()) { // schedule dependency cleanup async
holder.submitOp(CompletableFuture.runAsync(() -> {
holder.cleanupDependencies(this);
markDirty(holder.getKey());
}, getBackgroundExecutor()));
holder.consolidateMarkDirty(this);
continue;
}
if (holder.holdsDependency()) {
Expand Down Expand Up @@ -208,7 +208,7 @@ private void downgradeStatus0(ItemHolder<K, V, Ctx, UserData> holder, ItemStatus
clearDependencies0(holder, current);
}
}
markDirty(key);
holder.consolidateMarkDirty(this);
onItemDowngrade(holder, nextStatus);
} catch (Throwable t) {
t.printStackTrace();
Expand Down Expand Up @@ -251,7 +251,7 @@ private void advanceStatus0(ItemHolder<K, V, Ctx, UserData> holder, ItemStatus<K
Throwable actual = throwable;
while (actual instanceof CompletionException ex) actual = ex.getCause();
if (isCancelled.get() && actual instanceof CancellationException) {
markDirty(key);
holder.consolidateMarkDirty(this);
return;
}
}
Expand All @@ -263,13 +263,13 @@ private void advanceStatus0(ItemHolder<K, V, Ctx, UserData> holder, ItemStatus<K
case PROCEED -> {
holder.setStatus(nextStatus);
rerequestDependencies(holder, nextStatus);
markDirty(key);
holder.consolidateMarkDirty(this);
onItemUpgrade(holder, nextStatus);
}
case MARK_BROKEN -> {
holder.setFlag(ItemHolder.FLAG_BROKEN);
clearDependencies0(holder, nextStatus);
markDirty(key);
holder.consolidateMarkDirty(this);
}
}

Expand All @@ -282,7 +282,7 @@ private void advanceStatus0(ItemHolder<K, V, Ctx, UserData> holder, ItemStatus<K
try {
holder.setFlag(ItemHolder.FLAG_BROKEN);
clearDependencies0(holder, nextStatus);
markDirty(key);
holder.consolidateMarkDirty(this);
} catch (Throwable t1) {
t.addSuppressed(t1);
}
Expand Down Expand Up @@ -428,7 +428,7 @@ private ItemHolder<K, V, Ctx, UserData> addTicket0(K key, ItemTicket<K, V, Ctx>
}
try {
holder.addTicket(ticket);
markDirty(key);
holder.consolidateMarkDirty(this);
} finally {
holder.busyRefCounter().decrementRefCount();
}
Expand Down Expand Up @@ -457,7 +457,7 @@ public void removeTicket(K key, ItemTicket.TicketType type, Object source, ItemS
throw new IllegalStateException("No such item");
}
holder.removeTicket(new ItemTicket<>(type, source, targetStatus, null));
markDirty(key);
holder.consolidateMarkDirty(this);
}

private ItemStatus<K, V, Ctx> getNextStatus(ItemStatus<K, V, Ctx> current, ItemStatus<K, V, Ctx> target) {
Expand Down Expand Up @@ -501,7 +501,7 @@ private void releaseDependencies(ItemHolder<K, V, Ctx, UserData> holder, ItemSta
}

protected boolean hasPendingUpdates() {
return !this.pendingUpdates.isEmpty() || !this.pendingUpdatesInternal.isEmpty();
return !this.pendingUpdates.isEmpty();
}

protected boolean continueProcessWork() {
Expand Down

0 comments on commit 4febfeb

Please sign in to comment.