monitor Monitor(value: Type) {
def foo() {
statements
}
...
}
It is equivalent to:
class Monitor(value: Type) {
synchronized def foo() {
try {
statements
} finally {
this.notifyAll;
}
}
...
}
wait condition until deadline
condition
is a boolean expression, evaluated on each loop iteration.deadline
is of type Time, computed on each iteration.
Wait for condition
to become true
until deadline
occurs.
It is equivalent to
while (!condition && deadline > now()) {
this.wait(deadline - now());
}
wait condition
The statement is unblocked when the condition
becomes true
.
wait until deadline
It just waits till system time reaches the given deadline
.
wait condition for timeout
This is equivalent to
val beg = now()
wait condition until beg + timeout
It also has the following short forms, like:
wait condition
wait for timeout
while (condition1) {
statements1
} else if (condition2) {
statements2
...
}
I find elif
is nicer than else if
, but it is very unusual for Java/Scala world.
The Dijkstra's while loop is equivalent to
while (true) {
if (condition1) {
statements1
} else if (condition2) {
statements2
...
} else {
break;
}
}
We prefer to use not
, or
, and
over !
, ||
, &&
.
monitor AtomicInc(var value: Int = 0) {
def get = value
def incAndGet: Int = {
value++
value
}
def set(v: Int) {
value = v
}
def waitUntilExceeds(v: Int) {
wait value > v
}
}
monitor CompareAndSwap[V](var value: V = _) {
def get = value
def cas(prevValue: V, newValue: V): Boolean =
if (value == prevValue) {
value = newValue
true
} else false
}
monitor Barrier(maxArrivals: Int) {
private var nArrivals: Int
def arrive() {
nArrivals++
wait nArrivals >= maxArrivals
}
}
monitor AutoBarrier(maxArrivals: Int) {
private var nArrivals: Int
def arrive() {
nArrivals++
thisMaxArrivals = nArrivals - nArrivals % maxArrivals + maxArrivals
wait nArrivals >= thisMaxArrivals
}
}
monitor Event {
private var signaled: Boolean
def signal() {
signaled = true
}
def wait() {
wait signaled
}
def reset() {
signaled = false
}
}
monitor AutoEvent {
private var signaled: Boolean
def signal() {
signaled = true
}
def wait() {
wait signaled
signaled = false
}
}
monitor ConditionEvent {
private var nSignaled: Int
private var nPending: Int
def signal() {
nSignaled++
}
def signalAll() {
nSignaled = nPending
}
def wait() {
val id = nPending
nPending++
wait nSignaled > id
}
}
Note that ConditionVar
is just a class, since we use ConditionEvent
class ConditionVar(mutex: Mutex) {
private val event = new ConditionEvent
def await() {
mutex.unlock
try {
event.wait
} finally {
mutex.lock
}
}
def notify() {
event.signal
}
def notifyAll() {
event.signalAll
}
}
monitor Mutex {
private var locked: Boolean
def isLocked = locked
def lock() {
wait not locked
locked = true
}
def unlock() {
locked = false
}
def tryLock(timeout: Duration = 0): Boolean = {
wait not locked for timeout
if (not locked) {
locked = true
true
} else false
}
def conditionVar(): ConditionVar = new ConditionVar(this)
}
monitor Semaphore(nUnits: Int = 0) {
def release() {
nUnits++
}
def acquire() {
wait nUnits > 0
nUnits--
}
def tryAcquire(timeout: Duration = 0): Boolean = {
wait nUnits > 0 for timeout
if (nUnits > 0) {
nUnits--
true
} else false
}
}
monitor ReadWriteLock {
private var nReaders: Int
private var nWriters: Int
def lockRead() {
wait nWriters == 0
nReaders++
}
def unlockRead() {
nReaders--
}
def lockWrite() {
wait nWriters + nReaders == 0
nWriters++
}
def unlockWrite() {
nWriters--
}
}
monitor ReadWriteLock {
private var nReaders: Int
private var nWriters: Int
private var nWriteReqs: Int
def lockRead() {
wait nWriters + nWriteReqs == 0
nReaders++
}
def unlockRead() {
nReaders--
}
def lockWrite() {
nWriteReqs++
wait nWriters + nReaders == 0
nWriteReqs--
nWriters++
}
def unlockWrite() {
nWriters--
}
}
monitor UnboundedBlockingQueue[V] {
private val values = new Queue[V]
def enqueue(value: V) {
values.enqueue(value)
}
def dequeue(): V = {
wait not values.isEmpty
values.dequeue
}
}
monitor BlockingQueue[V](capacity: Int) {
private val values = new Queue[V]
def enqueue(value: V) {
wait values.size < capacity
values.enqueue(value)
}
def dequeue(): V = {
wait not values.isEmpty
values.dequeue
}
}
monitor DelayQueue[V] {
case class Entry(value: V, when: Time)
private val entries = new PriorityQueue[Entry](e => e.when)
def enqueue(value: V, timeout: Duration = 0) {
entries.enqueue(Entry(value, now() + timeout))
}
def dequeue(): V = {
while (entries.isEmpty) {
this.wait
} else if (entries.head.when > now()) {
this.wait(entries.head.when - now())
}
entries.dequeue.value
}
}
monitor DelayQueue[V] {
case class Entry(value: V, when: Time)
private val entries = new PriorityQueue[Entry]()(Ordering.by{e => -e.when})
def enqueue(value: V, timeout: Duration = 0) {
entries.enqueue(Entry(value, now() + timeout))
}
def dequeue(): V = {
wait not entries.isEmpty and entries.head.when <= now()
until if (entries.isEmpty) INFINITY else entries.head.when
entries.dequeue.value
}
}
class DelayExecutor(exec: Executor) {
private val runnables = new DelayQueue[Runnable]
private val worker = new Thread {
override def run() {
while (true) {
exec.execute(runnables.dequeue)
}
}
}
worker.start
def execute(r: Runnable, timeout: Duration = 0) {
if (timeout == 0) {
exec.execute(r)
} else {
runnables.enqueue(r, timeout)
}
}
}
monitor ExpireMap[K, V] {
class MEntry(var value: V = _, var it: DelayQueue[Entry].iterator=_)
private val keys = new DelayQueue[K]
private val table = new HashMap[K, MEntry]
private val worker = new Thread {
override def run() {
while (true) {
remove(keys.dequeue)
}
}
}
worker.start
def contains(key K): Boolean = table.contains(key)
def get(key: K): V = table.getOrElse(key, new MEntry).value
def put(key: K, value: V) {
table.getOrElseUpdate(key, new MEntry).value = value
}
def remove(key: K, timeout: Duration = 0) {
table.get(key) match {
case Some(e) => {
if (e.it != null) {
e.it.remove
}
if (timeout > 0) {
e.it = keys.enqueue(key, timeout)
} else {
table.remove(key)
}
}
case _ =>
}
}
}