diff --git a/src/main/java/com/ishland/flowsched/structs/DynamicPriorityQueue.java b/src/main/java/com/ishland/flowsched/structs/DynamicPriorityQueue.java index bd3ea64..86edec2 100644 --- a/src/main/java/com/ishland/flowsched/structs/DynamicPriorityQueue.java +++ b/src/main/java/com/ishland/flowsched/structs/DynamicPriorityQueue.java @@ -3,6 +3,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerArray; /** * A priority queue with fixed number of priorities and allows changing priorities of elements. @@ -11,12 +12,12 @@ */ public class DynamicPriorityQueue { + private final AtomicIntegerArray taskCount; private final ConcurrentLinkedQueue[] priorities; private final ConcurrentHashMap priorityMap = new ConcurrentHashMap<>(); - private final AtomicInteger currentMinPriority = new AtomicInteger(0); - public DynamicPriorityQueue(int priorityCount) { + this.taskCount = new AtomicIntegerArray(priorityCount); //noinspection unchecked this.priorities = new ConcurrentLinkedQueue[priorityCount]; for (int i = 0; i < priorityCount; i++) { @@ -31,7 +32,7 @@ public void enqueue(E element, int priority) { throw new IllegalArgumentException("Element already in queue"); this.priorities[priority].add(element); - this.currentMinPriority.getAndUpdate(operand -> Math.min(operand, priority)); + this.taskCount.incrementAndGet(priority); } // behavior is undefined when changing priority for one item concurrently @@ -47,26 +48,25 @@ public boolean changePriority(E element, int priority) { if (!removedFromQueue) { return false; // the element is dequeued while we are changing priority } + this.taskCount.decrementAndGet(currentPriority); final Integer put = this.priorityMap.put(element, priority); final boolean changeSuccess = put != null && put == currentPriority; if (!changeSuccess) { return false; // something else may have called remove() } this.priorities[priority].add(element); - this.currentMinPriority.getAndUpdate(operand -> Math.min(operand, priority)); + this.taskCount.incrementAndGet(priority); return true; } public E dequeue() { - int current = this.currentMinPriority.get(); - while (current < this.priorities.length) { - E element = priorities[current].poll(); + for (int i = 0; i < this.priorities.length; i ++) { + if (this.taskCount.get(i) == 0) continue; + E element = priorities[i].poll(); if (element != null) { + this.taskCount.decrementAndGet(i); this.priorityMap.remove(element); return element; - } else { - int finalCurrent = current; - current = this.currentMinPriority.updateAndGet(operand -> operand < this.priorities.length ? Math.min(operand + (priorities[operand].isEmpty() ? 1 : 0), finalCurrent + 1) : operand); } } return null; @@ -79,7 +79,8 @@ public boolean contains(E element) { public void remove(E element) { final Integer remove = this.priorityMap.remove(element); if (remove == null) return; - this.priorities[remove].remove(element); // best-effort + boolean removed = this.priorities[remove].remove(element); // best-effort + if (removed) this.taskCount.decrementAndGet(remove); } public int size() {