From f00bf9384fc2389cd270fcc2762824beae7fffac Mon Sep 17 00:00:00 2001 From: ishland Date: Fri, 2 Aug 2024 00:11:10 +0800 Subject: [PATCH] perf: thread status advance --- .../DaemonizedStatusAdvancingScheduler.java | 1 + .../flowsched/scheduler/ItemHolder.java | 13 ++++++++--- .../scheduler/StatusAdvancingScheduler.java | 22 ++++++++++++++----- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/ishland/flowsched/scheduler/DaemonizedStatusAdvancingScheduler.java b/src/main/java/com/ishland/flowsched/scheduler/DaemonizedStatusAdvancingScheduler.java index b541c25..40edf2a 100644 --- a/src/main/java/com/ishland/flowsched/scheduler/DaemonizedStatusAdvancingScheduler.java +++ b/src/main/java/com/ishland/flowsched/scheduler/DaemonizedStatusAdvancingScheduler.java @@ -119,6 +119,7 @@ protected void markDirty(K key) { } protected void wakeUp() { + if (Thread.currentThread() == this.thread) return; synchronized (this.notifyMonitor) { this.notifyMonitor.notify(); } diff --git a/src/main/java/com/ishland/flowsched/scheduler/ItemHolder.java b/src/main/java/com/ishland/flowsched/scheduler/ItemHolder.java index 9a90875..2796c9f 100644 --- a/src/main/java/com/ishland/flowsched/scheduler/ItemHolder.java +++ b/src/main/java/com/ishland/flowsched/scheduler/ItemHolder.java @@ -13,6 +13,7 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -265,20 +266,26 @@ public void release(SimpleObjectPool> ticketSetPool) { ticketSetPool.release(this.tickets); } - public void addDependencyTicket(StatusAdvancingScheduler scheduler, K key, ItemStatus status, Runnable callback) { + public void addDependencyTicket(StatusAdvancingScheduler scheduler, K key, ItemStatus status, Runnable callback, Executor runAddOn) { synchronized (this.dependencyInfos) { final DependencyInfo info = this.dependencyInfos.computeIfAbsent(key, k -> new DependencyInfo(status.getAllStatuses().length)); final int ordinal = status.ordinal(); if (info.refCnt[ordinal] == -1) { info.refCnt[ordinal] = 0; info.callbacks[ordinal] = new ObjectArrayList<>(); - scheduler.addTicketWithSource(key, ItemTicket.TicketType.DEPENDENCY, this.getKey(), status, () -> { + info.refCnt[ordinal] ++; + runAddOn.execute(() -> scheduler.addTicketWithSource(key, ItemTicket.TicketType.DEPENDENCY, this.getKey(), status, () -> { final ObjectArrayList list; synchronized (this.dependencyInfos) { list = info.callbacks[ordinal]; if (list != null) { info.callbacks[ordinal] = null; } + final int old = info.refCnt[status.ordinal()]--; + Assertions.assertTrue(old > 0); + if (old == 1) { + dependencyDirty = true; + } } if (list != null) { for (Runnable runnable : list) { @@ -289,7 +296,7 @@ public void addDependencyTicket(StatusAdvancingScheduler scheduler } } } - }); + })); } info.refCnt[ordinal] ++; final ObjectArrayList list = info.callbacks[ordinal]; diff --git a/src/main/java/com/ishland/flowsched/scheduler/StatusAdvancingScheduler.java b/src/main/java/com/ishland/flowsched/scheduler/StatusAdvancingScheduler.java index f83885a..a827f8d 100644 --- a/src/main/java/com/ishland/flowsched/scheduler/StatusAdvancingScheduler.java +++ b/src/main/java/com/ishland/flowsched/scheduler/StatusAdvancingScheduler.java @@ -7,8 +7,10 @@ import io.reactivex.rxjava3.schedulers.Schedulers; import it.unimi.dsi.fastutil.objects.Object2ReferenceOpenHashMap; import it.unimi.dsi.fastutil.objects.ObjectLinkedOpenHashSet; +import it.unimi.dsi.fastutil.objects.ReferenceArrayList; import java.lang.invoke.VarHandle; +import java.util.List; import java.util.Queue; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; @@ -152,7 +154,7 @@ public boolean tick() { if ((holder.getFlags() & ItemHolder.FLAG_BROKEN) != 0) { continue; // not allowed to upgrade } - holder.submitOp(CompletableFuture.runAsync(() -> advanceStatus0(holder, nextStatus, key), getExecutor())); + holder.submitOp(CompletableFuture.runAsync(() -> advanceStatus0(holder, nextStatus, key), getBackgroundExecutor())); } else { holder.setStatus(nextStatus); holder.submitOp(CompletableFuture.runAsync(() -> downgradeStatus0(holder, current, nextStatus, key), getExecutor())); @@ -216,7 +218,6 @@ private void advanceStatus0(ItemHolder holder, ItemStatus { Assertions.assertTrue(holder.isBusy()); @@ -287,7 +288,7 @@ private void rerequestDependencies(ItemHolder holder, ItemS holder.setDependencies(status, null); holder.setDependencies(status, newDep); for (KeyStatusPair pair : toAdd) { - holder.addDependencyTicket(this, pair.key(), pair.status(), NO_OP); + holder.addDependencyTicket(this, pair.key(), pair.status(), NO_OP, Runnable::run); } for (KeyStatusPair pair : toRemove) { holder.removeDependencyTicket(pair.key(), pair.status()); @@ -352,12 +353,12 @@ private CancellationSignaller getDependencyFuture0(KeyStatusPair[] de AtomicBoolean finished = new AtomicBoolean(false); final CancellationSignaller signaller = new CancellationSignaller(signaller1 -> { if (finished.compareAndSet(false, true)) { - releaseDependencies(holder, nextStatus); + this.getExecutor().execute(() -> releaseDependencies(holder, nextStatus)); signaller1.fireComplete(new CancellationException()); } }); try { - final KeyStatusPair keyStatusPair = new KeyStatusPair<>(holder.getKey(), nextStatus); + final List delayed = new ReferenceArrayList<>(); for (KeyStatusPair dependency : dependencies) { Assertions.assertTrue(!dependency.key().equals(holder.getKey())); holder.addDependencyTicket(this, dependency.key(), dependency.status(), () -> { @@ -369,6 +370,17 @@ private CancellationSignaller getDependencyFuture0(KeyStatusPair[] de getExecutor().execute(() -> signaller.fireComplete(null)); } } + }, delayed::add); + } + if (!delayed.isEmpty()) { + this.getExecutor().execute(() -> { + for (Runnable runnable : delayed) { + try { + runnable.run(); + } catch (Throwable t) { + t.printStackTrace(); + } + } }); } } catch (Throwable t) {