Skip to content

Commit

Permalink
- UNICAST3: removes only local non-members on view change, not remote…
Browse files Browse the repository at this point in the history
… (different site addresses)
  • Loading branch information
belaban committed Oct 4, 2023
1 parent 9341409 commit 968663a
Showing 1 changed file with 30 additions and 6 deletions.
36 changes: 30 additions & 6 deletions src/org/jgroups/protocols/UNICAST3.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.conf.AttributeType;
import org.jgroups.protocols.relay.RELAY;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.*;

import java.util.*;
Expand Down Expand Up @@ -124,15 +126,19 @@ public class UNICAST3 extends Protocol implements AgeOutCache.Handler<Address> {
@ManagedAttribute(description="tracing is enabled or disabled for the given log",writable=true)
protected boolean is_trace=log.isTraceEnabled();

@ManagedAttribute(description="Whether or not a RELAY protocol was found below in the stack")
protected boolean relay_present;

/* --------------------------------------------- Fields ------------------------------------------------ */


protected final ConcurrentMap<Address, SenderEntry> send_table=Util.createConcurrentMap();
protected final ConcurrentMap<Address, ReceiverEntry> recv_table=Util.createConcurrentMap();
protected final ConcurrentMap<Address,SenderEntry> send_table=Util.createConcurrentMap();
protected final ConcurrentMap<Address,ReceiverEntry> recv_table=Util.createConcurrentMap();

protected final ReentrantLock recv_table_lock=new ReentrantLock();

/** Used by the retransmit task to keep the last retransmitted seqno per sender (https://issues.redhat.com/browse/JGRP-1539) */
/** Used by the retransmit task to keep the last retransmitted seqno per member (applicable only
* for received messages (ReceiverEntry)): https://issues.redhat.com/browse/JGRP-1539 */
protected final Map<Address,Long> xmit_task_map=new HashMap<>();

/** RetransmitTask running every xmit_interval ms */
Expand Down Expand Up @@ -171,6 +177,12 @@ public class UNICAST3 extends Protocol implements AgeOutCache.Handler<Address> {

protected static final BiConsumer<MessageBatch,Message> BATCH_ACCUMULATOR=MessageBatch::add;

/** Used for testing only! */
public Table<Message> getSendWindow(Address target) {
SenderEntry entry=send_table.get(target);
return entry != null? entry.msgs : null;
}

@ManagedAttribute(description="Returns the number of outgoing (send) connections")
public int getNumSendConnections() {
return send_table.size();
Expand Down Expand Up @@ -409,6 +421,7 @@ public void init() throws Exception {
throw new IllegalStateException(e);
}
}
relay_present=ProtocolStack.findProtocol(this.down_prot, RELAY.class) != null;
}


Expand Down Expand Up @@ -617,7 +630,8 @@ public Object down(Event evt) {

if(!non_members.isEmpty()) {
log.trace("%s: closing connections of non members %s", local_addr, non_members);
non_members.forEach(this::closeConnection);
// remove all non members, except those from remote sites: https://issues.redhat.com/browse/JGRP-2729
non_members.stream().filter(this::isLocal).forEach(this::closeConnection);
}
if(!new_members.isEmpty()) {
for(Address mbr: new_members) {
Expand Down Expand Up @@ -706,8 +720,18 @@ public Object down(Message msg) {
protected boolean isLocalSiteMaster(Address dest) {
// quick check to avoid the use of 'instanceof'; will be removed once https://bugs.openjdk.org/browse/JDK-8180450
// has been fixed (in Java 22, should be backported to older versions)
if(dest.isSiteMaster()) {
Object ret=down_prot.down(new Event(Event.IS_LOCAL, dest));
if(relay_present && dest.isSiteMaster()) {
Object ret=down_prot.down(new Event(Event.IS_LOCAL_SITEMASTER, dest));
return ret != null && (Boolean)ret;
}
return false;
}

protected boolean isLocal(Address addr) {
// quick check to avoid the use of 'instanceof'; will be removed once https://bugs.openjdk.org/browse/JDK-8180450
// has been fixed (in Java 22, should be backported to older versions)
if(relay_present && addr.isSiteAddress()) {
Object ret=down_prot.down(new Event(Event.IS_LOCAL, addr));
return ret != null && (Boolean)ret;
}
return false;
Expand Down

0 comments on commit 968663a

Please sign in to comment.