Skip to content

Commit

Permalink
perf: thread status advance
Browse files Browse the repository at this point in the history
  • Loading branch information
ishland committed Aug 1, 2024
1 parent a6d940b commit f00bf93
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ protected void markDirty(K key) {
}

protected void wakeUp() {
if (Thread.currentThread() == this.thread) return;
synchronized (this.notifyMonitor) {
this.notifyMonitor.notify();
}
Expand Down
13 changes: 10 additions & 3 deletions src/main/java/com/ishland/flowsched/scheduler/ItemHolder.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand Down Expand Up @@ -265,20 +266,26 @@ public void release(SimpleObjectPool<TicketSet<K, V, Ctx>> ticketSetPool) {
ticketSetPool.release(this.tickets);
}

public void addDependencyTicket(StatusAdvancingScheduler<K, V, Ctx, ?> scheduler, K key, ItemStatus<K, V, Ctx> status, Runnable callback) {
public void addDependencyTicket(StatusAdvancingScheduler<K, V, Ctx, ?> scheduler, K key, ItemStatus<K, V, Ctx> status, Runnable callback, Executor runAddOn) {
synchronized (this.dependencyInfos) {
final DependencyInfo info = this.dependencyInfos.computeIfAbsent(key, k -> new DependencyInfo(status.getAllStatuses().length));
final int ordinal = status.ordinal();
if (info.refCnt[ordinal] == -1) {
info.refCnt[ordinal] = 0;
info.callbacks[ordinal] = new ObjectArrayList<>();
scheduler.addTicketWithSource(key, ItemTicket.TicketType.DEPENDENCY, this.getKey(), status, () -> {
info.refCnt[ordinal] ++;
runAddOn.execute(() -> scheduler.addTicketWithSource(key, ItemTicket.TicketType.DEPENDENCY, this.getKey(), status, () -> {
final ObjectArrayList<Runnable> list;
synchronized (this.dependencyInfos) {
list = info.callbacks[ordinal];
if (list != null) {
info.callbacks[ordinal] = null;
}
final int old = info.refCnt[status.ordinal()]--;
Assertions.assertTrue(old > 0);
if (old == 1) {
dependencyDirty = true;
}
}
if (list != null) {
for (Runnable runnable : list) {
Expand All @@ -289,7 +296,7 @@ public void addDependencyTicket(StatusAdvancingScheduler<K, V, Ctx, ?> scheduler
}
}
}
});
}));
}
info.refCnt[ordinal] ++;
final ObjectArrayList<Runnable> list = info.callbacks[ordinal];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import io.reactivex.rxjava3.schedulers.Schedulers;
import it.unimi.dsi.fastutil.objects.Object2ReferenceOpenHashMap;
import it.unimi.dsi.fastutil.objects.ObjectLinkedOpenHashSet;
import it.unimi.dsi.fastutil.objects.ReferenceArrayList;

import java.lang.invoke.VarHandle;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -152,7 +154,7 @@ public boolean tick() {
if ((holder.getFlags() & ItemHolder.FLAG_BROKEN) != 0) {
continue; // not allowed to upgrade
}
holder.submitOp(CompletableFuture.runAsync(() -> advanceStatus0(holder, nextStatus, key), getExecutor()));
holder.submitOp(CompletableFuture.runAsync(() -> advanceStatus0(holder, nextStatus, key), getBackgroundExecutor()));
} else {
holder.setStatus(nextStatus);
holder.submitOp(CompletableFuture.runAsync(() -> downgradeStatus0(holder, current, nextStatus, key), getExecutor()));
Expand Down Expand Up @@ -216,7 +218,6 @@ private void advanceStatus0(ItemHolder<K, V, Ctx, UserData> holder, ItemStatus<K
emitter.onComplete();
}
}))
.subscribeOn(getSchedulerBackedByBackgroundExecutor())
.observeOn(getSchedulerBackedByBackgroundExecutor())
.andThen(Completable.defer(() -> {
Assertions.assertTrue(holder.isBusy());
Expand Down Expand Up @@ -287,7 +288,7 @@ private void rerequestDependencies(ItemHolder<K, V, Ctx, UserData> holder, ItemS
holder.setDependencies(status, null);
holder.setDependencies(status, newDep);
for (KeyStatusPair<K, V, Ctx> pair : toAdd) {
holder.addDependencyTicket(this, pair.key(), pair.status(), NO_OP);
holder.addDependencyTicket(this, pair.key(), pair.status(), NO_OP, Runnable::run);
}
for (KeyStatusPair<K, V, Ctx> pair : toRemove) {
holder.removeDependencyTicket(pair.key(), pair.status());
Expand Down Expand Up @@ -352,12 +353,12 @@ private CancellationSignaller getDependencyFuture0(KeyStatusPair<K, V, Ctx>[] de
AtomicBoolean finished = new AtomicBoolean(false);
final CancellationSignaller signaller = new CancellationSignaller(signaller1 -> {
if (finished.compareAndSet(false, true)) {
releaseDependencies(holder, nextStatus);
this.getExecutor().execute(() -> releaseDependencies(holder, nextStatus));
signaller1.fireComplete(new CancellationException());
}
});
try {
final KeyStatusPair<K, V, Ctx> keyStatusPair = new KeyStatusPair<>(holder.getKey(), nextStatus);
final List<Runnable> delayed = new ReferenceArrayList<>();
for (KeyStatusPair<K, V, Ctx> dependency : dependencies) {
Assertions.assertTrue(!dependency.key().equals(holder.getKey()));
holder.addDependencyTicket(this, dependency.key(), dependency.status(), () -> {
Expand All @@ -369,6 +370,17 @@ private CancellationSignaller getDependencyFuture0(KeyStatusPair<K, V, Ctx>[] de
getExecutor().execute(() -> signaller.fireComplete(null));
}
}
}, delayed::add);
}
if (!delayed.isEmpty()) {
this.getExecutor().execute(() -> {
for (Runnable runnable : delayed) {
try {
runnable.run();
} catch (Throwable t) {
t.printStackTrace();
}
}
});
}
} catch (Throwable t) {
Expand Down

0 comments on commit f00bf93

Please sign in to comment.