Skip to content
This repository has been archived by the owner on Nov 9, 2022. It is now read-only.

Multiple try/catch+zombie fixes #97

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ node_modules
package-lock.json
.env
.cache
cache
cache/*.csv
.rescan

snapshot
.snapshot
Expand All @@ -16,4 +17,4 @@ snapshot
**/Thumbs.db

# Webstorm
.idea
.idea
Empty file added cache/.gitkeep
Empty file.
31 changes: 19 additions & 12 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,22 @@
"repository": "git@github.com:ArweaveTeam/gateway.git",
"author": "Arweave <hello@arweave.org>",
"license": "MIT",
"prettier": {
"singleQuote": true
},
"scripts": {
"dev:test": "npm run dev:build && nyc mocha dist/test dist/test/**/*.js",
"dev:codecov": "nyc report --reporter=text-lcov | codecov --pipe --token=cea7a229-8289-4a7c-9dc6-4fb694978cb2",
"dev:lint": "eslint \"test/*.ts\" \"test/**/*.ts\" \"src/*.ts\" \"src/**/*.ts\" \"migrations/*.ts\" \"knexfile.ts\"",
"dev:gql": "npx graphql-codegen --config codegen.yml",
"dev:build": "npm run dev:gql && tsc",
"dev:start": "npm run dev:build && node dist/src/Gateway.js",
"dev:restart": "npm run migrate:down && npm run migrate:latest && npm run dev:start",
"import:transaction": "npm run dev:build && node dist/src/import/transaction.import.js",
"import:tags": "npm run dev:build && node dist/src/import/tags.import.ts",
"verify:ans": "npm run dev:build && node dist/src/caching/ans.verify.js",
"verify:prune": "npm run dev:build && node dist/src/verify/prune.verify.js",
"verify:transaction": "npm run dev:build && node dist/src/verify/transaction.verify.js",
"test": "npm run build && nyc mocha dist/test dist/test/**/*.js",
"codecov": "nyc report --reporter=text-lcov | codecov --pipe --token=cea7a229-8289-4a7c-9dc6-4fb694978cb2",
"lint": "eslint \"test/*.ts\" \"test/**/*.ts\" \"src/*.ts\" \"src/**/*.ts\" \"migrations/*.ts\" \"knexfile.ts\"",
"gql": "npx graphql-codegen --config codegen.yml",
"build": "npm run gql && tsc",
"start": "npm run build && node dist/src/Gateway.js",
"restart": "npm run migrate:down && npm run migrate:latest && npm run start",
"import:transaction": "npm run build && node dist/src/import/transaction.import.js",
"import:tags": "npm run build && node dist/src/import/tags.import.ts",
"verify:ans": "npm run build && node dist/src/caching/ans.verify.js",
"verify:prune": "npm run build && node dist/src/verify/prune.verify.js",
"verify:transaction": "npm run build && node dist/src/verify/transaction.verify.js",
"migrate:down": "knex migrate:down",
"migrate:up": "knex migrate:up",
"migrate:latest": "knex migrate:latest",
Expand All @@ -42,6 +45,7 @@
"express": "^4.17.1",
"express-pg-session": "^1.1.0",
"express-session": "^1.17.1",
"fluture": "^14.0.0",
"fs": "^0.0.1-security",
"fs-jetpack": "^4.1.0",
"graphql": "^15.5.0",
Expand All @@ -55,7 +59,9 @@
"morgan": "^1.10.0",
"node-cron": "^2.0.3",
"pg": "^8.5.1",
"pg-copy-streams": "^5.1.1",
"progress": "^2.0.3",
"random-weighted-choice": "^0.1.4",
"rfc4648": "^1.4.0",
"shortid": "^2.2.16",
"superagent": "^6.1.0",
Expand All @@ -75,6 +81,7 @@
"@types/node": "^14.14.22",
"@types/node-cron": "^2.0.3",
"@types/pg": "^7.14.11",
"@types/pg-copy-streams": "^1.2.1",
"@types/progress": "^2.0.3",
"@types/shortid": "^0.0.29",
"@types/superagent": "^4.1.10",
Expand Down
113 changes: 79 additions & 34 deletions src/database/import.database.ts
Original file line number Diff line number Diff line change
@@ -1,49 +1,94 @@
import {config} from 'dotenv';
import {indices} from '../utility/order.utility';
import {connection} from '../database/connection.database';
import {transactionFields} from '../database/transaction.database';
import fs from 'fs';
import { PoolClient } from 'pg';
import { config } from 'dotenv';
import { indices } from '../utility/order.utility';
import { pgConnection } from '../database/connection.database';
import { transactionFields } from '../database/transaction.database';
import { from as copyFrom } from 'pg-copy-streams';

config();

export async function importBlocks(path: string) {
export function importBlocks(path: string) {
return new Promise(async (resolve, reject) => {
try {
const encoding = '(FORMAT CSV, HEADER, ESCAPE \'\\\', DELIMITER \'|\', FORCE_NULL("height"))';
await connection.raw(`COPY blocks ("id", "previous_block", "mined_at", "height", "txs", "extended") FROM '${path}' WITH ${encoding}`);

return resolve(true);
} catch (error) {
return reject(error);
}
const client: PoolClient = await pgConnection.connect();
const encoding =
"(FORMAT CSV, HEADER, ESCAPE '\\', DELIMITER '|', FORCE_NULL(\"height\"))";
const stream = client.query(
copyFrom(
`COPY blocks ("id", "previous_block", "mined_at", "height", "txs", "extended") FROM STDIN WITH ${encoding}`
)
);
const fileStream = fs.createReadStream(path);
fileStream.on('error', reject);
fileStream
.pipe(stream)
.on('finish', () => {
client.release();
resolve(true);
})
.on('error', (err: unknown) => {
client.release();
reject(new String(err));
});
});
}

export async function importTransactions(path: string) {
export function importTransactions(path: string) {
return new Promise(async (resolve, reject) => {
try {
const fields = transactionFields
.concat(indices)
.map((field) => `"${field}"`);

const encoding = '(FORMAT CSV, HEADER, ESCAPE \'\\\', DELIMITER \'|\', FORCE_NULL("format", "height", "data_size"))';
await connection.raw(`COPY transactions (${fields.join(',')}) FROM '${path}' WITH ${encoding}`);
const client: PoolClient = await pgConnection.connect();
const fields = transactionFields
.concat(indices)
.map((field) => `"${field}"`);

return resolve(true);
} catch (error) {
return reject(error);
}
const encoding =
'(FORMAT CSV, HEADER, ESCAPE \'\\\', DELIMITER \'|\', FORCE_NULL("format", "height", "data_size"))';
const stream = client.query(
copyFrom(
`COPY transactions (${fields.join(',')}) FROM STDIN WITH ${encoding}`
)
);
const fileStream = fs.createReadStream(path);
fileStream.on('error', () => {
client.release();
reject();
});
fileStream
.pipe(stream)
.on('finish', () => {
client.release();
resolve(true);
})
.on('error', (err: unknown) => {
client.release();
reject(new String(err));
});
});
}

export async function importTags(path: string) {
export function importTags(path: string) {
return new Promise(async (resolve, reject) => {
try {
const encoding = '(FORMAT CSV, HEADER, ESCAPE \'\\\', DELIMITER \'|\', FORCE_NULL(index))';
await connection.raw(`COPY tags ("tx_id", "index", "name", "value") FROM '${path}' WITH ${encoding}`);

return resolve(true);
} catch (error) {
return reject(error);
}
const client: PoolClient = await pgConnection.connect();
const encoding =
"(FORMAT CSV, HEADER, ESCAPE '\\', DELIMITER '|', FORCE_NULL(index))";
const stream = client.query(
copyFrom(
`COPY tags ("tx_id", "index", "name", "value") FROM STDIN WITH ${encoding}`
)
);
const fileStream = fs.createReadStream(path);
fileStream.on('error', () => {
client.release();
reject();
});
fileStream
.pipe(stream)
.on('finish', () => {
client.release();
resolve(true);
})
.on('error', (err: unknown) => {
client.release();
reject(new String(err));
});
});
}
Loading