Skip to content

Commit

Permalink
Added the ability for a persistence to auto-prune snapshot iterations…
Browse files Browse the repository at this point in the history
… and assets according to the specified retention policy
  • Loading branch information
dimitribouniol committed Oct 10, 2024
1 parent 231a9fb commit c9569e4
Show file tree
Hide file tree
Showing 10 changed files with 660 additions and 15 deletions.
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,7 @@ Note that in the example above, even though the author is persisted first, if an

As this project matures towards release, the project will focus on the functionality and work listed below:
- Force migration methods
- Composite indexes (via macros?)
- Cleaning up old resources on disk
- Composite indexes
- Ranged deletes
- Controls for the edit history
- Helper types to use with SwiftUI/Observability/Combine that can make data available on the main actor and filter and stay up to date
Expand All @@ -271,7 +270,7 @@ As this project matures towards release, the project will focus on the functiona
- An example app
- A memory persistence useful for testing apps with
- A pre-configured data store tuned to storing pure Data, useful for types like Images
- Cleaning up memory leaks
- Cleaning up memory and file descriptor leaks

The above list will be kept up to date during development and will likely see additions during that process.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,6 @@ extension DiskPersistence.Datastore.Index {
case .secondary(let index, let manifest): self = .secondary(index: index, manifest: manifest)
}
}

init(_ id: DatastoreRootManifest.IndexManifestID) {
switch id {
case .primary(let manifest):
self = .primary(manifest: manifest)
case .direct(let index, let manifest):
self = .direct(index: index, manifest: manifest)
case .secondary(let index, let manifest):
self = .secondary(index: index, manifest: manifest)
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ extension DatastoreIndexManifest {
}
}

extension DatastoreIndexManifest {
func pagesToPrune(for mode: SnapshotPruneMode) -> Set<DatastorePageIdentifier> {
switch mode {
case .pruneRemoved: Set(removedPageIDs)
case .pruneAdded: Set(addedPageIDs)
}
}
}

// MARK: - Decoding

extension DatastoreIndexManifest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,25 @@ extension DatastoreRootManifest {
}
}
}

