From 6a417a8ace11ae40201bfce611319465c310da3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hl=C3=B6=C3=B0ver=20Sigur=C3=B0sson?= Date: Thu, 8 Jul 2021 17:37:44 +0200 Subject: [PATCH 01/11] stream csv data into copy command to bypass superuser perm --- package.json | 1 + src/database/import.database.ts | 21 ++++++++++++++------- yarn.lock | 12 ++++++++++++ 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/package.json b/package.json index 6c2efa1..864b7a5 100644 --- a/package.json +++ b/package.json @@ -55,6 +55,7 @@ "morgan": "^1.10.0", "node-cron": "^2.0.3", "pg": "^8.5.1", + "pg-copy-streams": "^5.1.1", "progress": "^2.0.3", "rfc4648": "^1.4.0", "shortid": "^2.2.16", diff --git a/src/database/import.database.ts b/src/database/import.database.ts index 82d2910..729a7d4 100644 --- a/src/database/import.database.ts +++ b/src/database/import.database.ts @@ -1,7 +1,9 @@ +import fs from 'fs'; import {config} from 'dotenv'; import {indices} from '../utility/order.utility'; import {connection} from '../database/connection.database'; import {transactionFields} from '../database/transaction.database'; +import {from as copyFrom} from 'pg-copy-streams'; config(); @@ -9,9 +11,10 @@ export async function importBlocks(path: string) { return new Promise(async (resolve, reject) => { try { const encoding = '(FORMAT CSV, HEADER, ESCAPE \'\\\', DELIMITER \'|\', FORCE_NULL("height"))'; - await connection.raw(`COPY blocks ("id", "previous_block", "mined_at", "height", "txs", "extended") FROM '${path}' WITH ${encoding}`); - - return resolve(true); + const stream = connection.query(`COPY blocks ("id", "previous_block", "mined_at", "height", "txs", "extended") FROM STDIN WITH ${encoding}`); + const fileStream = fs.createReadStream(path); + fileStream.on('error', reject); + fileStream.pipe(stream).on('finish', resolve).on('error', reject); } catch (error) { return reject(error); } @@ -26,9 +29,11 @@ export async function importTransactions(path: string) { .map((field) => `"${field}"`); const encoding = '(FORMAT CSV, HEADER, ESCAPE \'\\\', DELIMITER \'|\', FORCE_NULL("format", "height", "data_size"))'; - await connection.raw(`COPY transactions (${fields.join(',')}) FROM '${path}' WITH ${encoding}`); + const stream = connection.query(`COPY transactions (${fields.join(',')}) FROM STDIN WITH ${encoding}`); + const fileStream = fs.createReadStream(path); + fileStream.on('error', reject); + fileStream.pipe(stream).on('finish', resolve).on('error', reject); - return resolve(true); } catch (error) { return reject(error); } @@ -39,9 +44,11 @@ export async function importTags(path: string) { return new Promise(async (resolve, reject) => { try { const encoding = '(FORMAT CSV, HEADER, ESCAPE \'\\\', DELIMITER \'|\', FORCE_NULL(index))'; - await connection.raw(`COPY tags ("tx_id", "index", "name", "value") FROM '${path}' WITH ${encoding}`); + const stream = connection.query(`COPY tags ("tx_id", "index", "name", "value") FROM STDIN WITH ${encoding}`); + const fileStream = fs.createReadStream(path); + fileStream.on('error', reject); + fileStream.pipe(stream).on('finish', resolve).on('error', reject); - return resolve(true); } catch (error) { return reject(error); } diff --git a/yarn.lock b/yarn.lock index f9aded6..05b4a2a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6082,6 +6082,11 @@ object.pick@^1.2.0, object.pick@^1.3.0: dependencies: isobject "^3.0.1" +obuf@^1.1.2: + version "1.1.2" + resolved "https://registry.yarnpkg.com/obuf/-/obuf-1.1.2.tgz#09bea3343d41859ebd446292d11c9d4db619084e" + integrity sha512-PX1wu0AmAdPqOL1mWhqmlOd8kOIZQwGZw6rh7uby9fTc5lhaOWFLX3I6R1hrF9k3zUY40e6igsLGkDXK92LJNg== + on-finished@~2.3.0: version "2.3.0" resolved "https://registry.yarnpkg.com/on-finished/-/on-finished-2.3.0.tgz#20f1336481b083cd75337992a16971aa2d906947" @@ -6385,6 +6390,13 @@ pg-connection-string@^2.5.0: resolved "https://registry.yarnpkg.com/pg-connection-string/-/pg-connection-string-2.5.0.tgz#538cadd0f7e603fc09a12590f3b8a452c2c0cf34" integrity sha512-r5o/V/ORTA6TmUnyWZR9nCj1klXCO2CEKNRlVuJptZe85QuhFayC7WeMic7ndayT5IRIR0S0xFxFi2ousartlQ== +pg-copy-streams@^5.1.1: + version "5.1.1" + resolved "https://registry.yarnpkg.com/pg-copy-streams/-/pg-copy-streams-5.1.1.tgz#012f48c332cc31490aa0b5330bc571795963230f" + integrity sha512-ieW6JuiIo/4WQ7n+Wevr9zYvpM1AwUs6EwNCCA0VgKZ6ZQ7Y9k3IW00vqc6svX9FtENhbaTbLN7MxekraCrbfg== + dependencies: + obuf "^1.1.2" + pg-int8@1.0.1: version "1.0.1" resolved "https://registry.yarnpkg.com/pg-int8/-/pg-int8-1.0.1.tgz#943bd463bf5b71b4170115f80f8efc9a0c0eb78c" From 55dcaac50c43dd4792a42b09472062e50c1c2c0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hl=C3=B6=C3=B0ver=20Sigur=C3=B0sson?= Date: Thu, 8 Jul 2021 18:07:01 +0200 Subject: [PATCH 02/11] fixing pool client query --- package.json | 4 ++ src/database/import.database.ts | 97 +++++++++++++++++++++++++-------- yarn.lock | 19 ++++++- 3 files changed, 97 insertions(+), 23 deletions(-) diff --git a/package.json b/package.json index 864b7a5..1bf317b 100644 --- a/package.json +++ b/package.json @@ -5,6 +5,9 @@ "repository": "git@github.com:ArweaveTeam/gateway.git", "author": "Arweave ", "license": "MIT", + "prettier": { + "singleQuote": true + }, "scripts": { "dev:test": "npm run dev:build && nyc mocha dist/test dist/test/**/*.js", "dev:codecov": "nyc report --reporter=text-lcov | codecov --pipe --token=cea7a229-8289-4a7c-9dc6-4fb694978cb2", @@ -76,6 +79,7 @@ "@types/node": "^14.14.22", "@types/node-cron": "^2.0.3", "@types/pg": "^7.14.11", + "@types/pg-copy-streams": "^1.2.1", "@types/progress": "^2.0.3", "@types/shortid": "^0.0.29", "@types/superagent": "^4.1.10", diff --git a/src/database/import.database.ts b/src/database/import.database.ts index 729a7d4..baff98f 100644 --- a/src/database/import.database.ts +++ b/src/database/import.database.ts @@ -1,20 +1,37 @@ -import fs from 'fs'; -import {config} from 'dotenv'; -import {indices} from '../utility/order.utility'; -import {connection} from '../database/connection.database'; -import {transactionFields} from '../database/transaction.database'; -import {from as copyFrom} from 'pg-copy-streams'; +import fs from "fs"; +import { PoolClient } from "pg"; +import { config } from "dotenv"; +import { indices } from "../utility/order.utility"; +import { pgConnection } from "../database/connection.database"; +import { transactionFields } from "../database/transaction.database"; +import { from as copyFrom } from "pg-copy-streams"; config(); export async function importBlocks(path: string) { return new Promise(async (resolve, reject) => { + let client: PoolClient; try { - const encoding = '(FORMAT CSV, HEADER, ESCAPE \'\\\', DELIMITER \'|\', FORCE_NULL("height"))'; - const stream = connection.query(`COPY blocks ("id", "previous_block", "mined_at", "height", "txs", "extended") FROM STDIN WITH ${encoding}`); + client = await pgConnection.connect(); + const encoding = + "(FORMAT CSV, HEADER, ESCAPE '\\', DELIMITER '|', FORCE_NULL(\"height\"))"; + const stream = client.query( + copyFrom( + `COPY blocks ("id", "previous_block", "mined_at", "height", "txs", "extended") FROM STDIN WITH ${encoding}` + ) + ); const fileStream = fs.createReadStream(path); - fileStream.on('error', reject); - fileStream.pipe(stream).on('finish', resolve).on('error', reject); + fileStream.on("error", reject); + fileStream + .pipe(stream) + .on("finish", () => { + client.release(); + resolve(true); + }) + .on("error", (err) => { + client.release(); + reject(err); + }); } catch (error) { return reject(error); } @@ -23,17 +40,35 @@ export async function importBlocks(path: string) { export async function importTransactions(path: string) { return new Promise(async (resolve, reject) => { + let client: PoolClient; try { + client = await pgConnection.connect(); const fields = transactionFields - .concat(indices) - .map((field) => `"${field}"`); + .concat(indices) + .map((field) => `"${field}"`); - const encoding = '(FORMAT CSV, HEADER, ESCAPE \'\\\', DELIMITER \'|\', FORCE_NULL("format", "height", "data_size"))'; - const stream = connection.query(`COPY transactions (${fields.join(',')}) FROM STDIN WITH ${encoding}`); + const encoding = + '(FORMAT CSV, HEADER, ESCAPE \'\\\', DELIMITER \'|\', FORCE_NULL("format", "height", "data_size"))'; + const stream = client.query( + copyFrom( + `COPY transactions (${fields.join(",")}) FROM STDIN WITH ${encoding}` + ) + ); const fileStream = fs.createReadStream(path); - fileStream.on('error', reject); - fileStream.pipe(stream).on('finish', resolve).on('error', reject); - + fileStream.on("error", () => { + client.release(); + reject(); + }); + fileStream + .pipe(stream) + .on("finish", () => { + client.release(); + resolve(true); + }) + .on("error", (err) => { + client.release(); + reject(err); + }); } catch (error) { return reject(error); } @@ -42,13 +77,31 @@ export async function importTransactions(path: string) { export async function importTags(path: string) { return new Promise(async (resolve, reject) => { + let client: PoolClient; try { - const encoding = '(FORMAT CSV, HEADER, ESCAPE \'\\\', DELIMITER \'|\', FORCE_NULL(index))'; - const stream = connection.query(`COPY tags ("tx_id", "index", "name", "value") FROM STDIN WITH ${encoding}`); + client = await pgConnection.connect(); + const encoding = + "(FORMAT CSV, HEADER, ESCAPE '\\', DELIMITER '|', FORCE_NULL(index))"; + const stream = client.query( + copyFrom( + `COPY tags ("tx_id", "index", "name", "value") FROM STDIN WITH ${encoding}` + ) + ); const fileStream = fs.createReadStream(path); - fileStream.on('error', reject); - fileStream.pipe(stream).on('finish', resolve).on('error', reject); - + fileStream.on("error", () => { + client.release(); + reject(); + }); + fileStream + .pipe(stream) + .on("finish", () => { + client.release(); + resolve(true); + }) + .on("error", (err) => { + client.release(); + reject(err); + }); } catch (error) { return reject(error); } diff --git a/yarn.lock b/yarn.lock index 05b4a2a..27ed382 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1340,6 +1340,23 @@ resolved "https://registry.yarnpkg.com/@types/parse-json/-/parse-json-4.0.0.tgz#2f8bb441434d163b35fb8ffdccd7138927ffb8c0" integrity sha512-//oorEZjL6sbPcKUaCdIGlIUeH26mgzimjBB77G6XRgnDl/L5wOnpyBGRe/Mmf5CVW3PwEBE1NjiMZ/ssFh4wA== +"@types/pg-copy-streams@^1.2.1": + version "1.2.1" + resolved "https://registry.yarnpkg.com/@types/pg-copy-streams/-/pg-copy-streams-1.2.1.tgz#f598cee10b59c20c8f155358bd99a6c438c42aae" + integrity sha512-7gsqXeYd4CypX4ZslvOhCuquL6Uo6B/ariCxw67fw6k+YL/Y1XncraDN/7qVlbM5WE5tmhxPxmacufrZ1/iloQ== + dependencies: + "@types/node" "*" + "@types/pg" "*" + +"@types/pg@*": + version "8.6.1" + resolved "https://registry.yarnpkg.com/@types/pg/-/pg-8.6.1.tgz#099450b8dc977e8197a44f5229cedef95c8747f9" + integrity sha512-1Kc4oAGzAl7uqUStZCDvaLFqZrW9qWSjXOmBfdgyBP5La7Us6Mg4GBvRlSoaZMhQF/zSj1C8CtKMBkoiT8eL8w== + dependencies: + "@types/node" "*" + pg-protocol "*" + pg-types "^2.2.0" + "@types/pg@^7.14.11": version "7.14.11" resolved "https://registry.yarnpkg.com/@types/pg/-/pg-7.14.11.tgz#daf5555504a1f7af4263df265d91f140fece52e3" @@ -6407,7 +6424,7 @@ pg-pool@^3.3.0: resolved "https://registry.yarnpkg.com/pg-pool/-/pg-pool-3.3.0.tgz#12d5c7f65ea18a6e99ca9811bd18129071e562fc" integrity sha512-0O5huCql8/D6PIRFAlmccjphLYWC+JIzvUhSzXSpGaf+tjTZc4nn+Lr7mLXBbFJfvwbP0ywDv73EiaBsxn7zdg== -pg-protocol@^1.2.0, pg-protocol@^1.5.0: +pg-protocol@*, pg-protocol@^1.2.0, pg-protocol@^1.5.0: version "1.5.0" resolved "https://registry.yarnpkg.com/pg-protocol/-/pg-protocol-1.5.0.tgz#b5dd452257314565e2d54ab3c132adc46565a6a0" integrity sha512-muRttij7H8TqRNu/DxrAJQITO4Ac7RmX3Klyr/9mJEOBeIpgnF8f9jAfRz5d3XwQZl5qBjF9gLsUtMPJE0vezQ== From d3cbd2ad26c430528a534133f4c090513326abb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hl=C3=B6=C3=B0ver=20Sigur=C3=B0sson?= Date: Thu, 8 Jul 2021 19:00:35 +0200 Subject: [PATCH 03/11] fix type error --- src/database/import.database.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/database/import.database.ts b/src/database/import.database.ts index baff98f..41dd64d 100644 --- a/src/database/import.database.ts +++ b/src/database/import.database.ts @@ -28,9 +28,9 @@ export async function importBlocks(path: string) { client.release(); resolve(true); }) - .on("error", (err) => { + .on("error", (err: unknown) => { client.release(); - reject(err); + reject(new String(err)); }); } catch (error) { return reject(error); @@ -65,9 +65,9 @@ export async function importTransactions(path: string) { client.release(); resolve(true); }) - .on("error", (err) => { + .on("error", (err: unknown) => { client.release(); - reject(err); + reject(new String(err)); }); } catch (error) { return reject(error); @@ -98,9 +98,9 @@ export async function importTags(path: string) { client.release(); resolve(true); }) - .on("error", (err) => { + .on("error", (err: unknown) => { client.release(); - reject(err); + reject(new String(err)); }); } catch (error) { return reject(error); From 6c20c55b5603e7734461d931b7a454ecd5a2b727 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hl=C3=B6=C3=B0ver=20Sigur=C3=B0sson?= Date: Mon, 19 Jul 2021 15:19:17 +0200 Subject: [PATCH 04/11] remove unneccecary async try-catches --- src/database/import.database.ts | 165 +++++++++++++++----------------- src/database/sync.database.ts | 145 +++++++++++++++++++--------- 2 files changed, 173 insertions(+), 137 deletions(-) diff --git a/src/database/import.database.ts b/src/database/import.database.ts index 41dd64d..3ef130d 100644 --- a/src/database/import.database.ts +++ b/src/database/import.database.ts @@ -1,109 +1,94 @@ -import fs from "fs"; -import { PoolClient } from "pg"; -import { config } from "dotenv"; -import { indices } from "../utility/order.utility"; -import { pgConnection } from "../database/connection.database"; -import { transactionFields } from "../database/transaction.database"; -import { from as copyFrom } from "pg-copy-streams"; +import fs from 'fs'; +import { PoolClient } from 'pg'; +import { config } from 'dotenv'; +import { indices } from '../utility/order.utility'; +import { pgConnection } from '../database/connection.database'; +import { transactionFields } from '../database/transaction.database'; +import { from as copyFrom } from 'pg-copy-streams'; config(); -export async function importBlocks(path: string) { +export function importBlocks(path: string) { return new Promise(async (resolve, reject) => { - let client: PoolClient; - try { - client = await pgConnection.connect(); - const encoding = - "(FORMAT CSV, HEADER, ESCAPE '\\', DELIMITER '|', FORCE_NULL(\"height\"))"; - const stream = client.query( - copyFrom( - `COPY blocks ("id", "previous_block", "mined_at", "height", "txs", "extended") FROM STDIN WITH ${encoding}` - ) - ); - const fileStream = fs.createReadStream(path); - fileStream.on("error", reject); - fileStream - .pipe(stream) - .on("finish", () => { - client.release(); - resolve(true); - }) - .on("error", (err: unknown) => { - client.release(); - reject(new String(err)); - }); - } catch (error) { - return reject(error); - } + const client: PoolClient = await pgConnection.connect(); + const encoding = + "(FORMAT CSV, HEADER, ESCAPE '\\', DELIMITER '|', FORCE_NULL(\"height\"))"; + const stream = client.query( + copyFrom( + `COPY blocks ("id", "previous_block", "mined_at", "height", "txs", "extended") FROM STDIN WITH ${encoding}` + ) + ); + const fileStream = fs.createReadStream(path); + fileStream.on('error', reject); + fileStream + .pipe(stream) + .on('finish', () => { + client.release(); + resolve(true); + }) + .on('error', (err: unknown) => { + client.release(); + reject(new String(err)); + }); }); } -export async function importTransactions(path: string) { +export function importTransactions(path: string) { return new Promise(async (resolve, reject) => { - let client: PoolClient; - try { - client = await pgConnection.connect(); - const fields = transactionFields - .concat(indices) - .map((field) => `"${field}"`); + const client: PoolClient = await pgConnection.connect(); + const fields = transactionFields + .concat(indices) + .map((field) => `"${field}"`); - const encoding = - '(FORMAT CSV, HEADER, ESCAPE \'\\\', DELIMITER \'|\', FORCE_NULL("format", "height", "data_size"))'; - const stream = client.query( - copyFrom( - `COPY transactions (${fields.join(",")}) FROM STDIN WITH ${encoding}` - ) - ); - const fileStream = fs.createReadStream(path); - fileStream.on("error", () => { + const encoding = + '(FORMAT CSV, HEADER, ESCAPE \'\\\', DELIMITER \'|\', FORCE_NULL("format", "height", "data_size"))'; + const stream = client.query( + copyFrom( + `COPY transactions (${fields.join(',')}) FROM STDIN WITH ${encoding}` + ) + ); + const fileStream = fs.createReadStream(path); + fileStream.on('error', () => { + client.release(); + reject(); + }); + fileStream + .pipe(stream) + .on('finish', () => { client.release(); - reject(); + resolve(true); + }) + .on('error', (err: unknown) => { + client.release(); + reject(new String(err)); }); - fileStream - .pipe(stream) - .on("finish", () => { - client.release(); - resolve(true); - }) - .on("error", (err: unknown) => { - client.release(); - reject(new String(err)); - }); - } catch (error) { - return reject(error); - } }); } -export async function importTags(path: string) { +export function importTags(path: string) { return new Promise(async (resolve, reject) => { - let client: PoolClient; - try { - client = await pgConnection.connect(); - const encoding = - "(FORMAT CSV, HEADER, ESCAPE '\\', DELIMITER '|', FORCE_NULL(index))"; - const stream = client.query( - copyFrom( - `COPY tags ("tx_id", "index", "name", "value") FROM STDIN WITH ${encoding}` - ) - ); - const fileStream = fs.createReadStream(path); - fileStream.on("error", () => { + const client: PoolClient = await pgConnection.connect(); + const encoding = + "(FORMAT CSV, HEADER, ESCAPE '\\', DELIMITER '|', FORCE_NULL(index))"; + const stream = client.query( + copyFrom( + `COPY tags ("tx_id", "index", "name", "value") FROM STDIN WITH ${encoding}` + ) + ); + const fileStream = fs.createReadStream(path); + fileStream.on('error', () => { + client.release(); + reject(); + }); + fileStream + .pipe(stream) + .on('finish', () => { + client.release(); + resolve(true); + }) + .on('error', (err: unknown) => { client.release(); - reject(); + reject(new String(err)); }); - fileStream - .pipe(stream) - .on("finish", () => { - client.release(); - resolve(true); - }) - .on("error", (err: unknown) => { - client.release(); - reject(new String(err)); - }); - } catch (error) { - return reject(error); - } }); } diff --git a/src/database/sync.database.ts b/src/database/sync.database.ts index f07f42c..0c65485 100644 --- a/src/database/sync.database.ts +++ b/src/database/sync.database.ts @@ -1,21 +1,34 @@ import ProgressBar from 'progress'; -import {DataItemJson} from 'arweave-bundles'; -import {config} from 'dotenv'; -import {getLastBlock} from '../utility/height.utility'; -import {serializeBlock, serializeTransaction, serializeAnsTransaction, serializeTags} from '../utility/serialize.utility'; -import {streams, initStreams, resetCacheStreams} from '../utility/csv.utility'; -import {log} from '../utility/log.utility'; -import {ansBundles} from '../utility/ans.utility'; -import {mkdir} from '../utility/file.utility'; -import {sleep} from '../utility/sleep.utility'; -import {TestSuite} from '../utility/mocha.utility'; -import {getNodeInfo} from '../query/node.query'; -import {block} from '../query/block.query'; -import {transaction, tagValue, Tag} from '../query/transaction.query'; -import {getDataFromChunks} from '../query/node.query'; -import {importBlocks, importTransactions, importTags} from './import.database'; -import {DatabaseTag} from './transaction.database'; -import {cacheANSEntries} from '../caching/ans.entry.caching'; +import { DataItemJson } from 'arweave-bundles'; +import { config } from 'dotenv'; +import { getLastBlock } from '../utility/height.utility'; +import { + serializeBlock, + serializeTransaction, + serializeAnsTransaction, + serializeTags, +} from '../utility/serialize.utility'; +import { + streams, + initStreams, + resetCacheStreams, +} from '../utility/csv.utility'; +import { log } from '../utility/log.utility'; +import { ansBundles } from '../utility/ans.utility'; +import { mkdir } from '../utility/file.utility'; +import { sleep } from '../utility/sleep.utility'; +import { TestSuite } from '../utility/mocha.utility'; +import { getNodeInfo } from '../query/node.query'; +import { block } from '../query/block.query'; +import { transaction, tagValue, Tag } from '../query/transaction.query'; +import { getDataFromChunks } from '../query/node.query'; +import { + importBlocks, + importTransactions, + importTags, +} from './import.database'; +import { DatabaseTag } from './transaction.database'; +import { cacheANSEntries } from '../caching/ans.entry.caching'; config(); mkdir('snapshot'); @@ -32,14 +45,11 @@ export let currentHeight = 0; export let timer = setTimeout(() => {}, 0); export function configureSyncBar(start: number, end: number) { - bar = new ProgressBar( - ':current/:total blocks synced [:bar] :percent :etas', - { - complete: '|', - incomplete: ' ', - total: end - start, - }, - ); + bar = new ProgressBar(':current/:total blocks synced [:bar] :percent :etas', { + complete: '|', + incomplete: ' ', + total: end - start, + }); } export async function startSync() { @@ -47,7 +57,9 @@ export async function startSync() { currentHeight = startHeight; if (parallelization > 0) { - log.info(`[database] starting sync, parallelization is set to ${parallelization}`); + log.info( + `[database] starting sync, parallelization is set to ${parallelization}` + ); if (storeSnapshot) { log.info('[snapshot] also writing new blocks to the snapshot folder'); } @@ -60,7 +72,9 @@ export async function startSync() { configureSyncBar(startHeight, nodeInfo.height); topHeight = nodeInfo.height; - log.info(`[database] database is currently at height ${startHeight}, resuming sync to ${topHeight}`); + log.info( + `[database] database is currently at height ${startHeight}, resuming sync to ${topHeight}` + ); bar.tick(); await parallelize(startHeight + 1); @@ -89,7 +103,9 @@ export async function parallelize(height: number) { await sleep(30000); const nodeInfo = await getNodeInfo(); if (nodeInfo.height > topHeight) { - log.info(`[database] updated height from ${topHeight} to ${nodeInfo.height} syncing new blocks`); + log.info( + `[database] updated height from ${topHeight} to ${nodeInfo.height} syncing new blocks` + ); topHeight = nodeInfo.height; } @@ -108,25 +124,30 @@ export async function parallelize(height: number) { try { await importBlocks(`${process.cwd()}/cache/block.csv`); } catch (error) { - log.error('[sync] importing new blocks failed most likely due to it already being in the DB'); + log.error( + '[sync] importing new blocks failed most likely due to it already being in the DB' + ); log.error(error); } try { await importTransactions(`${process.cwd()}/cache/transaction.csv`); } catch (error) { - log.error('[sync] importing new transactions failed most likely due to it already being in the DB'); + log.error( + '[sync] importing new transactions failed most likely due to it already being in the DB' + ); log.error(error); } try { await importTags(`${process.cwd()}/cache/tags.csv`); } catch (error) { - log.error('[sync] importing new tags failed most likely due to it already being in the DB'); + log.error( + '[sync] importing new tags failed most likely due to it already being in the DB' + ); log.error(error); } - resetCacheStreams(); if (!bar.complete) { @@ -144,7 +165,7 @@ export async function parallelize(height: number) { export async function storeBlock(height: number, retry: number = 0) { try { const currentBlock = await block(height); - const {formattedBlock, input} = serializeBlock(currentBlock, height); + const { formattedBlock, input } = serializeBlock(currentBlock, height); streams.block.cache.write(input); @@ -153,15 +174,22 @@ export async function storeBlock(height: number, retry: number = 0) { } if (height > 0) { - await storeTransactions(JSON.parse(formattedBlock.txs) as Array, height); + await storeTransactions( + JSON.parse(formattedBlock.txs) as Array, + height + ); } } catch (error) { if (SIGKILL === false) { if (retry >= 25) { - log.info(`[snapshot] there were problems retrieving ${height}, restarting the server`); + log.info( + `[snapshot] there were problems retrieving ${height}, restarting the server` + ); await startSync(); } else { - log.info(`[snapshot] could not retrieve block at height ${height}, retrying`); + log.info( + `[snapshot] could not retrieve block at height ${height}, retrying` + ); await storeBlock(height, retry + 1); } } @@ -179,10 +207,17 @@ export async function storeTransactions(txs: Array, height: number) { await Promise.all(batch); } -export async function storeTransaction(tx: string, height: number, retry: boolean = true) { +export async function storeTransaction( + tx: string, + height: number, + retry: boolean = true +) { try { const currentTransaction = await transaction(tx); - const {formattedTransaction, preservedTags, input} = serializeTransaction(currentTransaction, height); + const { formattedTransaction, preservedTags, input } = serializeTransaction( + currentTransaction, + height + ); streams.transaction.cache.write(input); @@ -199,7 +234,13 @@ export async function storeTransaction(tx: string, height: number, retry: boolea } } catch (error) { console.log(''); - log.info(`[database] could not retrieve tx ${tx} at height ${height} ${retry ? ', attempting to retrieve again' : ', missing tx stored in .rescan'}`); + log.info( + `[database] could not retrieve tx ${tx} at height ${height} ${ + retry + ? ', attempting to retrieve again' + : ', missing tx stored in .rescan' + }` + ); if (retry) { await storeTransaction(tx, height, false); } else { @@ -211,7 +252,11 @@ export async function storeTransaction(tx: string, height: number, retry: boolea } } -export async function processAns(id: string, height: number, retry: boolean = true) { +export async function processAns( + id: string, + height: number, + retry: boolean = true +) { try { const ansPayload = await getDataFromChunks(id); const ansTxs = await ansBundles.unbundleData(ansPayload.toString('utf-8')); @@ -222,7 +267,9 @@ export async function processAns(id: string, height: number, retry: boolean = tr if (retry) { await processAns(id, height, false); } else { - log.info(`[database] malformed ANS payload at height ${height} for tx ${id}`); + log.info( + `[database] malformed ANS payload at height ${height} for tx ${id}` + ); streams.rescan.cache.write(`${id}|${height}|ans\n`); if (storeSnapshot) { streams.rescan.snapshot.write(`${id}|${height}|ans\n`); @@ -231,10 +278,13 @@ export async function processAns(id: string, height: number, retry: boolean = tr } } -export async function processANSTransaction(ansTxs: Array, height: number) { +export async function processANSTransaction( + ansTxs: Array, + height: number +) { for (let i = 0; i < ansTxs.length; i++) { const ansTx = ansTxs[i]; - const {ansTags, input} = serializeAnsTransaction(ansTx, height); + const { ansTags, input } = serializeAnsTransaction(ansTx, height); streams.transaction.cache.write(input); @@ -244,7 +294,7 @@ export async function processANSTransaction(ansTxs: Array, height: for (let ii = 0; ii < ansTags.length; ii++) { const ansTag = ansTags[ii]; - const {name, value} = ansTag; + const { name, value } = ansTag; const tag: DatabaseTag = { tx_id: ansTx.id, @@ -267,7 +317,7 @@ export async function processANSTransaction(ansTxs: Array, height: export function storeTags(tx_id: string, tags: Array) { for (let i = 0; i < tags.length; i++) { const tag = tags[i]; - const {input} = serializeTags(tx_id, i, tag); + const { input } = serializeTags(tx_id, i, tag); streams.tags.cache.write(input); if (storeSnapshot) { streams.tags.snapshot.write(input); @@ -275,11 +325,12 @@ export function storeTags(tx_id: string, tags: Array) { } } - export function signalHook() { if (!TestSuite) { process.on('SIGINT', () => { - log.info('[database] ensuring all blocks are stored before exit, you may see some extra output in console'); + log.info( + '[database] ensuring all blocks are stored before exit, you may see some extra output in console' + ); SIGKILL = true; setInterval(() => { if (SIGINT === false) { From f4aaa96a224516f42826b8bc3e3a9a521be94304 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hl=C3=B6=C3=B0ver=20Sigur=C3=B0sson?= Date: Mon, 19 Jul 2021 15:30:06 +0200 Subject: [PATCH 05/11] better crash reports from block parser --- src/database/sync.database.ts | 61 ++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 29 deletions(-) diff --git a/src/database/sync.database.ts b/src/database/sync.database.ts index 0c65485..a69d091 100644 --- a/src/database/sync.database.ts +++ b/src/database/sync.database.ts @@ -162,38 +162,41 @@ export async function parallelize(height: number) { } } -export async function storeBlock(height: number, retry: number = 0) { - try { - const currentBlock = await block(height); - const { formattedBlock, input } = serializeBlock(currentBlock, height); +export function storeBlock(height: number, retry: number = 0) { + return new Promise(async (resolve, reject) => { + block(height) + .then((currentBlock) => { + const { formattedBlock, input } = serializeBlock(currentBlock, height); - streams.block.cache.write(input); + streams.block.cache.write(input); - if (storeSnapshot) { - streams.block.snapshot.write(input); - } + if (storeSnapshot) { + streams.block.snapshot.write(input); + } - if (height > 0) { - await storeTransactions( - JSON.parse(formattedBlock.txs) as Array, - height - ); - } - } catch (error) { - if (SIGKILL === false) { - if (retry >= 25) { - log.info( - `[snapshot] there were problems retrieving ${height}, restarting the server` - ); - await startSync(); - } else { - log.info( - `[snapshot] could not retrieve block at height ${height}, retrying` - ); - await storeBlock(height, retry + 1); - } - } - } + if (height > 0) { + storeTransactions( + JSON.parse(formattedBlock.txs) as Array, + height + ).resolve(); + } + }) + .catch((error) => { + if (SIGKILL === false) { + if (retry >= 25) { + log.info( + `[snapshot] there were problems retrieving ${height}, restarting the server` + ); + startSync().then(reject); + } else { + log.info( + `[snapshot] could not retrieve block at height ${height}, retrying` + ); + storeBlock(height, retry + 1).then(reject); + } + } + }); + }); } export async function storeTransactions(txs: Array, height: number) { From 6384e87fd00c670382438c20b816b55510ecdad8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hl=C3=B6=C3=B0ver=20Sigur=C3=B0sson?= Date: Mon, 19 Jul 2021 18:20:49 +0200 Subject: [PATCH 06/11] Improve randomized NODE selection Prevent zombie processes with proper promisified recursion --- package.json | 1 + src/database/sync.database.ts | 168 ++++++++++++++++--------------- src/declerations.d.ts | 1 + src/query/block.query.ts | 182 ++++++++++++++++++---------------- src/query/node.query.ts | 92 ++++++++++++----- src/route/status.route.ts | 26 +++-- src/route/sync.route.ts | 28 +++--- yarn.lock | 7 ++ 8 files changed, 293 insertions(+), 212 deletions(-) create mode 100644 src/declerations.d.ts diff --git a/package.json b/package.json index 1bf317b..3809982 100644 --- a/package.json +++ b/package.json @@ -60,6 +60,7 @@ "pg": "^8.5.1", "pg-copy-streams": "^5.1.1", "progress": "^2.0.3", + "random-weighted-choice": "^0.1.4", "rfc4648": "^1.4.0", "shortid": "^2.2.16", "superagent": "^6.1.0", diff --git a/src/database/sync.database.ts b/src/database/sync.database.ts index a69d091..8118273 100644 --- a/src/database/sync.database.ts +++ b/src/database/sync.database.ts @@ -52,41 +52,46 @@ export function configureSyncBar(start: number, end: number) { }); } -export async function startSync() { - const startHeight = await getLastBlock(); - currentHeight = startHeight; - - if (parallelization > 0) { - log.info( - `[database] starting sync, parallelization is set to ${parallelization}` - ); - if (storeSnapshot) { - log.info('[snapshot] also writing new blocks to the snapshot folder'); - } - - initStreams(); - signalHook(); - - if (startHeight > 0) { - const nodeInfo = await getNodeInfo(); - configureSyncBar(startHeight, nodeInfo.height); - topHeight = nodeInfo.height; - +export function startSync() { + getLastBlock().then(async (startHeight) => { + if (parallelization > 0) { log.info( - `[database] database is currently at height ${startHeight}, resuming sync to ${topHeight}` + `[database] starting sync, parallelization is set to ${parallelization}` ); - - bar.tick(); - await parallelize(startHeight + 1); - } else { - const nodeInfo = await getNodeInfo(); - configureSyncBar(0, nodeInfo.height); - topHeight = nodeInfo.height; - log.info(`[database] syncing from block 0 to ${topHeight}`); - bar.tick(); - await parallelize(0); + if (storeSnapshot) { + log.info('[snapshot] also writing new blocks to the snapshot folder'); + } + initStreams(); + signalHook(); + if (startHeight > 0) { + const nodeInfo = await getNodeInfo(); + if (nodeInfo) { + nodeInfo && configureSyncBar(startHeight, nodeInfo.height); + topHeight = nodeInfo.height; + + log.info( + `[database] database is currently at height ${startHeight}, resuming sync to ${topHeight}` + ); + + bar.tick(); + await parallelize(startHeight + 1); + } + } else { + const nodeInfo = await getNodeInfo(); + if (nodeInfo) { + configureSyncBar(0, nodeInfo.height); + topHeight = nodeInfo.height; + bar.tick(); + await parallelize(0); + } else { + console.error( + 'Failed to establish any connection to Nodes after 100 retries' + ); + process.exit(1); + } + } } - } + }); } export async function parallelize(height: number) { @@ -102,7 +107,7 @@ export async function parallelize(height: number) { log.info('[database] fully synced, monitoring for new blocks'); await sleep(30000); const nodeInfo = await getNodeInfo(); - if (nodeInfo.height > topHeight) { + if (nodeInfo && nodeInfo.height > topHeight) { log.info( `[database] updated height from ${topHeight} to ${nodeInfo.height} syncing new blocks` ); @@ -114,39 +119,26 @@ export async function parallelize(height: number) { const batch = []; for (let i = height; i < height + parallelization && i < topHeight; i++) { - batch.push(storeBlock(i)); + const blok = storeBlock(i); + if (!blok) { + // basically telling typescript to stfu + log.error( + `[database] shouldn't happen, but it seems a block creation went wrong` + ); + process.exit(1); + } + batch.push(blok); } SIGINT = true; - await Promise.all(batch); - - try { - await importBlocks(`${process.cwd()}/cache/block.csv`); - } catch (error) { - log.error( - '[sync] importing new blocks failed most likely due to it already being in the DB' - ); - log.error(error); - } + await Promise.all(batch).catch((error) => { + throw new Error(error); + }); - try { - await importTransactions(`${process.cwd()}/cache/transaction.csv`); - } catch (error) { - log.error( - '[sync] importing new transactions failed most likely due to it already being in the DB' - ); - log.error(error); - } - - try { - await importTags(`${process.cwd()}/cache/tags.csv`); - } catch (error) { - log.error( - '[sync] importing new tags failed most likely due to it already being in the DB' - ); - log.error(error); - } + await importBlocks(`${process.cwd()}/cache/block.csv`); + await importTransactions(`${process.cwd()}/cache/transaction.csv`); + await importTags(`${process.cwd()}/cache/tags.csv`); resetCacheStreams(); @@ -162,37 +154,51 @@ export async function parallelize(height: number) { } } -export function storeBlock(height: number, retry: number = 0) { - return new Promise(async (resolve, reject) => { +export function storeBlock(height: number, retry: number = 0): Promise { + return new Promise((resolve, reject) => { block(height) .then((currentBlock) => { - const { formattedBlock, input } = serializeBlock(currentBlock, height); + if (currentBlock) { + const { formattedBlock, input } = serializeBlock( + currentBlock, + height + ); - streams.block.cache.write(input); + streams.block.cache.write(input); - if (storeSnapshot) { - streams.block.snapshot.write(input); - } + if (storeSnapshot) { + streams.block.snapshot.write(input); + } - if (height > 0) { - storeTransactions( - JSON.parse(formattedBlock.txs) as Array, - height - ).resolve(); + if (height > 0) { + storeTransactions( + JSON.parse(formattedBlock.txs) as Array, + height + ).then(resolve); + } else { + resolve(); + } + } else { + return new Promise((res) => setTimeout(res, 10)).then(() => { + if (retry >= 25) { + log.info( + `[snapshot] could not retrieve block at height ${height}` + ); + return reject('Failed to fetch block after 25 retries'); + } else { + return resolve(storeBlock(height, retry + 1)); + } + }); } }) .catch((error) => { + log.error(`[snapshot] error ${error}`); if (SIGKILL === false) { if (retry >= 25) { - log.info( - `[snapshot] there were problems retrieving ${height}, restarting the server` - ); - startSync().then(reject); + log.info(`[snapshot] there were problems retrieving ${height}`); + return reject(error); } else { - log.info( - `[snapshot] could not retrieve block at height ${height}, retrying` - ); - storeBlock(height, retry + 1).then(reject); + return resolve(storeBlock(height, retry + 1)); } } }); diff --git a/src/declerations.d.ts b/src/declerations.d.ts new file mode 100644 index 0000000..bf652c4 --- /dev/null +++ b/src/declerations.d.ts @@ -0,0 +1 @@ +declare module 'random-weighted-choice'; diff --git a/src/query/block.query.ts b/src/query/block.query.ts index 5c3fb80..4c18bc5 100644 --- a/src/query/block.query.ts +++ b/src/query/block.query.ts @@ -1,93 +1,105 @@ -import {get} from 'superagent'; -import {grabNode} from './node.query'; +import { get } from 'superagent'; +import { grabNode, warmNode, coolNode } from './node.query'; export interface BlockType { - nonce: string; - previous_block: string; - timestamp: number; - last_retarget: number; - diff: string; - height: number; - hash: string; - indep_hash: string; - txs: Array; - tx_root: string; - tx_tree: Array; - wallet_list: string; - reward_addr: string; - tags: Array; - reward_pool: number; - weave_size: number; - block_size: number; - cumulative_diff: string; - hash_list_merkle: string; - poa: { - option: string; - tx_path: string; - chunk: string; - }; + nonce: string; + previous_block: string; + timestamp: number; + last_retarget: number; + diff: string; + height: number; + hash: string; + indep_hash: string; + txs: Array; + tx_root: string; + tx_tree: Array; + wallet_list: string; + reward_addr: string; + tags: Array; + reward_pool: number; + weave_size: number; + block_size: number; + cumulative_diff: string; + hash_list_merkle: string; + poa: { + option: string; + tx_path: string; + chunk: string; + }; } -export async function block(height: number): Promise { - const payload = await get(`${grabNode()}/block/height/${height}`); - const body = JSON.parse(payload.text); - - return { - nonce: body.nonce, - previous_block: body.previous_block, - timestamp: body.timestamp, - last_retarget: body.last_retarget, - diff: body.diff, - height: body.height, - hash: body.hash, - indep_hash: body.indep_hash, - txs: body.txs, - tx_root: body.tx_root, - tx_tree: body.tx_tree, - wallet_list: body.wallet_list, - reward_addr: body.reward_addr, - tags: body.tags, - reward_pool: body.reward_pool, - weave_size: body.weave_size, - block_size: body.block_size, - cumulative_diff: body.cumulative_diff, - hash_list_merkle: body.hash_list_merkle, - poa: { - option: body.poa?.option, - tx_path: body.poa?.tx_path, - chunk: body.poa?.chunk, - }, - }; +export function block(height: number): Promise { + const tryNode = grabNode(); + return get(`${tryNode}/block/height/${height}`) + .then((payload) => { + const body = JSON.parse(payload.text); + warmNode(tryNode); + return { + nonce: body.nonce, + previous_block: body.previous_block, + timestamp: body.timestamp, + last_retarget: body.last_retarget, + diff: body.diff, + height: body.height, + hash: body.hash, + indep_hash: body.indep_hash, + txs: body.txs, + tx_root: body.tx_root, + tx_tree: body.tx_tree, + wallet_list: body.wallet_list, + reward_addr: body.reward_addr, + tags: body.tags, + reward_pool: body.reward_pool, + weave_size: body.weave_size, + block_size: body.block_size, + cumulative_diff: body.cumulative_diff, + hash_list_merkle: body.hash_list_merkle, + poa: { + option: body.poa?.option, + tx_path: body.poa?.tx_path, + chunk: body.poa?.chunk, + }, + }; + }) + .catch(() => { + coolNode(tryNode); + }); } -export async function currentBlock(): Promise { - const payload = await get(`${grabNode()}/block/current`); - const body = JSON.parse(payload.text); +export async function currentBlock(): Promise { + const tryNode = grabNode(); + return get(`${tryNode}/block/current`) + .then((payload) => { + const body = JSON.parse(payload.text); - return { - nonce: body.nonce, - previous_block: body.previous_block, - timestamp: body.timestamp, - last_retarget: body.last_retarget, - diff: body.diff, - height: body.height, - hash: body.hash, - indep_hash: body.indep_hash, - txs: body.txs, - tx_root: body.tx_root, - tx_tree: body.tx_tree, - wallet_list: body.wallet_list, - reward_addr: body.reward_addr, - tags: body.tags, - reward_pool: body.reward_pool, - weave_size: body.weave_size, - block_size: body.block_size, - cumulative_diff: body.cumulative_diff, - hash_list_merkle: body.hash_list_merkle, - poa: { - option: body.poa.option, - tx_path: body.poa.tx_path, - chunk: body.poa.chunk, - }, - }; + return { + nonce: body.nonce, + previous_block: body.previous_block, + timestamp: body.timestamp, + last_retarget: body.last_retarget, + diff: body.diff, + height: body.height, + hash: body.hash, + indep_hash: body.indep_hash, + txs: body.txs, + tx_root: body.tx_root, + tx_tree: body.tx_tree, + wallet_list: body.wallet_list, + reward_addr: body.reward_addr, + tags: body.tags, + reward_pool: body.reward_pool, + weave_size: body.weave_size, + block_size: body.block_size, + cumulative_diff: body.cumulative_diff, + hash_list_merkle: body.hash_list_merkle, + poa: { + option: body.poa.option, + tx_path: body.poa.tx_path, + chunk: body.poa.chunk, + }, + }; + }) + .catch(() => { + coolNode(tryNode); + }); } diff --git a/src/query/node.query.ts b/src/query/node.query.ts index f67b930..b88522f 100644 --- a/src/query/node.query.ts +++ b/src/query/node.query.ts @@ -1,13 +1,37 @@ -import {config} from 'dotenv'; -import {get} from 'superagent'; -import {getTransactionOffset, getChunk} from './chunk.query'; +import { config } from 'dotenv'; +import { get } from 'superagent'; +import rwc from 'random-weighted-choice'; +import { getTransactionOffset, getChunk } from './chunk.query'; config(); -export const NODES = process.env.ARWEAVE_NODES ? JSON.parse(process.env.ARWEAVE_NODES) : ['http://lon-1.eu-west-1.arweave.net:1984']; +export const NODES = process.env.ARWEAVE_NODES + ? JSON.parse(process.env.ARWEAVE_NODES) + : ['http://lon-1.eu-west-1.arweave.net:1984']; + +type WeightedNode = { id: string; weight: number }; + +const nodeTemperatures: WeightedNode[] = NODES.map((url: string) => ({ + id: url, + weight: 1, +})); export function grabNode() { - return NODES[Math.floor(Math.random() * NODES.length)]; + return rwc(nodeTemperatures); +} + +export function warmNode(url: string) { + const item = nodeTemperatures.find((i: WeightedNode) => i.id === url); + if (item) { + item['weight'] = Math.max(item['weight'] + 1, 99); + } +} + +export function coolNode(url: string) { + const item = nodeTemperatures.find((i: WeightedNode) => i.id === url); + if (item) { + item['weight'] = Math.min(item['weight'] - 1, 1); + } } export interface InfoType { @@ -22,21 +46,37 @@ export interface InfoType { node_state_latency: number; } -export async function getNodeInfo(): Promise { - const payload = await get(`${grabNode()}/info`); - const body = JSON.parse(payload.text); - - return { - network: body.network, - version: body.version, - release: body.release, - height: body.height, - current: body.current, - blocks: body.blocks, - peers: body.peers, - queue_length: body.queue_length, - node_state_latency: body.node_state_latency, - }; +export function getNodeInfo(retry = 0): Promise { + const tryNode = grabNode(); + + return get(`${tryNode}/info`) + .then((payload) => { + const body = JSON.parse(payload.text); + warmNode(tryNode); + return { + network: body.network, + version: body.version, + release: body.release, + height: body.height, + current: body.current, + blocks: body.blocks, + peers: body.peers, + queue_length: body.queue_length, + node_state_latency: body.node_state_latency, + }; + }) + .catch(() => { + return new Promise((res) => setTimeout(res, 10 + 2 * retry)).then(() => { + if (retry < 100) { + return getNodeInfo(retry + 1); + } else { + console.error( + 'Failed to establish connection to any specified node after 100 retries' + ); + process.exit(1); + } + }); + }); } export async function getData(id: string): Promise { @@ -48,9 +88,12 @@ export function getDataAsStream(id: string) { return get(`${grabNode()}/${id}`); } -export async function getDataFromChunks(id: string, retry: boolean = true): Promise { +export async function getDataFromChunks( + id: string, + retry: boolean = true +): Promise { try { - const {startOffset, endOffset} = await getTransactionOffset(id); + const { startOffset, endOffset } = await getTransactionOffset(id); let byte = 0; let chunks = Buffer.from(''); @@ -64,7 +107,10 @@ export async function getDataFromChunks(id: string, retry: boolean = true): Prom return chunks; } catch (error) { if (retry) { - console.error(`error retrieving data from ${id}, please note that this may be a cancelled transaction`.red.bold); + console.error( + `error retrieving data from ${id}, please note that this may be a cancelled transaction` + .red.bold + ); return await getDataFromChunks(id, false); } else { throw error; diff --git a/src/route/status.route.ts b/src/route/status.route.ts index 673c8e8..d51bb23 100644 --- a/src/route/status.route.ts +++ b/src/route/status.route.ts @@ -1,18 +1,22 @@ -import {Request, Response} from 'express'; -import {currentHeight} from '../database/sync.database'; -import {getNodeInfo} from '../query/node.query'; +import { Request, Response } from 'express'; +import { currentHeight } from '../database/sync.database'; +import { getNodeInfo } from '../query/node.query'; -export const start = Number(new Date); +export const start = Number(new Date()); export async function statusRoute(req: Request, res: Response) { const info = await getNodeInfo(); - const delta = info.height - currentHeight; + if (info) { + const delta = info.height - currentHeight; - return res.status(200).send({ - status: 'OK', - gatewayHeight: currentHeight, - arweaveHeight: info.height, - delta, - }); + return res.status(200).send({ + status: 'OK', + gatewayHeight: currentHeight, + arweaveHeight: info.height, + delta, + }); + } else { + return res.status(404).send(); + } } diff --git a/src/route/sync.route.ts b/src/route/sync.route.ts index 1fa7c65..6e17c50 100644 --- a/src/route/sync.route.ts +++ b/src/route/sync.route.ts @@ -1,19 +1,23 @@ -import {Request, Response} from 'express'; -import {currentHeight} from '../database/sync.database'; -import {getNodeInfo} from '../query/node.query'; +import { Request, Response } from 'express'; +import { currentHeight } from '../database/sync.database'; +import { getNodeInfo } from '../query/node.query'; -export const start = Number(new Date); +export const start = Number(new Date()); export async function syncRoute(req: Request, res: Response) { const info = await getNodeInfo(); - const delta = info.height - currentHeight; - const status = delta < 3 ? 200 : 400; + if (info) { + const delta = info.height - currentHeight; + const status = delta < 3 ? 200 : 400; - return res.status(status).send({ - status: 'OK', - gatewayHeight: currentHeight, - arweaveHeight: info.height, - delta, - }); + return res.status(status).send({ + status: 'OK', + gatewayHeight: currentHeight, + arweaveHeight: info.height, + delta, + }); + } else { + return res.status(404).send(); + } } diff --git a/yarn.lock b/yarn.lock index 27ed382..dbc801b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6620,6 +6620,13 @@ random-bytes@~1.0.0: resolved "https://registry.yarnpkg.com/random-bytes/-/random-bytes-1.0.0.tgz#4f68a1dc0ae58bd3fb95848c30324db75d64360b" integrity sha1-T2ih3Arli9P7lYSMMDJNt11kNgs= +random-weighted-choice@^0.1.4: + version "0.1.4" + resolved "https://registry.yarnpkg.com/random-weighted-choice/-/random-weighted-choice-0.1.4.tgz#7bf4971f76c9d33e937eea1152e005024b3d55f6" + integrity sha512-c1aBMOpBjtDW641lCIvOpmkTV76KbTb+s833OZCyyKa+QxE8UrHWmxVXhTKXIwLoMhq1A4EaSL2d3kSQPOLJwQ== + dependencies: + debug "^2.2.0" + randombytes@^2.0.0, randombytes@^2.0.1, randombytes@^2.0.5, randombytes@^2.1.0: version "2.1.0" resolved "https://registry.yarnpkg.com/randombytes/-/randombytes-2.1.0.tgz#df6f84372f0270dc65cdf6291349ab7a473d4f2a" From bcaf41b405c34aef6b73b2c125fcff7c008a13b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hl=C3=B6=C3=B0ver=20Sigur=C3=B0sson?= Date: Tue, 20 Jul 2021 14:28:36 +0200 Subject: [PATCH 07/11] stack safe parallel fetching with fluture --- package.json | 25 ++-- src/database/sync.database.ts | 248 ++++++++++++++-------------------- src/declerations.d.ts | 1 + src/route/data.route.ts | 57 ++++---- yarn.lock | 18 +++ 5 files changed, 167 insertions(+), 182 deletions(-) diff --git a/package.json b/package.json index 3809982..7144b8a 100644 --- a/package.json +++ b/package.json @@ -9,18 +9,18 @@ "singleQuote": true }, "scripts": { - "dev:test": "npm run dev:build && nyc mocha dist/test dist/test/**/*.js", - "dev:codecov": "nyc report --reporter=text-lcov | codecov --pipe --token=cea7a229-8289-4a7c-9dc6-4fb694978cb2", - "dev:lint": "eslint \"test/*.ts\" \"test/**/*.ts\" \"src/*.ts\" \"src/**/*.ts\" \"migrations/*.ts\" \"knexfile.ts\"", - "dev:gql": "npx graphql-codegen --config codegen.yml", - "dev:build": "npm run dev:gql && tsc", - "dev:start": "npm run dev:build && node dist/src/Gateway.js", - "dev:restart": "npm run migrate:down && npm run migrate:latest && npm run dev:start", - "import:transaction": "npm run dev:build && node dist/src/import/transaction.import.js", - "import:tags": "npm run dev:build && node dist/src/import/tags.import.ts", - "verify:ans": "npm run dev:build && node dist/src/caching/ans.verify.js", - "verify:prune": "npm run dev:build && node dist/src/verify/prune.verify.js", - "verify:transaction": "npm run dev:build && node dist/src/verify/transaction.verify.js", + "test": "npm run build && nyc mocha dist/test dist/test/**/*.js", + "codecov": "nyc report --reporter=text-lcov | codecov --pipe --token=cea7a229-8289-4a7c-9dc6-4fb694978cb2", + "lint": "eslint \"test/*.ts\" \"test/**/*.ts\" \"src/*.ts\" \"src/**/*.ts\" \"migrations/*.ts\" \"knexfile.ts\"", + "gql": "npx graphql-codegen --config codegen.yml", + "build": "npm run gql && tsc", + "start": "npm run build && node dist/src/Gateway.js", + "restart": "npm run migrate:down && npm run migrate:latest && npm run start", + "import:transaction": "npm run build && node dist/src/import/transaction.import.js", + "import:tags": "npm run build && node dist/src/import/tags.import.ts", + "verify:ans": "npm run build && node dist/src/caching/ans.verify.js", + "verify:prune": "npm run build && node dist/src/verify/prune.verify.js", + "verify:transaction": "npm run build && node dist/src/verify/transaction.verify.js", "migrate:down": "knex migrate:down", "migrate:up": "knex migrate:up", "migrate:latest": "knex migrate:latest", @@ -45,6 +45,7 @@ "express": "^4.17.1", "express-pg-session": "^1.1.0", "express-session": "^1.17.1", + "fluture": "^14.0.0", "fs": "^0.0.1-security", "fs-jetpack": "^4.1.0", "graphql": "^15.5.0", diff --git a/src/database/sync.database.ts b/src/database/sync.database.ts index 8118273..309f60d 100644 --- a/src/database/sync.database.ts +++ b/src/database/sync.database.ts @@ -1,4 +1,6 @@ import ProgressBar from 'progress'; +import Fluture from 'fluture'; +import * as F from 'fluture'; import { DataItemJson } from 'arweave-bundles'; import { config } from 'dotenv'; import { getLastBlock } from '../utility/height.utility'; @@ -33,9 +35,9 @@ import { cacheANSEntries } from '../caching/ans.entry.caching'; config(); mkdir('snapshot'); mkdir('cache'); +F.debugMode(true); export const storeSnapshot = process.env.SNAPSHOT === '1' ? true : false; -export const parallelization = parseInt(process.env.PARALLEL || '8'); export let SIGINT: boolean = false; export let SIGKILL: boolean = false; @@ -45,164 +47,115 @@ export let currentHeight = 0; export let timer = setTimeout(() => {}, 0); export function configureSyncBar(start: number, end: number) { - bar = new ProgressBar(':current/:total blocks synced [:bar] :percent :etas', { - complete: '|', - incomplete: ' ', - total: end - start, + bar = new ProgressBar(':current/:total blocks synced :percent', { + curr: start, + total: end, }); + bar.curr = start; } export function startSync() { - getLastBlock().then(async (startHeight) => { - if (parallelization > 0) { - log.info( - `[database] starting sync, parallelization is set to ${parallelization}` - ); - if (storeSnapshot) { - log.info('[snapshot] also writing new blocks to the snapshot folder'); - } - initStreams(); - signalHook(); - if (startHeight > 0) { - const nodeInfo = await getNodeInfo(); - if (nodeInfo) { - nodeInfo && configureSyncBar(startHeight, nodeInfo.height); - topHeight = nodeInfo.height; - - log.info( - `[database] database is currently at height ${startHeight}, resuming sync to ${topHeight}` - ); - - bar.tick(); - await parallelize(startHeight + 1); - } - } else { - const nodeInfo = await getNodeInfo(); - if (nodeInfo) { - configureSyncBar(0, nodeInfo.height); - topHeight = nodeInfo.height; - bar.tick(); - await parallelize(0); - } else { - console.error( - 'Failed to establish any connection to Nodes after 100 retries' - ); - process.exit(1); - } - } - } - }); -} - -export async function parallelize(height: number) { - clearTimeout(timer); - timer = setTimeout(async () => { - log.info('[database] sync timed out, restarting server'); - process.exit(); - }, 300 * 1000); - - currentHeight = height; + getLastBlock().then((startHeight) => { + log.info(`[database] starting sync`); - if (height >= topHeight) { - log.info('[database] fully synced, monitoring for new blocks'); - await sleep(30000); - const nodeInfo = await getNodeInfo(); - if (nodeInfo && nodeInfo.height > topHeight) { - log.info( - `[database] updated height from ${topHeight} to ${nodeInfo.height} syncing new blocks` - ); - topHeight = nodeInfo.height; + if (storeSnapshot) { + log.info('[snapshot] also writing new blocks to the snapshot folder'); } - - await parallelize(height); - } else { - const batch = []; - - for (let i = height; i < height + parallelization && i < topHeight; i++) { - const blok = storeBlock(i); - if (!blok) { - // basically telling typescript to stfu - log.error( - `[database] shouldn't happen, but it seems a block creation went wrong` + initStreams(); + signalHook(); + + getNodeInfo().then((nodeInfo) => { + if (nodeInfo) { + configureSyncBar(startHeight, nodeInfo.height); + topHeight = nodeInfo.height; + bar.tick(); + // await parallelize(0); + + F.fork(() => console.error('Ooops'))(() => console.log('DONE!'))( + F.parallel( + (isNaN as any)(process.env['PARALLEL']) + ? 36 + : parseInt(process.env['PARALLEL'] || '36') + )( + Array.from( + Array(Math.abs(nodeInfo.height) - Math.abs(startHeight)).keys() + ).map((h) => { + return storeBlock(h + startHeight, bar); + }) + // Array.from(Array(topHeight - startHeight).keys()).map((h) => + // storeBlock(h + startHeight) + // ) + ) + ); + } else { + console.error( + 'Failed to establish any connection to Nodes after 100 retries' ); process.exit(1); } - batch.push(blok); - } - - SIGINT = true; - - await Promise.all(batch).catch((error) => { - throw new Error(error); }); - - await importBlocks(`${process.cwd()}/cache/block.csv`); - await importTransactions(`${process.cwd()}/cache/transaction.csv`); - await importTags(`${process.cwd()}/cache/tags.csv`); - - resetCacheStreams(); - - if (!bar.complete) { - bar.tick(batch.length); - } - - SIGINT = false; - - if (SIGKILL === false) { - await parallelize(height + batch.length); - } - } + }); } -export function storeBlock(height: number, retry: number = 0): Promise { - return new Promise((resolve, reject) => { - block(height) - .then((currentBlock) => { - if (currentBlock) { - const { formattedBlock, input } = serializeBlock( - currentBlock, - height - ); - - streams.block.cache.write(input); - - if (storeSnapshot) { - streams.block.snapshot.write(input); - } - - if (height > 0) { - storeTransactions( - JSON.parse(formattedBlock.txs) as Array, - height - ).then(resolve); - } else { - resolve(); - } - } else { - return new Promise((res) => setTimeout(res, 10)).then(() => { - if (retry >= 25) { - log.info( - `[snapshot] could not retrieve block at height ${height}` - ); - return reject('Failed to fetch block after 25 retries'); - } else { - return resolve(storeBlock(height, retry + 1)); - } - }); - } - }) - .catch((error) => { - log.error(`[snapshot] error ${error}`); - if (SIGKILL === false) { - if (retry >= 25) { - log.info(`[snapshot] there were problems retrieving ${height}`); - return reject(error); - } else { - return resolve(storeBlock(height, retry + 1)); - } - } - }); - }); +export function storeBlock(height: number, bar: ProgressBar): Promise { + return Fluture( + (reject: (reason: string | void) => void, resolve: () => void) => { + let isCancelled = false; + function getBlock(retry = 0) { + !isCancelled && + block(height) + .then((currentBlock) => { + if (currentBlock) { + const { formattedBlock, input } = serializeBlock( + currentBlock, + height + ); + + streams.block.cache.write(input); + + if (storeSnapshot) { + streams.block.snapshot.write(input); + } + + storeTransactions( + JSON.parse(formattedBlock.txs) as Array, + height + ); + bar.tick(); + resolve(); + } else { + new Promise((res) => setTimeout(res, 10)).then(() => { + if (retry >= 25) { + log.info( + `[snapshot] could not retrieve block at height ${height}` + ); + reject('Failed to fetch block after 25 retries'); + } else { + return getBlock(retry + 1); + } + }); + } + }) + .catch((error) => { + log.error(`[snapshot] error ${error}`); + if (SIGKILL === false) { + if (retry >= 25) { + log.info( + `[snapshot] there were problems retrieving ${height}` + ); + reject(error); + } else { + return getBlock(retry + 1); + } + } + }); + } + getBlock(); + return () => { + isCancelled = true; + }; + } + ); } export async function storeTransactions(txs: Array, height: number) { @@ -344,6 +297,7 @@ export function signalHook() { setInterval(() => { if (SIGINT === false) { log.info('[database] block sync state preserved, now exiting'); + console.log(''); process.exit(); } }, 100); diff --git a/src/declerations.d.ts b/src/declerations.d.ts index bf652c4..f840b85 100644 --- a/src/declerations.d.ts +++ b/src/declerations.d.ts @@ -1 +1,2 @@ declare module 'random-weighted-choice'; +declare module 'fluture'; diff --git a/src/route/data.route.ts b/src/route/data.route.ts index ef31704..d1bfd3b 100644 --- a/src/route/data.route.ts +++ b/src/route/data.route.ts @@ -1,11 +1,14 @@ -import {config} from 'dotenv'; -import {read, exists} from 'fs-jetpack'; -import {Request, Response} from 'express'; -import {connection} from '../database/connection.database'; -import {ManifestV1} from '../types/manifest.types'; -import {log} from '../utility/log.utility'; -import {transaction as getTransaction, tagValue} from '../query/transaction.query'; -import {cacheFolder, cacheFile, cacheAnsFile} from '../caching/file.caching'; +import { config } from 'dotenv'; +import { read, exists } from 'fs-jetpack'; +import { Request, Response } from 'express'; +import { connection } from '../database/connection.database'; +import { ManifestV1 } from '../types/manifest.types'; +import { log } from '../utility/log.utility'; +import { + transaction as getTransaction, + tagValue, +} from '../query/transaction.query'; +import { cacheFolder, cacheFile, cacheAnsFile } from '../caching/file.caching'; config(); @@ -37,11 +40,16 @@ export async function dataRoute(req: Request, res: Response) { const ans102 = tagValue(metadata.tags, 'Bundle-Type') === 'ANS-102'; if (req.hostname !== `${transaction}.${manifestPrefix}`) { - if (contentType === 'application/x.arweave-manifest+json' || contentType === 'application/x.arweave-manifest') { + if ( + contentType === 'application/x.arweave-manifest+json' || + contentType === 'application/x.arweave-manifest' + ) { const manifestFile = read(`${cacheFolder}/${transaction}`) || '{}'; const manifest: ManifestV1 = JSON.parse(manifestFile.toString()); - const cachePaths = Object.keys(manifest.paths).map((key) => cacheFile(manifest.paths[key].id)); + const cachePaths = Object.keys(manifest.paths).map((key) => + cacheFile(manifest.paths[key].id) + ); await Promise.all(cachePaths); for (let i = 0; i < Object.keys(manifest.paths).length; i++) { @@ -49,22 +57,22 @@ export async function dataRoute(req: Request, res: Response) { const manifest_path = manifest.paths[path_url]; const existingTx = await connection - .table('manifest') - .where('manifest_id', transaction) - .where('tx_id', manifest_path.id); + .table('manifest') + .where('manifest_id', transaction) + .where('tx_id', manifest_path.id); if (existingTx && existingTx.length > 0) { - return res.redirect(`http://${transaction}.${manifestPrefix}:${port}`); + return res.redirect( + `http://${transaction}.${manifestPrefix}:${port}` + ); } - await connection - .table('manifest') - .insert({ - manifest_url: transaction.toLowerCase(), - manifest_id: transaction, - path: path_url, - tx_id: manifest_path.id, - }); + await connection.table('manifest').insert({ + manifest_url: transaction.toLowerCase(), + manifest_id: transaction, + path: path_url, + tx_id: manifest_path.id, + }); } return res.redirect(`http://${transaction}.${manifestPrefix}:${port}`); @@ -92,6 +100,9 @@ export async function dataRoute(req: Request, res: Response) { console.error(error); res.status(500); - return res.json({status: 'ERROR', message: 'Could not retrieve transaction'}); + return res.json({ + status: 'ERROR', + message: 'Could not retrieve transaction', + }); } } diff --git a/yarn.lock b/yarn.lock index dbc801b..17aecef 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3934,6 +3934,14 @@ flatted@^3.1.0: resolved "https://registry.yarnpkg.com/flatted/-/flatted-3.1.1.tgz#c4b489e80096d9df1dfc97c79871aea7c617c469" integrity sha512-zAoAQiudy+r5SvnSw3KJy5os/oRJYHzrzja/tBDqrZtNhUw8bt6y8OBzMWcjWr+8liV8Eb6yOhw8WZ7VFZ5ZzA== +fluture@^14.0.0: + version "14.0.0" + resolved "https://registry.yarnpkg.com/fluture/-/fluture-14.0.0.tgz#81e1cae996a262e214aa2a8d7845fe1359018e3b" + integrity sha512-pENtLF948a8DfduVKugT8edTAbFi4rBS94xjHwzLanQqIu5PYtLGl+xqs6H8TaIRL7z/B0cDpswdINzH/HRUGA== + dependencies: + sanctuary-show "^2.0.0" + sanctuary-type-identifiers "^3.0.0" + fn.name@1.x.x: version "1.1.0" resolved "https://registry.yarnpkg.com/fn.name/-/fn.name-1.1.0.tgz#26cad8017967aea8731bc42961d04a3d5988accc" @@ -7002,6 +7010,16 @@ safe-regex@^1.1.0: resolved "https://registry.yarnpkg.com/safer-buffer/-/safer-buffer-2.1.2.tgz#44fa161b0187b9549dd84bb91802f9bd8385cd6a" integrity sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg== +sanctuary-show@^2.0.0: + version "2.0.0" + resolved "https://registry.yarnpkg.com/sanctuary-show/-/sanctuary-show-2.0.0.tgz#2326b4744f4b0f993f18ca56a29f68a50f514637" + integrity sha512-REj4ZiioUXnDLj6EpJ9HcYDIEGaEexmB9Fg5o6InZR9f0x5PfnnC21QeU9SZ9E7G8zXSZPNjy8VRUK4safbesw== + +sanctuary-type-identifiers@^3.0.0: + version "3.0.0" + resolved "https://registry.yarnpkg.com/sanctuary-type-identifiers/-/sanctuary-type-identifiers-3.0.0.tgz#51cb488d2ed9f194946a64ffe2b41dd49a348c0b" + integrity sha512-YFXYcG0Ura1dSPd/1xLYtE2XAWUEsBHhMTZvYBOvwT8MeFQwdUOCMm2DC+r94z6H93FVq0qxDac8/D7QpJj6Mg== + scuid@^1.1.0: version "1.1.0" resolved "https://registry.yarnpkg.com/scuid/-/scuid-1.1.0.tgz#d3f9f920956e737a60f72d0e4ad280bf324d5dab" From 176bad9efe4ef1aae1ecb40ddf1ed722495e738d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hl=C3=B6=C3=B0ver=20Sigur=C3=B0sson?= Date: Tue, 20 Jul 2021 14:50:58 +0200 Subject: [PATCH 08/11] add cache since it's needed intially --- .gitignore | 5 +++-- cache/.gitkeep | 0 2 files changed, 3 insertions(+), 2 deletions(-) create mode 100644 cache/.gitkeep diff --git a/.gitignore b/.gitignore index edc7766..dd74b4e 100644 --- a/.gitignore +++ b/.gitignore @@ -3,7 +3,8 @@ node_modules package-lock.json .env .cache -cache +cache/*.csv +.rescan snapshot .snapshot @@ -16,4 +17,4 @@ snapshot **/Thumbs.db # Webstorm -.idea \ No newline at end of file +.idea diff --git a/cache/.gitkeep b/cache/.gitkeep new file mode 100644 index 0000000..e69de29 From c75cb906a273c43a4fb4079658cb1e122e0d85de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hl=C3=B6=C3=B0ver=20Sigur=C3=B0sson?= Date: Tue, 20 Jul 2021 14:57:03 +0200 Subject: [PATCH 09/11] force a crash if 250 retries fail --- src/database/sync.database.ts | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/database/sync.database.ts b/src/database/sync.database.ts index 309f60d..3d3f238 100644 --- a/src/database/sync.database.ts +++ b/src/database/sync.database.ts @@ -71,7 +71,10 @@ export function startSync() { bar.tick(); // await parallelize(0); - F.fork(() => console.error('Ooops'))(() => console.log('DONE!'))( + F.fork((reason) => { + console.error('Ooops', reason); + process.exit(1); + })(() => console.log('DONE!'))( F.parallel( (isNaN as any)(process.env['PARALLEL']) ? 36 @@ -82,9 +85,6 @@ export function startSync() { ).map((h) => { return storeBlock(h + startHeight, bar); }) - // Array.from(Array(topHeight - startHeight).keys()).map((h) => - // storeBlock(h + startHeight) - // ) ) ); } else { @@ -124,12 +124,12 @@ export function storeBlock(height: number, bar: ProgressBar): Promise { bar.tick(); resolve(); } else { - new Promise((res) => setTimeout(res, 10)).then(() => { - if (retry >= 25) { + new Promise((res) => setTimeout(res, 100)).then(() => { + if (retry >= 250) { log.info( `[snapshot] could not retrieve block at height ${height}` ); - reject('Failed to fetch block after 25 retries'); + reject('Failed to fetch block after 250 retries'); } else { return getBlock(retry + 1); } @@ -139,7 +139,7 @@ export function storeBlock(height: number, bar: ProgressBar): Promise { .catch((error) => { log.error(`[snapshot] error ${error}`); if (SIGKILL === false) { - if (retry >= 25) { + if (retry >= 250) { log.info( `[snapshot] there were problems retrieving ${height}` ); From 13fa5246f58004ea621cf0a417b7f41998a94cbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hl=C3=B6=C3=B0ver=20Sigur=C3=B0sson?= Date: Tue, 20 Jul 2021 15:11:43 +0200 Subject: [PATCH 10/11] better retry mechanism over getTransaction --- src/database/sync.database.ts | 34 +++-------- src/query/transaction.query.ts | 108 ++++++++++++++++++++------------- src/route/data.route.ts | 26 +++++--- 3 files changed, 91 insertions(+), 77 deletions(-) diff --git a/src/database/sync.database.ts b/src/database/sync.database.ts index 3d3f238..afbd4ee 100644 --- a/src/database/sync.database.ts +++ b/src/database/sync.database.ts @@ -71,8 +71,8 @@ export function startSync() { bar.tick(); // await parallelize(0); - F.fork((reason) => { - console.error('Ooops', reason); + F.fork((reason: string | void) => { + console.error('Fatal', reason || ''); process.exit(1); })(() => console.log('DONE!'))( F.parallel( @@ -169,13 +169,9 @@ export async function storeTransactions(txs: Array, height: number) { await Promise.all(batch); } -export async function storeTransaction( - tx: string, - height: number, - retry: boolean = true -) { - try { - const currentTransaction = await transaction(tx); +export async function storeTransaction(tx: string, height: number) { + const currentTransaction = await transaction(tx); + if (currentTransaction) { const { formattedTransaction, preservedTags, input } = serializeTransaction( currentTransaction, height @@ -194,23 +190,9 @@ export async function storeTransaction( if (ans102) { await processAns(formattedTransaction.id, height); } - } catch (error) { - console.log(''); - log.info( - `[database] could not retrieve tx ${tx} at height ${height} ${ - retry - ? ', attempting to retrieve again' - : ', missing tx stored in .rescan' - }` - ); - if (retry) { - await storeTransaction(tx, height, false); - } else { - streams.rescan.cache.write(`${tx}|${height}|normal\n`); - if (storeSnapshot) { - streams.rescan.snapshot.write(`${tx}|${height}|normal\n`); - } - } + } else { + console.error('Fatal network error'); + process.exit(1); } } diff --git a/src/query/transaction.query.ts b/src/query/transaction.query.ts index d30e879..cc0b599 100644 --- a/src/query/transaction.query.ts +++ b/src/query/transaction.query.ts @@ -1,60 +1,82 @@ -import {get} from 'superagent'; -import {TagFilter} from '../graphql/types'; -import {Base64UrlEncodedString, WinstonString, fromB64Url} from '../utility/encoding.utility'; -import {grabNode} from './node.query'; +import { get } from 'superagent'; +import { TagFilter } from '../graphql/types'; +import { + Base64UrlEncodedString, + WinstonString, + fromB64Url, +} from '../utility/encoding.utility'; +import { grabNode, coolNode, warmNode } from './node.query'; export interface Tag { - name: Base64UrlEncodedString; - value: Base64UrlEncodedString; + name: Base64UrlEncodedString; + value: Base64UrlEncodedString; } export interface TransactionType { - format: number; - id: string; - height?: number; - last_tx: string; - owner: string; - tags: Array; - target: string; - quantity: WinstonString; - data: Base64UrlEncodedString; - data_size: string; - data_tree: Array; - data_root: string; - reward: string; - signature: string; + format: number; + id: string; + height?: number; + last_tx: string; + owner: string; + tags: Array; + target: string; + quantity: WinstonString; + data: Base64UrlEncodedString; + data_size: string; + data_tree: Array; + data_root: string; + reward: string; + signature: string; } -export async function transaction(id: string): Promise { - const payload = await get(`${grabNode()}/tx/${id}`); - const body = JSON.parse(payload.text); +export function transaction( + id: string, + retry = 0 +): Promise { + const tryNode = grabNode(); - return { - format: body.format, - id: body.id, - last_tx: body.last_tx, - owner: body.owner, - tags: body.tags, - target: body.target, - quantity: body.quantity, - data: body.data, - data_size: body.data_size, - data_tree: body.data_tree, - data_root: body.data_root, - reward: body.reward, - signature: body.signature, - }; + return get(`${tryNode}/tx/${id}`) + .then((payload) => { + const body = JSON.parse(payload.text); + warmNode(tryNode); + return { + format: body.format, + id: body.id, + last_tx: body.last_tx, + owner: body.owner, + tags: body.tags, + target: body.target, + quantity: body.quantity, + data: body.data, + data_size: body.data_size, + data_tree: body.data_tree, + data_root: body.data_root, + reward: body.reward, + signature: body.signature, + }; + }) + .catch(() => { + return new Promise((res) => setTimeout(res, 10 + 2 * retry)).then(() => { + if (retry < 100) { + return transaction(id, retry + 1); + } else { + console.error( + 'Failed to establish connection to any specified node after 100 retries' + ); + process.exit(1); + } + }); + }); } export function toB64url(input: string): Base64UrlEncodedString { return Buffer.from(input) - .toString('base64') - .replace(/\+/g, '-') - .replace(/\//g, '_') - .replace(/=/g, ''); + .toString('base64') + .replace(/\+/g, '-') + .replace(/\//g, '_') + .replace(/=/g, ''); } - export function tagValue(tags: Array, name: string): string { for (let i = 0; i < tags.length; i++) { const tag = tags[i]; diff --git a/src/route/data.route.ts b/src/route/data.route.ts index d1bfd3b..62db0fb 100644 --- a/src/route/data.route.ts +++ b/src/route/data.route.ts @@ -21,13 +21,16 @@ export const manifestPrefix = process.env.MANIFEST_PREFIX || 'amp-gw.online'; export async function dataHeadRoute(req: Request, res: Response) { const path = req.path.match(pathRegex) || []; const transaction = path.length > 1 ? path[1] : ''; - const metadata = await getTransaction(transaction); - - res.status(200); - res.setHeader('accept-ranges', 'bytes'); - res.setHeader('content-length', Number(metadata.data_size)); - - res.end(); + const metadata = await getTransaction(transaction, 95); + if (!metadata) { + res.status(404); + res.end(); + } else { + res.status(200); + res.setHeader('accept-ranges', 'bytes'); + metadata && res.setHeader('content-length', Number(metadata.data_size)); + res.end(); + } } export async function dataRoute(req: Request, res: Response) { @@ -35,7 +38,14 @@ export async function dataRoute(req: Request, res: Response) { const transaction = path.length > 1 ? path[1] : ''; try { - const metadata = await getTransaction(transaction); + const metadata = await getTransaction(transaction, 95); + if (!metadata) { + res.status(404); + return res.json({ + status: 'ERROR', + message: 'Transaction was not found', + }); + } const contentType = tagValue(metadata.tags, 'Content-Type'); const ans102 = tagValue(metadata.tags, 'Bundle-Type') === 'ANS-102'; From 629c73697c992f45baf1bf3ec0f63f3533b1d301 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hl=C3=B6=C3=B0ver=20Sigur=C3=B0sson?= Date: Tue, 20 Jul 2021 17:12:08 +0200 Subject: [PATCH 11/11] remove write batching and never snapshot --- src/database/sync.database.ts | 51 +++-------------------------------- 1 file changed, 4 insertions(+), 47 deletions(-) diff --git a/src/database/sync.database.ts b/src/database/sync.database.ts index afbd4ee..cb4a4ae 100644 --- a/src/database/sync.database.ts +++ b/src/database/sync.database.ts @@ -33,12 +33,9 @@ import { DatabaseTag } from './transaction.database'; import { cacheANSEntries } from '../caching/ans.entry.caching'; config(); -mkdir('snapshot'); mkdir('cache'); F.debugMode(true); -export const storeSnapshot = process.env.SNAPSHOT === '1' ? true : false; - export let SIGINT: boolean = false; export let SIGKILL: boolean = false; export let bar: ProgressBar; @@ -58,9 +55,6 @@ export function startSync() { getLastBlock().then((startHeight) => { log.info(`[database] starting sync`); - if (storeSnapshot) { - log.info('[snapshot] also writing new blocks to the snapshot folder'); - } initStreams(); signalHook(); @@ -113,11 +107,7 @@ export function storeBlock(height: number, bar: ProgressBar): Promise { streams.block.cache.write(input); - if (storeSnapshot) { - streams.block.snapshot.write(input); - } - - storeTransactions( + storeTransaction( JSON.parse(formattedBlock.txs) as Array, height ); @@ -126,9 +116,7 @@ export function storeBlock(height: number, bar: ProgressBar): Promise { } else { new Promise((res) => setTimeout(res, 100)).then(() => { if (retry >= 250) { - log.info( - `[snapshot] could not retrieve block at height ${height}` - ); + log.info(`Could not retrieve block at height ${height}`); reject('Failed to fetch block after 250 retries'); } else { return getBlock(retry + 1); @@ -137,12 +125,10 @@ export function storeBlock(height: number, bar: ProgressBar): Promise { } }) .catch((error) => { - log.error(`[snapshot] error ${error}`); + log.error(`error ${error}`); if (SIGKILL === false) { if (retry >= 250) { - log.info( - `[snapshot] there were problems retrieving ${height}` - ); + log.info(`there were problems retrieving ${height}`); reject(error); } else { return getBlock(retry + 1); @@ -158,17 +144,6 @@ export function storeBlock(height: number, bar: ProgressBar): Promise { ); } -export async function storeTransactions(txs: Array, height: number) { - const batch = []; - - for (let i = 0; i < txs.length; i++) { - const tx = txs[i]; - batch.push(storeTransaction(tx, height)); - } - - await Promise.all(batch); -} - export async function storeTransaction(tx: string, height: number) { const currentTransaction = await transaction(tx); if (currentTransaction) { @@ -179,10 +154,6 @@ export async function storeTransaction(tx: string, height: number) { streams.transaction.cache.write(input); - if (storeSnapshot) { - streams.transaction.snapshot.write(input); - } - storeTags(formattedTransaction.id, preservedTags); const ans102 = tagValue(preservedTags, 'Bundle-Type') === 'ANS-102'; @@ -215,9 +186,6 @@ export async function processAns( `[database] malformed ANS payload at height ${height} for tx ${id}` ); streams.rescan.cache.write(`${id}|${height}|ans\n`); - if (storeSnapshot) { - streams.rescan.snapshot.write(`${id}|${height}|ans\n`); - } } } } @@ -232,10 +200,6 @@ export async function processANSTransaction( streams.transaction.cache.write(input); - if (storeSnapshot) { - streams.transaction.snapshot.write(input); - } - for (let ii = 0; ii < ansTags.length; ii++) { const ansTag = ansTags[ii]; const { name, value } = ansTag; @@ -250,10 +214,6 @@ export async function processANSTransaction( const input = `"${tag.tx_id}"|"${tag.index}"|"${tag.name}"|"${tag.value}"\n`; streams.tags.cache.write(input); - - if (storeSnapshot) { - streams.tags.snapshot.write(input); - } } } } @@ -263,9 +223,6 @@ export function storeTags(tx_id: string, tags: Array) { const tag = tags[i]; const { input } = serializeTags(tx_id, i, tag); streams.tags.cache.write(input); - if (storeSnapshot) { - streams.tags.snapshot.write(input); - } } }