From 254cfefe133eefeda2f511cb6eb82574bdfddbf4 Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Thu, 28 Dec 2023 12:40:06 +0100 Subject: [PATCH] - Store headers in RelayHeader before routing and restore on reception (RELAY3: https://issues.redhat.com/browse/JGRP-2744) - DiagnosticsHandler checks for address compatibility when joining mcast groups on network interfaces --- src/org/jgroups/BaseMessage.java | 55 +------ src/org/jgroups/protocols/relay/RELAY2.java | 9 +- src/org/jgroups/protocols/relay/RELAY3.java | 19 ++- .../jgroups/protocols/relay/RelayHeader.java | 23 ++- src/org/jgroups/protocols/relay/Route.java | 26 ++-- src/org/jgroups/stack/DiagnosticsHandler.java | 8 +- src/org/jgroups/stack/Protocol.java | 2 +- src/org/jgroups/stack/ProtocolStack.java | 3 +- src/org/jgroups/util/Headers.java | 49 +++--- src/org/jgroups/util/Util.java | 2 +- .../org/jgroups/tests/RelayTest.java | 146 +++++++++--------- .../org/jgroups/tests/RelayTests.java | 39 +++-- .../org/jgroups/tests/SizeTest.java | 98 +++++------- 13 files changed, 226 insertions(+), 253 deletions(-) diff --git a/src/org/jgroups/BaseMessage.java b/src/org/jgroups/BaseMessage.java index 4b89e6830c3..6c685e13f84 100644 --- a/src/org/jgroups/BaseMessage.java +++ b/src/org/jgroups/BaseMessage.java @@ -2,15 +2,12 @@ package org.jgroups; -import org.jgroups.conf.ClassConfigurator; -import org.jgroups.util.ByteArray; import org.jgroups.util.Headers; import org.jgroups.util.Util; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.List; import java.util.Map; /** @@ -51,15 +48,9 @@ public BaseMessage(Address dest) { public Message setSrc(Address new_src) {sender=new_src; return this;} public int getNumHeaders() {return Headers.size(this.headers);} public Map getHeaders() {return Headers.getHeaders(this.headers);} + public Header[] headers() {return headers;} // don't modify! + public Message headers(Header[] hdrs) {this.headers=hdrs; return this;} // use with caution! public String printHeaders() {return Headers.printHeaders(this.headers);} - public ByteArray writeHeaders() throws IOException {return Headers.writeHeaders(this.headers);} - public Message readHeaders(ByteArray ba) throws IOException, ClassNotFoundException { - List
hdrs=Headers.readHeaders(ba); - for(Header h: hdrs) - putHeader(h.getProtId(), h); - return this; - } - /** * Sets a number of flags in a message @@ -268,7 +259,7 @@ public void writeTo(DataOutput out) throws IOException { Util.writeAddress(sender, out); // write the headers - writeHeaders(this.headers, out, (short[])null); + Headers.writeHeaders(this.headers, out, (short[])null); // finally write the payload writePayload(out); @@ -293,7 +284,7 @@ public void writeToNoAddrs(Address src, DataOutput out, short... excluded_header Util.writeAddress(sender, out); // write the headers - writeHeaders(this.headers, out, excluded_headers); + Headers.writeHeaders(this.headers, out, excluded_headers); // finally write the payload writePayload(out); @@ -316,14 +307,7 @@ public void readFrom(DataInput in) throws IOException, ClassNotFoundException { sender=Util.readAddress(in); // 5. headers - int len=in.readShort(); - if(this.headers == null || len > this.headers.length) - this.headers=createHeaders(len); - for(int i=0; i < len; i++) { - short id=in.readShort(); - Header hdr=readHeader(in).setProtId(id); - this.headers[i]=hdr; - } + this.headers=Headers.readHeaders(in); readPayload(in); } @@ -333,35 +317,6 @@ protected Message copyPayload(Message copy) { return copy; } - protected static void writeHeaders(Header[] hdrs, DataOutput out, short ... excluded_headers) throws IOException { - int size=Headers.size(hdrs, excluded_headers); - out.writeShort(size); - if(size > 0) { - for(Header hdr : hdrs) { - if(hdr == null) - break; - short id=hdr.getProtId(); - if(Util.containsId(id, excluded_headers)) - continue; - out.writeShort(id); - writeHeader(hdr, out); - } - } - } - - protected static void writeHeader(Header hdr, DataOutput out) throws IOException { - short magic_number=hdr.getMagicId(); - out.writeShort(magic_number); - hdr.writeTo(out); - } - - protected static Header readHeader(DataInput in) throws IOException, ClassNotFoundException { - short magic_number=in.readShort(); - Header hdr=ClassConfigurator.create(magic_number); - hdr.readFrom(in); - return hdr; - } - protected static Header[] createHeaders(int size) { return size > 0? new Header[size] : new Header[Util.DEFAULT_HEADERS]; } diff --git a/src/org/jgroups/protocols/relay/RELAY2.java b/src/org/jgroups/protocols/relay/RELAY2.java index 3c490a4f9d4..eb2917f42fb 100644 --- a/src/org/jgroups/protocols/relay/RELAY2.java +++ b/src/org/jgroups/protocols/relay/RELAY2.java @@ -262,13 +262,10 @@ protected void handleRelayMessage(Message msg) { log.warn("%s: received a message without a relay header; discarding it", local_addr); return; } - try { - msg.clearHeaders(); // remove all headers added by the bridge cluster + Header[] original_hdrs=hdr.originalHeaders(); + if(original_hdrs != null && Headers.size(original_hdrs) > 0) { // only true with RELAY3 + ((BaseMessage)msg).headers(original_hdrs); // overwrites headers added by the bridge cluster msg.putHeader(id, hdr); - ((BaseMessage)msg).readHeaders(hdr.originalHeaders()); - } - catch(Exception ex) { - log.error("%s: failed handling message relayed from %s: %s", local_addr, msg.src(), ex); } if(hdr.final_dest != null) { diff --git a/src/org/jgroups/protocols/relay/RELAY3.java b/src/org/jgroups/protocols/relay/RELAY3.java index 89d060a6367..17f40631331 100644 --- a/src/org/jgroups/protocols/relay/RELAY3.java +++ b/src/org/jgroups/protocols/relay/RELAY3.java @@ -285,11 +285,14 @@ protected void handleRelayMessage(Message msg) { return; } try { + Header[] original_hdrs=hdr.originalHeaders(); Message copy=copy(msg).dest(hdr.final_dest).src(hdr.original_sender); - copy.clearHeaders(); // remove all headers added by the bridge cluster + copy.clearHeaders(); + if(original_hdrs != null && Headers.size(original_hdrs) > 0) + ((BaseMessage)copy).headers(original_hdrs); // removes/overwrites all headers added by the bridge cluster copy.putHeader(id, hdr); - ((BaseMessage)copy).readHeaders(hdr.originalHeaders()); - copy.setFlag(hdr.originalFlags(), false); + if(msg.dest() != null) + copy.setFlag(hdr.originalFlags(), false); // todo: check if copy is needed! process(true, copy); } @@ -379,7 +382,7 @@ protected Object routeThen(Message msg, List sites, Supplier act return action != null? action.get() : null; } - /** This method has all of the routing logic, for both site masters and regular members */ + /** This method has all the routing logic, for both site masters and regular members */ protected Object process(boolean down, Message msg) { Address dest=msg.dest(); SiteAddress dst=null; @@ -389,7 +392,7 @@ protected Object process(boolean down, Message msg) { case ALL: if(down) return routeThen(msg, null,() -> deliver(null, msg, true)); - return dontRoute(msg)? passUp(msg) : routeThen(msg, null, () -> passUp(msg)); + return mustBeRouted(msg)? routeThen(msg, null, () -> passUp(msg)) : passUp(msg); case SM_ALL: return routeThen(msg, null, () -> passUp(msg)); case SM: @@ -437,12 +440,12 @@ protected Object process(boolean down, Message msg) { * multiple site masters, and this site master is picked to route the message, then return true, else return false. * JIRA: https://issues.redhat.com/browse/JGRP-2696 */ - protected boolean dontRoute(Message msg) { + protected boolean mustBeRouted(Message msg) { if(msg.isFlagSet(Flag.NO_RELAY)) - return true; // don't route + return false; // don't route final List
sms=site_masters; if(sms == null || sms.size() < 2) - return false; // do route + return true; // do route Address first_sm=sms.get(0); return local_addr.equals(first_sm); } diff --git a/src/org/jgroups/protocols/relay/RelayHeader.java b/src/org/jgroups/protocols/relay/RelayHeader.java index 55568313a69..72f0d3de5e4 100644 --- a/src/org/jgroups/protocols/relay/RelayHeader.java +++ b/src/org/jgroups/protocols/relay/RelayHeader.java @@ -4,7 +4,7 @@ import org.jgroups.Global; import org.jgroups.Header; import org.jgroups.util.Bits; -import org.jgroups.util.ByteArray; +import org.jgroups.util.Headers; import org.jgroups.util.Util; import java.io.DataInput; @@ -35,7 +35,10 @@ public class RelayHeader extends Header { protected Set visited_sites; // used with TOPO_REQ: when set, return the entire cache, otherwise only information about the local members protected boolean return_entire_cache; - protected ByteArray original_hdrs; // marshalled headers (https://issues.redhat.com/browse/JGRP-2729) + + // marshalled headers (https://issues.redhat.com/browse/JGRP-2729), + // changed in https://issues.redhat.com/browse/JGRP-2744 + protected Header[] original_hdrs; protected short original_flags; @@ -63,8 +66,8 @@ public RelayHeader(byte type, Address final_dest, Address original_sender) { public boolean hasSites() {return sites != null && !sites.isEmpty();} public boolean returnEntireCache() {return return_entire_cache;} public RelayHeader returnEntireCache(boolean b) {return_entire_cache=b; return this;} - public ByteArray originalHeaders() {return original_hdrs;} - public RelayHeader originalHeaders(ByteArray ba) {original_hdrs=ba; return this;} + public Header[] originalHeaders() {return original_hdrs;} + public RelayHeader originalHeaders(Header[] hdrs) {original_hdrs=hdrs; return this;} public short originalFlags() {return original_flags;} public RelayHeader originalFlags(short fl) {original_flags=fl; return this;} @@ -128,7 +131,8 @@ public int serializedSize() { assertNonNullSites(); return Global.BYTE_SIZE*2 + Util.size(final_dest) + Util.size(original_sender) + sizeOf(sites) + sizeOf(visited_sites) + - Global.BYTE_SIZE + (original_hdrs != null? originalHeaders().serializedSize() : 0) + Short.BYTES; // orig-flags + Short.BYTES /* num headers */ + Headers.marshalledSize(original_hdrs) + + Short.BYTES; // orig-flags } @Override @@ -148,9 +152,7 @@ public void writeTo(DataOutput out) throws IOException { Bits.writeString(s, out); } assertNonNullSites(); - out.writeBoolean(original_hdrs != null); - if(original_hdrs != null) - original_hdrs.writeTo(out); + Headers.writeHeaders(original_hdrs, out, (short[])null); out.writeShort(original_flags); } @@ -173,10 +175,7 @@ public void readFrom(DataInput in) throws IOException, ClassNotFoundException { visited_sites.add(Bits.readString(in)); } assertNonNullSites(); - if(in.readBoolean()) { - original_hdrs=new ByteArray(null, 0, 0); - original_hdrs.readFrom(in); - } + original_hdrs=Headers.readHeaders(in); original_flags=in.readShort(); } diff --git a/src/org/jgroups/protocols/relay/Route.java b/src/org/jgroups/protocols/relay/Route.java index b2524ef1db1..7fdd5d3943c 100644 --- a/src/org/jgroups/protocols/relay/Route.java +++ b/src/org/jgroups/protocols/relay/Route.java @@ -1,16 +1,14 @@ package org.jgroups.protocols.relay; -import org.jgroups.Address; -import org.jgroups.BaseMessage; -import org.jgroups.JChannel; -import org.jgroups.Message; +import org.jgroups.*; import org.jgroups.logging.Log; -import org.jgroups.util.ByteArray; +import org.jgroups.util.Headers; import org.jgroups.util.Util; import java.io.IOException; import java.util.Collection; +import java.util.Objects; import static org.jgroups.protocols.relay.RelayHeader.DATA; @@ -34,7 +32,7 @@ public Route(Address site_master, JChannel bridge, RELAY relay, Log log) { this.bridge=bridge; this.relay=relay; this.log=log; - this.relay3=relay instanceof RELAY3; + this.relay3=Objects.requireNonNull(relay) instanceof RELAY3; } public JChannel bridge() {return bridge;} @@ -90,15 +88,19 @@ public String toString() { protected Message createMessage(Address target, Address final_destination, Address original_sender, final Message msg, Collection visited_sites) throws IOException { Message copy=relay.copy(msg).setDest(target).setSrc(null); - ByteArray marshalled_hdrs=((BaseMessage)copy).writeHeaders(); RelayHeader tmp=msg.getHeader(relay.getId()); RelayHeader hdr=tmp != null? tmp.copy().setFinalDestination(final_destination).setOriginalSender(original_sender) : new RelayHeader(DATA, final_destination, original_sender); - hdr.addToVisitedSites(visited_sites) - // to prevent local headers getting mixed up with bridge headers: https://issues.redhat.com/browse/JGRP-2729 - .originalHeaders(marshalled_hdrs) - .originalFlags(copy.getFlags()); // store the original flags, will be restored at the receiver - copy.clearHeaders(); + hdr.addToVisitedSites(visited_sites) + .originalFlags(copy.getFlags()); // store the original flags, will be restored at the receiver + if(relay3) { + // to prevent local headers getting mixed up with bridge headers: https://issues.redhat.com/browse/JGRP-2729 + Header[] original_hdrs=((BaseMessage)copy).headers(); + if(Headers.size(original_hdrs) > 0) { + hdr.originalHeaders(original_hdrs); + copy.clearHeaders(); + } + } copy.putHeader(relay.getId(), hdr); return copy; } diff --git a/src/org/jgroups/stack/DiagnosticsHandler.java b/src/org/jgroups/stack/DiagnosticsHandler.java index 810b24875cb..9090f1afa28 100644 --- a/src/org/jgroups/stack/DiagnosticsHandler.java +++ b/src/org/jgroups/stack/DiagnosticsHandler.java @@ -385,7 +385,7 @@ protected void bindToInterfaces(List interfaces, MulticastSock try { if(Util.isUp(i)) { List inet_addrs=i.getInterfaceAddresses(); - if(inet_addrs != null && !inet_addrs.isEmpty()) { // fix for VM crash - suggested by JJalenak@netopia.com + if(inet_addrs != null && !inet_addrs.isEmpty() && isCompatible(mcast_addr, inet_addrs)) { // fix for VM crash - suggested by JJalenak@netopia.com s.joinGroup(group_addr, i); log.trace("joined %s on %s", group_addr, i.getName()); } @@ -396,7 +396,11 @@ protected void bindToInterfaces(List interfaces, MulticastSock } } } - + + /** Checks if there's any address in the address list that's compatible (same address family) to addr */ + protected static boolean isCompatible(InetAddress addr, List addrs) { + return addrs.stream().map(InterfaceAddress::getAddress).anyMatch(a -> Objects.equals(a.getClass(), addr.getClass())); + } public interface ProbeHandler { /** diff --git a/src/org/jgroups/stack/Protocol.java b/src/org/jgroups/stack/Protocol.java index 712d11f6ba1..79a6725a4d9 100644 --- a/src/org/jgroups/stack/Protocol.java +++ b/src/org/jgroups/stack/Protocol.java @@ -323,7 +323,7 @@ public Object down(Message msg) { * An event was received from the protocol below. Usually the current protocol will want to examine the event type * and - depending on its type - perform some computation (e.g. removing headers from a MSG event type, or updating * the internal membership list when receiving a VIEW_CHANGE event). - * Finally the event is either a) discarded, or b) an event is sent down the stack using {@code down_prot.down()} + * Finally, the event is either a) discarded, or b) an event is sent down the stack using {@code down_prot.down()} * or c) the event (or another event) is sent up the stack using {@code up_prot.up()}. */ public Object up(Event evt) { diff --git a/src/org/jgroups/stack/ProtocolStack.java b/src/org/jgroups/stack/ProtocolStack.java index cac5f157a25..4eb83dfc44d 100644 --- a/src/org/jgroups/stack/ProtocolStack.java +++ b/src/org/jgroups/stack/ProtocolStack.java @@ -944,8 +944,7 @@ public void up(MessageBatch batch) { } public Object down(Event evt) { - if(top_prot != null) - return top_prot.down(evt); + if(top_prot != null) return top_prot.down(evt); return null; } diff --git a/src/org/jgroups/util/Headers.java b/src/org/jgroups/util/Headers.java index 4b9ff34d1d3..60b6254ab88 100644 --- a/src/org/jgroups/util/Headers.java +++ b/src/org/jgroups/util/Headers.java @@ -4,10 +4,10 @@ import org.jgroups.Header; import org.jgroups.conf.ClassConfigurator; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; /** @@ -111,7 +111,7 @@ public static String printHeaders(final Header[] hdrs) { * @param headers The headers array * @param id The protocol ID of the header * @param hdr The header - * @param replace_if_present Whether or not to overwrite an existing header + * @param replace_if_present Whether to overwrite an existing header * @return A new copy of headers if the array needed to be expanded, or null otherwise */ public static Header[] putHeader(final Header[] headers, short id, Header hdr, boolean replace_if_present) { @@ -138,38 +138,37 @@ public static Header[] putHeader(final Header[] headers, short id, Header hdr, b throw new IllegalStateException("unable to add element " + id + ", index=" + i); // we should never come here } - public static ByteArray writeHeaders(Header[] hdrs) throws IOException { - int size=Headers.size(hdrs); - ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(marshalledSize(hdrs) + Global.SHORT_SIZE); + public static void writeHeaders(Header[] hdrs, DataOutput out, short... excluded_headers) throws IOException { + int size=Headers.size(hdrs, excluded_headers); out.writeShort(size); if(size > 0) { - for(Header hdr : hdrs) { + for(Header hdr: hdrs) { if(hdr == null) break; short id=hdr.getProtId(); + if(Util.containsId(id, excluded_headers)) + continue; out.writeShort(id); - short magic_number=hdr.getMagicId(); - out.writeShort(magic_number); - hdr.writeTo(out); + writeHeader(hdr, out); } } - return out.getBuffer(); } - public static List
readHeaders(ByteArray buf) throws IOException, ClassNotFoundException { - ByteArrayDataInputStream in=new ByteArrayDataInputStream(buf); + + public static Header[] readHeaders(DataInput in) throws IOException, ClassNotFoundException { int len=in.readShort(); - List
list=new ArrayList<>(len); + if(len == 0) + return new Header[Util.DEFAULT_HEADERS]; + Header[] headers=new Header[len]; for(int i=0; i < len; i++) { short id=in.readShort(); - short magic_number=in.readShort(); - Header hdr=ClassConfigurator.create(magic_number); - hdr.readFrom(in); - list.add(hdr.setProtId(id)); + Header hdr=readHeader(in).setProtId(id); + headers[i]=hdr; } - return list; + return headers; } + /** * Increases the capacity of the array and copies the contents of the old into the new array */ @@ -226,5 +225,17 @@ public static int size(Header[] hdrs, short... excluded_ids) { return retval; } + private static void writeHeader(Header hdr, DataOutput out) throws IOException { + short magic_number=hdr.getMagicId(); + out.writeShort(magic_number); + hdr.writeTo(out); + } + + private static Header readHeader(DataInput in) throws IOException, ClassNotFoundException { + short magic_number=in.readShort(); + Header hdr=ClassConfigurator.create(magic_number); + hdr.readFrom(in); + return hdr; + } } diff --git a/src/org/jgroups/util/Util.java b/src/org/jgroups/util/Util.java index e120d9b3b04..7b9c9c1808b 100644 --- a/src/org/jgroups/util/Util.java +++ b/src/org/jgroups/util/Util.java @@ -2864,7 +2864,7 @@ public static List newElements(List old_list,List new_list) { public static boolean contains(T key,T[] list) { if(list == null) return false; - for(T tmp : list) + for(T tmp: list) if(tmp == key || tmp.equals(key)) return true; return false; diff --git a/tests/junit-functional/org/jgroups/tests/RelayTest.java b/tests/junit-functional/org/jgroups/tests/RelayTest.java index a996a3bd2e0..8af1aa1942a 100644 --- a/tests/junit-functional/org/jgroups/tests/RelayTest.java +++ b/tests/junit-functional/org/jgroups/tests/RelayTest.java @@ -596,7 +596,7 @@ public void localMulticastForwardedToAllSites(Class cl) throws b.send(null, "b-req"); // non site-master (A is SM) d.send(null, "d-req"); // non site-master (C is SM) - // all members in all sites should received the 2 multicasts: + // all members in all sites should receive the 2 multicasts: Util.waitUntil(5000, 200, () -> allChannels().stream().peek(RelayTests::printMessages) .map(ch -> getReceiver(ch).list()) .allMatch(l -> l.size() >= 2)); @@ -610,7 +610,7 @@ public void localMulticastForwardedToAllSites(Class cl) throws /** Tests sending of unicasts to different local members, varying between site masters and non site masters */ public void testLocalUnicasts(Class cl) throws Exception { - createSymmetricNetwork(cl, ch -> new UnicastResponseSender<>(ch).rawMsgs(true)); + createSymmetricNetwork(cl, ch -> new UnicastResponseSender<>(ch).rawMsgs(true), LON); try(JChannel _c=createNode(cl, LON, "C", BRIDGE_CLUSTER, LON, NYC, SFO)){ Util.waitUntilAllChannelsHaveSameView(5000, 100, a,b,_c); _c.setReceiver(new UnicastResponseSender<>(_c).rawMsgs(true)); @@ -658,7 +658,7 @@ public void testLocalUnicasts(Class cl) throws Exception { /** Sends unicasts between members of different sites */ public void testUnicasts(Class cl) throws Exception { - createSymmetricNetwork(cl, ch -> new UnicastResponseSender<>(ch).rawMsgs(true)); + createSymmetricNetwork(cl, ch -> new UnicastResponseSender<>(ch).rawMsgs(true), LON, NYC); // because an address is not a SiteUUID in RELAY2 (just a regular address), A would not know C // we therefore wrap C's address into a SiteUUID (for RELAY2 only) @@ -690,16 +690,17 @@ public void testUnicasts(Class cl) throws Exception { /** Tests sending a large message across sites (from A:lon -> C:nyc); the fragmentation protocol should * fragment/unfragment it */ public void testFragmentation(Class cl) throws Exception { - createSymmetricNetwork(cl, ch -> new UnicastResponseSender<>(ch).rawMsgs(true)); + createSymmetricNetwork(cl, ch -> new UnicastResponseSender<>(ch).rawMsgs(true), LON, NYC); String s="hello".repeat(1000); - a.send(b.getAddress(), new Data(REQ, s)); - List list_a=getReceiver(a).list(), list_b=getReceiver(b).list(); - Util.waitUntil(2000, 100, () -> list_a.size() == 1 && list_b.size() == 1); + Address target=addr(cl, c, NYC); + a.send(target, new Data(REQ, s)); + List list_a=getReceiver(a).list(), list_c=getReceiver(c).list(); + Util.waitUntil(2000, 100, () -> list_a.size() == 1 && list_c.size() == 1); } /** Tests state transfer between sites: from C:nyc to A:lon */ public void testStateTransfer(Class cl) throws Exception { - createSymmetricNetwork(cl, ch -> new MyReceiver<>().rawMsgs(true)); + createSymmetricNetwork(cl, ch -> new MyReceiver<>().rawMsgs(true), LON, NYC); MyReceiver r_a=getReceiver(a); MyReceiver r_c=getReceiver(c); @@ -709,7 +710,7 @@ public void testStateTransfer(Class cl) throws Exception { // because an address is not a SiteUUID in RELAY2 (just a regular address), A would not know C // we therefore wrap C's address into a SiteUUID (for RELAY2 only) - Address target=addr(cl, c, "nyc"); + Address target=addr(cl, c, NYC); a.getState(target, 2000); assert r_a.state().size() == 2; } @@ -744,12 +745,12 @@ public void testMulticastWithMultipleSiteMasters(Class cl) thro ch.connect(SFO); Util.waitUntilAllChannelsHaveSameView(5000, 100, _g, _h, _i); - waitUntilRoute(NYC, true, 5000, 500, a); - waitUntilRoute(SFO, true, 5000, 500, a); - waitUntilRoute(LON, true, 5000, 500, d); - waitUntilRoute(SFO, true, 5000, 500, d); - waitUntilRoute(LON, true, 5000, 500, _g); - waitUntilRoute(NYC, true, 5000, 500, _g); + waitUntilRoute(NYC, true, 5000, 500, a,b); + waitUntilRoute(SFO, true, 5000, 500, a,b); + waitUntilRoute(LON, true, 5000, 500, d,e); + waitUntilRoute(SFO, true, 5000, 500, d,e); + waitUntilRoute(LON, true, 5000, 500, _g,_h); + waitUntilRoute(NYC, true, 5000, 500, _g,_h); assert Stream.of(a,b,d,e,_g,_h).map(ch -> ch.getProtocolStack().findProtocol(RELAY.class)) .allMatch(r -> ((RELAY)r).isSiteMaster()); @@ -757,7 +758,7 @@ public void testMulticastWithMultipleSiteMasters(Class cl) thro .noneMatch(r -> ((RELAY)r).isSiteMaster()); - generator.get().forEach(ch -> ch.setReceiver(new MyReceiver().rawMsgs(true))); + generator.get().forEach(ch -> ch.setReceiver(new MyReceiver().rawMsgs(true).name(ch.getName()))); // A and B (site masters) multicast 1 message each: every receiver should have exactly 2 messages a.send(null, "from A"); @@ -770,37 +771,38 @@ public void testMulticastWithMultipleSiteMasters(Class cl) thro System.out.printf("received messages:\n%s\n", printMessages(generator.get())); generator.get().forEach(ch -> getReceiver(ch).reset()); - // destination of SiteMaster(null) is only available in RELAY3: - if(cl.equals(RELAY3.class)) { - // send to all site masters, but only *one* site master from each site is picked - - a.send(new SiteMaster(null), "from A"); - b.send(new SiteMaster(null), "from B"); - - // A sends to itself, plus site masters from NYC (D or E) and SFO (G or H) - // B sends to itself, plus site masters from NYC (D or E) and SFO (G or H) - // -> the default SiteMasterPicker impl in RELAY pick a random site master / route; if we disabled - // this and always picked the first site master / route in the list, only D and G would - // receive messages (2 each); E and H would receive 0 messages - Util.waitUntil(3000, 100, - () -> Stream.of(a,b).map(RelayTests::getReceiver).allMatch(r -> r.size() == 1)); - - // all other site masters (D or E, G or H) get A's and B's message: - Util.waitUntil(3000, 100, - () -> Stream.of(d,e,_g,_h) - // a site master receives 0, 1 or 2 messages: - .map(RelayTests::getReceiver).allMatch(r -> r.size() >= 0 && r.size() <= 2), - () -> printMessages(generator.get())); - System.out.printf("-- received messages:\n%s\n", printMessages(generator.get())); - generator.get().forEach(ch -> getReceiver(ch).reset()); - - c.send(new SiteMaster(null), "from C"); - // same as above: A or B receives 1 message, D or E and G or H - Util.waitUntil(3000, 100, () -> Stream.of(a,b,d,e,_g,_h).map(RelayTests::getReceiver) - .allMatch(r -> r.size() >= 0 && r.size() <= 2), () -> printMessages(generator.get())); - System.out.printf("-- received messages:\n%s\n", printMessages(generator.get())); - generator.get().forEach(ch -> getReceiver(ch).reset()); - } + // cl must be RELAY3: destination of SiteMaster(null) is only available in RELAY3: + // send to all site masters, but only *one* site master from each site is picked + + a.send(new SiteMaster(null), "from A"); + b.send(new SiteMaster(null), "from B"); + + // A sends to itself, plus site masters from NYC (D or E) and SFO (G or H) + // B sends to itself, plus site masters from NYC (D or E) and SFO (G or H) + // -> the default SiteMasterPicker impl in RELAY pick a random site master / route; if we disabled + // this and always picked the first site master / route in the list, only D and G would + // receive messages (2 each); E and H would receive 0 messages + Util.waitUntil(3000, 100, + () -> Stream.of(a,b).map(RelayTests::getReceiver).allMatch(r -> r.size() == 1)); + // D and E must receive a total of 2 messages (from A, from B): + Util.waitUntil(3000, 100, () -> RelayTests.receivedMessages(d,e) == 2, () -> msgs(d, e)); + // G and H must receive a total of 2 messages (from A, from B): + Util.waitUntil(3000, 100, () -> RelayTests.receivedMessages(_g, _h) == 2, () -> msgs(_g,_h)); + + System.out.printf("-- received messages:\n%s\n", printMessages(generator.get())); + generator.get().forEach(ch -> getReceiver(ch).reset()); + + c.send(new SiteMaster(null), "from C"); + // same as above: {A or B} receives 1 message, {D or E} 1 and {G or H} 1 as well + Util.waitUntil(3000, 100, () -> RelayTests.receivedMessages(a,b) == 1, () -> msgs(a,b)); + + // D and E must receive a total of 2 messages (from A, from B): + Util.waitUntil(3000, 100, () -> RelayTests.receivedMessages(d,e) == 1, () -> msgs(d, e)); + // G and H must receive a total of 2 messages (from A, from B): + Util.waitUntil(3000, 100, () -> RelayTests.receivedMessages(_g, _h) == 1, () -> msgs(_g,_h)); + + System.out.printf("-- received messages:\n%s\n", printMessages(generator.get())); + generator.get().forEach(ch -> getReceiver(ch).reset()); // C sends a multicast; A *or* B (but not both) should forward it to the other sites NYC and SFO c.send(null, "from C"); @@ -819,8 +821,7 @@ public void testMulticastWithMultipleSiteMasters(Class cl) thro public void testFailover(Class cl) throws Exception { if(cl.equals(RELAY2.class)) return; - createSymmetricNetwork(cl, ch -> new MyReceiver<>().rawMsgs(true).verbose(true).name(ch.name())); - Util.close(f,e); + createSymmetricNetwork(cl, ch -> new MyReceiver<>().rawMsgs(true).verbose(true).name(ch.name()), LON, NYC); waitForBridgeView(2, 5000, 100, BRIDGE_CLUSTER, a,c); waitForSiteMasters(true, a, c); @@ -852,8 +853,7 @@ public void testFailover(Class cl) throws Exception { public void testFailover2(Class cl) throws Exception { if(cl.equals(RELAY2.class)) return; - createSymmetricNetwork(cl, ch -> new MyReceiver<>().rawMsgs(true).verbose(true).name(ch.name())); - Util.close(f,e); + createSymmetricNetwork(cl, ch -> new MyReceiver<>().rawMsgs(true).verbose(true).name(ch.name()), LON, NYC); waitForBridgeView(2, 5000, 100, BRIDGE_CLUSTER, a,c); waitForSiteMasters(true, a, c); @@ -887,8 +887,7 @@ public void testFailover2(Class cl) throws Exception { public void testFailover3(Class cl) throws Exception { if(cl.equals(RELAY2.class)) return; - createSymmetricNetwork(cl, ch -> new MyReceiver<>().rawMsgs(true).verbose(true).name(ch.getName())); - Util.close(f,e); + createSymmetricNetwork(cl, ch -> new MyReceiver<>().rawMsgs(true).verbose(true).name(ch.getName()), LON, NYC); waitForBridgeView(2, 5000, 100, BRIDGE_CLUSTER, a,c); waitForSiteMasters(true, a, c); @@ -916,8 +915,7 @@ public void testFailover3(Class cl) throws Exception { public void testFailover4(Class cl) throws Exception { if(cl.equals(RELAY2.class)) return; - createSymmetricNetwork(cl, ch -> new MyReceiver<>().rawMsgs(true).verbose(true).name(ch.name())); - Util.close(f, e); + createSymmetricNetwork(cl, ch -> new MyReceiver<>().rawMsgs(true).verbose(true).name(ch.name()), LON, NYC); waitForBridgeView(2, 5000, 100, BRIDGE_CLUSTER, a,c); waitForSiteMasters(true, a, c); @@ -948,9 +946,8 @@ public void testFailoverSiteDown2(Class cl) throws Exception { protected void _testFailoverSiteDown(Class cl, Supplier
s) throws Exception { if(cl.equals(RELAY2.class)) return; - createSymmetricNetwork(cl, ch -> new MyReceiver<>().rawMsgs(true).verbose(true)); + createSymmetricNetwork(cl, ch -> new MyReceiver<>().rawMsgs(true).verbose(true), LON, NYC); Address target=s.get(); - Util.close(f, e); // close NYC, we don't need it for this test waitForBridgeView(2, 5000, 100, BRIDGE_CLUSTER, a,c); waitForSiteMasters(true, a, c); @@ -979,10 +976,9 @@ protected void _testFailoverSiteDown(Class cl, Supplier
cl) throws Exception { - createSymmetricNetwork(cl, ch -> new MyReceiver<>().rawMsgs(true).verbose(true)); + createSymmetricNetwork(cl, ch -> new MyReceiver<>().rawMsgs(true).verbose(true), LON, NYC); RELAY r=a.stack().findProtocol(RELAY.class); boolean relay2=cl.equals(RELAY2.class); - Util.close(f, e); waitForBridgeView(2, 5000, 100, BRIDGE_CLUSTER, a,c); waitForSiteMasters(true, a, c); @@ -997,10 +993,9 @@ public void testSiteDown(Class cl) throws Exception { public void testViewChange(Class cl) throws Exception { if(cl.equals(RELAY2.class)) // since UNICAST3 is bypassed when RELAY2 is above it, we don't need to test RELAY2 return; - createSymmetricNetwork(cl, ch -> new MyReceiver<>().rawMsgs(true).verbose(true).name(ch.name())); + createSymmetricNetwork(cl, ch -> new MyReceiver<>().rawMsgs(true).verbose(true).name(ch.name()), LON, NYC); UNICAST3 unicast=a.getProtocolStack().findProtocol(UNICAST3.class); - Util.close(f, e); waitForBridgeView(2, 5000, 100, BRIDGE_CLUSTER, a,c); waitForSiteMasters(true, a, c); a.send(b.address(), "hello"); @@ -1025,9 +1020,8 @@ public void testViewChange(Class cl) throws Exception { public void testViewChangeDuringRetransmission(Class cl) throws Exception { if(cl.equals(RELAY2.class)) // since UNICAST3 is bypassed when RELAY2 is above it, we don't need to test RELAY2 return; - createSymmetricNetwork(cl, ch -> new MyReceiver<>().rawMsgs(true).verbose(true).name(ch.name())); + createSymmetricNetwork(cl, ch -> new MyReceiver<>().rawMsgs(true).verbose(true).name(ch.name()), LON, NYC); UNICAST3 unicast=a.getProtocolStack().findProtocol(UNICAST3.class); - Util.close(f, e); waitForBridgeView(2, 5000, 100, BRIDGE_CLUSTER, a,c); waitForSiteMasters(true, a, c); @@ -1084,27 +1078,37 @@ protected static JChannel createNode(Class cl, String site_name } + protected void createSymmetricNetwork(Class cl, Function r, String...sites) throws Exception { + if(sites.length == 0 || Util.contains(LON, sites)) + createLON(cl); + if(sites.length == 0 || Util.contains(NYC, sites)) + createNYC(cl); + if(sites.length == 0 || Util.contains(SFO, sites)) + createSFO(cl); + if(r != null) + allChannels().forEach(ch -> ch.setReceiver(r.apply(ch))); + } - protected void createSymmetricNetwork(Class cl, Function r) throws Exception { + protected void createLON(Class cl) throws Exception { a=createNode(cl, LON, "A", BRIDGE_CLUSTER, LON, NYC, SFO); b=createNode(cl, LON, "B", BRIDGE_CLUSTER, LON, NYC, SFO); + Util.waitUntilAllChannelsHaveSameView(5000, 200, a,b); + } + protected void createNYC(Class cl) throws Exception { c=createNode(cl, NYC, "C", BRIDGE_CLUSTER, LON, NYC, SFO); d=createNode(cl, NYC, "D", BRIDGE_CLUSTER, LON, NYC, SFO); + Util.waitUntilAllChannelsHaveSameView(5000, 200, c,d); + } + protected void createSFO(Class cl) throws Exception { e=createNode(cl, SFO, "E", BRIDGE_CLUSTER, LON, NYC, SFO); f=createNode(cl, SFO, "F", BRIDGE_CLUSTER, LON, NYC, SFO); - - if(r != null) - allChannels().forEach(ch -> ch.setReceiver(r.apply(ch))); - - Util.waitUntilAllChannelsHaveSameView(5000, 200, a,b); - Util.waitUntilAllChannelsHaveSameView(5000, 200, c,d); Util.waitUntilAllChannelsHaveSameView(5000, 200, e,f); } protected List allChannels() { - return Arrays.asList(a,b,c,d,e,f); + return Stream.of(a,b,c,d,e,f).filter(Objects::nonNull).collect(Collectors.toList()); } diff --git a/tests/junit-functional/org/jgroups/tests/RelayTests.java b/tests/junit-functional/org/jgroups/tests/RelayTests.java index a7e22c5ac3d..82652106821 100644 --- a/tests/junit-functional/org/jgroups/tests/RelayTests.java +++ b/tests/junit-functional/org/jgroups/tests/RelayTests.java @@ -41,6 +41,10 @@ public class RelayTests { } protected static Protocol[] defaultStack(RELAY relay) { + return defaultStack(relay, new STATE_TRANSFER()); + } + + protected static Protocol[] defaultStack(RELAY relay, Protocol state_transfer) { RELAY2 r2=relay instanceof RELAY2? (RELAY2)relay : null; RELAY3 r3=relay instanceof RELAY3? (RELAY3)relay : null; if(r3 != null) @@ -59,7 +63,7 @@ protected static Protocol[] defaultStack(RELAY relay) { new MFC().setMaxCredits(2_000_000).setMinThreshold(0.4), new FRAG2().setFragSize(1024), r2, - new STATE_TRANSFER() + state_transfer }; return Util.combine(Protocol.class, protocols); } @@ -111,7 +115,7 @@ protected static RELAY createSymmetricRELAY(Class cl, String lo .delaySitesDown(false); // for compatibility with testSitesUp() for(String site: sites) { SiteConfig cfg=new SiteConfig(site) - .addBridge(new RelayConfig.ProgrammaticBridgeConfig(bridge, defaultStack(null))); + .addBridge(new RelayConfig.ProgrammaticBridgeConfig(bridge, defaultStack(null, null))); relay.addSite(site, cfg); } return relay; @@ -145,15 +149,17 @@ protected static void retransmissionsDone(UNICAST3 unicast, Address dest) throws } protected static void waitUntilRoute(String site_name, boolean present, - long timeout, long interval, JChannel ch) throws Exception { - RELAY relay=ch.getProtocolStack().findProtocol(RELAY.class); - if(relay == null) - throw new IllegalArgumentException("protocol RELAY not found"); + long timeout, long interval, JChannel ... channels) throws Exception { + for(JChannel ch: channels) { + RELAY relay=ch.getProtocolStack().findProtocol(RELAY.class); + if(relay == null) + throw new IllegalArgumentException("protocol RELAY not found"); - Util.waitUntil(timeout, interval, () -> { - Route route=relay.getRoute(site_name); - return ((route != null && present) || (route == null && !present)); - }); + Util.waitUntil(timeout, interval, () -> { + Route route=relay.getRoute(site_name); + return ((route != null && present) || (route == null && !present)); + }); + } } protected static Route getRoute(JChannel ch, String site_name) { @@ -210,6 +216,13 @@ protected static int receivedMessages(JChannel ch) { return getReceiver(ch).list().size(); } + protected static int receivedMessages(JChannel ... channels) { + int sum=0; + for(JChannel ch: channels) + sum+=receivedMessages(ch); + return sum; + } + protected static void assertNumMessages(int expected, JChannel ... channels) throws TimeoutException { assertNumMessages(expected, Arrays.asList(channels)); } @@ -303,7 +316,7 @@ public void receive(Message msg) { Object obj=msg.getObject(); Data data=(Data)obj; if(data.type == Data.Type.REQ) { - Message rsp=new ObjectMessage(msg.src(), new Data(Data.Type.RSP,String.valueOf(ch.getAddress()))); + Message rsp=new ObjectMessage(msg.src(), new Data(Data.Type.RSP, java.lang.String.valueOf(ch.getAddress()))); if(msg.isFlagSet(Message.Flag.NO_RELAY)) rsp.setFlag(Message.Flag.NO_RELAY); try { @@ -346,7 +359,7 @@ protected MySiteConfig addForward(String to, String gateway) { } protected static class Data implements SizeStreamable { - enum Type {REQ,RSP} + protected enum Type {REQ,RSP} protected Type type; protected String payload; @@ -374,7 +387,7 @@ public void readFrom(DataInput in) throws IOException, ClassNotFoundException { } public String toString() { - return String.format("%s: %s", type, payload); + return java.lang.String.format("%s: %s", type, payload); } } } diff --git a/tests/junit-functional/org/jgroups/tests/SizeTest.java b/tests/junit-functional/org/jgroups/tests/SizeTest.java index 9977a94b71a..52c827a1684 100644 --- a/tests/junit-functional/org/jgroups/tests/SizeTest.java +++ b/tests/junit-functional/org/jgroups/tests/SizeTest.java @@ -194,21 +194,6 @@ public void testFdSockHeaders() throws Exception { _testSize(hdr) ; } - public void testHeaderMarshalling() throws IOException, ClassNotFoundException { - Header[] headers={ - NakAckHeader2.createMessageHeader(322649), - UnicastHeader3.createDataHeader(1024, (short)22, true), - new Frag3Header(22, 2, 3), - null, null - }; - ByteArray buf=Headers.writeHeaders(headers); - List
hdrs=Headers.readHeaders(buf); - assert hdrs.size() == 3; - assert hdrs.get(0).getClass().equals(NakAckHeader2.class); - assert ((UnicastHeader3)hdrs.get(1)).first(); - assert ((Frag3Header)hdrs.get(2)).getFragId() == 2; - } - public void testUnicast3Header() throws Exception { UnicastHeader3 hdr=UnicastHeader3.createDataHeader(322649, (short)127, false); _testSize(hdr); @@ -619,7 +604,8 @@ public void testRelayHeader() throws Exception { .addToVisitedSites(List.of("nyc", "sfc", "lon")); _testSize(hdr); - hdr.originalHeaders(new ByteArray(new byte[]{'1','2', '3', '4'}, 0, 4)); + Header[] hdrs=headers(); + hdr.originalHeaders(hdrs); _testSize(hdr); RelayHeader hdr2=hdr.copy(); @@ -674,9 +660,6 @@ public static void testIpAddress3() throws Exception { } - - - public static void testWriteAddress() throws Exception { Address uuid=UUID.randomUUID(); _testWriteAddress(uuid); @@ -691,24 +674,6 @@ public static void testWriteAddress() throws Exception { _testWriteAddress(addr); } - private static void _testWriteAddress(Address addr) throws Exception { - int len=Util.size(addr); - ByteArrayOutputStream output=new ByteArrayOutputStream(); - DataOutputStream out=new DataOutputStream(output); - Util.writeAddress(addr, out); - out.flush(); - byte[] buf=output.toByteArray(); - out.close(); - - System.out.println("\nlen=" + len + ", serialized length=" + buf.length); - assert len == buf.length; - DataInputStream in=new DataInputStream(new ByteArrayInputStream(buf)); - Address new_addr=Util.readAddress(in); - System.out.println("old addr=" + addr + "\nnew addr=" + new_addr); - assert addr.equals(new_addr); - } - - public static void testWriteAddresses() throws Exception { List
list=new ArrayList<>(); @@ -723,24 +688,6 @@ public static void testWriteAddresses() throws Exception { _testWriteAddresses(list); } - private static void _testWriteAddresses(List
list) throws Exception { - long len=Util.size(list); - ByteArrayOutputStream output=new ByteArrayOutputStream(); - DataOutputStream out=new DataOutputStream(output); - Util.writeAddresses(list, out); - out.flush(); - byte[] buf=output.toByteArray(); - out.close(); - - System.out.println("\nlen=" + len + ", serialized length=" + buf.length); - assert len == buf.length; - DataInputStream in=new DataInputStream(new ByteArrayInputStream(buf)); - Collection
new_list=Util.readAddresses(in, ArrayList::new); - System.out.println("old list=" + list + "\nnew list=" + new_list); - assert list.equals(new_list); - } - - public void testUUID() throws Exception { org.jgroups.util.UUID uuid=org.jgroups.util.UUID.randomUUID(); @@ -836,6 +783,39 @@ protected static void _test(DH_KEY_EXCHANGE.DhHeader hdr) throws Exception { assert Arrays.equals(hdr.dhKey(), hdr2.dhKey()); } + private static void _testWriteAddresses(List
list) throws Exception { + long len=Util.size(list); + ByteArrayOutputStream output=new ByteArrayOutputStream(); + DataOutputStream out=new DataOutputStream(output); + Util.writeAddresses(list, out); + out.flush(); + byte[] buf=output.toByteArray(); + out.close(); + + System.out.println("\nlen=" + len + ", serialized length=" + buf.length); + assert len == buf.length; + DataInputStream in=new DataInputStream(new ByteArrayInputStream(buf)); + Collection
new_list=Util.readAddresses(in, ArrayList::new); + System.out.println("old list=" + list + "\nnew list=" + new_list); + assert list.equals(new_list); + } + + private static void _testWriteAddress(Address addr) throws Exception { + int len=Util.size(addr); + ByteArrayOutputStream output=new ByteArrayOutputStream(); + DataOutputStream out=new DataOutputStream(output); + Util.writeAddress(addr, out); + out.flush(); + byte[] buf=output.toByteArray(); + out.close(); + + System.out.println("\nlen=" + len + ", serialized length=" + buf.length); + assert len == buf.length; + DataInputStream in=new DataInputStream(new ByteArrayInputStream(buf)); + Address new_addr=Util.readAddress(in); + System.out.println("old addr=" + addr + "\nnew addr=" + new_addr); + assert addr.equals(new_addr); + } private static void _testMarshalling(UnicastHeader3 hdr) throws Exception { byte[] buf=Util.streamableToByteBuffer(hdr); @@ -930,7 +910,6 @@ private static void _testSize(JoinRsp rsp) throws Exception { assert Util.match(rsp.getFailReason(), rsp2.getFailReason()); } - private static void _testSize(SizeStreamable data) throws Exception { System.out.println("\ndata: " + data); long size=data.serializedSize(); @@ -939,5 +918,12 @@ private static void _testSize(SizeStreamable data) throws Exception { assert serialized_form.length == size : "serialized length=" + serialized_form.length + ", size=" + size; } + private static Header[] headers() { + return new Header[] { + UnicastHeader3.createDataHeader(322649L, (short)1, false), + new RequestCorrelator.Header((byte)1, 33L, (short)2), + new FORK.ForkHeader("foo", "bar") + }; + } }