Skip to content

Commit

Permalink
- Added test to assert correct topology after start
Browse files Browse the repository at this point in the history
- Added test with broken link ("net1" down)
  • Loading branch information
belaban committed Aug 18, 2023
1 parent 9ab41af commit eaa3c25
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 34 deletions.
26 changes: 13 additions & 13 deletions tests/junit-functional/org/jgroups/tests/RelayTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,8 @@ public void testSendAndReceiveMulticasts(Class<? extends RELAY> cl) throws Excep
for(JChannel ch: allChannels())
ch.send(null, String.format("%s", ch.getAddress()));
Util.waitUntil(5000, 500,
() -> allChannels().stream().peek(RelayTest::printMessages)
.map(RelayTest::getReceiver)
() -> allChannels().stream().peek(RelayTests::printMessages)
.map(RelayTests::getReceiver)
.allMatch(r -> r.size() == 6));
}

Expand All @@ -386,8 +386,8 @@ public void testSendAndReceiveMulticastsAndUnicastResponses(Class<? extends RELA
for(JChannel ch: allChannels())
ch.send(null, String.format("%s", ch.getAddress()));
Util.waitUntil(5000, 500,
() -> allChannels().stream().peek(RelayTest::printMessages)
.map(RelayTest::getReceiver)
() -> allChannels().stream().peek(RelayTests::printMessages)
.map(RelayTests::getReceiver)
.allMatch(r -> r.size() == 6 * 2));

for(JChannel ch: allChannels()) {
Expand Down Expand Up @@ -481,8 +481,8 @@ public void testMulticastsToLocalSiteOnly(Class<? extends RELAY> cl) throws Exce
b.send(new ObjectMessage(null, "from-B").setFlag(Message.Flag.NO_RELAY));

Util.waitUntil(5000, 200,
() -> Stream.of(a,b).peek(RelayTest::printMessages)
.map(RelayTest::getReceiver)
() -> Stream.of(a,b).peek(RelayTests::printMessages)
.map(RelayTests::getReceiver)
.allMatch(r -> r.list().size() == 4));

assert Stream.of(a,b).map(ch -> getReceiver(ch).list())
Expand All @@ -502,13 +502,13 @@ public void localMulticastForwardedToAllSites(Class<? extends RELAY> cl) throws
d.send(null, "d-req"); // non site-master (C is SM)

// all members in all sites should received the 2 multicasts:
Util.waitUntil(5000, 200, () -> allChannels().stream().peek(RelayTest::printMessages)
Util.waitUntil(5000, 200, () -> allChannels().stream().peek(RelayTests::printMessages)
.map(ch -> getReceiver(ch).list())
.allMatch(l -> l.size() >= 2));

Util.waitUntil(5000, 200,
() -> Stream.of(b,d)
.peek(RelayTest::printMessages)
.peek(RelayTests::printMessages)
.map(ch -> getReceiver(ch).list())
.allMatch(l -> l.size() == 2 /* mcasts */ + 6 /* unicast rsps */));
}
Expand Down Expand Up @@ -639,7 +639,7 @@ public void testMulticastWithMultipleSiteMasters(Class<? extends RELAY> cl) thro
b.send(null, "from B");

Util.waitUntil(5000, 100,
() -> generator.get().map(RelayTest::getReceiver).allMatch(r -> r.size() == 2),
() -> generator.get().map(RelayTests::getReceiver).allMatch(r -> r.size() == 2),
() -> printMessages(generator.get()));

System.out.printf("received messages:\n%s\n", printMessages(generator.get()));
Expand All @@ -658,20 +658,20 @@ public void testMulticastWithMultipleSiteMasters(Class<? extends RELAY> cl) thro
// this and always picked the first site master / route in the list, only D and G would
// receive messages (2 each); E and H would receive 0 messages
Util.waitUntil(3000, 100,
() -> Stream.of(a,b).map(RelayTest::getReceiver).allMatch(r -> r.size() == 1));
() -> Stream.of(a,b).map(RelayTests::getReceiver).allMatch(r -> r.size() == 1));

// all other site masters (D or E, G or H) get A's and B's message:
Util.waitUntil(3000, 100,
() -> Stream.of(d,e,_g,_h)
// a site master receives 0, 1 or 2 messages:
.map(RelayTest::getReceiver).allMatch(r -> r.size() >= 0 && r.size() <= 2),
.map(RelayTests::getReceiver).allMatch(r -> r.size() >= 0 && r.size() <= 2),
() -> printMessages(generator.get()));
System.out.printf("-- received messages:\n%s\n", printMessages(generator.get()));
generator.get().forEach(ch -> getReceiver(ch).reset());

c.send(new SiteMaster(null), "from C");
// same as above: A or B receives 1 message, D or E and G or H
Util.waitUntil(3000, 100, () -> Stream.of(a,b,d,e,_g,_h).map(RelayTest::getReceiver)
Util.waitUntil(3000, 100, () -> Stream.of(a,b,d,e,_g,_h).map(RelayTests::getReceiver)
.allMatch(r -> r.size() >= 0 && r.size() <= 2), () -> printMessages(generator.get()));
System.out.printf("-- received messages:\n%s\n", printMessages(generator.get()));
generator.get().forEach(ch -> getReceiver(ch).reset());
Expand All @@ -680,7 +680,7 @@ public void testMulticastWithMultipleSiteMasters(Class<? extends RELAY> cl) thro
// C sends a multicast; A *or* B (but not both) should forward it to the other sites NYC and SFO
c.send(null, "from C");
Util.waitUntil(3000, 100,
() -> generator.get().map(RelayTest::getReceiver).allMatch(r -> r.size() == 1),
() -> generator.get().map(RelayTests::getReceiver).allMatch(r -> r.size() == 1),
() -> printMessages(generator.get()));
System.out.printf("-- received messages:\n%s\n", printMessages(generator.get()));
}
Expand Down
74 changes: 53 additions & 21 deletions tests/junit-functional/org/jgroups/tests/RelayTestAsym.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package org.jgroups.tests;

import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.*;
import org.jgroups.protocols.relay.RELAY3;
import org.jgroups.util.MyReceiver;
import org.jgroups.util.Util;
Expand All @@ -13,7 +10,9 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Tests asymmetric networks. The sites are setup as follows:
Expand Down Expand Up @@ -52,31 +51,64 @@ public void testMessageSending() throws Exception {
allChannels().forEach(ch -> ch.setReceiver(new MyReceiver<Message>().rawMsgs(true)));

// multicasts:
allChannels().forEach(ch -> {
try {
ch.send(null, String.format("from %s", ch.address()));
}
catch(Exception ex) {
throw new RuntimeException(ex);
}
});
allChannels().forEach(ch -> send(ch, null, String.format("from %s", ch.address())));
assertNumMessages(12, allChannels(), true); // reset receivers

// unicasts:
Collection<Address> all_addrs=allChannels().stream().map(JChannel::getAddress).collect(Collectors.toSet());
allChannels().forEach(ch -> {
all_addrs.forEach(target -> {
try {
ch.send(target, String.format("from %s", ch.address()));
}
catch(Exception ex) {
throw new RuntimeException(ex);
}
});
all_addrs.forEach(target -> send(ch, target, String.format("from %s", ch.address())));
});

assertNumMessages(12, allChannels(), true);
}

public void testTopology() throws Exception {
setup(true);
Util.waitUntilTrue(3000, 100, () -> assertTopo(allChannels()));
for(JChannel ch: allChannels()) {
RELAY3 r=ch.getProtocolStack().findProtocol(RELAY3.class);
Map<String,View> cache=r.topo().cache();
System.out.printf("%s", printTopo(List.of(ch)));
assert cache.size() == 4 : printTopo(List.of(ch)); // 4 sites - HF, NET1-3
assert cache.values().stream().allMatch(v -> v.size() == 3) : printTopo(List.of(ch));
}
}

/** Tests sending mcasts from HF and NET3 when NET1 is down: messages should not be received across the broken link */
public void testMessageSendingWithNet1Down() throws Exception {
setup(true);
// take NET1 down:
Util.close(d,e,f);
allChannels().stream().filter(ch -> !ch.isClosed())
.forEach(ch -> ch.setReceiver(new MyReceiver<Message>().rawMsgs(true)));
Stream.of(a,b,c,x,y,z).forEach(ch -> send(ch, null, String.format("from %s", ch.address())));
// we only receive 3 messages (from own site)
assertNumMessages(3, a,b,c,x,y,z);
}

protected static void send(JChannel ch, Address dest, Object payload) {
try {
ch.send(dest, payload);
}
catch(Exception ex) {
throw new RuntimeException(ex);
}
}

protected static boolean assertTopo(List<JChannel> channels) {
for(JChannel ch: channels) {
RELAY3 r=ch.getProtocolStack().findProtocol(RELAY3.class);
Map<String,View> cache=r.topo().cache();
if(cache.size() != 4 || !cache.values().stream().allMatch(v -> v.size() == 3))
return false;
}
return true;
}

protected static String printTopo(List<JChannel> channels) {
return channels.stream()
.map(ch -> String.format("%s:\n%s\n", ch.address(), ((RELAY3)ch.getProtocolStack().findProtocol(RELAY3.class))
.printTopology(true))).collect(Collectors.joining("\n"));
}

protected void setup(boolean connect) throws Exception {
Expand Down

0 comments on commit eaa3c25

Please sign in to comment.