Skip to content

Commit

Permalink
new: parallel capable neighbor locking
Browse files Browse the repository at this point in the history
  • Loading branch information
ishland committed Jul 30, 2024
1 parent 99cac95 commit fc3357d
Showing 1 changed file with 52 additions and 29 deletions.
81 changes: 52 additions & 29 deletions src/main/java/com/ishland/flowsched/executor/ExecutorManager.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
package com.ishland.flowsched.executor;

import com.ishland.flowsched.structs.DynamicPriorityQueue;
import com.ishland.flowsched.structs.SimpleObjectPool;
import it.unimi.dsi.fastutil.objects.Object2ReferenceOpenHashMap;
import it.unimi.dsi.fastutil.objects.ReferenceArraySet;
import com.ishland.flowsched.util.Assertions;
import it.unimi.dsi.fastutil.objects.ReferenceArrayList;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.function.Consumer;

public class ExecutorManager {

private final DynamicPriorityQueue<Task> globalWorkQueue;
private final Object2ReferenceOpenHashMap<LockToken, Set<Task>> lockListeners = new Object2ReferenceOpenHashMap<>();
private final SimpleObjectPool<Set<Task>> lockListenersPool = new SimpleObjectPool<>(
pool -> new ReferenceArraySet<>(32),
Set::clear,
Set::clear,
4096
);
private final ConcurrentMap<LockToken, FreeableTaskList> lockListeners = new ConcurrentHashMap<>();
private final Object schedulingMutex = new Object();
final Object workerMonitor = new Object();
private final WorkerThread[] workerThreads;
Expand Down Expand Up @@ -67,18 +61,28 @@ public ExecutorManager(int workerThreadCount, Consumer<Thread> threadInitializer
* @return {@code true} if the lock is acquired, {@code false} otherwise.
*/
boolean tryLock(Task task) {
synchronized (this.schedulingMutex) {
for (LockToken token : task.lockTokens()) {
final Set<Task> listeners = this.lockListeners.get(token);
if (listeners != null) {
listeners.add(task);
retry:
while (true) {
final FreeableTaskList listenerSet = new FreeableTaskList();
LockToken[] lockTokens = task.lockTokens();
for (int i = 0; i < lockTokens.length; i++) {
LockToken token = lockTokens[i];
final FreeableTaskList present = this.lockListeners.putIfAbsent(token, listenerSet);
if (present != null) {
for (int j = 0; j < i; j++) {
this.lockListeners.remove(lockTokens[j], listenerSet);
}
callListeners(listenerSet); // synchronizes
synchronized (present) {
if (present.freed) {
continue retry;
} else {
present.add(task);
}
}
return false;
}
}
for (LockToken token : task.lockTokens()) {
assert !this.lockListeners.containsKey(token);
this.lockListeners.put(token, this.lockListenersPool.alloc());
}
return true;
}
}
Expand All @@ -88,17 +92,30 @@ boolean tryLock(Task task) {
* @param task the task.
*/
void releaseLocks(Task task) {
synchronized (this.schedulingMutex) {
for (LockToken token : task.lockTokens()) {
final Set<Task> listeners = this.lockListeners.remove(token);
if (listeners != null) {
for (Task listener : listeners) {
this.schedule0(listener);
}
this.lockListenersPool.release(listeners);
FreeableTaskList expectedListeners = null;
for (LockToken token : task.lockTokens()) {
final FreeableTaskList listeners = this.lockListeners.remove(token);
if (listeners != null) {
if (expectedListeners == null) {
expectedListeners = listeners;
} else {
throw new IllegalStateException("Lock token " + token + " is not locked");
Assertions.assertTrue(expectedListeners == listeners, "Inconsistent lock listeners");
}
} else {
throw new IllegalStateException("Lock token " + token + " is not locked");
}
}
if (expectedListeners != null) {
callListeners(expectedListeners); // synchronizes
}
}

private void callListeners(FreeableTaskList listeners) {
synchronized (listeners) {
listeners.freed = true;
if (listeners.isEmpty()) return;
for (Task listener : listeners) {
this.schedule0(listener);
}
}
this.wakeup();
Expand Down Expand Up @@ -181,4 +198,10 @@ public void notifyPriorityChange(Task task) {
this.globalWorkQueue.changePriority(task, task.priority());
}

private static class FreeableTaskList extends ReferenceArrayList<Task> {

private boolean freed = false;

}

}

0 comments on commit fc3357d

Please sign in to comment.