Skip to content

Commit

Permalink
Merge pull request #11 from 13x54n/auth
Browse files Browse the repository at this point in the history
Multi Node from different connection enabled
  • Loading branch information
13x54n authored Jul 10, 2024
2 parents d8eb10f + 5edcd81 commit 527dd12
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 105 deletions.
63 changes: 31 additions & 32 deletions client.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
/**
* @author: Lexy <not.so.lexy@gmail.com, 13x54n>
*/
const net = require("net");
const WebSocket = require("ws");

const TCP_PORT = 18943;
const TCP_HOST = "2.tcp.ngrok.io";
const WS_PORT = 8080;

// TCP Client Setup
const tcpClient = new net.Socket();
tcpClient.connect(TCP_PORT, TCP_HOST, () => {
console.log("Connected to TCP server");
});

tcpClient.on("data", (data) => {
console.log(`${data}`);
try {
// Parse incoming data assuming it's JSON
const bufferData = Buffer.from(data);
const bufferString = bufferData.toString("utf8");
const jsonData = JSON.parse(bufferString);

console.log("This looks awesome now: ", jsonData);
} catch (error) {
console.error(
`Worker ${process.pid}: Error parsing JSON or handling message: ${error.message}`
);
}
});

tcpClient.on("close", () => {
Expand All @@ -22,52 +34,39 @@ tcpClient.on("error", (err) => {
console.error(`TCP Client Error: ${err.message}`);
});

const WebSocket = require("ws");

const WS_PORT = 8080;
const wss = new WebSocket.Server({ port: WS_PORT });

wss.on("listening", () => {
// WebSocket Server Setup
const wss = new WebSocket.Server({ port: WS_PORT }, () => {
console.log(`WS Server: ws://localhost:${WS_PORT}`);
});

wss.on("connection", (ws) => {
console.log("Client connected");

ws.on("message", async (data) => {
/**
* data format
* {
* method_name: string,
* payload:,
* from: Lexy
* }
*/
try {
const bufferData = Buffer.from(data);
const bufferString = bufferData.toString("utf8");
const jsonObject = JSON.parse(bufferString);

console.log("Parsed JSON object:", jsonObject);
const message = JSON.parse(data); // Assuming data is JSON string

// Forward message to TCP server
tcpClient.write(data, () => {
if (!tcpClient.writable) {
console.log(
"Server might be dead! Restarting both client & server might help."
"TCP server might be dead! Restarting both client & server might help."
);
return res.status(500).json({
error:
"Server might be dead! Restarting both client & server might help.",
});
// Handle TCP server error
}

console.log(`Sent to TCP server: ${data}`);
});

// Handle the jsonObject as needed
// Broadcast message to all WebSocket clients
wss.clients.forEach((client) => {
if (client !== ws && client.readyState === WebSocket.OPEN) {
client.send(data);
console.log(client);
}
});
} catch (error) {
console.error("Error parsing JSON:", error.message);
// Handle parsing error or invalid data scenario
console.error("Error handling message:", error.message);
// Handle error scenario
}
});

Expand Down
161 changes: 88 additions & 73 deletions server.js
Original file line number Diff line number Diff line change
@@ -1,95 +1,110 @@
/**
* @author: Lexy <not.so.lexy@gmail.com, 13x54n>
*/
const cluster = require("cluster");
const net = require("net"); // tcp
const numCPUs = require("os").cpus().length;

const port = 7070;
const host = "127.0.0.1";

/**
* @dev Cluster Module(Improved Performance): Refer to https://nodejs.org/api/cluster.html#cluster
*/
if (cluster.isMaster) {
// Fork workers
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
// Code for TCP server
const server = net.createServer();

cluster.on("exit", (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died`);
// Fork a new worker in case of worker death
cluster.fork();
});
} else {
// Code for TCP server inside each worker
const server = net.createServer();
/**
* Determining the ideal maximum number of connections (server.maxConnections) for a TCP server
* depends on various factors including hardware resources, expected workload, and the nature of
* the application.
* Here are some considerations to help determine an appropriate server.maxConnections value:
*
* server.maxConnections=numCPUs×connections per core=4×50=200
*/
server.maxConnections = 100;

server.listen(port, host, () => {
// @dev need to benchmark whats the ideal connections
server.maxConnections = 100;

// Start listening on defined port and host
server.listen(port, host, () => {
console.log(`Worker ${process.pid}: TCP Server is running on port ${port}.`);
});

let sockets = [];

// Event handler for new connections
server.on("connection", function (sock) {
if (sockets.length >= server.maxConnections) {
// Handle exceeding max connections -> reject new connections
console.log(
`Worker ${process.pid}: TCP Server is running on port ${port}.`
`Worker ${process.pid}: Max connections reached. Rejecting new connection from ${sock.remoteAddress}`
);
});
sock.destroy();
return;
}

// Log new connection
console.log(
`Worker ${process.pid}: CONNECTED: ${sock.remoteAddress}:${sock.remotePort}`
);

// Add the socket to the sockets array
sockets.push(sock);

let sockets = [];
// Event handler for incoming data
sock.on("data", function (data) {
try {
// Parse incoming data assuming it's JSON
const bufferData = Buffer.from(data);
const bufferString = bufferData.toString("utf8");
const jsonData = JSON.parse(bufferString);

server.on("connection", function (sock) {
if (sockets.length >= server.maxConnections) {
/**
* Handle exceeding max connections -> reject new connections, but better algorithm can be implemented
*/
console.log(
`Worker ${process.pid}: Max connections reached. Rejecting new connection from ${sock.remoteAddress}`
`Worker ${process.pid}: Received data from ${sock.remoteAddress}:${sock.remotePort}:`,
jsonData
);

// Handle different message types
if (jsonData.method_name === "broadcast") {
// Broadcast message to all connected clients
broadcastToAll("Some Calculated Output!");
} else if (jsonData.method_name === "private_message") {
// Example of sending a private message to a specific client
sendPrivateMessage(sock, "This is a private message!");
}
} catch (error) {
console.error(
`Worker ${process.pid}: Error parsing JSON or handling message: ${error.message}`
);
sock.destroy();
return;
}
});

// Event handler for connection close
sock.on("close", function () {
console.log(
`Worker ${process.pid}: CONNECTED: ${sock.remoteAddress}:${sock.remotePort}`// this should be user public address
`Worker ${process.pid}: CLOSED: ${sock.remoteAddress} ${sock.remotePort}`
);

sock.on("data", function (data) {
// @dev socket connection must only be listed if node is authorized
sockets.push(sock);
// console.log(`Worker ${process.pid}: DATA ${sock.remoteAddress}: ${data}`);

sock.write(`${sock.remoteAddress}:${sock.remotePort} said ${data}, and Helllo, from Lexy!\n`);
// Write the data back to all the connected clients
// sockets.forEach(function (sock) {
// });

// @dev receive socket information
// switch(data){
// case '':
// return;
// default:
// return;
// }
});

// Add a 'close' event handler to this instance of socket
sock.on("close", function (data) {
console.log(
`Worker ${process.pid}: CLOSED: ${sock.remoteAddress} ${sock.remotePort}`
);
let index = sockets.findIndex(function (o) {
return (
o.remoteAddress === sock.remoteAddress &&
o.remotePort === sock.remotePort
);
});
if (index !== -1) sockets.splice(index, 1);
});
// Remove the closed socket from the sockets array
sockets = sockets.filter((socket) => socket !== sock);
});
});

// Function to broadcast message to all connected sockets
function broadcastToAll(message) {
sockets.forEach(function (socket) {
if (!socket.destroyed) {
socket.write(JSON.stringify(message));
}
});
}

// Function to get all connected peers
function getAllPeers() {
const peers = sockets.map((socket) => ({
remoteAddress: socket.remoteAddress,
remotePort: socket.remotePort,
// connectedAt: socket.connectTime, // Assuming connectTime is a custom property or you have a way to track connection time
}));

return peers;
}

// Function to send a private message to a specific client
function sendPrivateMessage(target, message) {
if (!target.destroyed) {
const peers = getAllPeers();
target.write(`${JSON.stringify(peers)}\n`);
} else {
console.log(
`Worker ${process.pid}: Unable to send private message: Client ${target.remoteAddress}:${target.remotePort} is disconnected.`
);
}
}

0 comments on commit 527dd12

Please sign in to comment.