Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: various resilience and stability improvements #25

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .eslintignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/dist
_unstable_apply.js
_unstable_store.js
_unstable_local.js
path.js
.eslintrc.cjs
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ coverage

# legacy exports
_unstable_apply.js
_unstable_store.js
_unstable_local.js
path.js
tsconfig.tsbuildinfo
132 changes: 15 additions & 117 deletions examples/web/App.tsx
Original file line number Diff line number Diff line change
@@ -1,26 +1,17 @@
import {
createClient,
type MutationEvent as ClientMutationEvent,
type ReconnectEvent,
type SanityClient,
type WelcomeEvent,
} from '@sanity/client'
import {createClient} from '@sanity/client'
import {CollapseIcon, ExpandIcon} from '@sanity/icons'
import {
createIfNotExists,
del,
type Mutation,
type Path,
type SanityDocumentBase,
SanityEncoder,
} from '@sanity/mutate'
import {
createContentLakeStore,
type ListenerSyncEvent,
createLocalDataset,
type MutationGroup,
type RemoteDocumentEvent,
type SanityMutation,
} from '@sanity/mutate/_unstable_store'
} from '@sanity/mutate/_unstable_local'
import {
draft,
type Infer,
Expand Down Expand Up @@ -53,23 +44,12 @@ import {
TabPanel,
Text,
} from '@sanity/ui'
import {type RawPatch} from 'mendoza'
import {Fragment, type ReactNode, useCallback, useEffect, useState} from 'react'
import {
concatMap,
defer,
filter,
from,
map,
merge,
of,
share,
shareReplay,
tap,
timer,
} from 'rxjs'
import {concatMap, filter, from, merge, tap} from 'rxjs'
import styled from 'styled-components'

import {createDocumentObserver} from '../../src/local/listener/createDocumentObserver'
import {createGlobalMutationEventsListener} from '../../src/local/listener/createGlobalMutationEventsListener'
import {DocumentView} from './DocumentView'
import {personForm} from './forms/person'
import {
Expand Down Expand Up @@ -176,60 +156,6 @@ interface SharedListenerOptions {
includeMutations?: boolean
}

function createSharedListener(
client: SanityClient,
options: SharedListenerOptions = {},
) {
const {shutdownDelay, includeMutations} = options
const allEvents$ = client
.listen(
'*[!(_id in path("_.**"))]',
{},
{
events: ['welcome', 'mutation', 'reconnect'],
includeResult: false,
includePreviousRevision: false,
visibility: 'transaction',
effectFormat: 'mendoza',
...(includeMutations ? {} : {includeMutations: false}),
},
)
.pipe(
share({
resetOnRefCountZero: shutdownDelay ? () => timer(shutdownDelay) : true,
}),
)

// Reconnect events emitted in case the connection is lost
const reconnect = allEvents$.pipe(
filter((event): event is ReconnectEvent => event.type === 'reconnect'),
)

// Welcome events are emitted when the listener is (re)connected
const welcome = allEvents$.pipe(
filter((event): event is WelcomeEvent => event.type === 'welcome'),
)

// Mutation events coming from the listener
const mutations = allEvents$.pipe(
filter((event): event is ClientMutationEvent => event.type === 'mutation'),
)

// Replay the latest connection event that was emitted either when the connection was disconnected ('reconnect'), established or re-established ('welcome')
const connectionEvent = merge(welcome, reconnect).pipe(
shareReplay({bufferSize: 1, refCount: true}),
)

// Emit the welcome event if the latest connection event was the 'welcome' event.
// Downstream subscribers will typically map the welcome event to an initial fetch
const replayWelcome = connectionEvent.pipe(
filter(latestConnectionEvent => latestConnectionEvent.type === 'welcome'),
)

// Combine into a single stream
return merge(replayWelcome, mutations, reconnect)
}

const sanityClient = createClient({
projectId: import.meta.env.VITE_SANITY_API_PROJECT_ID,
dataset: import.meta.env.VITE_SANITY_API_DATASET,
Expand All @@ -238,45 +164,17 @@ const sanityClient = createClient({
token: import.meta.env.VITE_SANITY_API_TOKEN,
})

const listener = createSharedListener(sanityClient)

const RECONNECT_EVENT: ReconnectEvent = {type: 'reconnect'}
const globalMutationEventsListener = createGlobalMutationEventsListener({
client: sanityClient,
})

function observe(documentId: string) {
return defer(() => listener).pipe(
filter(
(event): event is WelcomeEvent | ClientMutationEvent | ReconnectEvent =>
event.type === 'welcome' ||
event.type === 'reconnect' ||
(event.type === 'mutation' && event.documentId === documentId),
),
concatMap(event =>
event.type === 'reconnect'
? of(RECONNECT_EVENT)
: event.type === 'welcome'
? sanityClient.observable.getDocument(documentId).pipe(
map(
(doc: undefined | SanityDocumentBase): ListenerSyncEvent => ({
type: 'sync',
transactionId: doc?._id,
document: doc,
}),
),
)
: of({
type: 'mutation' as const,
transactionId: event.transactionId,
effects: event.effects as {apply: RawPatch},
previousRev: event.previousRev!,
resultRev: event.resultRev!,
mutations: event.mutations as SanityMutation[],
}),
),
)
}
const documentObserver = createDocumentObserver({
client: sanityClient,
globalEvents: globalMutationEventsListener,
})

const datastore = createContentLakeStore({
observe,
const datastore = createLocalDataset({
observe: documentObserver,
submit: transactions => {
return from(transactions).pipe(
concatMap(transaction =>
Expand Down
2 changes: 1 addition & 1 deletion examples/web/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"incremental": true,
"paths": {
"@sanity/mutate": ["../../src"],
"@sanity/mutate/_unstable_store": ["../../src/_unstable_store"],
"@sanity/mutate/_unstable_local": ["../../src/_unstable_local"],
"@sanity/mutate/path": ["../../src/_path"]
}
}
Expand Down
2 changes: 1 addition & 1 deletion examples/web/vite.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export default defineConfig({
resolve: {
alias: {
'@sanity/mutate': path.resolve(__dirname, '../../src'),
'@sanity/mutate/_unstable_store': path.resolve(__dirname, '../../src'),
'@sanity/mutate/_unstable_local': path.resolve(__dirname, '../../src'),
},
},
})
14 changes: 7 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
"require": "./dist/_path.cjs",
"default": "./dist/_path.js"
},
"./_unstable_store": {
"source": "./src/_unstable_store.ts",
"import": "./dist/_unstable_store.js",
"require": "./dist/_unstable_store.cjs",
"default": "./dist/_unstable_store.js"
"./_unstable_local": {
"source": "./src/_unstable_local.ts",
"import": "./dist/_unstable_local.js",
"require": "./dist/_unstable_local.cjs",
"default": "./dist/_unstable_local.js"
},
"./_unstable_apply": {
"source": "./src/_unstable_apply.ts",
Expand All @@ -57,8 +57,8 @@
"_unstable_apply": [
"./dist/_unstable_apply.d.ts"
],
"_unstable_store": [
"./dist/_unstable_store.d.ts"
"_unstable_local": [
"./dist/_unstable_local.d.ts"
]
}
},
Expand Down
4 changes: 2 additions & 2 deletions src/_unstable_store.ts → src/_unstable_local.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export {type Insert} from './encoders/sanity'
export * from './store'
export * from './local'
export {
type SanityCreateIfNotExistsMutation,
type SanityCreateMutation,
Expand All @@ -13,4 +13,4 @@ export {
type SanitySetIfMissingPatch,
type SanitySetPatch,
type SanityUnsetPatch,
} from './store/sanityApiTypes'
} from './local/sanityApiTypes'
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import {concat, delay, NEVER, of, take} from 'rxjs'
import {describe, expect, test} from 'vitest'

import {createContentLakeStore} from '../contentLakeStore'
import {createLocalDataset} from '../createLocalDataset'
import {allValuesFrom, collectNotifications, sleep} from './helpers'

describe('observing documents', () => {
test('observing a document that does not exist on the backend', async () => {
const store = createContentLakeStore({
const store = createLocalDataset({
observe: id => of({type: 'sync', id, document: undefined}),
submit: () => NEVER,
})
Expand All @@ -24,7 +24,7 @@ describe('observing documents', () => {
})
test('observing a document that exist on the backend', async () => {
const doc = {_id: 'foo', _type: 'foo'}
const store = createContentLakeStore({
const store = createLocalDataset({
observe: id =>
of({type: 'sync', id, document: doc} as const).pipe(delay(10)),
submit: () => NEVER,
Expand All @@ -47,7 +47,7 @@ describe('observing documents', () => {

test("observing a document that doesn't exist initially, but later is created", async () => {
const doc = {_id: 'foo', _type: 'foo'}
const store = createContentLakeStore({
const store = createLocalDataset({
observe: id =>
concat(
of({type: 'sync', id, document: undefined} as const),
Expand Down Expand Up @@ -80,7 +80,7 @@ describe('observing documents', () => {
})
describe('local mutations', () => {
test('mutating a document that does not exist on the backend', () => {
const store = createContentLakeStore({
const store = createLocalDataset({
observe: id => of({type: 'sync', id, document: undefined}),
submit: () => NEVER,
})
Expand Down Expand Up @@ -137,7 +137,7 @@ describe('local mutations', () => {

test("observing a document that doesn't exist initially, but later is created locally", async () => {
const doc = {_id: 'foo', _type: 'foo'}
const store = createContentLakeStore({
const store = createLocalDataset({
observe: id =>
concat(
of({type: 'sync', id, document: undefined} as const).pipe(delay(10)),
Expand Down Expand Up @@ -248,7 +248,7 @@ describe('local mutations', () => {

test("error when creating a document locally using 'create', when it turns out later that it exists on the server ", async () => {
const doc = {_id: 'foo', _type: 'foo'}
const store = createContentLakeStore({
const store = createLocalDataset({
observe: id =>
concat(of({type: 'sync', id, document: doc} as const).pipe(delay(10))),
submit: () => NEVER,
Expand Down
File renamed without changes.
32 changes: 17 additions & 15 deletions src/store/contentLakeStore.ts → src/local/createLocalDataset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,34 @@ import {

import {decodeAll, type SanityMutation} from '../encoders/sanity'
import {type Transaction} from '../mutations/types'
import {applyMutationEventEffects} from './datasets/applyMendoza'
import {applyMutations} from './datasets/applyMutations'
import {commit} from './datasets/commit'
import {createDataset} from './datasets/createDataset'
import {applyMutationEventEffects} from './documentMap/applyMendoza'
import {applyMutations} from './documentMap/applyMutations'
import {commit} from './documentMap/commit'
import {createDocumentMap} from './documentMap/createDocumentMap'
import {squashDMPStrings} from './optimizations/squashDMPStrings'
import {squashMutationGroups} from './optimizations/squashMutations'
import {rebase} from './rebase'
import {
type ContentLakeStore,
type ListenerEvent,
type LocalDataset,
type MutationGroup,
type OptimisticDocumentEvent,
type RemoteDocumentEvent,
type RemoteListenerEvent,
type RemoteMutationEvent,
type SubmitResult,
type TransactionalMutationGroup,
} from './types'
import {createReplayMemoizer} from './utils/createReplayMemoizer'
import {filterMutationGroupsById} from './utils/filterMutationGroups'

export interface StoreBackend {
export interface LocalDatasetBackend {
/**
* Sets up a subscription to a document
* The first event should either be a sync event or an error event.
* After that, it should emit mutation events, error events or sync events
* @param id
*/
observe: (id: string) => Observable<RemoteListenerEvent>
observe: (id: string) => Observable<ListenerEvent>
submit: (mutationGroups: Transaction[]) => Observable<SubmitResult>
}

Expand All @@ -57,7 +57,7 @@ function warnNoMutationsReceived() {
// eslint-disable-next-line no-console
console.warn(
new Error(
'No mutation received from backend. The listener is likely set up with `excludeMutations: true`. If your app need to now about mutations, make sure the listener is set up to include mutations',
'No mutation received from backend. The listener is likely set up with `excludeMutations: true`. If your app need to know about mutations, make sure the listener is set up to include mutations',
),
)
didEmitMutationsAccessWarning = true
Expand All @@ -66,11 +66,13 @@ function warnNoMutationsReceived() {

const EMPTY_ARRAY: any[] = []

export function createContentLakeStore(
backend: StoreBackend,
): ContentLakeStore {
const local = createDataset()
const remote = createDataset()
/**
* Creates a local dataset that allows subscribing to documents by id and submitting mutations to be optimistically applied
* @param backend
*/
export function createLocalDataset(backend: LocalDatasetBackend): LocalDataset {
const local = createDocumentMap()
const remote = createDocumentMap()
const memoize = createReplayMemoizer(1000)
let stagedChanges: MutationGroup[] = []

Expand All @@ -91,7 +93,7 @@ export function createContentLakeStore(
function getRemoteEvents(id: string) {
return backend.observe(id).pipe(
filter(
(event): event is Exclude<RemoteListenerEvent, ReconnectEvent> =>
(event): event is Exclude<ListenerEvent, ReconnectEvent> =>
event.type !== 'reconnect',
),
mergeMap((event): Observable<RemoteDocumentEvent> => {
Expand Down
Loading
Loading