Skip to content

Commit

Permalink
Log when dropped message thresholds are reached
Browse files Browse the repository at this point in the history
  • Loading branch information
slominskir committed Feb 14, 2024
1 parent cb74247 commit ba5bc2c
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/main/java/org/jlab/epics2web/epics/ChannelMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ public void run() {
handleRegularConnectionOrReconnect();
}
} else {
LOGGER.log(Level.FINE, "Notifying clients of disconnect from channel: {0}", pv);
LOGGER.log(Level.FINEST, "Notifying clients of disconnect from channel: {0}", pv);

state.set(MonitorState.DISCONNECTED);
notifyPvInfoAll(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,14 +335,17 @@ public void sendUpdate(Session session, String pv, DBR dbr) {
@SuppressWarnings("unchecked")
public void send(Session session, String pv, String msg) {
if (session.isOpen()) {
String id = session.toString();
if (Application.WRITE_STRATEGY == WriteStrategy.ASYNC_QUEUE) {
ConcurrentLinkedQueue<String> writequeue = (ConcurrentLinkedQueue<String>) session.getUserProperties().get("writequeue");

//System.out.println("Queue Size: " + writequeue.size());

if (writequeue.size() > Application.WRITE_QUEUE_SIZE_LIMIT) {
//LOGGER.log(Level.INFO, "Dropping message: {0}", msg);
AtomicLong dropCount = (AtomicLong)session.getUserProperties().get("droppedMessageCount");
dropCount.getAndIncrement();
long count = dropCount.getAndIncrement() + 1; // getAndIncrement is actually returning previous value, not newly updated, so we add 1.
// Limit log file output by only reporting when thresholds are reached
if(count == 1 || count == 1000 || count == 10000 || count == 100000) {
LOGGER.log(Level.FINEST, "Session {0} queue full (limit={1}); Dropping pv {2} message: {3}; total dropped: {4}", new Object[]{id, Application.WRITE_QUEUE_SIZE_LIMIT, pv, msg, count});
}
} else {
writequeue.offer(msg);
}
Expand All @@ -355,9 +358,12 @@ public void send(Session session, String pv, String msg) {
boolean success = writequeue.offer(msg);

if(!success) {
//LOGGER.log(Level.INFO, "Dropping message: {0}", msg);
AtomicLong dropCount = (AtomicLong)session.getUserProperties().get("droppedMessageCount");
dropCount.getAndIncrement();
long count = dropCount.getAndIncrement() + 1; // getAndIncrement is actually returning previous value, not newly updated, so we add 1.
// Limit log file output by only reporting when thresholds are reached
if(count == 1 || count == 1000 || count == 10000 || count == 100000) {
LOGGER.log(Level.FINEST, "Session {0} queue full (limit={1}); Dropping pv {2} message: {3}; total dropped: {4}", new Object[]{id, Application.WRITE_QUEUE_SIZE_LIMIT, pv, msg, count});
}
}
} else {
try {
Expand Down

0 comments on commit ba5bc2c

Please sign in to comment.