diff --git a/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt b/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt index 437f2c7fac..e6766dd4c0 100644 --- a/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt +++ b/model-client/src/commonMain/kotlin/org/modelix/model/client2/ModelClientV2.kt @@ -337,31 +337,6 @@ class ModelClientV2( httpClient.close() } - private suspend fun HttpResponse.readVersionDelta(): VersionDeltaStream { - return if (contentType()?.match(VersionDeltaStream.CONTENT_TYPE) == true) { - val content = bodyAsChannel() - val versionHash = checkNotNull(content.readUTF8Line()) { "No objects received" } - val versionObject = content.readUTF8Line() - return if (versionObject == null) { - VersionDeltaStream(versionHash, emptyFlow()) - } else { - VersionDeltaStream( - versionHash, - flow { - emit(versionHash to versionObject) - while (true) { - val key = content.readUTF8Line() ?: break - val value = checkNotNull(content.readUTF8Line()) { "Object missing for hash $key" } - emit(key to value) - } - }, - ) - } - } else { - body().asStream() - } - } - private suspend fun createVersion(baseVersion: CLVersion?, delta: VersionDeltaStream): CLVersion { delta.getObjectsAsFlow().collect { HashUtil.checkObjectHash(it.first, it.second) @@ -405,10 +380,6 @@ class ModelClientV2( } } - private fun HttpRequestBuilder.useVersionStreamFormat() { - headers.set(HttpHeaders.Accept, VersionDeltaStream.CONTENT_TYPE.toString()) - } - companion object { private val LOG = mu.KotlinLogging.logger {} fun builder(): ModelClientV2Builder = ModelClientV2PlatformSpecificBuilder() @@ -507,6 +478,35 @@ private fun URLBuilder.appendPathSegmentsEncodingSlash(vararg components: String fun VersionDelta.getAllObjects(): Map = objectsMap + objects.associateBy { HashUtil.sha256(it) } +suspend fun HttpResponse.readVersionDelta(): VersionDeltaStream { + return if (contentType()?.match(VersionDeltaStream.CONTENT_TYPE) == true) { + val content = bodyAsChannel() + val versionHash = checkNotNull(content.readUTF8Line()) { "No objects received" } + val versionObject = content.readUTF8Line() + return if (versionObject == null) { + VersionDeltaStream(versionHash, emptyFlow()) + } else { + VersionDeltaStream( + versionHash, + flow { + emit(versionHash to versionObject) + while (true) { + val key = content.readUTF8Line() ?: break + val value = checkNotNull(content.readUTF8Line()) { "Object missing for hash $key" } + emit(key to value) + } + }, + ) + } + } else { + body().asStream() + } +} + +fun HttpRequestBuilder.useVersionStreamFormat() { + headers.set(HttpHeaders.Accept, VersionDeltaStream.CONTENT_TYPE.toString()) +} + /** * Performs a write transaction on the root node of the given branch. * diff --git a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt index 6a8f4ffa06..c2a11299ee 100644 --- a/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt +++ b/model-server/src/main/kotlin/org/modelix/model/server/handlers/RepositoriesManager.kt @@ -38,6 +38,7 @@ import org.modelix.model.lazy.KVEntryReference import org.modelix.model.lazy.RepositoryId import org.modelix.model.lazy.computeDelta import org.modelix.model.metameta.MetaModelBranch +import org.modelix.model.persistent.CPVersion import org.modelix.model.server.store.IStoreClient import org.modelix.model.server.store.LocalModelClient import org.modelix.model.server.store.pollEntry @@ -240,16 +241,23 @@ class RepositoriesManager(val client: LocalModelClient) { val version = CLVersion(versionHash, objectStore) // Use a bulk query to make as few request to the underlying store as possible. val bulkQuery = objectStore.newBulkQuery() + // It is unsatisfactory that we have to keep already emitted hashes in memory. + // But without changing the underlying model, + // we have to do this to not emit objects more than once. + val seenHashes = mutableSetOf() fun emitObjects(entry: KVEntryReference<*>) { bulkQuery.get(entry).onSuccess { channel.trySend(entry.getHash() to it!!.serialize()) - for (referencedEntry in it!!.getReferencedEntries()) { - emitObjects(referencedEntry) + for (referencedEntry in it.getReferencedEntries()) { + val wasSeenBefore = !seenHashes.add(referencedEntry.getHash()) + // Do not emit the object if we already emitted it. + if (!wasSeenBefore) { + emitObjects(referencedEntry) + } } } } - channel.send(versionHash to kvStore.get(versionHash)!!) - emitObjects(version.treeHash!!) + emitObjects(KVEntryReference(versionHash, CPVersion.DESERIALIZER)) (bulkQuery as? BulkQuery)?.process() } } diff --git a/model-server/src/test/kotlin/org/modelix/model/server/ModelClientV2Test.kt b/model-server/src/test/kotlin/org/modelix/model/server/ModelClientV2Test.kt index e6bd79c283..2ee5da0709 100644 --- a/model-server/src/test/kotlin/org/modelix/model/server/ModelClientV2Test.kt +++ b/model-server/src/test/kotlin/org/modelix/model/server/ModelClientV2Test.kt @@ -28,10 +28,12 @@ import org.modelix.model.api.IConceptReference import org.modelix.model.api.ITree import org.modelix.model.api.PBranch import org.modelix.model.client2.ModelClientV2 +import org.modelix.model.client2.runWrite import org.modelix.model.lazy.CLTree import org.modelix.model.lazy.CLVersion import org.modelix.model.lazy.RepositoryId import org.modelix.model.operations.OTBranch +import org.modelix.model.persistent.IKVValue import org.modelix.model.server.handlers.ModelReplicationServer import org.modelix.model.server.store.InMemoryStoreClient import org.modelix.modelql.core.count @@ -184,4 +186,31 @@ class ModelClientV2Test { assertFalse(success) assertFalse(containsRepository) } + + @Test + fun `pulling existing versions pulls all referenced objects`() = runTest { + // Arrange + val url = "http://localhost/v2" + val modelClientForArrange = ModelClientV2.builder().url(url).client(client).build().also { it.init() } + val modelClientForAssert = ModelClientV2.builder().url(url).client(client).build().also { it.init() } + val repositoryId = RepositoryId("repo1") + val branchId = repositoryId.getBranchReference("my-branch") + modelClientForArrange.runWrite(branchId) { root -> + root.addNewChild("aChild", -1, null as IConceptReference?) + } + + // Act + val versionPulled = modelClientForAssert.pullIfExists(branchId)!! as CLVersion + + // Assert + fun checkAllReferencedEntriesExistInStore(referencingEntry: IKVValue) { + for (entryReference in referencingEntry.getReferencedEntries()) { + // Check that the store also provides each referenced KVEntry. + // `getValue` would fail if this is not the case. + val referencedEntry = entryReference.getValue(versionPulled.store) + checkAllReferencedEntriesExistInStore(referencedEntry) + } + } + checkAllReferencedEntriesExistInStore(versionPulled.data!!) + } } diff --git a/model-server/src/test/kotlin/org/modelix/model/server/handlers/ModelReplicationServerTest.kt b/model-server/src/test/kotlin/org/modelix/model/server/handlers/ModelReplicationServerTest.kt new file mode 100644 index 0000000000..07e68f30dc --- /dev/null +++ b/model-server/src/test/kotlin/org/modelix/model/server/handlers/ModelReplicationServerTest.kt @@ -0,0 +1,101 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.modelix.model.server.handlers + +import io.ktor.client.request.get +import io.ktor.http.appendPathSegments +import io.ktor.http.takeFrom +import io.ktor.serialization.kotlinx.json.json +import io.ktor.server.application.install +import io.ktor.server.plugins.contentnegotiation.ContentNegotiation +import io.ktor.server.resources.Resources +import io.ktor.server.routing.IgnoreTrailingSlash +import io.ktor.server.testing.ApplicationTestBuilder +import io.ktor.server.testing.testApplication +import io.ktor.server.websocket.WebSockets +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.coroutineScope +import org.modelix.authorization.installAuthentication +import org.modelix.model.api.IConceptReference +import org.modelix.model.client2.ModelClientV2 +import org.modelix.model.client2.readVersionDelta +import org.modelix.model.client2.runWrite +import org.modelix.model.client2.useVersionStreamFormat +import org.modelix.model.lazy.RepositoryId +import org.modelix.model.server.store.InMemoryStoreClient +import org.modelix.model.server.store.LocalModelClient +import kotlin.test.Test +import kotlin.test.fail + +class ModelReplicationServerTest { + + private fun runTest(block: suspend ApplicationTestBuilder.(scope: CoroutineScope) -> Unit) = testApplication { + application { + installAuthentication(unitTestMode = true) + install(ContentNegotiation) { + json() + } + install(WebSockets) + install(Resources) + install(IgnoreTrailingSlash) + val repositoriesManager = RepositoriesManager(LocalModelClient(InMemoryStoreClient())) + ModelReplicationServer(repositoriesManager).init(this) + KeyValueLikeModelServer(repositoriesManager).init(this) + } + + coroutineScope { + block(this) + } + } + + @Test + fun `pulling delta does not return objects twice`() = runTest { + // Arrange + val url = "http://localhost/v2" + val modelClient = ModelClientV2.builder().url(url).client(client).build().also { it.init() } + val repositoryId = RepositoryId("repo1") + val branchId = repositoryId.getBranchReference("my-branch") + // By calling modelClient.runWrite twice, we create to versions. + // Those two versions will share objects. + modelClient.runWrite(branchId) { root -> + root.addNewChild("aChild", -1, null as IConceptReference?) + } + modelClient.runWrite(branchId) { root -> + root.addNewChild("aChild", -1, null as IConceptReference?) + } + + // Act + val response = client.get { + url { + takeFrom(url) + appendPathSegments("repositories", repositoryId.id, "branches", branchId.branchName) + } + useVersionStreamFormat() + } + val versionDelta = response.readVersionDelta() + + // Assert + val seenHashes = mutableSetOf() + versionDelta.getObjectsAsFlow().collect { (hash, _) -> + val wasSeenBefore = !seenHashes.add(hash) + if (wasSeenBefore) { + // We should not send the same object (with the same hash more than once) + // even if we got with versions that share data. + fail("Hash $hash sent more than once.") + } + } + } +}