Skip to content

Commit

Permalink
Merge pull request #345 from modelix/fix/bulk-sync-large-modules
Browse files Browse the repository at this point in the history
MODELIX-640 Handle large modules in bulk sync
  • Loading branch information
mhuster23 authored Dec 19, 2023
2 parents d029ea9 + 75a5afb commit 5567d28
Show file tree
Hide file tree
Showing 56 changed files with 834 additions and 1,547 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@ import org.gradle.api.tasks.TaskAction
import org.modelix.model.ModelFacade
import org.modelix.model.api.ILanguage
import org.modelix.model.api.ILanguageRepository
import org.modelix.model.client2.ModelClientV2PlatformSpecificBuilder
import org.modelix.model.api.INode
import org.modelix.model.api.PNodeAdapter
import org.modelix.model.client2.ModelClientV2
import org.modelix.model.client2.runWrite
import org.modelix.model.lazy.RepositoryId
import org.modelix.model.operations.OTBranch
import org.modelix.model.sync.bulk.ModelImporter
import org.modelix.model.sync.bulk.importFilesAsRootChildren
import org.modelix.model.sync.bulk.isModuleIncluded
Expand Down Expand Up @@ -75,7 +78,7 @@ abstract class ImportIntoModelServer @Inject constructor(of: ObjectFactory) : De
val repoId = RepositoryId(repositoryId.get())

val branchRef = ModelFacade.createBranchReference(repoId, branchName.get())
val client = ModelClientV2PlatformSpecificBuilder().url(url.get()).build()
val client = ModelClientV2.builder().url(url.get()).build()
val files = inputDir.listFiles()?.filter {
it.extension == "json" && isModuleIncluded(it.nameWithoutExtension, includedModules.get(), includedModulePrefixes.get())
}
Expand All @@ -84,11 +87,20 @@ abstract class ImportIntoModelServer @Inject constructor(of: ObjectFactory) : De
runBlocking {
client.init()
client.runWrite(branchRef) { rootNode ->
logger.info("Got root node: {}", rootNode)
logger.info("Importing...")
ModelImporter(rootNode, continueOnError.get()).importFilesAsRootChildren(files)
logger.info("Import finished")
rootNode.runBulkUpdate {
logger.info("Got root node: {}", rootNode)
logger.info("Importing...")
ModelImporter(rootNode, continueOnError.get()).importFilesAsRootChildren(files)
logger.info("Import finished")
}
}
}
}
}

/**
* Memory optimization that doesn't record individual change operations, but only the result.
*/
private fun INode.runBulkUpdate(body: () -> Unit) {
((this as PNodeAdapter).branch as OTBranch).runBulkUpdate(body = body)
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class ModelImporter(private val root: INode, private val continueOnError: Boolea
// updated to the constructor with two arguments.
constructor(root: INode) : this(root, false)

private fun doAndPotentiallyContinueOnErrors(block: () -> Unit) {
private inline fun doAndPotentiallyContinueOnErrors(block: () -> Unit) {
try {
block()
} catch (e: Exception) {
Expand All @@ -70,34 +70,38 @@ class ModelImporter(private val root: INode, private val continueOnError: Boolea
*/
@JvmName("importData")
fun import(data: ModelData) {
logImportSize(data.root, logger)
logger.info { "Building indices for import..." }
originalIdToExisting.clear()
postponedReferences.clear()
nodesToRemove.clear()
numExpectedNodes = countExpectedNodes(data.root)
currentNodeProgress = 0
buildExistingIndex(root)

logger.info { "Importing nodes..." }
data.root.originalId()?.let { originalIdToExisting[it] = root }
syncNode(root, data.root)

logger.info { "Synchronizing references..." }
postponedReferences.forEach {
doAndPotentiallyContinueOnErrors {
it.invoke()
INodeResolutionScope.runWithAdditionalScope(root.getArea()) {
logImportSize(data.root, logger)
logger.info { "Building indices for import..." }
originalIdToExisting.clear()
postponedReferences.clear()
nodesToRemove.clear()
numExpectedNodes = countExpectedNodes(data.root)
currentNodeProgress = 0
buildExistingIndex(root)

logger.info { "Importing nodes..." }
data.root.originalId()?.let { originalIdToExisting[it] = root }
syncNode(root, data.root)

logger.info { "Synchronizing references..." }
postponedReferences.forEach {
doAndPotentiallyContinueOnErrors {
it.invoke()
}
}
}

logger.info { "Removing extra nodes..." }
nodesToRemove.forEach {
doAndPotentiallyContinueOnErrors {
it.remove()
logger.info { "Removing extra nodes..." }
nodesToRemove.forEach {
doAndPotentiallyContinueOnErrors {
if (it.isValid) { // if it's invalid then it's already removed
it.remove()
}
}
}
}

logger.info { "Synchronization finished." }
logger.info { "Synchronization finished." }
}
}

private fun countExpectedNodes(data: NodeData): Int =
Expand All @@ -110,9 +114,7 @@ class ModelImporter(private val root: INode, private val continueOnError: Boolea
doAndPotentiallyContinueOnErrors {
syncProperties(node, data)
syncChildren(node, data)
INodeResolutionScope.runWithAdditionalScope(node.getArea()) {
syncReferences(node, data)
}
syncReferences(node, data)
}
}

Expand All @@ -122,6 +124,17 @@ class ModelImporter(private val root: INode, private val continueOnError: Boolea
val expectedNodes = data.children.filter { it.role == role }
val existingNodes = node.getChildren(role).toList()

// optimization that uses the bulk operation .addNewChildren
if (existingNodes.isEmpty() && expectedNodes.all { originalIdToExisting[it.originalId()] == null }) {
node.addNewChildren(role, -1, expectedNodes.map { it.concept?.let { ConceptReference(it) } }).zip(expectedNodes).forEach { (newChild, expected) ->
val expectedId = checkNotNull(expected.originalId()) { "Specified node '$expected' has no id" }
newChild.setPropertyValue(NodeData.idPropertyKey, expectedId)
originalIdToExisting[expectedId] = newChild
syncNode(newChild, expected)
}
continue
}

// optimization for when there is no change in the child list
// size check first to avoid querying the original ID
if (expectedNodes.size == existingNodes.size && expectedNodes.map { it.originalId() } == existingNodes.map { it.originalId() }) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package org.modelix.model.sync.bulk

import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.encodeToStream
import org.modelix.model.api.INode
import org.modelix.model.data.ModelData
import java.io.File
Expand All @@ -27,9 +30,11 @@ actual class ModelExporter actual constructor(private val root: INode) {
*
* @param outputFile target file of the export
*/
@OptIn(ExperimentalSerializationApi::class)
fun export(outputFile: File) {
val modelData = ModelData(root = root.asExported())
outputFile.parentFile.mkdirs()
outputFile.writeText(modelData.toJson())

val modelData = ModelData(root = root.asExported())
Json.encodeToStream(modelData, outputFile.outputStream())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package org.modelix.model.sync.bulk

import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.decodeFromStream
import org.modelix.model.data.ModelData
import java.io.File

Expand All @@ -35,8 +38,11 @@ fun ModelImporter.import(jsonFile: File) {
import(data)
}

@OptIn(ExperimentalSerializationApi::class)
fun ModelImporter.importFilesAsRootChildren(files: Collection<File>) {
val models = files.map { ModelData.fromJson(it.readText()) }
val models: List<ModelData> = files.map {
Json.decodeFromStream(it.inputStream())
}
import(mergeModelData(models))
}

Expand Down

This file was deleted.

Loading

0 comments on commit 5567d28

Please sign in to comment.