Skip to content

Commit

Permalink
- Store headers in RelayHeader before routing and restore on receptio…
Browse files Browse the repository at this point in the history
…n (RELAY3: https://issues.redhat.com/browse/JGRP-2744)

- DiagnosticsHandler checks for address compatibility when joining mcast groups on network interfaces
  • Loading branch information
belaban committed Dec 28, 2023
1 parent df1ee90 commit 254cfef
Show file tree
Hide file tree
Showing 13 changed files with 226 additions and 253 deletions.
55 changes: 5 additions & 50 deletions src/org/jgroups/BaseMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@
package org.jgroups;


import org.jgroups.conf.ClassConfigurator;
import org.jgroups.util.ByteArray;
import org.jgroups.util.Headers;
import org.jgroups.util.Util;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -51,15 +48,9 @@ public BaseMessage(Address dest) {
public Message setSrc(Address new_src) {sender=new_src; return this;}
public int getNumHeaders() {return Headers.size(this.headers);}
public Map<Short,Header> getHeaders() {return Headers.getHeaders(this.headers);}
public Header[] headers() {return headers;} // don't modify!
public Message headers(Header[] hdrs) {this.headers=hdrs; return this;} // use with caution!
public String printHeaders() {return Headers.printHeaders(this.headers);}
public ByteArray writeHeaders() throws IOException {return Headers.writeHeaders(this.headers);}
public Message readHeaders(ByteArray ba) throws IOException, ClassNotFoundException {
List<Header> hdrs=Headers.readHeaders(ba);
for(Header h: hdrs)
putHeader(h.getProtId(), h);
return this;
}


/**
* Sets a number of flags in a message
Expand Down Expand Up @@ -268,7 +259,7 @@ public void writeTo(DataOutput out) throws IOException {
Util.writeAddress(sender, out);

// write the headers
writeHeaders(this.headers, out, (short[])null);
Headers.writeHeaders(this.headers, out, (short[])null);

// finally write the payload
writePayload(out);
Expand All @@ -293,7 +284,7 @@ public void writeToNoAddrs(Address src, DataOutput out, short... excluded_header
Util.writeAddress(sender, out);

// write the headers
writeHeaders(this.headers, out, excluded_headers);
Headers.writeHeaders(this.headers, out, excluded_headers);

// finally write the payload
writePayload(out);
Expand All @@ -316,14 +307,7 @@ public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
sender=Util.readAddress(in);

// 5. headers
int len=in.readShort();
if(this.headers == null || len > this.headers.length)
this.headers=createHeaders(len);
for(int i=0; i < len; i++) {
short id=in.readShort();
Header hdr=readHeader(in).setProtId(id);
this.headers[i]=hdr;
}
this.headers=Headers.readHeaders(in);
readPayload(in);
}

Expand All @@ -333,35 +317,6 @@ protected Message copyPayload(Message copy) {
return copy;
}

protected static void writeHeaders(Header[] hdrs, DataOutput out, short ... excluded_headers) throws IOException {
int size=Headers.size(hdrs, excluded_headers);
out.writeShort(size);
if(size > 0) {
for(Header hdr : hdrs) {
if(hdr == null)
break;
short id=hdr.getProtId();
if(Util.containsId(id, excluded_headers))
continue;
out.writeShort(id);
writeHeader(hdr, out);
}
}
}

protected static void writeHeader(Header hdr, DataOutput out) throws IOException {
short magic_number=hdr.getMagicId();
out.writeShort(magic_number);
hdr.writeTo(out);
}

protected static Header readHeader(DataInput in) throws IOException, ClassNotFoundException {
short magic_number=in.readShort();
Header hdr=ClassConfigurator.create(magic_number);
hdr.readFrom(in);
return hdr;
}

protected static Header[] createHeaders(int size) {
return size > 0? new Header[size] : new Header[Util.DEFAULT_HEADERS];
}
Expand Down
9 changes: 3 additions & 6 deletions src/org/jgroups/protocols/relay/RELAY2.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,10 @@ protected void handleRelayMessage(Message msg) {
log.warn("%s: received a message without a relay header; discarding it", local_addr);
return;
}
try {
msg.clearHeaders(); // remove all headers added by the bridge cluster
Header[] original_hdrs=hdr.originalHeaders();
if(original_hdrs != null && Headers.size(original_hdrs) > 0) { // only true with RELAY3
((BaseMessage)msg).headers(original_hdrs); // overwrites headers added by the bridge cluster
msg.putHeader(id, hdr);
((BaseMessage)msg).readHeaders(hdr.originalHeaders());
}
catch(Exception ex) {
log.error("%s: failed handling message relayed from %s: %s", local_addr, msg.src(), ex);
}

if(hdr.final_dest != null) {
Expand Down
19 changes: 11 additions & 8 deletions src/org/jgroups/protocols/relay/RELAY3.java
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,14 @@ protected void handleRelayMessage(Message msg) {
return;
}
try {
Header[] original_hdrs=hdr.originalHeaders();
Message copy=copy(msg).dest(hdr.final_dest).src(hdr.original_sender);
copy.clearHeaders(); // remove all headers added by the bridge cluster
copy.clearHeaders();
if(original_hdrs != null && Headers.size(original_hdrs) > 0)
((BaseMessage)copy).headers(original_hdrs); // removes/overwrites all headers added by the bridge cluster
copy.putHeader(id, hdr);
((BaseMessage)copy).readHeaders(hdr.originalHeaders());
copy.setFlag(hdr.originalFlags(), false);
if(msg.dest() != null)
copy.setFlag(hdr.originalFlags(), false);
// todo: check if copy is needed!
process(true, copy);
}
Expand Down Expand Up @@ -379,7 +382,7 @@ protected Object routeThen(Message msg, List<String> sites, Supplier<Object> act
return action != null? action.get() : null;
}

/** This method has all of the routing logic, for both site masters and regular members */
/** This method has all the routing logic, for both site masters and regular members */
protected Object process(boolean down, Message msg) {
Address dest=msg.dest();
SiteAddress dst=null;
Expand All @@ -389,7 +392,7 @@ protected Object process(boolean down, Message msg) {
case ALL:
if(down)
return routeThen(msg, null,() -> deliver(null, msg, true));
return dontRoute(msg)? passUp(msg) : routeThen(msg, null, () -> passUp(msg));
return mustBeRouted(msg)? routeThen(msg, null, () -> passUp(msg)) : passUp(msg);
case SM_ALL:
return routeThen(msg, null, () -> passUp(msg));
case SM:
Expand Down Expand Up @@ -437,12 +440,12 @@ protected Object process(boolean down, Message msg) {
* multiple site masters, and this site master is picked to route the message, then return true, else return false.
* JIRA: https://issues.redhat.com/browse/JGRP-2696
*/
protected boolean dontRoute(Message msg) {
protected boolean mustBeRouted(Message msg) {
if(msg.isFlagSet(Flag.NO_RELAY))
return true; // don't route
return false; // don't route
final List<Address> sms=site_masters;
if(sms == null || sms.size() < 2)
return false; // do route
return true; // do route
Address first_sm=sms.get(0);
return local_addr.equals(first_sm);
}
Expand Down
23 changes: 11 additions & 12 deletions src/org/jgroups/protocols/relay/RelayHeader.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import org.jgroups.Global;
import org.jgroups.Header;
import org.jgroups.util.Bits;
import org.jgroups.util.ByteArray;
import org.jgroups.util.Headers;
import org.jgroups.util.Util;

import java.io.DataInput;
Expand Down Expand Up @@ -35,7 +35,10 @@ public class RelayHeader extends Header {
protected Set<String> visited_sites;
// used with TOPO_REQ: when set, return the entire cache, otherwise only information about the local members
protected boolean return_entire_cache;
protected ByteArray original_hdrs; // marshalled headers (https://issues.redhat.com/browse/JGRP-2729)

// marshalled headers (https://issues.redhat.com/browse/JGRP-2729),
// changed in https://issues.redhat.com/browse/JGRP-2744
protected Header[] original_hdrs;
protected short original_flags;


Expand Down Expand Up @@ -63,8 +66,8 @@ public RelayHeader(byte type, Address final_dest, Address original_sender) {
public boolean hasSites() {return sites != null && !sites.isEmpty();}
public boolean returnEntireCache() {return return_entire_cache;}
public RelayHeader returnEntireCache(boolean b) {return_entire_cache=b; return this;}
public ByteArray originalHeaders() {return original_hdrs;}
public RelayHeader originalHeaders(ByteArray ba) {original_hdrs=ba; return this;}
public Header[] originalHeaders() {return original_hdrs;}
public RelayHeader originalHeaders(Header[] hdrs) {original_hdrs=hdrs; return this;}
public short originalFlags() {return original_flags;}
public RelayHeader originalFlags(short fl) {original_flags=fl; return this;}

Expand Down Expand Up @@ -128,7 +131,8 @@ public int serializedSize() {
assertNonNullSites();
return Global.BYTE_SIZE*2 + Util.size(final_dest) + Util.size(original_sender) +
sizeOf(sites) + sizeOf(visited_sites) +
Global.BYTE_SIZE + (original_hdrs != null? originalHeaders().serializedSize() : 0) + Short.BYTES; // orig-flags
Short.BYTES /* num headers */ + Headers.marshalledSize(original_hdrs)
+ Short.BYTES; // orig-flags
}

@Override
Expand All @@ -148,9 +152,7 @@ public void writeTo(DataOutput out) throws IOException {
Bits.writeString(s, out);
}
assertNonNullSites();
out.writeBoolean(original_hdrs != null);
if(original_hdrs != null)
original_hdrs.writeTo(out);
Headers.writeHeaders(original_hdrs, out, (short[])null);
out.writeShort(original_flags);
}

Expand All @@ -173,10 +175,7 @@ public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
visited_sites.add(Bits.readString(in));
}
assertNonNullSites();
if(in.readBoolean()) {
original_hdrs=new ByteArray(null, 0, 0);
original_hdrs.readFrom(in);
}
original_hdrs=Headers.readHeaders(in);
original_flags=in.readShort();
}

Expand Down
26 changes: 14 additions & 12 deletions src/org/jgroups/protocols/relay/Route.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package org.jgroups.protocols.relay;


import org.jgroups.Address;
import org.jgroups.BaseMessage;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.*;
import org.jgroups.logging.Log;
import org.jgroups.util.ByteArray;
import org.jgroups.util.Headers;
import org.jgroups.util.Util;

import java.io.IOException;
import java.util.Collection;
import java.util.Objects;

import static org.jgroups.protocols.relay.RelayHeader.DATA;

Expand All @@ -34,7 +32,7 @@ public Route(Address site_master, JChannel bridge, RELAY relay, Log log) {
this.bridge=bridge;
this.relay=relay;
this.log=log;
this.relay3=relay instanceof RELAY3;
this.relay3=Objects.requireNonNull(relay) instanceof RELAY3;
}

public JChannel bridge() {return bridge;}
Expand Down Expand Up @@ -90,15 +88,19 @@ public String toString() {
protected Message createMessage(Address target, Address final_destination, Address original_sender,
final Message msg, Collection<String> visited_sites) throws IOException {
Message copy=relay.copy(msg).setDest(target).setSrc(null);
ByteArray marshalled_hdrs=((BaseMessage)copy).writeHeaders();
RelayHeader tmp=msg.getHeader(relay.getId());
RelayHeader hdr=tmp != null? tmp.copy().setFinalDestination(final_destination).setOriginalSender(original_sender)
: new RelayHeader(DATA, final_destination, original_sender);
hdr.addToVisitedSites(visited_sites)
// to prevent local headers getting mixed up with bridge headers: https://issues.redhat.com/browse/JGRP-2729
.originalHeaders(marshalled_hdrs)
.originalFlags(copy.getFlags()); // store the original flags, will be restored at the receiver
copy.clearHeaders();
hdr.addToVisitedSites(visited_sites)
.originalFlags(copy.getFlags()); // store the original flags, will be restored at the receiver
if(relay3) {
// to prevent local headers getting mixed up with bridge headers: https://issues.redhat.com/browse/JGRP-2729
Header[] original_hdrs=((BaseMessage)copy).headers();
if(Headers.size(original_hdrs) > 0) {
hdr.originalHeaders(original_hdrs);
copy.clearHeaders();
}
}
copy.putHeader(relay.getId(), hdr);
return copy;
}
Expand Down
8 changes: 6 additions & 2 deletions src/org/jgroups/stack/DiagnosticsHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ protected void bindToInterfaces(List<NetworkInterface> interfaces, MulticastSock
try {
if(Util.isUp(i)) {
List<InterfaceAddress> inet_addrs=i.getInterfaceAddresses();
if(inet_addrs != null && !inet_addrs.isEmpty()) { // fix for VM crash - suggested by JJalenak@netopia.com
if(inet_addrs != null && !inet_addrs.isEmpty() && isCompatible(mcast_addr, inet_addrs)) { // fix for VM crash - suggested by JJalenak@netopia.com
s.joinGroup(group_addr, i);
log.trace("joined %s on %s", group_addr, i.getName());
}
Expand All @@ -396,7 +396,11 @@ protected void bindToInterfaces(List<NetworkInterface> interfaces, MulticastSock
}
}
}


/** Checks if there's any address in the address list that's compatible (same address family) to addr */
protected static boolean isCompatible(InetAddress addr, List<InterfaceAddress> addrs) {
return addrs.stream().map(InterfaceAddress::getAddress).anyMatch(a -> Objects.equals(a.getClass(), addr.getClass()));
}

public interface ProbeHandler {
/**
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/stack/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ public Object down(Message msg) {
* An event was received from the protocol below. Usually the current protocol will want to examine the event type
* and - depending on its type - perform some computation (e.g. removing headers from a MSG event type, or updating
* the internal membership list when receiving a VIEW_CHANGE event).
* Finally the event is either a) discarded, or b) an event is sent down the stack using {@code down_prot.down()}
* Finally, the event is either a) discarded, or b) an event is sent down the stack using {@code down_prot.down()}
* or c) the event (or another event) is sent up the stack using {@code up_prot.up()}.
*/
public Object up(Event evt) {
Expand Down
3 changes: 1 addition & 2 deletions src/org/jgroups/stack/ProtocolStack.java
Original file line number Diff line number Diff line change
Expand Up @@ -944,8 +944,7 @@ public void up(MessageBatch batch) {
}

public Object down(Event evt) {
if(top_prot != null)
return top_prot.down(evt);
if(top_prot != null) return top_prot.down(evt);
return null;
}

Expand Down
Loading

0 comments on commit 254cfef

Please sign in to comment.