Skip to content

Commit

Permalink
Allow setting bundler instance before init
Browse files Browse the repository at this point in the history
This change allows setting the bundler instance on TP before it is initialized/channel is created
  • Loading branch information
cfredri4 authored and belaban committed Nov 18, 2024
1 parent 71703ca commit f190fdd
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 31 deletions.
37 changes: 15 additions & 22 deletions src/org/jgroups/protocols/TP.java
Original file line number Diff line number Diff line change
Expand Up @@ -531,12 +531,18 @@ public <T extends TP> T setLocalTransport(String tp_class) throws Exception {
public Bundler getBundler() {return bundler;}

/** Installs a bundler */
public <T extends TP> T setBundler(Bundler bundler) {
if(this.bundler != null)
this.bundler.stop();
bundler.init(this);
bundler.start();
this.bundler=bundler;
public <T extends TP> T setBundler(Bundler new_bundler) {
String old_bundler_class=null;
if(bundler != null) {
bundler.stop();
old_bundler_class=bundler.getClass().getName();
new_bundler.init(this);
new_bundler.start();
}
bundler=new_bundler;
bundler_type=bundler.getClass().getName();
if(old_bundler_class != null)
log.debug("%s: replaced bundler %s with %s", local_addr, old_bundler_class, bundler.getClass().getName());
return (T)this;
}

Expand Down Expand Up @@ -766,10 +772,9 @@ public String toString() {
else
msg_processing_policy.init(this);

if(bundler == null) {
if(bundler == null)
bundler=createBundler(bundler_type, getClass());
bundler.init(this);
}
bundler.init(this);
rtt.init(this);
// When stats is false, we'll set msg_stats.enabled to false, too. However, msg_stats.enabled=false can be
// set to false even if stats is true
Expand Down Expand Up @@ -819,19 +824,7 @@ public void destroy() {
@ManagedOperation(description="Creates and sets a new bundler. Type has to be either a bundler_type or the fully " +
"qualified classname of a Bundler impl. Stops the current bundler (if running)")
public <T extends TP> T bundler(String type) throws Exception {
Bundler new_bundler=createBundler(type, getClass());
String old_bundler_class=null;
if(bundler != null) {
bundler.stop();
old_bundler_class=bundler.getClass().getName();
}
new_bundler.init(this);
new_bundler.start();
bundler=new_bundler;
bundler_type=type;
if(old_bundler_class != null)
log.debug("%s: replaced bundler %s with %s", local_addr, old_bundler_class, bundler.getClass().getName());
return (T)this;
return setBundler(createBundler(type, getClass()));
}


Expand Down
18 changes: 9 additions & 9 deletions tests/stress/org/jgroups/tests/BundlerStressTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class BundlerStressTest {
protected int num_sender_threads=1;
protected boolean details;

protected String cfg="tcp.xml";
protected String cfg="udp.xml";
protected JChannel[] channels;
protected final Map<Long,Promise<Long>> sender_threads=new ConcurrentHashMap<>();

Expand All @@ -62,16 +62,16 @@ protected BundlerStressTest createChannels() throws Exception {
String name=String.valueOf(ch);
channels[i]=new JChannel(cfg).name(name);
GMS gms=channels[i].getProtocolStack().findProtocol(GMS.class);
if(gms != null)
gms.printLocalAddress(false);
// if(gms != null)
// gms.printLocalAddress(false);

channels[i].connect("bst");
System.out.print(".");
if(i == 0) {
TP transport=channels[0].getProtocolStack().getTransport();
transport.bundler(bundler);
}
else
//if(i == 0) {
// TP transport=channels[0].getProtocolStack().getTransport();
// transport.bundler(bundler);
//}
//else
channels[i].setReceiver(new BundlerTestReceiver());
}
Util.waitUntilAllChannelsHaveSameView(10000, 500, channels);
Expand Down Expand Up @@ -259,7 +259,7 @@ protected void sendMessages(boolean is_warmup) throws Exception {


public static void main(String[] args) throws Exception {
String bundler="transfer-queue", props="tcp.xml";
String bundler="transfer-queue", props="udp.xml";
int time=60, warmup=time/2, nodes=4, num_sender_threads=1, msg_size=1000;
boolean interactive=true;
for(int i=0; i < args.length; i++) {
Expand Down

0 comments on commit f190fdd

Please sign in to comment.