extension DatastoreRootManifest {
func indexesToPrune(for mode: SnapshotPruneMode) -> Set<IndexID> {
switch mode {
case .pruneRemoved: removedIndexes
case .pruneAdded: addedIndexes
}
}

func indexManifestsToPrune(
for mode: SnapshotPruneMode,
options: SnapshotPruneOptions
) -> Set<IndexManifestID> {
switch (mode, options) {
case (.pruneRemoved, .pruneAndDelete): removedIndexManifests
case (.pruneAdded, .pruneAndDelete): addedIndexManifests
/// Flip the results when we aren't deleting, but only when removing from the bottom end.
case (.pruneRemoved, .pruneOnly): addedIndexManifests
case (.pruneAdded, .pruneOnly): []
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,110 @@ extension DiskPersistence.Datastore {
}
}

func pruneRootObject(with identifier: RootObject.ID, mode: SnapshotPruneMode, shouldDelete: Bool) async throws {
let fileManager = FileManager()
let rootObject = try loadRootObject(for: identifier, shouldCache: false)
// let rootObject = trackedRootObjects[identifier]?.value ?? RootObject(datastore: self, id: identifier, rootObject: rootObjectManifest) // TODO: Investigate why this is commented out

/// Collect the indexes and related manifests we'll be deleting.
/// - For indexes, only collect the ones we'll be deleting since the ones we are keeping won't be making references to other deletable assets.
/// - For the manifests, we'll be deleting the entries that are being removed (relative to the direction we are removing from, so the removed ones from the oldest edge, and the added ones from the newest edge, as determined by the caller), while we'll be checking for pages to remove from entries that have just been added, but only when removing from the oldest edge. We only do this for the oldest edge because pages that have been "removed" from the newest edge are actually being _restored_ and not replaced, which maintains symmetry in a non-obvious way.
let indexesToPruneAndDelete = rootObject.indexesToPrune(for: mode)
let indexManifestsToPruneAndDelete = rootObject.indexManifestsToPrune(for: mode, options: .pruneAndDelete)
let indexManifestsToPrune = rootObject.indexManifestsToPrune(for: mode, options: .pruneOnly)

/// Delete the index manifests and pages we know to be removed.
for indexManifestID in indexManifestsToPruneAndDelete {
let indexID = Index.ID(indexManifestID)
defer {
trackedIndexes.removeValue(forKey: indexID)
loadedIndexes.remove(indexID)
}
/// Skip any manifests for indexes being deleted, since we'll just unlink the whole directory in that case.
guard !indexesToPruneAndDelete.contains(indexID.indexID) else { continue }

let manifestURL = manifestURL(for: indexID)
let manifest: DatastoreIndexManifest?
do {
manifest = try await DatastoreIndexManifest(contentsOf: manifestURL, id: indexID.manifestID)
} catch URLError.fileDoesNotExist, CocoaError.fileReadNoSuchFile, CocoaError.fileNoSuchFile, POSIXError.ENOENT {
manifest = nil
} catch {
print("Uncaught Manifest Error: \(error)")
throw error
}

guard let manifest else { continue }

/// Only delete the pages we know to be removed
let pagesToPruneAndDelete = manifest.pagesToPrune(for: mode)
for pageID in pagesToPruneAndDelete {
let indexedPageID = Page.ID(index: indexID, page: pageID)
defer {
trackedPages.removeValue(forKey: indexedPageID.withoutManifest)
loadedPages.remove(indexedPageID.withoutManifest)
}

let pageURL = pageURL(for: indexedPageID)

try? fileManager.removeItem(at: pageURL)
try? fileManager.removeDirectoryIfEmpty(url: pageURL.deletingLastPathComponent(), recursivelyRemoveParents: true)
}

try? fileManager.removeItem(at: manifestURL)
}

/// Prune the index manifests that were just added, as they themselves refer to other deleted pages.
for indexManifestID in indexManifestsToPrune {
let indexID = Index.ID(indexManifestID)
/// Skip any manifests for indexes being deleted, since we'll just unlink the whole directory in that case.
guard !indexesToPruneAndDelete.contains(indexID.indexID) else { continue }

let manifestURL = manifestURL(for: indexID)
let manifest: DatastoreIndexManifest?
do {
manifest = try await DatastoreIndexManifest(contentsOf: manifestURL, id: indexID.manifestID)
} catch URLError.fileDoesNotExist, CocoaError.fileReadNoSuchFile, CocoaError.fileNoSuchFile, POSIXError.ENOENT {
manifest = nil
} catch {
print("Uncaught Manifest Error: \(error)")
throw error
}

guard let manifest else { continue }

/// Only delete the pages we know to be removed
let pagesToPruneAndDelete = manifest.pagesToPrune(for: mode)
for pageID in pagesToPruneAndDelete {
let indexedPageID = Page.ID(index: indexID, page: pageID)
defer {
trackedPages.removeValue(forKey: indexedPageID.withoutManifest)
loadedPages.remove(indexedPageID.withoutManifest)
}

let pageURL = pageURL(for: indexedPageID)

try? fileManager.removeItem(at: pageURL)
try? fileManager.removeDirectoryIfEmpty(url: pageURL.deletingLastPathComponent(), recursivelyRemoveParents: true)
}
}

/// Delete any indexes in their entirety.
for indexID in indexesToPruneAndDelete {
try? fileManager.removeItem(at: indexURL(for: indexID))
}

/// If we are deleting the root object itself, do so at the very end as everything else would have been cleaned up.
if shouldDelete {
trackedRootObjects.removeValue(forKey: identifier)
loadedRootObjects.remove(identifier)

let rootURL = rootURL(for: rootObject.id)
try? fileManager.removeItem(at: rootURL)
try? fileManager.removeDirectoryIfEmpty(url: rootURL.deletingLastPathComponent(), recursivelyRemoveParents: true)
}
}

func index(for identifier: Index.ID) -> Index {
if let index = trackedIndexes[identifier]?.value {
return index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ public actor DiskPersistence<AccessMode: _AccessMode>: Persistence {

var lastTransaction: Transaction?

var _transactionRetentionPolicy: SnapshotRetentionPolicy = .indefinite

var nextSnapshotIterationCandidateToEnforce: (snapshot: Snapshot<ReadWrite>, iteration: SnapshotIteration)?
var snapshotIterationPruningTask: Task<Void, Never>?

/// Shared caches across all snapshots and datastores.
var rollingRootObjectCacheIndex = 0
var rollingRootObjectCache: [Datastore.RootObject] = []
Expand Down Expand Up @@ -59,6 +64,10 @@ public actor DiskPersistence<AccessMode: _AccessMode>: Persistence {
storeURL = readOnlyURL
}

deinit {
snapshotIterationPruningTask?.cancel()
}

/// The default URL to use for disk persistences.
static var defaultURL: URL {
// TODO: Make non-throwing: https://github.com/mochidev/CodableDatastore/issues/15
Expand Down Expand Up @@ -517,7 +526,7 @@ extension DiskPersistence {
else { throw DiskPersistenceError.cannotWrite }

/// If we are read-write, apply the updated root objects to the snapshot.
try await self.updatingCurrentSnapshot { snapshot in
let (currentSnapshot, persistedIteration) = try await self.updatingCurrentSnapshot { snapshot in
try await snapshot.updatingManifest { manifest, iteration in
iteration.actionName = actionName
iteration.addedDatastoreRoots = addedDatastoreRoots
Expand All @@ -529,8 +538,159 @@ extension DiskPersistence {
root: root.id
)
}
return (snapshot, iteration)
}
}

enforceRetentionPolicy(snapshot: currentSnapshot, fromIteration: persistedIteration)
}
}

// MARK: - Retention Policy

extension DiskPersistence where AccessMode == ReadWrite {
/// The current transaction retention policy for snapshot iterations written to disk.
public var transactionRetentionPolicy: SnapshotRetentionPolicy {
get async {
_transactionRetentionPolicy
}
}

/// Update the transaction retention policy for snapshot iterations written to disk.
///
/// - Parameter policy: The new policy to enforce on write.
///
/// - SeeAlso: ``SnapshotRetentionPolicy``.
public func setTransactionRetentionPolicy(_ policy: SnapshotRetentionPolicy) async {
_transactionRetentionPolicy = policy
for (_, snapshot) in snapshots {
await snapshot.setExtendedIterationCacheEnabled(!_transactionRetentionPolicy.isIndefinite)
}
}

/// Enforce the retention policy on the persistence immediately.
///
/// - Note: Transaction retention policies are enforced after ever write transaction, so calling this method directly is often unecessary. However, it can be useful if the user requires disk resources immediately.
public func enforceRetentionPolicy() async {
// TODO: Don't create any snapshots if they don't exist yet
let info = try? await self.readingCurrentSnapshot { snapshot in
try await snapshot.readingManifest { manifest, iteration in
(snapshot: snapshot, iteration: iteration)
}
}

if let (snapshot, iteration) = info {
enforceRetentionPolicy(snapshot: snapshot, fromIteration: iteration)
}

await finishTransactionCleanup()
}
}

extension DiskPersistence {
/// Internal method to envorce the retention policy after a transaction is written.
private func enforceRetentionPolicy(snapshot: Snapshot<ReadWrite>, fromIteration iteration: SnapshotIteration) {
nextSnapshotIterationCandidateToEnforce = (snapshot, iteration)

if let snapshotIterationPruningTask {
/// Update the next snapshot iteration we should be checking, and cancel the existing task so we can move on to checking this iteration.
snapshotIterationPruningTask.cancel()
return
}

/// Update the next snapshot iteration we should be checking, and enqueue a task since we know one isn't currently running.
checkNextSnapshotIterationCandidateForPruning()
}

/// Private method to check the next candidate for pruning.
///
/// First, this method walks down the linked list defining the iteration chain, from newest to oldest, and collects the iterations that should be pruned. Then, it iterates that list in reverse (from oldest to newest) actually removing the iterations as they are encountered.
/// - Note: This method should only ever be called when it is known that no `snapshotIterationPruningTask` is ongoing (it is nil), or when one just finishes.
@discardableResult
private func checkNextSnapshotIterationCandidateForPruning() -> Task<Void, Never>? {
let transactionRetentionPolicy = _transactionRetentionPolicy
let iterationCandidate = nextSnapshotIterationCandidateToEnforce

snapshotIterationPruningTask = nil
nextSnapshotIterationCandidateToEnforce = nil

guard let (snapshot, iteration) = iterationCandidate, !transactionRetentionPolicy.isIndefinite
else { return nil }

snapshotIterationPruningTask = Task.detached(priority: .background) {
await snapshot.setExtendedIterationCacheEnabled(true)
do {
var iterations: [SnapshotIteration] = []
var distance = 1
var mainlineSuccessorIteration = iteration
var currentIteration = iteration

/// First, walk the preceding iteration chain to the oldest iteration we can open, collecting the ones that should be pruned.
while let precedingIterationID = currentIteration.precedingIteration, let precedingIteration = try? await snapshot.loadIteration(for: precedingIterationID) {
try Task.checkCancellation()

if !iterations.isEmpty || transactionRetentionPolicy.shouldIterationBePruned(iteration: precedingIteration, distance: distance) {
iterations.append(precedingIteration)
} else {
mainlineSuccessorIteration = precedingIteration
}
currentIteration = precedingIteration

distance += 1
await Task.yield()
}

/// Prune iterations from oldest to newest.
for (index, iteration) in iterations.enumerated().reversed() {
let mainlineSuccessorIteration = index > 0 ? iterations[index-1] : mainlineSuccessorIteration

var iterationsToPrune: [SnapshotIteration] = []
var successorCandidatesToCheck = iteration.successiveIterations
successorCandidatesToCheck.removeAll { $0 == mainlineSuccessorIteration.id }

/// Walk the successor candidates all the way back up so newer iterations are pruned before the ones that reference them. We pull items off from the end, and add new ones to the beginning to make sure they stay in graph order.
while let successorCandidateID = successorCandidatesToCheck.popLast() {
try Task.checkCancellation()
guard let successorIteration = try? await snapshot.loadIteration(for: successorCandidateID)
else { continue }

iterationsToPrune.append(successorIteration)
successorCandidatesToCheck.insert(contentsOf: successorIteration.successiveIterations, at: 0)
await Task.yield()
}

/// First, remove the branch of iterations based on the one we are removing, but representing a history that was previously reverted.
/// Prune the iterations in atomic tasks so they don't get cancelled mid-way, and instead check for cancellation in between iterations.
for iteration in iterationsToPrune.reversed() {
try Task.checkCancellation()
try await Task { try await snapshot.pruneIteration(iteration, mode: .pruneAdded, shouldDelete: true) }.value
await Task.yield()
}

/// Finally, prune the iteration itself.
try Task.checkCancellation()
try await Task { try await snapshot.pruneIteration(iteration, mode: .pruneRemoved, shouldDelete: true) }.value
await Task.yield()
}

try Task.checkCancellation()
try await Task { try await snapshot.pruneIteration(mainlineSuccessorIteration, mode: .pruneRemoved, shouldDelete: false) }.value
await Task.yield()
} catch {
print("Pruning stopped: \(error)")
}

await self.checkNextSnapshotIterationCandidateForPruning()?.value
}

return snapshotIterationPruningTask
}

/// Await any cleanup since the last complete write transaction to the persistence.
///
/// - Note: An application is not required to await cleanup, as it'll be eventually completed on future runs. It is however useful in cases when disk resources must be cleared before progressing to another step.
public func finishTransactionCleanup() async {
await snapshotIterationPruningTask?.value
}
}

Expand Down
Loading

0 comments on commit c9569e4

Please sign in to comment.