From 0dbbd269db1c988c59f840c1b0e7955677536b39 Mon Sep 17 00:00:00 2001 From: slominskir Date: Wed, 14 Feb 2024 16:13:51 -0500 Subject: [PATCH] Log when dropped message thresholds are reached --- .../jlab/epics2web/epics/ChannelMonitor.java | 2 +- .../websocket/WebSocketSessionManager.java | 18 ++++++++++++------ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/jlab/epics2web/epics/ChannelMonitor.java b/src/main/java/org/jlab/epics2web/epics/ChannelMonitor.java index 21c4f5a..50d20b0 100644 --- a/src/main/java/org/jlab/epics2web/epics/ChannelMonitor.java +++ b/src/main/java/org/jlab/epics2web/epics/ChannelMonitor.java @@ -266,7 +266,7 @@ public void run() { handleRegularConnectionOrReconnect(); } } else { - LOGGER.log(Level.FINE, "Notifying clients of disconnect from channel: {0}", pv); + LOGGER.log(Level.FINEST, "Notifying clients of disconnect from channel: {0}", pv); state.set(MonitorState.DISCONNECTED); notifyPvInfoAll(false); diff --git a/src/main/java/org/jlab/epics2web/websocket/WebSocketSessionManager.java b/src/main/java/org/jlab/epics2web/websocket/WebSocketSessionManager.java index 452b418..7eee3df 100644 --- a/src/main/java/org/jlab/epics2web/websocket/WebSocketSessionManager.java +++ b/src/main/java/org/jlab/epics2web/websocket/WebSocketSessionManager.java @@ -335,14 +335,17 @@ public void sendUpdate(Session session, String pv, DBR dbr) { @SuppressWarnings("unchecked") public void send(Session session, String pv, String msg) { if (session.isOpen()) { + String id = session.toString(); if (Application.WRITE_STRATEGY == WriteStrategy.ASYNC_QUEUE) { ConcurrentLinkedQueue writequeue = (ConcurrentLinkedQueue) session.getUserProperties().get("writequeue"); - - //System.out.println("Queue Size: " + writequeue.size()); + if (writequeue.size() > Application.WRITE_QUEUE_SIZE_LIMIT) { - //LOGGER.log(Level.INFO, "Dropping message: {0}", msg); AtomicLong dropCount = (AtomicLong)session.getUserProperties().get("droppedMessageCount"); - dropCount.getAndIncrement(); + long count = dropCount.getAndIncrement() + 1; // getAndIncrement is actually returning previous value, not newly updated, so we add 1. + // Limit log file output by only reporting when thresholds are reached + if(count == 1 || count == 1000 || count == 10000 || count == 100000) { + LOGGER.log(Level.FINEST, "Session {0} queue full (limit={1}); Dropping pv {2} message: {3}; total dropped: {4}", new Object[]{id, Application.WRITE_QUEUE_SIZE_LIMIT, pv, msg, count}); + } } else { writequeue.offer(msg); } @@ -355,9 +358,12 @@ public void send(Session session, String pv, String msg) { boolean success = writequeue.offer(msg); if(!success) { - //LOGGER.log(Level.INFO, "Dropping message: {0}", msg); AtomicLong dropCount = (AtomicLong)session.getUserProperties().get("droppedMessageCount"); - dropCount.getAndIncrement(); + long count = dropCount.getAndIncrement() + 1; // getAndIncrement is actually returning previous value, not newly updated, so we add 1. + // Limit log file output by only reporting when thresholds are reached + if(count == 1 || count == 1000 || count == 10000 || count == 100000) { + LOGGER.log(Level.FINEST, "Session {0} queue full (limit={1}); Dropping pv {2} message: {3}; total dropped: {4}", new Object[]{id, Application.WRITE_QUEUE_SIZE_LIMIT, pv, msg, count}); + } } } else { try {