Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Reader class and refactor shield sync #471

Merged
merged 4 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions scripts/reader.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
export class Reader {
#i = 0;
#maxBytes = 0;
#availableBytes;
#done = false;
/**
* @type{()=>{} | null} Called when bytes are available.
* There can't be more than 1 awaiter
*/
#awaiter = null;

/**
* @returns {number} Content length if available, or an estimante
*/
get contentLength() {
return this.#availableBytes.length;
}

/**
* @returns {number} Number or bytes read
*/
get readBytes() {
return this.#i;
}
/**
* @param
*/
constructor(req) {
this.#availableBytes = new Uint8Array(
req.headers?.get('Content-Length') || 1024
);
const stream = req.body.getReader();
(async () => {
while (true) {
const { done, value } = await stream.read();
if (value) {
this.#appendBytes(value);
}
if (done) {
this.#done = true;
break;
}
}
})();
}

#resizeArray(newLength) {
if (newLength <= this.#availableBytes.length) {
throw new Error(
'New length must be greater than the current length.'
);
}

const newArray = new Uint8Array(newLength);
newArray.set(this.#availableBytes);
this.#availableBytes = newArray;
}

#appendBytes(bytes) {
// If we have content-length, there should never be a need to
// resize
if (bytes.length + this.#maxBytes > this.#availableBytes.length) {
this.#resizeArray((bytes.length + this.#maxBytes) * 2);
}

this.#availableBytes.set(bytes, this.#maxBytes);
this.#maxBytes += bytes.length;
// Notify the awaiter if there is one
if (this.#awaiter) this.#awaiter();
}

/**
* @param{number} byteLength
* @returns {Promise<Uint8Array | null>} bytes or null if there are no more bytes
*/
async read(byteLength) {
if (this.#awaiter) throw new Error('Called read more than once');
while (true) {
if (this.#maxBytes - this.#i >= byteLength) {
this.#awaiter = null;
// We have enough bytes to respond
const res = this.#availableBytes.subarray(
this.#i,
this.#i + byteLength
);
this.#i += byteLength;
return res;
}

// There are no more bytes to await, so we can return null
if (this.#done) return null;
// If we didn't respond, wait for the next batch of bytes, then try again
await new Promise((res) => {
this.#awaiter = res;
});
}
}
}
106 changes: 44 additions & 62 deletions scripts/wallet.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { validateMnemonic } from 'bip39';
import { Reader } from './reader.js';
import { decrypt } from './aes-gcm.js';
import { bytesToNum, parseWIF } from './encoding.js';
import { beforeUnloadListener, blockCount } from './global.js';
Expand Down Expand Up @@ -787,100 +788,81 @@ export class Wallet {
wallet.#shield.getLastSyncedBlock() + 1
);
if (!req.ok) throw new Error("Couldn't sync shield");
const reader = req.body.getReader();
const reader = new Reader(req);

/** @type{string[]} Array of txs in the current block */
let txs = [];
let processedBytes = 0;
const length = req.headers.get('Content-Length');
const length = reader.contentLength;
/** @type {Uint8Array} Array of bytes that we are processing **/
const processing = new Uint8Array(length);
getEventEmitter().emit(
'shield-sync-status-update',
0,
length,
false
);
let i = 0;
let max = 0;
while (true) {
/**
* @type {{done: boolean, value: Uint8Array?}}
*/
const { done, value } = await reader.read();
/**
* Array of blocks ready to pass to the shield library
* @type {{txs: string[]; height: number; time: number}[]}
*/
const blocksArray = [];

if (value) {
// Append received bytes in the processing array
processing.set(value, max);
max += value.length;
processedBytes += value.length;
// Loop until we have less than 4 bytes (length)
while (max - i >= 4) {
const length = Number(
bytesToNum(processing.subarray(i, i + 4))
);
// If we have less bytes than the length break and wait for the next
// batch of bytes
if (max - i < length) break;

i += 4;
const bytes = processing.subarray(i, length + i);
i += length;
// 0x5d rapresents the block
if (bytes[0] === 0x5d) {
const height = Number(
bytesToNum(bytes.slice(1, 5))
);
const time = Number(bytesToNum(bytes.slice(5, 9)));

blocksArray.push({ txs, height, time });
txs = [];
} else if (bytes[0] === 0x03) {
// 0x03 is the tx version. We should only get v3 transactions
const hex = bytesToHex(bytes);
txs.push({
hex,
txid: Transaction.getTxidFromHex(hex),
});
} else {
// This is neither a block or a tx.
throw new Error('Failed to parse shield binary');
}
}
}

/**
* Array of blocks ready to pass to the shield library
* @type {{txs: string[]; height: number; time: number}[]}
*/
let blocksArray = [];
const handleAllBlocks = async () => {
// Process the current batch of blocks before starting to parse the next one
if (blocksArray.length) {
await this.#shield.handleBlocks(blocksArray);
}
blocksArray = [];
// Emit status update
getEventEmitter().emit(
'shield-sync-status-update',
processedBytes,
reader.readBytes,
length,
false
);
if (done) break;
};
while (true) {
const packetLengthBytes = await reader.read(4);
if (!packetLengthBytes) break;
const packetLength = Number(bytesToNum(packetLengthBytes));

const bytes = await reader.read(packetLength);
if (!bytes) throw new Error('Stream was cut short');
if (bytes[0] === 0x5d) {
const height = Number(bytesToNum(bytes.slice(1, 5)));
const time = Number(bytesToNum(bytes.slice(5, 9)));

blocksArray.push({ txs, height, time });
txs = [];
} else if (bytes[0] === 0x03) {
// 0x03 is the tx version. We should only get v3 transactions
const hex = bytesToHex(bytes);
txs.push({
hex,
txid: Transaction.getTxidFromHex(hex),
});
} else {
// This is neither a block or a tx.
throw new Error('Failed to parse shield binary');
}
if (blocksArray.length > 1000) {
await handleAllBlocks();
}
}

getEventEmitter().emit('shield-sync-status-update', 0, 0, true);
await handleAllBlocks();
// At this point it should be safe to assume that shield is ready to use
await this.saveShieldOnDisk();
} catch (e) {
debugError(DebugTopics.WALLET, e);
}

