diff --git a/packages/core/src/state-management/repository.ts b/packages/core/src/state-management/repository.ts index d364927025..cb51d83f70 100644 --- a/packages/core/src/state-management/repository.ts +++ b/packages/core/src/state-management/repository.ts @@ -23,7 +23,7 @@ import { ExecutionQueue } from './execution-queue.js' import { RunningState } from './running-state.js' import type { Dispatcher } from '../dispatcher.js' import type { HandlersMap } from '../handlers-map.js' -import { catchError, EMPTY, Observable, Subject, Subscription, takeUntil, concatMap } from 'rxjs' +import { catchError, concatMap, EMPTY, Observable, Subject, Subscription, takeUntil } from 'rxjs' import { StateCache } from './state-cache.js' import { SnapshotState } from './snapshot-state.js' import { IKVStore } from '../store/ikv-store.js' @@ -462,13 +462,17 @@ export class Repository { return state$ // nothing changed } - await this._applyWriteOpts(state$, opts, OperationType.UPDATE) + if (opts.anchor) { + await this.anchor(state$, opts) + } + await this._handlePinOpts(state$, opts as PinningOpts, OperationType.UPDATE) await this._updateStateIfPinned(state$) - state$.next(updatedState) // emit the new state + if (opts.publish) { + this._publishTip(state$) + } this.logger.verbose(`Stream ${state$.id} successfully updated to tip ${state$.tip}`) - return state$ }) } @@ -733,31 +737,6 @@ export class Repository { } } - /** - * Apply options relating to authoring a new commit - * - * @param state$ - Running State - * @param opts - Initialization options (request anchor, publish to pubsub, etc.) - * @param opType - If we load, create or update a stream - * @private - */ - private async _applyWriteOpts( - state$: RunningState, - opts: CreateOpts | UpdateOpts, - opType: OperationType - ) { - const anchor = opts.anchor - const publish = opts.publish - if (anchor) { - await this.anchor(state$, opts) - } - if (publish && opType !== OperationType.LOAD) { - this._publishTip(state$) - } - - await this._handlePinOpts(state$, opts as PinningOpts, opType) - } - private _publishTip(state$: RunningState): void { this.dispatcher.publishTip(state$.id, state$.tip, state$.state.metadata.model) } @@ -807,12 +786,21 @@ export class Repository { * @param opts */ async applyCreateOpts(streamId: StreamID, opts: CreateOpts): Promise { - const state = await this.load(streamId, opts) + const state$ = await this.load(streamId, opts) + // Create operations can actually be load operations when using deterministic streams, so we // ensure that the stream only has a single commit in its log to properly consider it a create. - const opType = state.state.log.length == 1 ? OperationType.CREATE : OperationType.LOAD - await this._applyWriteOpts(state, opts, opType) - return state + const opType = state$.state.log.length == 1 ? OperationType.CREATE : OperationType.LOAD + + if (opts.anchor) { + await this.anchor(state$, opts) + } + if (opts.publish && opType !== OperationType.LOAD) { + this._publishTip(state$) + } + + await this._handlePinOpts(state$, opts as PinningOpts, opType) + return state$ } /**