Skip to content

Commit

Permalink
Better error handling for Barrage connections
Browse files Browse the repository at this point in the history
  • Loading branch information
stanbrub committed Oct 28, 2024
1 parent dcd7c86 commit 8eccadf
Showing 1 changed file with 16 additions and 21 deletions.
37 changes: 16 additions & 21 deletions src/main/java/io/deephaven/benchmark/connect/BarrageConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
package io.deephaven.benchmark.connect;

import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
Expand All @@ -14,15 +11,7 @@
import org.apache.arrow.memory.RootAllocator;
import io.deephaven.benchmark.metric.Metrics;
import io.deephaven.benchmark.metric.MetricsFuture;
import io.deephaven.client.impl.BarrageSession;
import io.deephaven.client.impl.BarrageSessionFactory;
import io.deephaven.client.impl.BarrageSnapshot;
import io.deephaven.client.impl.BarrageSubscription;
import io.deephaven.client.impl.ConsoleSession;
import io.deephaven.client.impl.DaggerDeephavenBarrageRoot;
import io.deephaven.client.impl.FieldInfo;
import io.deephaven.client.impl.TableHandle;
import io.deephaven.client.impl.TableHandleManager;
import io.deephaven.client.impl.*;
import io.deephaven.client.impl.script.Changes;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.Table;
Expand Down Expand Up @@ -64,12 +53,13 @@ public class BarrageConnector implements AutoCloseable {
*/
public BarrageConnector(String hostPort) {
String[] split = hostPort.split(":");
this.channel = getManagedChannel(split[0], Integer.parseInt(split[1]));
this.session = getSession(channel);
try {
this.channel = getManagedChannel(split[0], Integer.parseInt(split[1]));
this.session = getSession(channel);
this.console = session.session().console("python").get();
} catch (Exception ex) {
throw new RuntimeException("Failed to get console for session on host: " + hostPort);
close();
throw new RuntimeException("Failed to get console for session on host: " + hostPort, ex);
}
}

Expand Down Expand Up @@ -178,17 +168,22 @@ public void close() {
});
subscriptions.clear();
variableNames.clear();
console.close();
session.close();
if (console != null)
console.close();
if (session != null)
session.close();
} catch (Exception ex) {
throw new RuntimeException("Failed to close Session", ex);
} finally {
try {
session.session().closeFuture().get(5, TimeUnit.SECONDS);
if (session != null)
session.session().closeFuture().get(5, TimeUnit.SECONDS);
if (scheduler != null)
scheduler.shutdownNow();
if (channel != null)
channel.shutdownNow();
} catch (Exception ex) {
}
scheduler.shutdownNow();
channel.shutdownNow();
}
}

Expand Down

0 comments on commit 8eccadf

Please sign in to comment.