diff --git a/src/services/BroadcastService.ts b/src/services/BroadcastService.ts index 88674ecb..32e3d606 100644 --- a/src/services/BroadcastService.ts +++ b/src/services/BroadcastService.ts @@ -1,4 +1,3 @@ -// src/services/BroadcastService.ts import { Message, WebhookClient, WebhookMessageCreateOptions, HexColorString } from 'discord.js'; import { Hub, connectedList } from '@prisma/client'; import HubSettingsManager from '#main/managers/HubSettingsManager.js'; @@ -13,7 +12,32 @@ import { censor } from '#utils/ProfanityUtils.js'; import { trimAndCensorBannedWebhookWords } from '#utils/Utils.js'; import { ReferredMsgData, BroadcastOpts } from '#main/utils/network/Types.js'; +const BATCH_SIZE = 15; +const CONCURRENCY_LIMIT = 10; + export class BroadcastService { + private webhookClients: Map = new Map(); + + constructor() { + setInterval(() => this.cleanupWebhookClients(), 5 * 60 * 1000); // 5 minutes + } + + private getWebhookClient(webhookURL: string): WebhookClient { + let client = this.webhookClients.get(webhookURL); + if (!client) { + client = new WebhookClient({ url: webhookURL }); + this.webhookClients.set(webhookURL, client); + } + return client; + } + + private cleanupWebhookClients() { + this.webhookClients.forEach((client, url) => { + client.destroy(); + this.webhookClients.delete(url); + }); + } + async broadcastMessage( message: Message, hub: Hub, @@ -22,15 +46,26 @@ export class BroadcastService { connection: connectedList, ) { const attachmentURL = await this.resolveAttachmentURL(message); - const referredMessage = await this.fetchReferredMessage(message); - const referredMsgData = await getReferredMsgData(referredMessage); - const username = this.getUsername(settings, message); const censoredContent = censor(message.content); + const referredMessage = await this.fetchReferredMessage(message); + const referredMsgData = await getReferredMsgData(referredMessage); const referredContent = this.getReferredContent(referredMsgData); - const sendResult = await Promise.all( - hubConnections.map((conn) => + // Sort connections by last active first + const sortedHubConnections = hubConnections.sort( + (a, b) => b.lastActive.getTime() - a.lastActive.getTime(), + ); + + Logger.debug(`Broadcasting message to ${sortedHubConnections.length} connections`); + + // Split connections into batches + const batches = this.chunkArray(sortedHubConnections, BATCH_SIZE); + const allResults: NetworkWebhookSendResult[] = []; + + // Process batches with concurrency limit + for (const batch of batches) { + const batchPromises = batch.map((conn) => this.sendToConnection(message, hub, conn, { attachmentURL, referredMsgData, @@ -39,12 +74,56 @@ export class BroadcastService { censoredContent, referredContent, }), - ), - ); + ); + + Logger.debug(`Sending batch of ${batch.length} messages`); + const batchResults = await this.processWithConcurrency(batchPromises, CONCURRENCY_LIMIT); + allResults.push(...batchResults); + Logger.debug(`Sent batch of ${batch.length} messages`); + } - await storeMessageData(message, sendResult, connection.hubId, referredMsgData.dbReferrence); + // Batch store message data + await storeMessageData(message, allResults, connection.hubId, referredMsgData.dbReferrence); } + private chunkArray(array: T[], size: number): T[][] { + const chunks: T[][] = []; + for (let i = 0; i < array.length; i += size) { + chunks.push(array.slice(i, i + size)); + } + return chunks; + } + + private async processWithConcurrency( + promises: Promise[], + concurrency: number, + ): Promise { + const results: T[] = []; + let index = 0; + + async function next(): Promise { + const currentIndex = index++; + if (currentIndex >= promises.length) return; + + try { + const result = await promises[currentIndex]; + results[currentIndex] = result; + } + catch (error) { + results[currentIndex] = error; + } + + await next(); + } + + // Start initial batch of promises + const initialPromises = Array(Math.min(concurrency, promises.length)) + .fill(null) + .map(() => next()); + + await Promise.all(initialPromises); + return results; + } async resolveAttachmentURL(message: Message) { return ( message.attachments.first()?.url ?? (await getAttachmentURL(message.content)) ?? undefined @@ -91,7 +170,10 @@ export class BroadcastService { return { messageRes, webhookURL: connection.webhookURL, mode }; } catch (e) { - Logger.error(`Failed to send message to ${connection.channelId} in server ${connection.serverId}`, e); + Logger.error( + `Failed to send message to ${connection.channelId} in server ${connection.serverId}`, + e, + ); return { error: e.message, webhookURL: connection.webhookURL }; } } @@ -109,7 +191,11 @@ export class BroadcastService { ): WebhookMessageCreateOptions { const { dbReferrence, referredAuthor } = opts.referredMsgData; const author = { username: opts.username, avatarURL: message.author.displayAvatarURL() }; - const jumpButton = this.getJumpButton(referredAuthor?.username ?? 'Unknown', connection, dbReferrence); + const jumpButton = this.getJumpButton( + referredAuthor?.username ?? 'Unknown', + connection, + dbReferrence, + ); const servername = trimAndCensorBannedWebhookWords(message.guild.name); const messageFormatter = new MessageFormattingService(connection); @@ -134,8 +220,7 @@ export class BroadcastService { } private async sendMessage(webhookUrl: string, data: WebhookMessageCreateOptions) { - const webhook = new WebhookClient({ url: webhookUrl }); + const webhook = this.getWebhookClient(webhookUrl); return await webhook.send(data); } } - diff --git a/src/utils/network/messageUtils.ts b/src/utils/network/messageUtils.ts index fcbca526..62daa70f 100644 --- a/src/utils/network/messageUtils.ts +++ b/src/utils/network/messageUtils.ts @@ -45,43 +45,54 @@ export const addBroadcasts = async ( originalMsgId: Snowflake, ...broadcasts: Broadcast[] ) => { - const redis = getRedis(); - const broadcastsKey = `${RedisKeys.broadcasts}:${originalMsgId}:${hubId}`; - - // Single loop to process all broadcast data - const { broadcastEntries, reverseLookups } = broadcasts.reduce( - (acc, broadcast) => { - const { messageId, channelId, mode } = broadcast; - const broadcastInfo = JSON.stringify({ mode, messageId, channelId, originalMsgId }); - - // Add to broadcasts hash - acc.broadcastEntries.push(channelId, broadcastInfo); - - // Prepare reverse lookup - acc.reverseLookups.push( - `${RedisKeys.messageReverse}:${messageId}`, - `${originalMsgId}:${hubId}`, - ); - - return acc; - }, - { broadcastEntries: [] as string[], reverseLookups: [] as string[] }, - ); - - Logger.debug(`Adding ${broadcasts.length} broadcasts for message ${originalMsgId}`); - - // Add all broadcasts to the hash in a single operation - await redis.hset(broadcastsKey, broadcastEntries); - await redis.expire(broadcastsKey, 86400); - await redis.mset(reverseLookups); + try { + const redis = getRedis(); + const broadcastsKey = `${RedisKeys.broadcasts}:${originalMsgId}:${hubId}`; + const pipeline = redis.pipeline(); + + // Prepare all operations in a single reduce to minimize iterations + const { broadcastEntries, reverseLookupKeys } = broadcasts.reduce( + (acc, broadcast) => { + const { messageId, channelId, mode } = broadcast; + const broadcastInfo = JSON.stringify({ mode, messageId, channelId, originalMsgId }); + + // Add to broadcasts entries + acc.broadcastEntries.push(channelId, broadcastInfo); + + // Store reverse lookup key for later expiry setting + const reverseKey = `${RedisKeys.messageReverse}:${messageId}`; + acc.reverseLookupKeys.push(reverseKey); + + // Add reverse lookup to pipeline + pipeline.set(reverseKey, `${originalMsgId}:${hubId}`); + + return acc; + }, + { + broadcastEntries: [] as string[], + reverseLookupKeys: [] as string[], + }, + ); + + Logger.debug(`Adding ${broadcasts.length} broadcasts for message ${originalMsgId}`); + + // Add main broadcast hash + pipeline.hset(broadcastsKey, broadcastEntries); + pipeline.expire(broadcastsKey, 86400); + + // Set expiry for all reverse lookups in the same pipeline + reverseLookupKeys.forEach((key) => { + pipeline.expire(key, 86400); + }); - Logger.debug(`Added ${broadcasts.length} broadcasts for message ${originalMsgId}`); + // Execute all Redis operations in a single pipeline + await pipeline.exec(); - reverseLookups - .filter((_, i) => i % 2 === 0) - .forEach(async (key) => { - await redis.expire(key, 86400); - }); + Logger.debug(`Added ${broadcasts.length} broadcasts for message ${originalMsgId}`); + } + catch (error) { + Logger.error('Failed to add broadcasts', error); + } }; export const getBroadcasts = async (originalMsgId: string, hubId: string) => {