Skip to content

Commit

Permalink
fully break down _applyWriteOpts
Browse files Browse the repository at this point in the history
  • Loading branch information
stbrody committed Oct 17, 2023
1 parent 0844d26 commit 126b512
Showing 1 changed file with 21 additions and 33 deletions.
54 changes: 21 additions & 33 deletions packages/core/src/state-management/repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { ExecutionQueue } from './execution-queue.js'
import { RunningState } from './running-state.js'
import type { Dispatcher } from '../dispatcher.js'
import type { HandlersMap } from '../handlers-map.js'
import { catchError, EMPTY, Observable, Subject, Subscription, takeUntil, concatMap } from 'rxjs'
import { catchError, concatMap, EMPTY, Observable, Subject, Subscription, takeUntil } from 'rxjs'
import { StateCache } from './state-cache.js'
import { SnapshotState } from './snapshot-state.js'
import { IKVStore } from '../store/ikv-store.js'
Expand Down Expand Up @@ -462,13 +462,17 @@ export class Repository {
return state$ // nothing changed
}

await this._applyWriteOpts(state$, opts, OperationType.UPDATE)
if (opts.anchor) {
await this.anchor(state$, opts)
}
await this._handlePinOpts(state$, opts as PinningOpts, OperationType.UPDATE)
await this._updateStateIfPinned(state$)

state$.next(updatedState) // emit the new state
if (opts.publish) {
this._publishTip(state$)
}

this.logger.verbose(`Stream ${state$.id} successfully updated to tip ${state$.tip}`)

return state$
})
}
Expand Down Expand Up @@ -733,31 +737,6 @@ export class Repository {
}
}

/**
* Apply options relating to authoring a new commit
*
* @param state$ - Running State
* @param opts - Initialization options (request anchor, publish to pubsub, etc.)
* @param opType - If we load, create or update a stream
* @private
*/
private async _applyWriteOpts(
state$: RunningState,
opts: CreateOpts | UpdateOpts,
opType: OperationType
) {
const anchor = opts.anchor
const publish = opts.publish
if (anchor) {
await this.anchor(state$, opts)
}
if (publish && opType !== OperationType.LOAD) {
this._publishTip(state$)
}

await this._handlePinOpts(state$, opts as PinningOpts, opType)
}

private _publishTip(state$: RunningState): void {
this.dispatcher.publishTip(state$.id, state$.tip, state$.state.metadata.model)
}
Expand Down Expand Up @@ -807,12 +786,21 @@ export class Repository {
* @param opts
*/
async applyCreateOpts(streamId: StreamID, opts: CreateOpts): Promise<RunningState> {
const state = await this.load(streamId, opts)
const state$ = await this.load(streamId, opts)

// Create operations can actually be load operations when using deterministic streams, so we
// ensure that the stream only has a single commit in its log to properly consider it a create.
const opType = state.state.log.length == 1 ? OperationType.CREATE : OperationType.LOAD
await this._applyWriteOpts(state, opts, opType)
return state
const opType = state$.state.log.length == 1 ? OperationType.CREATE : OperationType.LOAD

if (opts.anchor) {
await this.anchor(state$, opts)
}
if (opts.publish && opType !== OperationType.LOAD) {
this._publishTip(state$)
}

await this._handlePinOpts(state$, opts as PinningOpts, opType)
return state$
}

/**
Expand Down

0 comments on commit 126b512

Please sign in to comment.