Skip to content

Commit

Permalink
fix: unstable DynamicPriorityQueue in edge cases
Browse files Browse the repository at this point in the history
  • Loading branch information
ishland committed Aug 25, 2024
1 parent 5e5b12d commit acbec28
Showing 1 changed file with 12 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;

/**
* A priority queue with fixed number of priorities and allows changing priorities of elements.
Expand All @@ -11,12 +12,12 @@
*/
public class DynamicPriorityQueue<E> {

private final AtomicIntegerArray taskCount;
private final ConcurrentLinkedQueue<E>[] priorities;
private final ConcurrentHashMap<E, Integer> priorityMap = new ConcurrentHashMap<>();

private final AtomicInteger currentMinPriority = new AtomicInteger(0);

public DynamicPriorityQueue(int priorityCount) {
this.taskCount = new AtomicIntegerArray(priorityCount);
//noinspection unchecked
this.priorities = new ConcurrentLinkedQueue[priorityCount];
for (int i = 0; i < priorityCount; i++) {
Expand All @@ -31,7 +32,7 @@ public void enqueue(E element, int priority) {
throw new IllegalArgumentException("Element already in queue");

this.priorities[priority].add(element);
this.currentMinPriority.getAndUpdate(operand -> Math.min(operand, priority));
this.taskCount.incrementAndGet(priority);
}

// behavior is undefined when changing priority for one item concurrently
Expand All @@ -47,26 +48,25 @@ public boolean changePriority(E element, int priority) {
if (!removedFromQueue) {
return false; // the element is dequeued while we are changing priority
}
this.taskCount.decrementAndGet(currentPriority);
final Integer put = this.priorityMap.put(element, priority);
final boolean changeSuccess = put != null && put == currentPriority;
if (!changeSuccess) {
return false; // something else may have called remove()
}
this.priorities[priority].add(element);
this.currentMinPriority.getAndUpdate(operand -> Math.min(operand, priority));
this.taskCount.incrementAndGet(priority);
return true;
}

public E dequeue() {
int current = this.currentMinPriority.get();
while (current < this.priorities.length) {
E element = priorities[current].poll();
for (int i = 0; i < this.priorities.length; i ++) {
if (this.taskCount.get(i) == 0) continue;
E element = priorities[i].poll();
if (element != null) {
this.taskCount.decrementAndGet(i);
this.priorityMap.remove(element);
return element;
} else {
int finalCurrent = current;
current = this.currentMinPriority.updateAndGet(operand -> operand < this.priorities.length ? Math.min(operand + (priorities[operand].isEmpty() ? 1 : 0), finalCurrent + 1) : operand);
}
}
return null;
Expand All @@ -79,7 +79,8 @@ public boolean contains(E element) {
public void remove(E element) {
final Integer remove = this.priorityMap.remove(element);
if (remove == null) return;
this.priorities[remove].remove(element); // best-effort
boolean removed = this.priorities[remove].remove(element); // best-effort
if (removed) this.taskCount.decrementAndGet(remove);
}

public int size() {
Expand Down

0 comments on commit acbec28

Please sign in to comment.