Skip to content

Commit

Permalink
Minor update on queue name (#44)
Browse files Browse the repository at this point in the history
* Minor update on the queue name change from standard to full, as well as returning some msg for each queue job process

* Add redis connect checker

* Update full processor to show its status on the UI for each job
  • Loading branch information
wjrjerome authored Dec 22, 2023
1 parent 2960b6f commit 33df7c0
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 129 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Relayer is a communication bridge between XDC parent chain and its subnet. It pr
To build and run this app locally you will need:

- Install [Node.js](https://nodejs.org/en/). Note, we use node 14
- If you are running the service on your local machine, then you need a redis

# Getting Started

Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"cors": "^2.8.5",
"dotenv": "^8.2.0",
"express": "^4.18.2",
"ioredis": "^5.3.2",
"lodash": "^4.17.21",
"node-cache": "^5.1.2",
"node-fetch": "2",
Expand Down
2 changes: 1 addition & 1 deletion src/processors/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# How to add new processors?

1. Read our `lite.ts`(simple version) or the `standard.ts`(more complex version) as examples
1. Read our `lite.ts`(simple version) or the `full.ts`(more complex version) as examples
2. Assume you plan to add a new processor called `XXX`. First create the file `XXX.ts` under current directory.
3. Add `export class XXX extends BaseProcessor` where all our processors has some common methods such as `init` and `reset`. Implement those methods.
4. Go to `index.ts` in this directory, register your processors with `enum Mode`, `private processors` (class property), `reset` method and add your custom start up condition in `getRunningModes` method
Expand Down
148 changes: 49 additions & 99 deletions src/processors/standard.ts → src/processors/full.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,23 @@ import bunyan from "bunyan";
import { config } from "../config";
import { MainnetService, SmartContractData } from "../service/mainnet";
import { SubnetBlockInfo, SubnetService } from "../service/subnet";
import { Cache } from "../service/cache";
import { chunkBy, sleep } from "../utils";
import { ForkingError } from "../errors/forkingError";
import { BaseProcessor } from "./base";

const chunkByMaxFetchSize = chunkBy(config.chunkSize);
export const NAME = "STANDARD";
export const NAME = "FULL";
const REPEAT_JOB_OPT = { jobId: NAME, repeat: { cron: config.cronJob.jobExpression}};
export class Standard extends BaseProcessor {
export class Full extends BaseProcessor {
private mainnetService: MainnetService;
private subnetService: SubnetService;
cache: Cache;
logger: bunyan;

constructor(logger: bunyan) {
super(NAME);
this.logger = logger;
this.mainnetService = new MainnetService(config.mainnet, logger);
this.subnetService = new SubnetService(config.subnet, logger);
this.cache = this.cache = new Cache(logger);
}

getQueue() {
Expand All @@ -35,7 +32,7 @@ export class Standard extends BaseProcessor {
try {
done(null, await this.processEvent());
} catch (error) {
this.logger.error("Fail to process standard relayer job", {
this.logger.error("Fail to process full relayer job", {
message: error.message,
});
// Report the error
Expand All @@ -48,73 +45,30 @@ export class Standard extends BaseProcessor {

// Reset and start the state sync until success
async reset() {
try {
// Stop and remove repeatable jobs
await this.queue.removeRepeatable(NAME, REPEAT_JOB_OPT.repeat);
// Clean timer
this.cache.cleanCache();
// Pull latest confirmed tx from mainnet
const smartContractData = await this.mainnetService.getLastAudittedBlock();
// Pull latest confirm block from subnet
const lastestSubnetCommittedBlock =
await this.subnetService.getLastCommittedBlockInfo();
const { shouldProcess, from } = await this.shouldProcessSync(
smartContractData,
lastestSubnetCommittedBlock
);

if (shouldProcess) {
await this.submitTxs(
from,
lastestSubnetCommittedBlock.subnetBlockNumber
);
// Store subnet block into cache
this.cache.setLastSubmittedSubnetHeader(lastestSubnetCommittedBlock);
}
// Keep the "jobId: NAME" and its repeat configuration here so that bull won't create a new repeated job each time we run this code.
await this.queue.add({}, REPEAT_JOB_OPT);
} catch (error) {
this.logger.error(
`Error while bootstraping, system will go into sleep mode for ${
config.reBootstrapWaitingTime / 1000 / 60
} minutes before re-processing!, message: ${error?.message}`
);
await sleep(config.reBootstrapWaitingTime);
this.reset();
}
await this.queue.add({}, REPEAT_JOB_OPT);
}

async processEvent() {
// Pull subnet's latest committed block
const lastSubmittedSubnetBlock = await this.getLastSubmittedSubnetHeader();
const lastCommittedBlockInfo = await this.subnetService.getLastCommittedBlockInfo();
if (
lastCommittedBlockInfo.subnetBlockNumber <=
lastSubmittedSubnetBlock.subnetBlockNumber
) {
this.logger.info(
`Already on the latest, nothing to subnet, Subnet latest: ${lastCommittedBlockInfo.subnetBlockNumber}, smart contract latest: ${lastSubmittedSubnetBlock.subnetBlockNumber}`
// Pull latest confirmed tx from mainnet
const smartContractData = await this.mainnetService.getLastAuditedBlock();
// Pull latest confirmed block from subnet
const lastestSubnetCommittedBlock =
await this.subnetService.getLastCommittedBlockInfo();

const { shouldProcess, from, msg } = await this.shouldProcessSync(
smartContractData,
lastestSubnetCommittedBlock
);

if (shouldProcess) {
await this.submitTxs(
from,
lastestSubnetCommittedBlock.subnetBlockNumber
);
return;
return `Completed sync from ${from} to ${lastestSubnetCommittedBlock.subnetBlockNumber}`;
}
await this.submitTxs(
lastSubmittedSubnetBlock.subnetBlockNumber,
lastCommittedBlockInfo.subnetBlockNumber
);
this.cache.setLastSubmittedSubnetHeader(lastCommittedBlockInfo);
return msg;
};


async getLastSubmittedSubnetHeader(): Promise<SubnetBlockInfo> {
const lastSubmittedSubnetBlock = this.cache.getLastSubmittedSubnetHeader();
if (lastSubmittedSubnetBlock) return lastSubmittedSubnetBlock;
// Else, our cache don't have such data
const smartContractData = await this.mainnetService.getLastAudittedBlock();
return await this.subnetService.getCommittedBlockInfoByNum(
smartContractData.smartContractHeight
);
}


// "from" is exclusive, we submit blocks "from + 1" till "to"
private async submitTxs(from: number, to: number): Promise<void> {
Expand All @@ -140,7 +94,7 @@ export class Standard extends BaseProcessor {
private async shouldProcessSync(
smartContractData: SmartContractData,
lastestSubnetCommittedBlock: SubnetBlockInfo
): Promise<{ shouldProcess: boolean; from?: number }> {
): Promise<{ shouldProcess: boolean; msg?: string, from?: number }> {
const { subnetBlockHash, subnetBlockNumber } = lastestSubnetCommittedBlock;
const {
smartContractHash,
Expand All @@ -163,10 +117,8 @@ export class Standard extends BaseProcessor {
subnetBlockHash
);
}
this.logger.info(
"Smart contract is ahead of subnet, nothing needs to be done, just wait"
);
return { shouldProcess: false };
const msg = "Smart contract is ahead of subnet, nothing needs to be done, just wait";
return { shouldProcess: false, msg };
} else if (subnetBlockNumber == smartContractCommittedHeight) {
if (smartContractCommittedHash != subnetBlockHash) {
this.logger.error(
Expand All @@ -178,10 +130,7 @@ export class Standard extends BaseProcessor {
subnetBlockHash
);
}
this.logger.info(
"Smart contract committed and subnet are already in sync, nothing needs to be done, waiting for new blocks"
);
return { shouldProcess: false };
return { shouldProcess: false, msg: "Smart contract committed and subnet are already in sync, nothing needs to be done, waiting for new blocks" };
} else {
// Check the committed
const auditedCommittedBlockInfoInSubnet =
Expand All @@ -204,12 +153,9 @@ export class Standard extends BaseProcessor {
// Verification for committed blocks are completed! We need to check where we shall start sync based on the last audited block (smartContractHash and height) in mainnet
if (smartContractHash == subnetBlockHash) {
// Same block height and hash
this.logger.info(
"Smart contract latest and subnet are already in sync, nothing needs to be done, waiting for new blocks"
);
return { shouldProcess: false };
return { shouldProcess: false, msg: "Smart contract latest and subnet are already in sync, nothing needs to be done, waiting for new blocks" };
} else if (subnetBlockNumber < smartContractHeight) {
// This is when subnet is behind the mainnet latest auditted
// This is when subnet is behind the mainnet latest audited
const subnetHashInSmartContract =
await this.mainnetService.getBlockHashByNumber(subnetBlockNumber);
if (subnetHashInSmartContract != subnetBlockHash) {
Expand All @@ -220,25 +166,26 @@ export class Standard extends BaseProcessor {
return {
shouldProcess: true,
from: divergingHeight,
msg: `Forking happened but not yet committed on mainnet, we will need to recursively submit subnet headers from diverging point of ${divergingHeight}`
};
}
this.logger.warn(
"Subnet is behind mainnet latest auditted blocks! This usually means there is another relayer on a different node who is ahead of this relayer in terms of mining and submitting txs. OR there gonna be forking soon!"
);
return { shouldProcess: false };
return {
shouldProcess: false,
msg: "Subnet is behind mainnet latest audited blocks! This usually means there is another relayer on a different node who is ahead of this relayer in terms of mining and submitting txs. OR there gonna be forking soon!"
};
}
// Below is the case where subnet is ahead of mainnet and we need to do some more checks before submit txs
const audittedBlockInfoInSubnet =
const auditedBlockInfoInSubnet =
await this.subnetService.getCommittedBlockInfoByNum(
smartContractHeight
);
if (audittedBlockInfoInSubnet.subnetBlockHash != smartContractHash) {
if (auditedBlockInfoInSubnet.subnetBlockHash != smartContractHash) {
const { divergingHeight } = await this.findDivergingPoint(
smartContractHeight
);
return {
shouldProcess: true,
from: divergingHeight,
from: divergingHeight
};
}
// Everything seems normal, we will just submit txs from this point onwards.
Expand All @@ -249,24 +196,27 @@ export class Standard extends BaseProcessor {
}
}


// Find the point where after this "divering block", chain start to split (fork)
private async findDivergingPoint(
heightToSearchFrom: number
): Promise<{ divergingHeight: number; divergingHash: string }> {
const mainnetHash = await this.mainnetService.getBlockHashByNumber(
heightToSearchFrom
);
const subnetBlockInfo = await this.subnetService.getCommittedBlockInfoByNum(
heightToSearchFrom
);
if (mainnetHash != subnetBlockInfo.subnetBlockHash) {
return this.findDivergingPoint(heightToSearchFrom - 1);
let currentHeight = heightToSearchFrom;
let mainnetHash: string;
let subnetBlockInfo: SubnetBlockInfo;

while (currentHeight > 0) {
mainnetHash = await this.mainnetService.getBlockHashByNumber(currentHeight);
subnetBlockInfo = await this.subnetService.getCommittedBlockInfoByNum(currentHeight);

if (mainnetHash != subnetBlockInfo.subnetBlockHash) {
currentHeight--;
} else {
break;
}
}
return {
divergingHash: mainnetHash,
divergingHeight: heightToSearchFrom,
divergingHeight: currentHeight
};
}

}
22 changes: 11 additions & 11 deletions src/processors/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,23 @@ import { createBullBoard } from '@bull-board/api';
import { BullAdapter } from '@bull-board/api/bullAdapter';
import { ExpressAdapter } from '@bull-board/express';

import { Zero } from "./zero";
import { Zero, NAME as zeroName } from "./zero";
import { config } from "./../config";
import { Lite } from "./lite";
import { Standard } from "./standard";
import { Lite, NAME as liteName } from "./lite";
import { Full, NAME as fullName } from "./full";
import { MainnetService } from "../service/mainnet";

enum Mode {
LITE = "LITE",
STANDARD = "STANDARD",
ZERO = "ZERO"
LITE = liteName,
FULL = fullName,
ZERO = zeroName
}

export class Processors {
logger: bunyan;
private processors: {
lite: Lite;
standard: Standard;
full: Full;
zero: Zero;
}
private mainnetService: MainnetService;
Expand All @@ -29,7 +29,7 @@ export class Processors {
this.logger = logger;
this.processors = {
lite: new Lite(logger),
standard: new Standard(logger),
full: new Full(logger),
zero: new Zero(logger)
// Register more processors here
};
Expand Down Expand Up @@ -68,8 +68,8 @@ export class Processors {
await this.processors.lite.reset();
break;
// TODO: Add more processors here. e.g XDC-ZERO
case Mode.STANDARD:
await this.processors.standard.reset();
case Mode.FULL:
await this.processors.full.reset();
break;
case Mode.ZERO:
await this.processors.zero.reset();
Expand All @@ -89,7 +89,7 @@ export class Processors {
modes.push(Mode.LITE);
break;
case "full":
modes.push(Mode.STANDARD);
modes.push(Mode.FULL);
break;
default:
throw new Error("No avaiable mode from mainnet smart contract API");
Expand Down
6 changes: 4 additions & 2 deletions src/processors/lite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export class Lite extends BaseProcessor {

private async processEvent() {
// Pull latest confirmed tx from mainnet
const latestBlock = await this.liteMainnetService.getLastAudittedBlock();
const latestBlock = await this.liteMainnetService.getLastAuditedBlock();
// Pull latest confirm block from subnet
const lastestSubnetCommittedBlock =
await this.subnetService.getLastCommittedBlockInfo();
Expand All @@ -54,6 +54,8 @@ export class Lite extends BaseProcessor {
latestBlock,
lastestSubnetCommittedBlock.subnetBlockNumber
);

return `Successfully submitted subnet header up till ${lastestSubnetCommittedBlock.subnetBlockNumber} into parent chain`;
}

private async liteSubmitTxs(
Expand Down Expand Up @@ -123,7 +125,7 @@ export class Lite extends BaseProcessor {
await this.liteMainnetService.submitTxs(results);
}

const last = await this.liteMainnetService.getLastAudittedBlock();
const last = await this.liteMainnetService.getLastAuditedBlock();
scCommittedHeight = last.smartContractCommittedHeight;
scHash = last.smartContractHash;
}
Expand Down
Loading

0 comments on commit 33df7c0

Please sign in to comment.