Skip to content

Commit

Permalink
add api/pipelinesource and setup basic listen test
Browse files Browse the repository at this point in the history
  • Loading branch information
wu-hui committed Nov 1, 2024
1 parent 422723a commit ee64690
Show file tree
Hide file tree
Showing 13 changed files with 458 additions and 60 deletions.

This file was deleted.

2 changes: 1 addition & 1 deletion packages/firestore/src/api/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import {
connectFirestoreEmulator,
Firestore as LiteFirestore
} from '../lite-api/database';
import { PipelineSource } from '../lite-api/pipeline-source';
import { PipelineSource } from './pipeline_source';
import { DocumentReference, Query } from '../lite-api/reference';
import { newUserDataReader } from '../lite-api/user_data_reader';
import {
Expand Down
49 changes: 36 additions & 13 deletions packages/firestore/src/api/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
import { Pipeline as LitePipeline } from '../lite-api/pipeline';
import { PipelineResult } from '../lite-api/pipeline-result';
import { DocumentData, DocumentReference } from '../lite-api/reference';
import { AddFields, Stage } from '../lite-api/stage';
import {AddFields, Sort, Stage, Where} from '../lite-api/stage';
import { UserDataReader } from '../lite-api/user_data_reader';
import { AbstractUserDataWriter } from '../lite-api/user_data_writer';
import { DocumentKey } from '../model/document_key';
Expand All @@ -15,6 +15,8 @@ import { DocumentSnapshot, PipelineSnapshot } from './snapshot';
import { FirestoreError } from '../util/error';
import { Unsubscribe } from './reference_impl';
import { cast } from '../util/input_validation';
import {Field, FilterCondition} from '../api';
import {Expr} from '../lite-api/expressions';

export class Pipeline<
AppModelType = DocumentData
Expand Down Expand Up @@ -49,6 +51,20 @@ export class Pipeline<
);
}

where(condition: FilterCondition & Expr): Pipeline<AppModelType> {
const copy = this.stages.map(s => s);
super.readUserData('where', condition);
copy.push(new Where(condition));
return new Pipeline(
this.db,
this.userDataReader,
this.userDataWriter,
this.documentReferenceFactory,
copy,
this.converter
);
}

