Skip to content

Commit

Permalink
Refactor structure of presence updates
Browse files Browse the repository at this point in the history
This commit changes how we handle updates for presence spaces. An presence update becomes a PresenceMember:

export type PresenceMember = {
  data: {
    profileUpdate: {
      id: string | null;
      current: ProfileData;
    };
    locationUpdate: {
      id: string | null;
      previous: unknown;
      current: unknown;
    };
  };
} & Omit<Types.PresenceMessage, 'data'>;

Which then gets translated for the developer to a SpaceMember:

export type SpaceMember = {
  clientId: string;
  connectionId: string;
  isConnected: boolean;
  profileData: ProfileData;
  location: unknown;
  lastEvent: {
    name: Types.PresenceAction;
    timestamp: number;
  };
};

data on PresenceMember contains the last update for profileData an location. The current key is the value of these properties on SpaceMember.

profileUpdate and locationUpdate contain an id. This id is set on publish, but only when we are providing new data, not copying already set data.
The handlers check the id to decide if an update should be emitted (it will still be applied, and it should be the same).
  • Loading branch information
Dominik Piatek committed Aug 3, 2023
1 parent c296711 commit d501224
Show file tree
Hide file tree
Showing 21 changed files with 895 additions and 861 deletions.
10 changes: 8 additions & 2 deletions __mocks__/ably/promises/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ const mockPresence = {
enter: methodReturningVoidPromise,
leave: methodReturningVoidPromise,
subscriptions: {
once: async (_, fn) => {
return await fn();
once: async (_: unknown, fn: Function) => {
fn();
},
},
subscribe: () => {},
Expand Down Expand Up @@ -51,8 +51,11 @@ class MockRealtime {
};
public connection: {
id?: string;
state: string;
};

public time() {}

constructor() {
this.channels = {
get: () => mockChannel,
Expand All @@ -62,7 +65,10 @@ class MockRealtime {
};
this.connection = {
id: '1',
state: 'connected',
};

this['options'] = {};
}
}

Expand Down
7 changes: 7 additions & 0 deletions __mocks__/nanoid/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
const nanoidId = 'NanoidID';

function nanoid(): string {
return nanoidId;
}

export { nanoid, nanoidId };
8 changes: 4 additions & 4 deletions src/CursorBatching.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Types } from 'ably';

import { CursorUpdate } from './Cursors.js';
import { CURSOR_UPDATE } from './utilities/Constants.js';
import type { StrictCursorsOptions } from './options/CursorsOptions.js';
import { CURSOR_UPDATE } from './Cursors.js';
import type { CursorUpdate } from './types.js';
import type { CursorsOptions } from './types.js';

type OutgoingBuffer = Pick<CursorUpdate, 'position' | 'data'>[];

Expand All @@ -20,7 +20,7 @@ export default class CursorBatching {
// Set to `true` if there is more than one user listening to cursors
shouldSend: boolean = false;

constructor(readonly outboundBatchInterval: StrictCursorsOptions['outboundBatchInterval']) {
constructor(readonly outboundBatchInterval: CursorsOptions['outboundBatchInterval']) {
this.batchTime = outboundBatchInterval;
}

Expand Down
6 changes: 3 additions & 3 deletions src/CursorHistory.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Types } from 'ably';

import type { CursorUpdate } from './Cursors.js';
import type { StrictCursorsOptions } from './options/CursorsOptions.js';
import type { CursorUpdate } from './types.js';
import type { CursorsOptions } from './types.js';

type ConnectionId = string;
type ConnectionsLastPosition = Record<ConnectionId, null | CursorUpdate>;
Expand Down Expand Up @@ -47,7 +47,7 @@ export default class CursorHistory {

async getLastCursorUpdate(
channel: Types.RealtimeChannelPromise,
paginationLimit: StrictCursorsOptions['paginationLimit'],
paginationLimit: CursorsOptions['paginationLimit'],
): Promise<ConnectionsLastPosition> {
const members = await channel.presence.get();

Expand Down
21 changes: 11 additions & 10 deletions src/Cursors.mockClient.test.ts → src/Cursors.test.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { it, describe, expect, vi, beforeEach, vitest, afterEach } from 'vitest';
import { Realtime, Types } from 'ably/promises';

import Space, { SpaceMember } from './Space.js';
import Cursors from './Cursors.js';
import Space from './Space.js';
import Cursors, { CURSOR_UPDATE } from './Cursors.js';
import { createPresenceMessage } from './utilities/test/fakes.js';
import CursorBatching from './CursorBatching.js';
import { CURSOR_UPDATE } from './utilities/Constants.js';
import CursorDispensing from './CursorDispensing.js';
import CursorHistory from './CursorHistory.js';
import type { CursorUpdate } from './Cursors.js';
import type { CursorUpdate, SpaceMember } from './types.js';

import type { RealtimeMessage } from './utilities/types.js';

interface CursorsTestContext {
client: Types.RealtimePromise;
Expand All @@ -18,9 +19,9 @@ interface CursorsTestContext {
batching: CursorBatching;
dispensing: CursorDispensing;
history: CursorHistory;
fakeMessageStub: Types.Message;
selfStub: SpaceMember;
lastCursorPositionsStub: Record<string, CursorUpdate>;
fakeMessageStub: RealtimeMessage;
}

vi.mock('ably/promises');
Expand All @@ -29,7 +30,7 @@ function createPresenceCount(length: number) {
return async () => Array.from({ length }, (_, i) => createPresenceMessage('enter', { clientId: '' + i }));
}

describe('Cursors (mockClient)', () => {
describe('Cursors', () => {
beforeEach<CursorsTestContext>((context) => {
const client = new Realtime({});
context.client = client;
Expand Down Expand Up @@ -470,15 +471,15 @@ describe('Cursors (mockClient)', () => {
selfStub,
}) => {
vi.spyOn(space.cursors, 'getAll').mockImplementation(async () => lastCursorPositionsStub);
vi.spyOn(space, 'getSelf').mockReturnValue(selfStub);
vi.spyOn(space.members, 'getSelf').mockReturnValue(selfStub);

const selfCursor = await space.cursors.getSelf();
expect(selfCursor).toEqual(lastCursorPositionsStub['connectionId1']);
});

it<CursorsTestContext>('returns an empty object if self is not present in cursors', async ({ space }) => {
vi.spyOn(space.cursors, 'getAll').mockResolvedValue({});
vi.spyOn(space, 'getSelf').mockReturnValue(undefined);
vi.spyOn(space.members, 'getSelf').mockReturnValue(undefined);

const others = await space.cursors.getOthers();
expect(others).toEqual({});
Expand All @@ -500,7 +501,7 @@ describe('Cursors (mockClient)', () => {
};

vi.spyOn(space.cursors, 'getAll').mockResolvedValue(onlyMyCursor);
vi.spyOn(space, 'getSelf').mockReturnValue(selfStub);
vi.spyOn(space.members, 'getSelf').mockReturnValue(selfStub);

const others = await space.cursors.getOthers();
expect(others).toEqual({});
Expand All @@ -512,7 +513,7 @@ describe('Cursors (mockClient)', () => {
lastCursorPositionsStub,
}) => {
vi.spyOn(space.cursors, 'getAll').mockResolvedValue(lastCursorPositionsStub);
vi.spyOn(space, 'getSelf').mockReturnValue(selfStub);
vi.spyOn(space.members, 'getSelf').mockReturnValue(selfStub);

const others = await space.cursors.getOthers();
expect(others).toEqual({
Expand Down
35 changes: 16 additions & 19 deletions src/Cursors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,23 @@ import { Types } from 'ably';
import Space from './Space.js';
import CursorBatching from './CursorBatching.js';
import CursorDispensing from './CursorDispensing.js';
import { OUTGOING_BATCH_TIME_DEFAULT, PAGINATION_LIMIT_DEFAULT } from './utilities/Constants.js';
import EventEmitter, {
InvalidArgumentError,
inspect,
type EventKey,
type EventListener,
} from './utilities/EventEmitter.js';
import CursorHistory from './CursorHistory.js';
import { CURSOR_UPDATE } from './utilities/Constants.js';

import type { CursorUpdate } from './types.js';
import type { CursorsOptions, CursorUpdate } from './types.js';
import type { RealtimeMessage } from './utilities/types.js';

type CursorsEventMap = {
cursorsUpdate: Record<string, CursorUpdate>;
cursorsUpdate: CursorUpdate;
};

export const CURSOR_UPDATE = 'cursorUpdate';

const emitterHasListeners = (emitter) => {
const flattenEvents = (obj) =>
Object.entries(obj)
Expand All @@ -39,15 +39,12 @@ export default class Cursors extends EventEmitter<CursorsEventMap> {
private readonly cursorDispensing: CursorDispensing;
private readonly cursorHistory: CursorHistory;
private channel?: Types.RealtimeChannelPromise;
readonly options: StrictCursorsOptions;
readonly options: CursorsOptions;

constructor(private space: Space, options: CursorsOptions = {}) {
constructor(private space: Space) {
super();

this.options = {
outboundBatchInterval: options['outboundBatchInterval'] ?? OUTGOING_BATCH_TIME_DEFAULT,
paginationLimit: options['paginationLimit'] ?? PAGINATION_LIMIT_DEFAULT,
};
this.options = this.space.options.cursors;

this.cursorHistory = new CursorHistory();
this.cursorBatching = new CursorBatching(this.options.outboundBatchInterval);
Expand All @@ -64,7 +61,7 @@ export default class Cursors extends EventEmitter<CursorsEventMap> {
* @return {void}
*/
set(cursor: Pick<CursorUpdate, 'position' | 'data'>): void {
const self = this.space.getSelf();
const self = this.space.members.getSelf();

if (!self) {
throw new Error('Must enter a space before setting a cursor update');
Expand All @@ -79,7 +76,7 @@ export default class Cursors extends EventEmitter<CursorsEventMap> {
}

private initializeCursorsChannel(): Types.RealtimeChannelPromise {
const spaceChannelName = this.space.getChannelName();
const spaceChannelName = this.space.channelName;
const channel = this.space.client.channels.get(`${spaceChannelName}_cursors`);
channel.presence.subscribe(this.onPresenceUpdate.bind(this));
channel.presence.enter();
Expand Down Expand Up @@ -147,26 +144,26 @@ export default class Cursors extends EventEmitter<CursorsEventMap> {
}
}

async getAll() {
const channel = this.getChannel();
return await this.cursorHistory.getLastCursorUpdate(channel, this.options.paginationLimit);
}

async getSelf(): Promise<CursorUpdate | undefined> {
const self = this.space.getSelf();
const self = this.space.members.getSelf();
if (!self) return;

const allCursors = await this.getAll();
return allCursors[self.connectionId] as CursorUpdate;
}

async getOthers(): Promise<Record<string, null | CursorUpdate>> {
const self = this.space.getSelf();
const self = this.space.members.getSelf();
if (!self) return {};

const allCursors = await this.getAll();
const allCursorsFiltered = allCursors;
delete allCursorsFiltered[self.connectionId];
return allCursorsFiltered;
}

async getAll() {
const channel = this.getChannel();
return await this.cursorHistory.getLastCursorUpdate(channel, this.options.paginationLimit);
}
}
47 changes: 47 additions & 0 deletions src/Leavers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import type Space from './Space.js';
import type { SpaceMember } from './types.js';

type SpaceLeaver = SpaceMember & {
timeoutId: ReturnType<typeof setTimeout>;
};

class Leavers {
private leavers: SpaceLeaver[] = [];

constructor(private space: Space) {}

getByConnectionId(connectionId: string): SpaceLeaver | undefined {
return this.leavers.find((leaver) => leaver.connectionId === connectionId);
}

addLeaver(connectionId: string) {
const timeoutCallback = () => {
this.space.members.removeMember(connectionId);
};

const member = this.space.members.getByConnectionId(connectionId);

if (member) {
this.leavers.push({
...member,
timeoutId: setTimeout(timeoutCallback, this.space.options.offlineTimeout),
});
}
}

removeLeaver(connectionId: string) {
const leaverIndex = this.leavers.findIndex((leaver) => leaver.connectionId === connectionId);

if (leaverIndex >= 0) {
clearTimeout(this.leavers[leaverIndex].timeoutId);
this.leavers.splice(leaverIndex, 1);
}
}

refreshTimeout(connectionId: string) {
this.removeLeaver(connectionId);
this.addLeaver(connectionId);
}
}

export default Leavers;
Loading

0 comments on commit d501224

Please sign in to comment.