// At this point it should be safe to assume that shield is ready to use
await this.saveShieldOnDisk();
const networkSaplingRoot = (
await getNetwork().getBlock(this.#shield.getLastSyncedBlock())
).finalsaplingroot;
if (networkSaplingRoot)
await this.#checkShieldSaplingRoot(networkSaplingRoot);
this.#isSynced = true;

getEventEmitter().emit('shield-sync-status-update', 0, 0, true);
}

/**
Expand Down
130 changes: 130 additions & 0 deletions tests/unit/reader.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { describe, it, expect } from 'vitest';
import { Reader } from '../../scripts/reader.js';

function createMockStream(chunks, contentLength = null) {
let i = 0;
return {
headers: {
get: (key) => (key === 'Content-Length' ? contentLength : null),
},
body: {
getReader: () => {
return {
read: async () => {
if (i < chunks.length) {
const value = chunks[i];
i++;
return { done: false, value };
}
return { done: true, value: null };
},
};
},
},
};
}

describe('Reader without content length', () => {
it('should read bytes correctly when available', async () => {
const mockStream = createMockStream([
new Uint8Array([1, 2, 3, 4]),
new Uint8Array([5, 6, 7, 8]),
]);

const reader = new Reader(mockStream);

const result1 = await reader.read(4);
expect(result1).toEqual(new Uint8Array([1, 2, 3, 4]));

const result2 = await reader.read(4);
expect(result2).toEqual(new Uint8Array([5, 6, 7, 8]));

// Reads after the stream is done should yield null
expect(await reader.read(10)).toBe(null);
});

it('should wait for more bytes if not enough are available', async () => {
const mockStream = createMockStream([
new Uint8Array([1, 2, 3]),
new Uint8Array([4, 5, 6]),
]);

const reader = new Reader(mockStream);

const result = await reader.read(6);
expect(result).toEqual(new Uint8Array([1, 2, 3, 4, 5, 6]));
// Reads after the stream is done should yield null
expect(await reader.read(1)).toBe(null);
});

it('should throw an error if read is called multiple times concurrently', async () => {
const mockStream = createMockStream([new Uint8Array([1, 2, 3])]);

const reader = new Reader(mockStream);

const read1 = reader.read(2);
const read2 = reader.read(2);

await expect(read2).rejects.toThrow('Called read more than once');
await expect(read1).resolves.toEqual(new Uint8Array([1, 2]));
});

it('should handle reading less than available bytes', async () => {
const mockStream = createMockStream([new Uint8Array([1, 2, 3, 4, 5])]);

const reader = new Reader(mockStream);

const result1 = await reader.read(3);
expect(result1).toEqual(new Uint8Array([1, 2, 3]));

const result2 = await reader.read(2);
expect(result2).toEqual(new Uint8Array([4, 5]));
});
});

describe('Reader with Content-Length', () => {
it('should initialize buffer size based on Content-Length header', async () => {
const mockStream = createMockStream([], 2048);
const reader = new Reader(mockStream);

// Read some bytes to indirectly validate initialization
const readPromise = reader.read(0); // No bytes to read, but ensures no errors
await expect(readPromise).resolves.toEqual(new Uint8Array(0));
});

it('should work if Content-Length is not set', async () => {
const mockStream = createMockStream([]);
const reader = new Reader(mockStream);

// Read some bytes to validate no Content-Length doesn't break initialization
const readPromise = reader.read(0);
await expect(readPromise).resolves.toEqual(new Uint8Array(0));
});

it('should handle reading bytes when Content-Length is specified', async () => {
const mockStream = createMockStream(
[new Uint8Array([1, 2, 3, 4])],
2048 // Content-Length
);

const reader = new Reader(mockStream);

const result = await reader.read(4);
expect(result).toEqual(new Uint8Array([1, 2, 3, 4]));
});

it('should resize the buffer if more bytes are received than Content-Length', async () => {
const mockStream = createMockStream(
[new Uint8Array([1, 2, 3, 4]), new Uint8Array([5, 6, 7, 8])],
4 // Content-Length is smaller than total bytes received
);

const reader = new Reader(mockStream);

const result1 = await reader.read(4);
expect(result1).toEqual(new Uint8Array([1, 2, 3, 4]));

const result2 = await reader.read(4);
expect(result2).toEqual(new Uint8Array([5, 6, 7, 8]));
});
});