Skip to content

Commit

Permalink
- Added more tests to RelayTest
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Sep 28, 2023
1 parent 25e88eb commit f2d2bb6
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 7 deletions.
4 changes: 4 additions & 0 deletions src/org/jgroups/protocols/relay/RELAY.java
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,10 @@ public String toString() {
return String.format("%s%s", getClass().getSimpleName(), local_addr != null? String.format(" (%s)", local_addr) : "");
}

protected boolean isLocal(SiteAddress addr) {
return Objects.equals(site, addr.getSite());
}

protected boolean isLocal(SiteMaster sm) {
return is_site_master && Objects.equals(site, sm.getSite());
}
Expand Down
49 changes: 48 additions & 1 deletion tests/junit-functional/org/jgroups/tests/RelayTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.stream.Stream;

import static org.jgroups.tests.RelayTests.Data.Type.REQ;
import static org.jgroups.tests.RelayTests.Data.Type.RSP;

/**
* Various RELAY-related tests ({@link RELAY2} and {@link RELAY3})
Expand Down Expand Up @@ -437,7 +438,7 @@ assert allChannels().stream().filter(ch -> !isSiteMaster(ch)).map(ch -> getRecei
// Note that the SiteMaster("X") to self message will result in a unicast (self) dest address, so 5 SM dests;
// this was changed in JGRP-2729 (only in RELAY3)
for(JChannel ch: allChannels()) {
List<Message> list=((MyReceiver<Message>)ch.getReceiver()).list();
List<Message> list=getReceiver(ch).list();
RELAY relay=ch.getProtocolStack().findProtocol(RELAY.class);
int sm=0, loopbacks=0;
for(Message msg: list) {
Expand Down Expand Up @@ -498,6 +499,23 @@ assert allChannels().stream().filter(RelayTests::isSiteMaster)
.allMatch(l -> l.stream().filter(m -> m.dest() instanceof SiteMaster).count() == 6);
}

/** Tests A sending to SM("lon") (A, loopback) and B sending to SM("lon") (A) */
public void testSendingToLocalSiteMaster(Class<? extends RELAY> cl) throws Exception {
createSymmetricNetwork(cl, ch -> new UnicastResponseSender<Message>(ch).rawMsgs(true));
a.send(new SiteMaster(LON), new Data(REQ, "hello"));
List<Message> list_a=getReceiver(a).list();
List<Message> list_b=getReceiver(b).list();
Util.waitUntil(2000, 100, () -> list_a.size() == 2);
assert list_a.stream().map(Message::getObject).filter(obj -> ((Data)obj).type == REQ).count() == 1;
assert list_a.stream().map(Message::getObject).filter(obj -> ((Data)obj).type == RSP).count() == 1;

list_a.clear();
b.send(new SiteMaster(LON), new Data(REQ, "hello"));
Util.waitUntil(2000, 100, () -> list_a.size() == 1 && list_b.size() == 1);
assert list_a.stream().map(Message::getObject).filter(obj -> ((Data)obj).type == REQ).count() == 1;
assert list_b.stream().map(Message::getObject).filter(obj -> ((Data)obj).type == RSP).count() == 1;
}

/** Sends a message to all members of the local site only */
public void testMulticastsToLocalSiteOnly(Class<? extends RELAY> cl) throws Exception {
createSymmetricNetwork(cl, ch -> new ResponseSender<Message>(ch).rawMsgs(true));
Expand Down Expand Up @@ -614,6 +632,35 @@ public void testUnicasts(Class<? extends RELAY> cl) throws Exception {
assertNumMessages(1, a,d);
}

/** Tests sending a large message across sites (from A:lon -> C:nyc); the fragmentation protocol should
* fragment/unfragment it */
public void testFragmentation(Class<? extends RELAY> cl) throws Exception {
createSymmetricNetwork(cl, ch -> new UnicastResponseSender<>(ch).rawMsgs(true));
String s="hello".repeat(1000);
a.send(b.getAddress(), new Data(REQ, s));
List<Message> list_a=getReceiver(a).list(), list_b=getReceiver(b).list();
Util.waitUntil(2000, 100, () -> list_a.size() == 1 && list_b.size() == 1);
}

/** Tests state transfer between sites: from C:nyc to A:lon */
public void testStateTransfer(Class<? extends RELAY> cl) throws Exception {
createSymmetricNetwork(cl, ch -> new MyReceiver<>().rawMsgs(true));
MyReceiver<Message> r_a=(MyReceiver<Message>)a.getReceiver();
MyReceiver<Message> r_c=(MyReceiver<Message>)c.getReceiver();
boolean relay2=cl.equals(RELAY2.class);

// set state in C:
r_c.state().put("name", "Bela");
r_c.state().put("id", "322649");

// because an address is not a SiteUUID in RELAY2 (just a regular address), A would not know C
// we therefore wrap C's address into a SiteUUID (for RELAY2 only)
Address target=relay2? makeSiteUUID(c.getAddress(), "nyc") : c.getAddress();

a.getState(target, 2000);
assert r_a.state().size() == 2;
}

/** Tests https://issues.redhat.com/browse/JGRP-2696 */
public void testMulticastWithMultipleSiteMasters(Class<? extends RELAY> cl) throws Exception {
if(cl.equals(RELAY2.class))
Expand Down
16 changes: 10 additions & 6 deletions tests/junit-functional/org/jgroups/tests/RelayTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
import org.jgroups.*;
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;
import org.jgroups.protocols.LOCAL_PING;
import org.jgroups.protocols.MERGE3;
import org.jgroups.protocols.TCP;
import org.jgroups.protocols.UNICAST3;
import org.jgroups.protocols.*;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.protocols.pbcast.STABLE;
import org.jgroups.protocols.pbcast.STATE_TRANSFER;
import org.jgroups.protocols.relay.*;
import org.jgroups.protocols.relay.config.RelayConfig;
import org.jgroups.protocols.relay.config.RelayConfig.SiteConfig;
Expand Down Expand Up @@ -51,8 +50,13 @@ protected static Protocol[] defaultStack(RELAY relay) {
new NAKACK2().useMcastXmit(false),
r3,
new UNICAST3(),
new GMS().printLocalAddress(false),
r2
new STABLE().setDesiredAverageGossip(50000).setMaxBytes(8_000_000),
new GMS().printLocalAddress(false).setJoinTimeout(500),
new UFC().setMaxCredits(2_000_000).setMinThreshold(0.4),
new MFC().setMaxCredits(2_000_000).setMinThreshold(0.4),
new FRAG2().setFragSize(1024),
r2,
new STATE_TRANSFER()
};
return Util.combine(Protocol.class, protocols);
}
Expand Down

0 comments on commit f2d2bb6

Please sign in to comment.