Skip to content

Commit

Permalink
Merge pull request #1 from my-folder-online/feat/peerjs
Browse files Browse the repository at this point in the history
feat: peerjs
  • Loading branch information
ido-pluto authored Nov 4, 2023
2 parents dfcc01c + 799127f commit f5936e2
Show file tree
Hide file tree
Showing 22 changed files with 382 additions and 640 deletions.
358 changes: 232 additions & 126 deletions package-lock.json

Large diffs are not rendered by default.

9 changes: 2 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
"@emotion/react": "^11.11.1",
"@emotion/styled": "^11.11.0",
"browser-fs-access": "^0.35.0",
"bson": "^6.2.0",
"color-scheme-detector": "^1.0.1",
"emittery": "^1.0.1",
"framer-motion": "^10.16.4",
"ipull": "^1.1.0",
"local-storage-proxy": "^4.0.4",
"lodash": "^4.17.21",
"native-file-system-adapter": "^3.0.0",
"peerjs": "^1.5.1",
"pretty-bytes": "^6.1.1",
"react": "^18.2.0",
"react-dom": "^18.2.0",
Expand All @@ -36,20 +35,16 @@
"react-router-dom": "^6.17.0",
"react-text-gradients": "^1.0.2",
"react-type-animation": "^3.2.0",
"simple-peer": "^9.11.1",
"sleep-promise": "^9.1.0",
"tsparticles-slim": "^2.12.0",
"use-array-state": "^1.0.2",
"use-async-effect": "^2.2.7",
"usehooks-ts": "^2.9.1",
"uuid": "^9.0.1",
"vite-plugin-top-level-await": "^1.3.1"
"uuid": "^9.0.1"
},
"devDependencies": {
"@types/react": "^18.2.15",
"@types/react-dom": "^18.2.7",
"@types/rollup-plugin-node-builtins": "^2.1.4",
"@types/simple-peer": "^9.11.7",
"@types/uuid": "^9.0.6",
"@types/wicg-file-system-access": "^2023.10.2",
"@typescript-eslint/eslint-plugin": "^6.0.0",
Expand Down
14 changes: 2 additions & 12 deletions src/config/const.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,4 @@
export const WEB_SERVER = import.meta.env.VITE_WEB_SERVER || 'localhost:8080';
export const WEB_SERVER = import.meta.env.VITE_WEB_SERVER || '';
export const SERVER_SECURE = import.meta.env.VITE_SERVER_SECURE;
export const ICE_SERVERS: RTCIceServer[] =
import.meta.env.VITE_ICE_SERVERS && JSON.parse(import.meta.env.VITE_ICE_SERVERS) ||
[
{
urls: 'turn:turn.anyfirewall.com:443?transport=tcp',
credential: 'webrtc',
username: 'webrtc'
},
{
urls: ['stun:stun.l.google.com:19302']
}
];
import.meta.env.VITE_ICE_SERVERS && JSON.parse(import.meta.env.VITE_ICE_SERVERS) || [];
6 changes: 3 additions & 3 deletions src/config/pwa.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {ManifestOptions, VitePWAOptions} from 'vite-plugin-pwa';
import {Preset} from '@vite-pwa/assets-generator/config';
import {DeepPartial} from '@chakra-ui/react';
import type {ManifestOptions, VitePWAOptions} from 'vite-plugin-pwa';
import type {Preset} from '@vite-pwa/assets-generator/config';
import type {DeepPartial} from '@chakra-ui/react';

export const pwaOptions: Partial<VitePWAOptions> = {
includeAssets: ['logo.svg', 'pwa.svg', 'pwa.png'],
Expand Down
26 changes: 15 additions & 11 deletions src/core/app-store/server-settings.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
import {settings} from './localstorage.ts';
import cloneDeep from 'lodash/cloneDeep.js';
import {PeerOptions} from 'peerjs';

export default class ServerSettings {
private static get _secureChar() {
return settings.secure ? 's' : '';
}
export function getPeerOptions() {
const options: PeerOptions = {};

static get wsServer() {
return `ws${this._secureChar}://${settings.webServer}`;
if (settings.iceServers.length) {
options.config = {
iceServers: cloneDeep(settings.iceServers)
};
}

static get httpServer() {
return `http${this._secureChar}://${settings.webServer}`;
if (settings.webServer) {
const url = new URL(`https://${settings.webServer}`);
options.host = url.hostname;
options.port = Number(url.port);
options.path = url.pathname;
options.secure = settings.secure;
}

static get iceServers() {
return settings.iceServers;
}
return options;
}
21 changes: 0 additions & 21 deletions src/core/peer-to-peer/bson.ts

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
import SimplePeer from 'simple-peer';
import {v4 as uuid} from 'uuid';
import {MessageType} from './types.ts';
import StreamSignals from '../share/remote-download/stream-signals.ts';
import sleep from 'sleep-promise';
import type {Document} from 'bson';
import {BSON} from 'bson';
import deserializeBSON from './bson.ts';
import ReceiveChunk from './receive-chunk.ts';
import Peer, {DataConnection, PeerOptions} from 'peerjs';
import {getPeerOptions} from '../app-store/server-settings.ts';

const QUEUE_IS_FULL_ERROR = 'RTCDataChannel send queue is full';
const FULL_DELAY = 100; // 100ms
const MAX_RETRY_SEND = 15;
export type CallbackChunk<T> = (response: T, chunkIndex?: number, totalChunks?: number) => any;

export default class PeerRequest {
export default class PeerDataConnection {
/**
* on **response** data from the peer
*/
Expand All @@ -39,24 +37,43 @@ export default class PeerRequest {
}
} = {};

public constructor(private _peerConnected: InstanceType<typeof SimplePeer>) {
public constructor(private _dataConnection: DataConnection) {
this._listenForData();
this._initEvents();
}

public connect() {
return new Promise(res => {
this._peerConnected.on('connect', res);
private _initEvents() {
this._dataConnection.on('iceStateChanged', (state: any) => {
console.log('iceStateChanged', state);
});

this._dataConnection.on('error', (error: any) => {
console.log('error', error);
});

this._dataConnection.on('close', () => {
console.log(`close ${this._dataConnection.peer}`);
});

this._dataConnection.on('open', () => {
console.log(`open ${this._dataConnection.peer}`);
});
}

private _sendBSON(data: Document) {
const bson = BSON.serialize(data);
this._peerConnected.send(bson);
public async waitOpen(): Promise<void> {
if (this._dataConnection.open) {
throw new Error('Connection already open');
}

return await new Promise((res, rej) => {
this._dataConnection.once('open', res);
this._dataConnection.once('error', rej);
});
}

private _listenForData() {
this._peerConnected.on('data', data => {
const {type, resource, requestId, body, chunkIndex, totalChunks} = deserializeBSON(data);
this._dataConnection.on('data', (data: any) => {
const {type, resource, requestId, body, chunkIndex, totalChunks} = data;

switch (type) {
case MessageType.REQUEST:
Expand Down Expand Up @@ -147,7 +164,7 @@ export default class PeerRequest {
while (retrySend) {
await this._handelRequestPreSend(requestId);
try {
this._sendBSON({
this._dataConnection.send({
type: MessageType.RESPONSE,
requestId,
body,
Expand All @@ -172,7 +189,7 @@ export default class PeerRequest {

if (!responseInfo) {
console.error(`No callback for requestId ${requestId}`);
this._sendBSON({
this._dataConnection.send({
type: MessageType.ABORT,
requestId
});
Expand All @@ -184,15 +201,15 @@ export default class PeerRequest {

public request<Response, Body>(resource: string, body: Body, callback: CallbackChunk<Response>, signal?: StreamSignals) {
const requestId = uuid();
const removeEvents = signal?.addEvents(requestId, this._sendBSON.bind(this));
const removeEvents = signal?.addEvents(requestId, this._dataConnection.send.bind(this._dataConnection));

const onClose = () => {
delete this._receiveInfo[requestId];
removeEvents?.();
};

this._receiveInfo[requestId] = new ReceiveChunk(callback, onClose);
this._sendBSON({
this._dataConnection.send({
type: MessageType.REQUEST,
resource,
requestId,
Expand All @@ -203,4 +220,14 @@ export default class PeerRequest {
public listen<Body, Response>(resource: string, callback: (body: Body, sendChunk: CallbackChunk<Response>) => Promise<void>) {
this._requestEvent[resource] = callback;
}

public static async newPeerConnection(options: PeerOptions = {}): Promise<{ peer: Peer, id: string }> {
const peer = new Peer({...getPeerOptions(), ...options});
return await new Promise((res, rej) => {
peer.once('open', id => {
res({peer, id});
});
peer.once('error', rej);
});
}
}
124 changes: 22 additions & 102 deletions src/core/peer-to-peer/peer-manager.ts
Original file line number Diff line number Diff line change
@@ -1,126 +1,46 @@
import SimplePeer from 'simple-peer';
import WebSocketRequest from './web-socket-request.ts';
import {v4 as uuid} from 'uuid';
import PeerRequest, {CallbackChunk} from './peer-request.ts';
import ServerSettings from '../app-store/server-settings.ts';
import PeerDataConnection, {CallbackChunk} from './peer-data-connection.ts';
import Peer from 'peerjs';

type PeerManagerRequestCallback = (body: any, sendChunk: CallbackChunk<any>, peer: PeerRequest) => any;
type PeerManagerRequestCallback = (body: any, sendChunk: CallbackChunk<any>, peer: PeerDataConnection) => any;
export type NewPeerResponse = {
peerId: string,
connectInfo: string
error?: string
}

export default class PeerManager {
private static _serverWS = new WebSocketRequest();
private static _serverWSActiveDirectories = 0;
private _destroyed = false;
public shareId = uuid();

private _peers: {
[id: string]: SimplePeer.Instance
} = {};

private _requestInfo: {
[resource: string]: PeerManagerRequestCallback
} = {};
private _peer?: Peer;
public peerId?: string;

public constructor() {
this._createNewPeer = this._createNewPeer.bind(this);
this._signalPeer = this._signalPeer.bind(this);
}

public async initServerWS() {
const serverWS = await this._makeSureServerWSConnected();
public async init() {
const {peer, id} = await PeerDataConnection.newPeerConnection();

await serverWS.request('new-share', this.shareId);
this._peer = peer;
this.peerId = id;

PeerManager._serverWSActiveDirectories++;
serverWS.listen(`new-peer/${this.shareId}`, this._createNewPeer);
serverWS.listen(`signal-peer/${this.shareId}`, this._signalPeer);
}

public listen(resource: string, callback: PeerManagerRequestCallback) {
this._requestInfo[resource] = callback;
}

private _createNewPeer(): Promise<NewPeerResponse> {
const peerId = uuid();
const peer = new SimplePeer({
initiator: true,
trickle: true,
config: {
iceServers: ServerSettings.iceServers
}
})

this._peers[peerId] = peer;
this._initPeerMethods(peer, peerId);

return new Promise(res => {
peer.once('signal', data => {
res({
peerId,
connectInfo: JSON.stringify(data)
});
});
this._peer.on('connection', conn => {
console.log('new connection');
const dataConnection = new PeerDataConnection(conn);
this._initDataConnectionMethods(dataConnection);
});
}

private _signalPeer({peerId, connectInfo}: {peerId: string, connectInfo: string}) {
const peer = this._peers[peerId];

if (!peer) {
console.error(`No peer with id ${peerId}`);
return {error: `No peer with id ${peerId}`};
}

peer.signal(JSON.parse(connectInfo));
return {ok: true};
}

destroy() {
if (this._destroyed || !PeerManager._serverWS)
return;


const serverWS = PeerManager._serverWS;

serverWS.request('closed-share', this.shareId);
serverWS.unregisterListen(`new-peer/${this.shareId}`);
serverWS.unregisterListen(`signal-peer/${this.shareId}`);

for(const peer of Object.values(this._peers)){
peer.destroy();
}

PeerManager._serverWSActiveDirectories--;
this._destroyed = true;

if (PeerManager._serverWSActiveDirectories === 0) {
serverWS.close();
private _initDataConnectionMethods(peer: PeerDataConnection) {
for (const [resource, callback] of Object.entries(this._requestInfo)) {
peer.listen(resource, (body, sendChunk) =>
callback(body, sendChunk, peer)
);
}
}

private async _makeSureServerWSConnected() {
PeerManager._serverWS ??= new WebSocketRequest();
if (!PeerManager._serverWS.connected) {
await PeerManager._serverWS.connect(ServerSettings.wsServer);
}

return PeerManager._serverWS;
public listen(resource: string, callback: PeerManagerRequestCallback) {
this._requestInfo[resource] = callback;
}

private _initPeerMethods(peer: InstanceType<typeof SimplePeer>, peerId: string){
peer.once('close', () => {
delete this._peers[peerId];
});

const peerRequest = new PeerRequest(peer);
for(const [resource, callback] of Object.entries(this._requestInfo)){
peerRequest.listen(resource, (body, sendChunk) =>
callback(body, sendChunk, peerRequest)
);
}
public destroy() {
this._peer?.destroy();
}
}
Loading

0 comments on commit f5936e2

Please sign in to comment.