Skip to content

Commit

Permalink
Merge pull request #519 from modelix/fix/MODELIX-754
Browse files Browse the repository at this point in the history
fix(model-server): respond with all objects if no base version was given
  • Loading branch information
odzhychko authored Feb 21, 2024
2 parents 4117793 + 7d1fa8a commit 3c161d3
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,31 +337,6 @@ class ModelClientV2(
httpClient.close()
}

private suspend fun HttpResponse.readVersionDelta(): VersionDeltaStream {
return if (contentType()?.match(VersionDeltaStream.CONTENT_TYPE) == true) {
val content = bodyAsChannel()
val versionHash = checkNotNull(content.readUTF8Line()) { "No objects received" }
val versionObject = content.readUTF8Line()
return if (versionObject == null) {
VersionDeltaStream(versionHash, emptyFlow())
} else {
VersionDeltaStream(
versionHash,
flow {
emit(versionHash to versionObject)
while (true) {
val key = content.readUTF8Line() ?: break
val value = checkNotNull(content.readUTF8Line()) { "Object missing for hash $key" }
emit(key to value)
}
},
)
}
} else {
body<VersionDelta>().asStream()
}
}

private suspend fun createVersion(baseVersion: CLVersion?, delta: VersionDeltaStream): CLVersion {
delta.getObjectsAsFlow().collect {
HashUtil.checkObjectHash(it.first, it.second)
Expand Down Expand Up @@ -405,10 +380,6 @@ class ModelClientV2(
}
}

private fun HttpRequestBuilder.useVersionStreamFormat() {
headers.set(HttpHeaders.Accept, VersionDeltaStream.CONTENT_TYPE.toString())
}

companion object {
private val LOG = mu.KotlinLogging.logger {}
fun builder(): ModelClientV2Builder = ModelClientV2PlatformSpecificBuilder()
Expand Down Expand Up @@ -507,6 +478,35 @@ private fun URLBuilder.appendPathSegmentsEncodingSlash(vararg components: String

fun VersionDelta.getAllObjects(): Map<String, String> = objectsMap + objects.associateBy { HashUtil.sha256(it) }

suspend fun HttpResponse.readVersionDelta(): VersionDeltaStream {
return if (contentType()?.match(VersionDeltaStream.CONTENT_TYPE) == true) {
val content = bodyAsChannel()
val versionHash = checkNotNull(content.readUTF8Line()) { "No objects received" }
val versionObject = content.readUTF8Line()
return if (versionObject == null) {
VersionDeltaStream(versionHash, emptyFlow())
} else {
VersionDeltaStream(
versionHash,
flow {
emit(versionHash to versionObject)
while (true) {
val key = content.readUTF8Line() ?: break
val value = checkNotNull(content.readUTF8Line()) { "Object missing for hash $key" }
emit(key to value)
}
},
)
}
} else {
body<VersionDelta>().asStream()
}
}

fun HttpRequestBuilder.useVersionStreamFormat() {
headers.set(HttpHeaders.Accept, VersionDeltaStream.CONTENT_TYPE.toString())
}

/**
* Performs a write transaction on the root node of the given branch.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.modelix.model.lazy.KVEntryReference
import org.modelix.model.lazy.RepositoryId
import org.modelix.model.lazy.computeDelta
import org.modelix.model.metameta.MetaModelBranch
import org.modelix.model.persistent.CPVersion
import org.modelix.model.server.store.IStoreClient
import org.modelix.model.server.store.LocalModelClient
import org.modelix.model.server.store.pollEntry
Expand Down Expand Up @@ -240,16 +241,23 @@ class RepositoriesManager(val client: LocalModelClient) {
val version = CLVersion(versionHash, objectStore)
// Use a bulk query to make as few request to the underlying store as possible.
val bulkQuery = objectStore.newBulkQuery()
// It is unsatisfactory that we have to keep already emitted hashes in memory.
// But without changing the underlying model,
// we have to do this to not emit objects more than once.
val seenHashes = mutableSetOf<String>()
fun emitObjects(entry: KVEntryReference<*>) {
bulkQuery.get(entry).onSuccess {
channel.trySend(entry.getHash() to it!!.serialize())
for (referencedEntry in it!!.getReferencedEntries()) {
emitObjects(referencedEntry)
for (referencedEntry in it.getReferencedEntries()) {
val wasSeenBefore = !seenHashes.add(referencedEntry.getHash())
// Do not emit the object if we already emitted it.
if (!wasSeenBefore) {
emitObjects(referencedEntry)
}
}
}
}
channel.send(versionHash to kvStore.get(versionHash)!!)
emitObjects(version.treeHash!!)
emitObjects(KVEntryReference(versionHash, CPVersion.DESERIALIZER))
(bulkQuery as? BulkQuery)?.process()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ import org.modelix.model.api.IConceptReference
import org.modelix.model.api.ITree
import org.modelix.model.api.PBranch
import org.modelix.model.client2.ModelClientV2
import org.modelix.model.client2.runWrite
import org.modelix.model.lazy.CLTree
import org.modelix.model.lazy.CLVersion
import org.modelix.model.lazy.RepositoryId
import org.modelix.model.operations.OTBranch
import org.modelix.model.persistent.IKVValue
import org.modelix.model.server.handlers.ModelReplicationServer
import org.modelix.model.server.store.InMemoryStoreClient
import org.modelix.modelql.core.count
Expand Down Expand Up @@ -184,4 +186,31 @@ class ModelClientV2Test {
assertFalse(success)
assertFalse(containsRepository)
}

@Test
fun `pulling existing versions pulls all referenced objects`() = runTest {
// Arrange
val url = "http://localhost/v2"
val modelClientForArrange = ModelClientV2.builder().url(url).client(client).build().also { it.init() }
val modelClientForAssert = ModelClientV2.builder().url(url).client(client).build().also { it.init() }
val repositoryId = RepositoryId("repo1")
val branchId = repositoryId.getBranchReference("my-branch")
modelClientForArrange.runWrite(branchId) { root ->
root.addNewChild("aChild", -1, null as IConceptReference?)
}

// Act
val versionPulled = modelClientForAssert.pullIfExists(branchId)!! as CLVersion

// Assert
fun checkAllReferencedEntriesExistInStore(referencingEntry: IKVValue) {
for (entryReference in referencingEntry.getReferencedEntries()) {
// Check that the store also provides each referenced KVEntry.
// `getValue` would fail if this is not the case.
val referencedEntry = entryReference.getValue(versionPulled.store)
checkAllReferencedEntriesExistInStore(referencedEntry)
}
}
checkAllReferencedEntriesExistInStore(versionPulled.data!!)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.modelix.model.server.handlers

import io.ktor.client.request.get
import io.ktor.http.appendPathSegments
import io.ktor.http.takeFrom
import io.ktor.serialization.kotlinx.json.json
import io.ktor.server.application.install
import io.ktor.server.plugins.contentnegotiation.ContentNegotiation
import io.ktor.server.resources.Resources
import io.ktor.server.routing.IgnoreTrailingSlash
import io.ktor.server.testing.ApplicationTestBuilder
import io.ktor.server.testing.testApplication
import io.ktor.server.websocket.WebSockets
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.coroutineScope
import org.modelix.authorization.installAuthentication
import org.modelix.model.api.IConceptReference
import org.modelix.model.client2.ModelClientV2
import org.modelix.model.client2.readVersionDelta
import org.modelix.model.client2.runWrite
import org.modelix.model.client2.useVersionStreamFormat
import org.modelix.model.lazy.RepositoryId
import org.modelix.model.server.store.InMemoryStoreClient
import org.modelix.model.server.store.LocalModelClient
import kotlin.test.Test
import kotlin.test.fail

class ModelReplicationServerTest {

private fun runTest(block: suspend ApplicationTestBuilder.(scope: CoroutineScope) -> Unit) = testApplication {
application {
installAuthentication(unitTestMode = true)
install(ContentNegotiation) {
json()
}
install(WebSockets)
install(Resources)
install(IgnoreTrailingSlash)
val repositoriesManager = RepositoriesManager(LocalModelClient(InMemoryStoreClient()))
ModelReplicationServer(repositoriesManager).init(this)
KeyValueLikeModelServer(repositoriesManager).init(this)
}

coroutineScope {
block(this)
}
}

@Test
fun `pulling delta does not return objects twice`() = runTest {
// Arrange
val url = "http://localhost/v2"
val modelClient = ModelClientV2.builder().url(url).client(client).build().also { it.init() }
val repositoryId = RepositoryId("repo1")
val branchId = repositoryId.getBranchReference("my-branch")
// By calling modelClient.runWrite twice, we create to versions.
// Those two versions will share objects.
modelClient.runWrite(branchId) { root ->
root.addNewChild("aChild", -1, null as IConceptReference?)
}
modelClient.runWrite(branchId) { root ->
root.addNewChild("aChild", -1, null as IConceptReference?)
}

// Act
val response = client.get {
url {
takeFrom(url)
appendPathSegments("repositories", repositoryId.id, "branches", branchId.branchName)
}
useVersionStreamFormat()
}
val versionDelta = response.readVersionDelta()

// Assert
val seenHashes = mutableSetOf<String>()
versionDelta.getObjectsAsFlow().collect { (hash, _) ->
val wasSeenBefore = !seenHashes.add(hash)
if (wasSeenBefore) {
// We should not send the same object (with the same hash more than once)
// even if we got with versions that share data.
fail("Hash $hash sent more than once.")
}
}
}
}

0 comments on commit 3c161d3

Please sign in to comment.