/**
* Executes this pipeline and returns a Promise to represent the asynchronous operation.
*
Expand Down Expand Up @@ -106,23 +122,30 @@ export class Pipeline<
* @internal
* @private
*/
_onSnapshot(observer: {
next?: (snapshot: PipelineSnapshot) => void;
error?: (error: FirestoreError) => void;
complete?: () => void;
}): Unsubscribe {
_onSnapshot(
next: (snapshot: PipelineSnapshot) => void,
error?: (error: FirestoreError) => void,
complete?: () => void
): Unsubscribe {
// this.stages.push(
// new AddFields(
// this.selectablesToMap([
// '__name__',
// '__create_time__',
// '__update_time__'
// ])
// )
// );

this.stages.push(
new AddFields(
this.selectablesToMap([
'__name__',
'__create_time__',
'__update_time__'
])
new Sort([
Field.of('__name__').ascending()
]
)
);

const client = ensureFirestoreConfigured(this.db);
firestoreClientListenPipeline(client, this, observer);
firestoreClientListenPipeline(client, this, {next, error, complete});

return () => {};
}
Expand Down
91 changes: 91 additions & 0 deletions packages/firestore/src/api/pipeline_source.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import { DocumentKey } from '../model/document_key';

import { Firestore } from './database';
import { Pipeline } from './pipeline';
import { DocumentReference } from './reference';
import {
CollectionGroupSource,
CollectionSource,
DatabaseSource,
DocumentsSource
} from '../lite-api/stage';
import {PipelineSource as LitePipelineSource} from '../lite-api/pipeline-source';
import { UserDataReader } from '../lite-api/user_data_reader';
import { AbstractUserDataWriter } from '../lite-api/user_data_writer';

/**
* Represents the source of a Firestore {@link Pipeline}.
* @beta
*/
export class PipelineSource extends LitePipelineSource{
/**
* @internal
* @private
* @param db
* @param userDataReader
* @param userDataWriter
* @param documentReferenceFactory
*/
constructor(
db: Firestore,
userDataReader: UserDataReader,
userDataWriter: AbstractUserDataWriter,
documentReferenceFactory: (id: DocumentKey) => DocumentReference
) {
super(db, userDataReader, userDataWriter, documentReferenceFactory);
}

collection(collectionPath: string): Pipeline {
return new Pipeline(
this.db as Firestore,
this.userDataReader,
this.userDataWriter,
this.documentReferenceFactory,
[new CollectionSource(collectionPath)]
);
}

collectionGroup(collectionId: string): Pipeline {
return new Pipeline(
this.db as Firestore,
this.userDataReader,
this.userDataWriter,
this.documentReferenceFactory,
[new CollectionGroupSource(collectionId)]
);
}

database(): Pipeline {
return new Pipeline(
this.db as Firestore,
this.userDataReader,
this.userDataWriter,
this.documentReferenceFactory,
[new DatabaseSource()]
);
}

documents(docs: DocumentReference[]): Pipeline {
return new Pipeline(
this.db as Firestore,
this.userDataReader,
this.userDataWriter,
this.documentReferenceFactory,
[DocumentsSource.of(docs)]
);
}
}
5 changes: 5 additions & 0 deletions packages/firestore/src/core/sync_engine_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,11 @@ function removeAndCleanupTarget(
): void {
syncEngineImpl.sharedClientState.removeLocalQueryTarget(targetId);

// TODO(pipeline): REMOVE this hack.
if(!syncEngineImpl.queriesByTarget.has(targetId)||syncEngineImpl.queriesByTarget.get(targetId)!.length !== 0){
return;
}

debugAssert(
syncEngineImpl.queriesByTarget.has(targetId) &&
syncEngineImpl.queriesByTarget.get(targetId)!.length !== 0,
Expand Down
8 changes: 4 additions & 4 deletions packages/firestore/src/lite-api/pipeline-source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ export class PipelineSource {
* @param documentReferenceFactory
*/
constructor(
private db: Firestore,
private userDataReader: UserDataReader,
private userDataWriter: AbstractUserDataWriter,
private documentReferenceFactory: (id: DocumentKey) => DocumentReference
protected db: Firestore,
protected userDataReader: UserDataReader,
protected userDataWriter: AbstractUserDataWriter,
protected documentReferenceFactory: (id: DocumentKey) => DocumentReference
) {}

collection(collectionPath: string): Pipeline {
Expand Down
6 changes: 3 additions & 3 deletions packages/firestore/src/lite-api/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ export class Pipeline<AppModelType = DocumentData> implements ProtoSerializable<
*/
constructor(
private liteDb: Firestore,
private userDataReader: UserDataReader,
protected userDataReader: UserDataReader,
/**
* @internal
* @private
Expand All @@ -144,7 +144,7 @@ export class Pipeline<AppModelType = DocumentData> implements ProtoSerializable<
protected stages: Stage[],
// TODO(pipeline) support converter
//private converter: FirestorePipelineConverter<AppModelType> = defaultPipelineConverter()
private converter: unknown = {}
protected converter: unknown = {}
) {}

/**
Expand Down Expand Up @@ -265,7 +265,7 @@ export class Pipeline<AppModelType = DocumentData> implements ProtoSerializable<
* @return the expressionMap argument.
* @private
*/
private readUserData<
protected readUserData<
T extends
| Map<string, ReadableUserData>
| ReadableUserData[]
Expand Down
7 changes: 7 additions & 0 deletions packages/firestore/src/local/local_store_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,13 @@ export async function localStoreReleaseTarget(
): Promise<void> {
const localStoreImpl = debugCast(localStore, LocalStoreImpl);
const targetData = localStoreImpl.targetDataByTarget.get(targetId);

// TODO(pipeline): this is a hack that only works because pipelines are the only ones returning nulls here.
// REMOVE ASAP.
if(targetData === null) {
return;
}

debugAssert(
targetData !== null,
`Tried to release nonexistent target: ${targetId}`
Expand Down
Loading

0 comments on commit ee64690

Please sign in to comment.