From 4febfeb06d9156f3cecb39134ca4bf7757bac57b Mon Sep 17 00:00:00 2001 From: ishland Date: Sun, 1 Sep 2024 00:05:03 +0800 Subject: [PATCH] perf: consolidate markDirty calls --- .../flowsched/scheduler/BusyRefCounter.java | 32 ++++++++++++++++++- .../flowsched/scheduler/ItemHolder.java | 5 +++ .../scheduler/StatusAdvancingScheduler.java | 20 ++++++------ 3 files changed, 46 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/ishland/flowsched/scheduler/BusyRefCounter.java b/src/main/java/com/ishland/flowsched/scheduler/BusyRefCounter.java index ccdec4f..2e07bc4 100644 --- a/src/main/java/com/ishland/flowsched/scheduler/BusyRefCounter.java +++ b/src/main/java/com/ishland/flowsched/scheduler/BusyRefCounter.java @@ -9,6 +9,7 @@ public class BusyRefCounter { private final ReferenceList onComplete = new ReferenceArrayList<>(); + private Runnable onCompleteOnce = null; private int counter = 0; public synchronized boolean isBusy() { @@ -17,13 +18,32 @@ public synchronized boolean isBusy() { public void addListener(Runnable runnable) { Objects.requireNonNull(runnable); + boolean runNow = false; synchronized (this) { if (!isBusy()) { - runnable.run(); + runNow = true; } else { onComplete.add(runnable); } } + if (runNow) { + runnable.run(); + } + } + + public void addListenerOnce(Runnable runnable) { + Objects.requireNonNull(runnable); + boolean runNow = false; + synchronized (this) { + if (!isBusy()) { + runNow = true; + } else { + onCompleteOnce = runnable; + } + } + if (runNow) { + runnable.run(); + } } public synchronized void incrementRefCount() { @@ -32,12 +52,15 @@ public synchronized void incrementRefCount() { public void decrementRefCount() { Runnable[] onCompleteArray = null; + Runnable onCompleteOnce = null; synchronized (this) { Assertions.assertTrue(counter > 0); if (--counter == 0) { onCompleteArray = onComplete.toArray(Runnable[]::new); onComplete.clear(); } + onCompleteOnce = this.onCompleteOnce; + this.onCompleteOnce = null; } if (onCompleteArray != null) { for (Runnable runnable : onCompleteArray) { @@ -48,6 +71,13 @@ public void decrementRefCount() { } } } + if (onCompleteOnce != null) { + try { + onCompleteOnce.run(); + } catch (Throwable t) { + t.printStackTrace(); + } + } } } diff --git a/src/main/java/com/ishland/flowsched/scheduler/ItemHolder.java b/src/main/java/com/ishland/flowsched/scheduler/ItemHolder.java index 6f65e82..cd9b873 100644 --- a/src/main/java/com/ishland/flowsched/scheduler/ItemHolder.java +++ b/src/main/java/com/ishland/flowsched/scheduler/ItemHolder.java @@ -181,6 +181,11 @@ public void submitOpListener(Runnable runnable) { this.busyRefCounter.addListener(runnable); } + public void consolidateMarkDirty(StatusAdvancingScheduler scheduler) { + assertOpen(); + this.busyRefCounter.addListenerOnce(() -> scheduler.markDirty(this.getKey())); + } + public boolean setStatus(ItemStatus status) { assertOpen(); ItemTicket[] ticketsToFire = null; diff --git a/src/main/java/com/ishland/flowsched/scheduler/StatusAdvancingScheduler.java b/src/main/java/com/ishland/flowsched/scheduler/StatusAdvancingScheduler.java index 7be6db3..bb9f5da 100644 --- a/src/main/java/com/ishland/flowsched/scheduler/StatusAdvancingScheduler.java +++ b/src/main/java/com/ishland/flowsched/scheduler/StatusAdvancingScheduler.java @@ -131,7 +131,7 @@ public boolean tick() { if (projectedCurrent.ordinal() > nextStatus.ordinal()) { getExecutor().execute(holder::tryCancelUpgradeAction); } - holder.submitOpListener(() -> markDirty(key)); + holder.consolidateMarkDirty(this); continue; } if (nextStatus == current) { @@ -139,8 +139,8 @@ public boolean tick() { if (holder.isDependencyDirty()) { // schedule dependency cleanup async holder.submitOp(CompletableFuture.runAsync(() -> { holder.cleanupDependencies(this); - markDirty(holder.getKey()); }, getBackgroundExecutor())); + holder.consolidateMarkDirty(this); continue; } if (holder.holdsDependency()) { @@ -208,7 +208,7 @@ private void downgradeStatus0(ItemHolder holder, ItemStatus clearDependencies0(holder, current); } } - markDirty(key); + holder.consolidateMarkDirty(this); onItemDowngrade(holder, nextStatus); } catch (Throwable t) { t.printStackTrace(); @@ -251,7 +251,7 @@ private void advanceStatus0(ItemHolder holder, ItemStatus holder, ItemStatus { holder.setStatus(nextStatus); rerequestDependencies(holder, nextStatus); - markDirty(key); + holder.consolidateMarkDirty(this); onItemUpgrade(holder, nextStatus); } case MARK_BROKEN -> { holder.setFlag(ItemHolder.FLAG_BROKEN); clearDependencies0(holder, nextStatus); - markDirty(key); + holder.consolidateMarkDirty(this); } } @@ -282,7 +282,7 @@ private void advanceStatus0(ItemHolder holder, ItemStatus addTicket0(K key, ItemTicket } try { holder.addTicket(ticket); - markDirty(key); + holder.consolidateMarkDirty(this); } finally { holder.busyRefCounter().decrementRefCount(); } @@ -457,7 +457,7 @@ public void removeTicket(K key, ItemTicket.TicketType type, Object source, ItemS throw new IllegalStateException("No such item"); } holder.removeTicket(new ItemTicket<>(type, source, targetStatus, null)); - markDirty(key); + holder.consolidateMarkDirty(this); } private ItemStatus getNextStatus(ItemStatus current, ItemStatus target) { @@ -501,7 +501,7 @@ private void releaseDependencies(ItemHolder holder, ItemSta } protected boolean hasPendingUpdates() { - return !this.pendingUpdates.isEmpty() || !this.pendingUpdatesInternal.isEmpty(); + return !this.pendingUpdates.isEmpty(); } protected boolean continueProcessWork() {