Skip to content

Commit

Permalink
feat: process messages in batches
Browse files Browse the repository at this point in the history
  • Loading branch information
dev-737 committed Nov 11, 2024
1 parent fb21c87 commit 78c9473
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 48 deletions.
111 changes: 98 additions & 13 deletions src/services/BroadcastService.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
// src/services/BroadcastService.ts
import { Message, WebhookClient, WebhookMessageCreateOptions, HexColorString } from 'discord.js';
import { Hub, connectedList } from '@prisma/client';
import HubSettingsManager from '#main/managers/HubSettingsManager.js';
Expand All @@ -13,7 +12,32 @@ import { censor } from '#utils/ProfanityUtils.js';
import { trimAndCensorBannedWebhookWords } from '#utils/Utils.js';
import { ReferredMsgData, BroadcastOpts } from '#main/utils/network/Types.js';

const BATCH_SIZE = 15;
const CONCURRENCY_LIMIT = 10;

export class BroadcastService {
private webhookClients: Map<string, WebhookClient> = new Map();

constructor() {
setInterval(() => this.cleanupWebhookClients(), 5 * 60 * 1000); // 5 minutes
}

private getWebhookClient(webhookURL: string): WebhookClient {
let client = this.webhookClients.get(webhookURL);
if (!client) {
client = new WebhookClient({ url: webhookURL });
this.webhookClients.set(webhookURL, client);
}
return client;
}

private cleanupWebhookClients() {
this.webhookClients.forEach((client, url) => {
client.destroy();
this.webhookClients.delete(url);
});
}

async broadcastMessage(
message: Message<true>,
hub: Hub,
Expand All @@ -22,15 +46,26 @@ export class BroadcastService {
connection: connectedList,
) {
const attachmentURL = await this.resolveAttachmentURL(message);
const referredMessage = await this.fetchReferredMessage(message);
const referredMsgData = await getReferredMsgData(referredMessage);

const username = this.getUsername(settings, message);
const censoredContent = censor(message.content);
const referredMessage = await this.fetchReferredMessage(message);
const referredMsgData = await getReferredMsgData(referredMessage);
const referredContent = this.getReferredContent(referredMsgData);

const sendResult = await Promise.all(
hubConnections.map((conn) =>
// Sort connections by last active first
const sortedHubConnections = hubConnections.sort(
(a, b) => b.lastActive.getTime() - a.lastActive.getTime(),
);

Logger.debug(`Broadcasting message to ${sortedHubConnections.length} connections`);

// Split connections into batches
const batches = this.chunkArray(sortedHubConnections, BATCH_SIZE);
const allResults: NetworkWebhookSendResult[] = [];

// Process batches with concurrency limit
for (const batch of batches) {
const batchPromises = batch.map((conn) =>
this.sendToConnection(message, hub, conn, {
attachmentURL,
referredMsgData,
Expand All @@ -39,12 +74,56 @@ export class BroadcastService {
censoredContent,
referredContent,
}),
),
);
);

Logger.debug(`Sending batch of ${batch.length} messages`);
const batchResults = await this.processWithConcurrency(batchPromises, CONCURRENCY_LIMIT);
allResults.push(...batchResults);
Logger.debug(`Sent batch of ${batch.length} messages`);
}

await storeMessageData(message, sendResult, connection.hubId, referredMsgData.dbReferrence);
// Batch store message data
await storeMessageData(message, allResults, connection.hubId, referredMsgData.dbReferrence);
}

private chunkArray<T>(array: T[], size: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}

private async processWithConcurrency<T>(
promises: Promise<T>[],
concurrency: number,
): Promise<T[]> {
const results: T[] = [];
let index = 0;

async function next(): Promise<void> {
const currentIndex = index++;
if (currentIndex >= promises.length) return;

try {
const result = await promises[currentIndex];
results[currentIndex] = result;
}
catch (error) {
results[currentIndex] = error;
}

await next();
}

// Start initial batch of promises
const initialPromises = Array(Math.min(concurrency, promises.length))
.fill(null)
.map(() => next());

await Promise.all(initialPromises);
return results;
}
async resolveAttachmentURL(message: Message) {
return (
message.attachments.first()?.url ?? (await getAttachmentURL(message.content)) ?? undefined
Expand Down Expand Up @@ -91,7 +170,10 @@ export class BroadcastService {
return { messageRes, webhookURL: connection.webhookURL, mode };
}
catch (e) {
Logger.error(`Failed to send message to ${connection.channelId} in server ${connection.serverId}`, e);
Logger.error(
`Failed to send message to ${connection.channelId} in server ${connection.serverId}`,
e,
);
return { error: e.message, webhookURL: connection.webhookURL };
}
}
Expand All @@ -109,7 +191,11 @@ export class BroadcastService {
): WebhookMessageCreateOptions {
const { dbReferrence, referredAuthor } = opts.referredMsgData;
const author = { username: opts.username, avatarURL: message.author.displayAvatarURL() };
const jumpButton = this.getJumpButton(referredAuthor?.username ?? 'Unknown', connection, dbReferrence);
const jumpButton = this.getJumpButton(
referredAuthor?.username ?? 'Unknown',
connection,
dbReferrence,
);
const servername = trimAndCensorBannedWebhookWords(message.guild.name);

const messageFormatter = new MessageFormattingService(connection);
Expand All @@ -134,8 +220,7 @@ export class BroadcastService {
}

private async sendMessage(webhookUrl: string, data: WebhookMessageCreateOptions) {
const webhook = new WebhookClient({ url: webhookUrl });
const webhook = this.getWebhookClient(webhookUrl);
return await webhook.send(data);
}
}

81 changes: 46 additions & 35 deletions src/utils/network/messageUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,43 +45,54 @@ export const addBroadcasts = async (
originalMsgId: Snowflake,
...broadcasts: Broadcast[]
) => {
const redis = getRedis();
const broadcastsKey = `${RedisKeys.broadcasts}:${originalMsgId}:${hubId}`;

// Single loop to process all broadcast data
const { broadcastEntries, reverseLookups } = broadcasts.reduce(
(acc, broadcast) => {
const { messageId, channelId, mode } = broadcast;
const broadcastInfo = JSON.stringify({ mode, messageId, channelId, originalMsgId });

// Add to broadcasts hash
acc.broadcastEntries.push(channelId, broadcastInfo);

// Prepare reverse lookup
acc.reverseLookups.push(
`${RedisKeys.messageReverse}:${messageId}`,
`${originalMsgId}:${hubId}`,
);

return acc;
},
{ broadcastEntries: [] as string[], reverseLookups: [] as string[] },
);

Logger.debug(`Adding ${broadcasts.length} broadcasts for message ${originalMsgId}`);

// Add all broadcasts to the hash in a single operation
await redis.hset(broadcastsKey, broadcastEntries);
await redis.expire(broadcastsKey, 86400);
await redis.mset(reverseLookups);
try {
const redis = getRedis();
const broadcastsKey = `${RedisKeys.broadcasts}:${originalMsgId}:${hubId}`;
const pipeline = redis.pipeline();

// Prepare all operations in a single reduce to minimize iterations
const { broadcastEntries, reverseLookupKeys } = broadcasts.reduce(
(acc, broadcast) => {
const { messageId, channelId, mode } = broadcast;
const broadcastInfo = JSON.stringify({ mode, messageId, channelId, originalMsgId });

// Add to broadcasts entries
acc.broadcastEntries.push(channelId, broadcastInfo);

// Store reverse lookup key for later expiry setting
const reverseKey = `${RedisKeys.messageReverse}:${messageId}`;
acc.reverseLookupKeys.push(reverseKey);

// Add reverse lookup to pipeline
pipeline.set(reverseKey, `${originalMsgId}:${hubId}`);

return acc;
},
{
broadcastEntries: [] as string[],
reverseLookupKeys: [] as string[],
},
);

Logger.debug(`Adding ${broadcasts.length} broadcasts for message ${originalMsgId}`);

// Add main broadcast hash
pipeline.hset(broadcastsKey, broadcastEntries);
pipeline.expire(broadcastsKey, 86400);

// Set expiry for all reverse lookups in the same pipeline
reverseLookupKeys.forEach((key) => {
pipeline.expire(key, 86400);
});

Logger.debug(`Added ${broadcasts.length} broadcasts for message ${originalMsgId}`);
// Execute all Redis operations in a single pipeline
await pipeline.exec();

reverseLookups
.filter((_, i) => i % 2 === 0)
.forEach(async (key) => {
await redis.expire(key, 86400);
});
Logger.debug(`Added ${broadcasts.length} broadcasts for message ${originalMsgId}`);
}
catch (error) {
Logger.error('Failed to add broadcasts', error);
}
};

export const getBroadcasts = async (originalMsgId: string, hubId: string) => {
Expand Down

0 comments on commit 78c9473

Please sign in to comment.