From 33df7c05bb4c90e6a54dd9bbdfeef457c39bd52a Mon Sep 17 00:00:00 2001 From: Banana-J Date: Fri, 22 Dec 2023 23:25:40 +1100 Subject: [PATCH] Minor update on queue name (#44) * Minor update on the queue name change from standard to full, as well as returning some msg for each queue job process * Add redis connect checker * Update full processor to show its status on the UI for each job --- README.md | 1 + package.json | 1 + src/processors/README.md | 2 +- src/processors/{standard.ts => full.ts} | 148 ++++++++---------------- src/processors/index.ts | 22 ++-- src/processors/lite.ts | 6 +- src/processors/zero.ts | 21 ++-- src/server.ts | 21 ++++ src/service/mainnet/index.ts | 8 +- 9 files changed, 101 insertions(+), 129 deletions(-) rename src/processors/{standard.ts => full.ts} (59%) diff --git a/README.md b/README.md index a950a94..f1bc818 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ Relayer is a communication bridge between XDC parent chain and its subnet. It pr To build and run this app locally you will need: - Install [Node.js](https://nodejs.org/en/). Note, we use node 14 +- If you are running the service on your local machine, then you need a redis # Getting Started diff --git a/package.json b/package.json index 594760a..34a512c 100644 --- a/package.json +++ b/package.json @@ -56,6 +56,7 @@ "cors": "^2.8.5", "dotenv": "^8.2.0", "express": "^4.18.2", + "ioredis": "^5.3.2", "lodash": "^4.17.21", "node-cache": "^5.1.2", "node-fetch": "2", diff --git a/src/processors/README.md b/src/processors/README.md index ab46a84..6777384 100644 --- a/src/processors/README.md +++ b/src/processors/README.md @@ -1,6 +1,6 @@ # How to add new processors? -1. Read our `lite.ts`(simple version) or the `standard.ts`(more complex version) as examples +1. Read our `lite.ts`(simple version) or the `full.ts`(more complex version) as examples 2. Assume you plan to add a new processor called `XXX`. First create the file `XXX.ts` under current directory. 3. Add `export class XXX extends BaseProcessor` where all our processors has some common methods such as `init` and `reset`. Implement those methods. 4. Go to `index.ts` in this directory, register your processors with `enum Mode`, `private processors` (class property), `reset` method and add your custom start up condition in `getRunningModes` method diff --git a/src/processors/standard.ts b/src/processors/full.ts similarity index 59% rename from src/processors/standard.ts rename to src/processors/full.ts index 2b4527b..b25b405 100644 --- a/src/processors/standard.ts +++ b/src/processors/full.ts @@ -2,18 +2,16 @@ import bunyan from "bunyan"; import { config } from "../config"; import { MainnetService, SmartContractData } from "../service/mainnet"; import { SubnetBlockInfo, SubnetService } from "../service/subnet"; -import { Cache } from "../service/cache"; import { chunkBy, sleep } from "../utils"; import { ForkingError } from "../errors/forkingError"; import { BaseProcessor } from "./base"; const chunkByMaxFetchSize = chunkBy(config.chunkSize); -export const NAME = "STANDARD"; +export const NAME = "FULL"; const REPEAT_JOB_OPT = { jobId: NAME, repeat: { cron: config.cronJob.jobExpression}}; -export class Standard extends BaseProcessor { +export class Full extends BaseProcessor { private mainnetService: MainnetService; private subnetService: SubnetService; - cache: Cache; logger: bunyan; constructor(logger: bunyan) { @@ -21,7 +19,6 @@ export class Standard extends BaseProcessor { this.logger = logger; this.mainnetService = new MainnetService(config.mainnet, logger); this.subnetService = new SubnetService(config.subnet, logger); - this.cache = this.cache = new Cache(logger); } getQueue() { @@ -35,7 +32,7 @@ export class Standard extends BaseProcessor { try { done(null, await this.processEvent()); } catch (error) { - this.logger.error("Fail to process standard relayer job", { + this.logger.error("Fail to process full relayer job", { message: error.message, }); // Report the error @@ -48,73 +45,30 @@ export class Standard extends BaseProcessor { // Reset and start the state sync until success async reset() { - try { - // Stop and remove repeatable jobs - await this.queue.removeRepeatable(NAME, REPEAT_JOB_OPT.repeat); - // Clean timer - this.cache.cleanCache(); - // Pull latest confirmed tx from mainnet - const smartContractData = await this.mainnetService.getLastAudittedBlock(); - // Pull latest confirm block from subnet - const lastestSubnetCommittedBlock = - await this.subnetService.getLastCommittedBlockInfo(); - const { shouldProcess, from } = await this.shouldProcessSync( - smartContractData, - lastestSubnetCommittedBlock - ); - - if (shouldProcess) { - await this.submitTxs( - from, - lastestSubnetCommittedBlock.subnetBlockNumber - ); - // Store subnet block into cache - this.cache.setLastSubmittedSubnetHeader(lastestSubnetCommittedBlock); - } - // Keep the "jobId: NAME" and its repeat configuration here so that bull won't create a new repeated job each time we run this code. - await this.queue.add({}, REPEAT_JOB_OPT); - } catch (error) { - this.logger.error( - `Error while bootstraping, system will go into sleep mode for ${ - config.reBootstrapWaitingTime / 1000 / 60 - } minutes before re-processing!, message: ${error?.message}` - ); - await sleep(config.reBootstrapWaitingTime); - this.reset(); - } + await this.queue.add({}, REPEAT_JOB_OPT); } async processEvent() { - // Pull subnet's latest committed block - const lastSubmittedSubnetBlock = await this.getLastSubmittedSubnetHeader(); - const lastCommittedBlockInfo = await this.subnetService.getLastCommittedBlockInfo(); - if ( - lastCommittedBlockInfo.subnetBlockNumber <= - lastSubmittedSubnetBlock.subnetBlockNumber - ) { - this.logger.info( - `Already on the latest, nothing to subnet, Subnet latest: ${lastCommittedBlockInfo.subnetBlockNumber}, smart contract latest: ${lastSubmittedSubnetBlock.subnetBlockNumber}` + // Pull latest confirmed tx from mainnet + const smartContractData = await this.mainnetService.getLastAuditedBlock(); + // Pull latest confirmed block from subnet + const lastestSubnetCommittedBlock = + await this.subnetService.getLastCommittedBlockInfo(); + + const { shouldProcess, from, msg } = await this.shouldProcessSync( + smartContractData, + lastestSubnetCommittedBlock + ); + + if (shouldProcess) { + await this.submitTxs( + from, + lastestSubnetCommittedBlock.subnetBlockNumber ); - return; + return `Completed sync from ${from} to ${lastestSubnetCommittedBlock.subnetBlockNumber}`; } - await this.submitTxs( - lastSubmittedSubnetBlock.subnetBlockNumber, - lastCommittedBlockInfo.subnetBlockNumber - ); - this.cache.setLastSubmittedSubnetHeader(lastCommittedBlockInfo); + return msg; }; - - - async getLastSubmittedSubnetHeader(): Promise { - const lastSubmittedSubnetBlock = this.cache.getLastSubmittedSubnetHeader(); - if (lastSubmittedSubnetBlock) return lastSubmittedSubnetBlock; - // Else, our cache don't have such data - const smartContractData = await this.mainnetService.getLastAudittedBlock(); - return await this.subnetService.getCommittedBlockInfoByNum( - smartContractData.smartContractHeight - ); - } - // "from" is exclusive, we submit blocks "from + 1" till "to" private async submitTxs(from: number, to: number): Promise { @@ -140,7 +94,7 @@ export class Standard extends BaseProcessor { private async shouldProcessSync( smartContractData: SmartContractData, lastestSubnetCommittedBlock: SubnetBlockInfo - ): Promise<{ shouldProcess: boolean; from?: number }> { + ): Promise<{ shouldProcess: boolean; msg?: string, from?: number }> { const { subnetBlockHash, subnetBlockNumber } = lastestSubnetCommittedBlock; const { smartContractHash, @@ -163,10 +117,8 @@ export class Standard extends BaseProcessor { subnetBlockHash ); } - this.logger.info( - "Smart contract is ahead of subnet, nothing needs to be done, just wait" - ); - return { shouldProcess: false }; + const msg = "Smart contract is ahead of subnet, nothing needs to be done, just wait"; + return { shouldProcess: false, msg }; } else if (subnetBlockNumber == smartContractCommittedHeight) { if (smartContractCommittedHash != subnetBlockHash) { this.logger.error( @@ -178,10 +130,7 @@ export class Standard extends BaseProcessor { subnetBlockHash ); } - this.logger.info( - "Smart contract committed and subnet are already in sync, nothing needs to be done, waiting for new blocks" - ); - return { shouldProcess: false }; + return { shouldProcess: false, msg: "Smart contract committed and subnet are already in sync, nothing needs to be done, waiting for new blocks" }; } else { // Check the committed const auditedCommittedBlockInfoInSubnet = @@ -204,12 +153,9 @@ export class Standard extends BaseProcessor { // Verification for committed blocks are completed! We need to check where we shall start sync based on the last audited block (smartContractHash and height) in mainnet if (smartContractHash == subnetBlockHash) { // Same block height and hash - this.logger.info( - "Smart contract latest and subnet are already in sync, nothing needs to be done, waiting for new blocks" - ); - return { shouldProcess: false }; + return { shouldProcess: false, msg: "Smart contract latest and subnet are already in sync, nothing needs to be done, waiting for new blocks" }; } else if (subnetBlockNumber < smartContractHeight) { - // This is when subnet is behind the mainnet latest auditted + // This is when subnet is behind the mainnet latest audited const subnetHashInSmartContract = await this.mainnetService.getBlockHashByNumber(subnetBlockNumber); if (subnetHashInSmartContract != subnetBlockHash) { @@ -220,25 +166,26 @@ export class Standard extends BaseProcessor { return { shouldProcess: true, from: divergingHeight, + msg: `Forking happened but not yet committed on mainnet, we will need to recursively submit subnet headers from diverging point of ${divergingHeight}` }; } - this.logger.warn( - "Subnet is behind mainnet latest auditted blocks! This usually means there is another relayer on a different node who is ahead of this relayer in terms of mining and submitting txs. OR there gonna be forking soon!" - ); - return { shouldProcess: false }; + return { + shouldProcess: false, + msg: "Subnet is behind mainnet latest audited blocks! This usually means there is another relayer on a different node who is ahead of this relayer in terms of mining and submitting txs. OR there gonna be forking soon!" + }; } // Below is the case where subnet is ahead of mainnet and we need to do some more checks before submit txs - const audittedBlockInfoInSubnet = + const auditedBlockInfoInSubnet = await this.subnetService.getCommittedBlockInfoByNum( smartContractHeight ); - if (audittedBlockInfoInSubnet.subnetBlockHash != smartContractHash) { + if (auditedBlockInfoInSubnet.subnetBlockHash != smartContractHash) { const { divergingHeight } = await this.findDivergingPoint( smartContractHeight ); return { shouldProcess: true, - from: divergingHeight, + from: divergingHeight }; } // Everything seems normal, we will just submit txs from this point onwards. @@ -249,24 +196,27 @@ export class Standard extends BaseProcessor { } } - // Find the point where after this "divering block", chain start to split (fork) private async findDivergingPoint( heightToSearchFrom: number ): Promise<{ divergingHeight: number; divergingHash: string }> { - const mainnetHash = await this.mainnetService.getBlockHashByNumber( - heightToSearchFrom - ); - const subnetBlockInfo = await this.subnetService.getCommittedBlockInfoByNum( - heightToSearchFrom - ); - if (mainnetHash != subnetBlockInfo.subnetBlockHash) { - return this.findDivergingPoint(heightToSearchFrom - 1); + let currentHeight = heightToSearchFrom; + let mainnetHash: string; + let subnetBlockInfo: SubnetBlockInfo; + + while (currentHeight > 0) { + mainnetHash = await this.mainnetService.getBlockHashByNumber(currentHeight); + subnetBlockInfo = await this.subnetService.getCommittedBlockInfoByNum(currentHeight); + + if (mainnetHash != subnetBlockInfo.subnetBlockHash) { + currentHeight--; + } else { + break; + } } return { divergingHash: mainnetHash, - divergingHeight: heightToSearchFrom, + divergingHeight: currentHeight }; } - } \ No newline at end of file diff --git a/src/processors/index.ts b/src/processors/index.ts index 8b90675..e8340b2 100644 --- a/src/processors/index.ts +++ b/src/processors/index.ts @@ -4,23 +4,23 @@ import { createBullBoard } from '@bull-board/api'; import { BullAdapter } from '@bull-board/api/bullAdapter'; import { ExpressAdapter } from '@bull-board/express'; -import { Zero } from "./zero"; +import { Zero, NAME as zeroName } from "./zero"; import { config } from "./../config"; -import { Lite } from "./lite"; -import { Standard } from "./standard"; +import { Lite, NAME as liteName } from "./lite"; +import { Full, NAME as fullName } from "./full"; import { MainnetService } from "../service/mainnet"; enum Mode { - LITE = "LITE", - STANDARD = "STANDARD", - ZERO = "ZERO" + LITE = liteName, + FULL = fullName, + ZERO = zeroName } export class Processors { logger: bunyan; private processors: { lite: Lite; - standard: Standard; + full: Full; zero: Zero; } private mainnetService: MainnetService; @@ -29,7 +29,7 @@ export class Processors { this.logger = logger; this.processors = { lite: new Lite(logger), - standard: new Standard(logger), + full: new Full(logger), zero: new Zero(logger) // Register more processors here }; @@ -68,8 +68,8 @@ export class Processors { await this.processors.lite.reset(); break; // TODO: Add more processors here. e.g XDC-ZERO - case Mode.STANDARD: - await this.processors.standard.reset(); + case Mode.FULL: + await this.processors.full.reset(); break; case Mode.ZERO: await this.processors.zero.reset(); @@ -89,7 +89,7 @@ export class Processors { modes.push(Mode.LITE); break; case "full": - modes.push(Mode.STANDARD); + modes.push(Mode.FULL); break; default: throw new Error("No avaiable mode from mainnet smart contract API"); diff --git a/src/processors/lite.ts b/src/processors/lite.ts index 84297f4..16b2c91 100644 --- a/src/processors/lite.ts +++ b/src/processors/lite.ts @@ -43,7 +43,7 @@ export class Lite extends BaseProcessor { private async processEvent() { // Pull latest confirmed tx from mainnet - const latestBlock = await this.liteMainnetService.getLastAudittedBlock(); + const latestBlock = await this.liteMainnetService.getLastAuditedBlock(); // Pull latest confirm block from subnet const lastestSubnetCommittedBlock = await this.subnetService.getLastCommittedBlockInfo(); @@ -54,6 +54,8 @@ export class Lite extends BaseProcessor { latestBlock, lastestSubnetCommittedBlock.subnetBlockNumber ); + + return `Successfully submitted subnet header up till ${lastestSubnetCommittedBlock.subnetBlockNumber} into parent chain`; } private async liteSubmitTxs( @@ -123,7 +125,7 @@ export class Lite extends BaseProcessor { await this.liteMainnetService.submitTxs(results); } - const last = await this.liteMainnetService.getLastAudittedBlock(); + const last = await this.liteMainnetService.getLastAuditedBlock(); scCommittedHeight = last.smartContractCommittedHeight; scHash = last.smartContractHash; } diff --git a/src/processors/zero.ts b/src/processors/zero.ts index d5014cb..f3abac0 100644 --- a/src/processors/zero.ts +++ b/src/processors/zero.ts @@ -44,10 +44,9 @@ export class Zero extends BaseProcessor { async processEvent() { const payloads = await this.zeroService.getPayloads(); if (payloads.length == 0) { - this.logger.info( - "Nothing to process in xdc-zero, wait for the next event log" - ); - return; + const msg = "Nothing to process in xdc-zero, wait for the next event log"; + this.logger.info(msg); + return msg; } const lastPayload = payloads[payloads.length - 1]; const lastIndexFromSubnet = lastPayload[0]; @@ -58,13 +57,9 @@ export class Zero extends BaseProcessor { const lastBlockNumber = lastPayload[7]; const cscBlockNumber = await this.zeroService.getLatestBlockNumberFromCsc(); if (cscBlockNumber < lastBlockNumber) { - this.logger.info( - "wait for csc block lastBlockNumber:" + - lastBlockNumber + - " cscBlockNumber:" + - cscBlockNumber - ); - return; + const msg = `Wait for csc block lastBlockNumber: ${lastBlockNumber}, cscBlockNumber: ${cscBlockNumber}`; + this.logger.info(msg); + return msg; } if (lastIndexFromSubnet > lastIndexfromParentnet) { @@ -81,6 +76,8 @@ export class Zero extends BaseProcessor { } } } - this.logger.info("Completed the xdc-zero sync, wait for the next cycle"); + const msg = `Completed the xdc-zero sync up till ${lastIndexFromSubnet} from subnet, wait for the next cycle`; + this.logger.info(msg); + return msg; } } diff --git a/src/server.ts b/src/server.ts index 486c06c..ecd010c 100644 --- a/src/server.ts +++ b/src/server.ts @@ -3,6 +3,7 @@ import express from "express"; import cors from "cors"; import bodyParser from "body-parser"; import { ExpressAdapter } from '@bull-board/express'; +import Redis from 'ioredis'; import { config } from "./config"; import { Processors } from "./processors"; @@ -29,6 +30,26 @@ app.use('/', serverAdapter.getRouter()); app.listen(config.port, async () => { logger.info(`Relayer running on port ${config.port}`); + await checkConnection(); await processors.init(serverAdapter).reset(); }); + +// Check if necessary infrastructures are up running, such as redis +const checkConnection = async () => { + logger.info("Checking redis connection"); + const redisClient = new Redis({ + maxRetriesPerRequest: 2 + }); + try { + // Test command using await + const result = await redisClient.ping(); + logger.info('Redis is connected! PING response:', result); + } catch (error) { + logger.error('Make sure you have redis running, error connecting to Redis:', error); + process.exit(1); + } finally { + // Close the Redis connection + redisClient.disconnect(); + } +}; \ No newline at end of file diff --git a/src/service/mainnet/index.ts b/src/service/mainnet/index.ts index 9f7de05..500d7b2 100644 --- a/src/service/mainnet/index.ts +++ b/src/service/mainnet/index.ts @@ -52,7 +52,7 @@ export class MainnetService { /* A method to fetch the last subnet block that has been stored/audited in mainnet XDC **/ - async getLastAudittedBlock(): Promise { + async getLastAuditedBlock(): Promise { try { const result = await this.smartContractInstance.methods .getLatestBlocks() @@ -72,7 +72,7 @@ export class MainnetService { latestSmComittedHash, latestSmHeight ); - throw new Error("Unable to get last auditted block informations"); + throw new Error("Unable to get last audited block informations"); } return { smartContractHash: latestBlockHash, @@ -182,7 +182,7 @@ export class LiteMainnetService { /* A method to fetch the last subnet block that has been stored/audited in mainnet XDC **/ - async getLastAudittedBlock(): Promise { + async getLastAuditedBlock(): Promise { try { const result = await this.liteSmartContractInstance.methods .getLatestBlocks() @@ -202,7 +202,7 @@ export class LiteMainnetService { latestSmComittedHash, latestSmHeight ); - throw new Error("Unable to get last auditted block informations"); + throw new Error("Unable to get last audited block informations"); } return { smartContractHash: latestBlockHash,