From fc3357d21114319b7876036656039cfdb21fbccf Mon Sep 17 00:00:00 2001 From: ishland Date: Tue, 30 Jul 2024 23:52:43 +0800 Subject: [PATCH] new: parallel capable neighbor locking --- .../flowsched/executor/ExecutorManager.java | 81 ++++++++++++------- 1 file changed, 52 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/ishland/flowsched/executor/ExecutorManager.java b/src/main/java/com/ishland/flowsched/executor/ExecutorManager.java index 8a4239b..fa8e796 100644 --- a/src/main/java/com/ishland/flowsched/executor/ExecutorManager.java +++ b/src/main/java/com/ishland/flowsched/executor/ExecutorManager.java @@ -1,24 +1,18 @@ package com.ishland.flowsched.executor; import com.ishland.flowsched.structs.DynamicPriorityQueue; -import com.ishland.flowsched.structs.SimpleObjectPool; -import it.unimi.dsi.fastutil.objects.Object2ReferenceOpenHashMap; -import it.unimi.dsi.fastutil.objects.ReferenceArraySet; +import com.ishland.flowsched.util.Assertions; +import it.unimi.dsi.fastutil.objects.ReferenceArrayList; -import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.function.Consumer; public class ExecutorManager { private final DynamicPriorityQueue globalWorkQueue; - private final Object2ReferenceOpenHashMap> lockListeners = new Object2ReferenceOpenHashMap<>(); - private final SimpleObjectPool> lockListenersPool = new SimpleObjectPool<>( - pool -> new ReferenceArraySet<>(32), - Set::clear, - Set::clear, - 4096 - ); + private final ConcurrentMap lockListeners = new ConcurrentHashMap<>(); private final Object schedulingMutex = new Object(); final Object workerMonitor = new Object(); private final WorkerThread[] workerThreads; @@ -67,18 +61,28 @@ public ExecutorManager(int workerThreadCount, Consumer threadInitializer * @return {@code true} if the lock is acquired, {@code false} otherwise. */ boolean tryLock(Task task) { - synchronized (this.schedulingMutex) { - for (LockToken token : task.lockTokens()) { - final Set listeners = this.lockListeners.get(token); - if (listeners != null) { - listeners.add(task); + retry: + while (true) { + final FreeableTaskList listenerSet = new FreeableTaskList(); + LockToken[] lockTokens = task.lockTokens(); + for (int i = 0; i < lockTokens.length; i++) { + LockToken token = lockTokens[i]; + final FreeableTaskList present = this.lockListeners.putIfAbsent(token, listenerSet); + if (present != null) { + for (int j = 0; j < i; j++) { + this.lockListeners.remove(lockTokens[j], listenerSet); + } + callListeners(listenerSet); // synchronizes + synchronized (present) { + if (present.freed) { + continue retry; + } else { + present.add(task); + } + } return false; } } - for (LockToken token : task.lockTokens()) { - assert !this.lockListeners.containsKey(token); - this.lockListeners.put(token, this.lockListenersPool.alloc()); - } return true; } } @@ -88,17 +92,30 @@ boolean tryLock(Task task) { * @param task the task. */ void releaseLocks(Task task) { - synchronized (this.schedulingMutex) { - for (LockToken token : task.lockTokens()) { - final Set listeners = this.lockListeners.remove(token); - if (listeners != null) { - for (Task listener : listeners) { - this.schedule0(listener); - } - this.lockListenersPool.release(listeners); + FreeableTaskList expectedListeners = null; + for (LockToken token : task.lockTokens()) { + final FreeableTaskList listeners = this.lockListeners.remove(token); + if (listeners != null) { + if (expectedListeners == null) { + expectedListeners = listeners; } else { - throw new IllegalStateException("Lock token " + token + " is not locked"); + Assertions.assertTrue(expectedListeners == listeners, "Inconsistent lock listeners"); } + } else { + throw new IllegalStateException("Lock token " + token + " is not locked"); + } + } + if (expectedListeners != null) { + callListeners(expectedListeners); // synchronizes + } + } + + private void callListeners(FreeableTaskList listeners) { + synchronized (listeners) { + listeners.freed = true; + if (listeners.isEmpty()) return; + for (Task listener : listeners) { + this.schedule0(listener); } } this.wakeup(); @@ -181,4 +198,10 @@ public void notifyPriorityChange(Task task) { this.globalWorkQueue.changePriority(task, task.priority()); } + private static class FreeableTaskList extends ReferenceArrayList { + + private boolean freed = false; + + } + }