Skip to content

Commit

Permalink
fix(model-server): transaction isolation in listeners
Browse files Browse the repository at this point in the history
When listening to an entry in the store, a listener should only be notified after a transaction is
committed and the notification should happen outside a transaction.
A listener should not get notified about changes that where made during a failed transaction.
  • Loading branch information
slisson committed Dec 15, 2023
1 parent 94d28f8 commit c8bb123
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,28 @@
*/
package org.modelix.model.server.store

import com.google.common.collect.MultimapBuilder
import mu.KotlinLogging
import org.apache.ignite.Ignite
import org.apache.ignite.IgniteCache
import org.apache.ignite.Ignition
import org.modelix.model.IKeyListener
import org.modelix.model.persistent.HashUtil
import java.io.File
import java.io.FileReader
import java.io.IOException
import java.util.*
import java.util.concurrent.Executors
import java.util.stream.Collectors

private val LOG = KotlinLogging.logger { }

class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) : IStoreClient, AutoCloseable {
private val ignite: Ignite
private val ENTRY_CHANGED_TOPIC = "entryChanged"
private lateinit var ignite: Ignite
private val cache: IgniteCache<String, String?>
private val timer = Executors.newScheduledThreadPool(1)
private val listeners = MultimapBuilder.hashKeys().hashSetValues().build<String, IKeyListener>()
private val changeNotifier = ChangeNotifier(this)
private val pendingChangeMessages = PendingChangeMessages {
ignite.message().send(ENTRY_CHANGED_TOPIC, it)
}

/**
* Istantiate an IgniteStoreClient
Expand Down Expand Up @@ -68,6 +73,13 @@ class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) :
// timer.scheduleAtFixedRate(() -> {
// System.out.println("stats: " + cache.metrics());
// }, 10, 10, TimeUnit.SECONDS);

ignite.message().localListen(ENTRY_CHANGED_TOPIC) { nodeId: UUID?, key: Any? ->
if (key is String) {
changeNotifier.notifyListeners(key)
}
true
}
}

override fun get(key: String): String? {
Expand All @@ -94,44 +106,27 @@ class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) :
override fun putAll(entries: Map<String, String?>, silent: Boolean) {
val deletes = entries.filterValues { it == null }
val puts = entries.filterValues { it != null }
if (deletes.isNotEmpty()) cache.removeAll(deletes.keys)
if (puts.isNotEmpty()) cache.putAll(puts)
if (!silent) {
for ((key, value) in entries) {
ignite.message().send(key, value ?: IKeyListener.NULL_VALUE)
runTransaction {
if (deletes.isNotEmpty()) cache.removeAll(deletes.keys)
if (puts.isNotEmpty()) cache.putAll(puts)
if (!silent) {
for (key in entries.keys) {
if (HashUtil.isSha256(key)) continue
pendingChangeMessages.entryChanged(key)
}
}
}
}

override fun listen(key: String, listener: IKeyListener) {
synchronized(listeners) {
val wasSubscribed = listeners.containsKey(key)
listeners.put(key, listener)
if (!wasSubscribed) {
ignite.message()
.localListen(
key,
) { nodeId: UUID?, value: Any? ->
if (value is String) {
synchronized(listeners) {
for (l in listeners[key].toList()) {
try {
l.changed(key, if (value == IKeyListener.NULL_VALUE) null else value)
} catch (ex: Exception) {
println(ex.message)
ex.printStackTrace()
}
}
}
}
true
}
}
}
// Entries where the key is the SHA hash over the value are not expected to change and listening is unnecessary.
require(!HashUtil.isSha256(key)) { "Listener for $key will never get notified." }

changeNotifier.addListener(key, listener)
}

override fun removeListener(key: String, listener: IKeyListener) {
synchronized(listeners) { listeners.remove(key, listener) }
changeNotifier.removeListener(key, listener)
}

override fun generateId(key: String): Long {
Expand All @@ -144,6 +139,7 @@ class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) :
transactions.txStart().use { tx ->
val result = body()
tx.commit()
pendingChangeMessages.flushChangeMessages()
return result
}
} else {
Expand All @@ -160,3 +156,62 @@ class IgniteStoreClient(jdbcConfFile: File? = null, inmemory: Boolean = false) :
dispose()
}
}

class PendingChangeMessages(private val notifier: (String) -> Unit) {
private val pendingChangeMessages = Collections.synchronizedSet(HashSet<String>())

@Synchronized
fun flushChangeMessages() {
for (pendingChangeMessage in pendingChangeMessages) {
notifier(pendingChangeMessage)
}
pendingChangeMessages.clear()
}

@Synchronized
fun entryChanged(key: String) {
pendingChangeMessages += key
}
}

class ChangeNotifier(val store: IStoreClient) {
private val changeNotifiers = HashMap<String, EntryChangeNotifier>()

@Synchronized
fun notifyListeners(key: String) {
changeNotifiers[key]?.notifyIfChanged()
}

@Synchronized
fun addListener(key: String, listener: IKeyListener) {
changeNotifiers.getOrPut(key) { EntryChangeNotifier(key) }.listeners.add(listener)
}

@Synchronized
fun removeListener(key: String, listener: IKeyListener) {
val notifier = changeNotifiers[key] ?: return
notifier.listeners.remove(listener)
if (notifier.listeners.isEmpty()) {
changeNotifiers.remove(key)
}
}

private inner class EntryChangeNotifier(val key: String) {
val listeners = HashSet<IKeyListener>()
private var lastNotifiedValue: String? = null

fun notifyIfChanged() {
val value = store.get(key)
if (value == lastNotifiedValue) return
lastNotifiedValue = value

for (listener in listeners) {
try {
listener.changed(key, value)
} catch (ex: Exception) {
LOG.error("Exception in listener of $key", ex)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class InMemoryStoreClient : IStoreClient {

private val values: MutableMap<String, String?> = HashMap()
private var transactionValues: MutableMap<String, String?>? = null
private val listeners: MutableMap<String?, MutableSet<IKeyListener>> = HashMap()
private val changeNotifier = ChangeNotifier(this)
private val pendingChangeMessages = PendingChangeMessages(changeNotifier::notifyListeners)

@Synchronized
override fun get(key: String): String? {
Expand All @@ -62,35 +63,31 @@ class InMemoryStoreClient : IStoreClient {

@Synchronized
override fun put(key: String, value: String?, silent: Boolean) {
(transactionValues ?: values)[key] = value
if (!silent) {
listeners[key]?.toList()?.forEach {
try {
it.changed(key, value)
} catch (ex: Exception) {
println(ex.message)
ex.printStackTrace()
LOG.error("Failed to notify listeners after put '$key' = '$value'", ex)
}
runTransaction {
(transactionValues ?: values)[key] = value
if (!silent) {
pendingChangeMessages.entryChanged(key)
}
}
}

@Synchronized
override fun putAll(entries: Map<String, String?>, silent: Boolean) {
for ((key, value) in entries) {
put(key, value, silent)
runTransaction {
for ((key, value) in entries) {
put(key, value, silent)
}
}
}

@Synchronized
override fun listen(key: String, listener: IKeyListener) {
listeners.getOrPut(key) { LinkedHashSet() }.add(listener)
changeNotifier.addListener(key, listener)
}

@Synchronized
override fun removeListener(key: String, listener: IKeyListener) {
listeners[key]?.remove(listener)
changeNotifier.removeListener(key, listener)
}

@Synchronized
Expand All @@ -110,6 +107,7 @@ class InMemoryStoreClient : IStoreClient {
return result
} finally {
transactionValues = null
pendingChangeMessages.flushChangeMessages()
}
} else {
return body()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@

package org.modelix.model.server

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import org.modelix.model.IKeyListener
import org.modelix.model.server.store.IStoreClient
import org.modelix.model.server.store.IgniteStoreClient
import org.modelix.model.server.store.InMemoryStoreClient
import java.util.Collections
import kotlin.random.Random
import kotlin.test.AfterTest
import kotlin.test.Test
Expand Down Expand Up @@ -92,4 +95,49 @@ abstract class StoreClientTest(val store: IStoreClient) {
}
assertEquals(value1, store.get(key)) // failed transaction should be rolled back
}

@Test
fun `listeners don't see incomplete transaction`() = runTest {
val key = "nbmndsyr"
val value1 = "a"
val value2 = "b"
val value3 = "c"

val valuesSeenByListener = Collections.synchronizedSet(HashSet<String?>())
store.listen(key, object : IKeyListener {
override fun changed(key: String, value: String?) {
valuesSeenByListener += value
valuesSeenByListener += store.get(key)
}
})

store.put(key, value1)
assertEquals(value1, store.get(key))

assertEquals(setOf<String?>(value1), valuesSeenByListener)
valuesSeenByListener.clear()

coroutineScope {
launch {
assertFailsWith(NullPointerException::class) {
store.runTransaction {
assertEquals(value1, store.get(key))
store.put(key, value2, silent = false)
assertEquals(value2, store.get(key))
throw NullPointerException()
}
}
}

launch {
store.runTransaction {
assertEquals(value1, store.get(key))
store.put(key, value3, silent = false)
assertEquals(value3, store.get(key))
}
}
}

assertEquals(setOf<String?>(value3), valuesSeenByListener)
}
}

0 comments on commit c8bb123

Please sign in to comment.