diff --git a/remote/src/main/mima-filters/1.1.x.backwards.excludes/quarantine.backwards.excludes b/remote/src/main/mima-filters/1.1.x.backwards.excludes/quarantine.backwards.excludes new file mode 100644 index 00000000000..bcf94007380 --- /dev/null +++ b/remote/src/main/mima-filters/1.1.x.backwards.excludes/quarantine.backwards.excludes @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# changes made due to issues with downing during harmless quarantine +# https://github.com/apache/pekko/issues/578 +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.artery.AssociationState#QuarantinedTimestamp.copy") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.artery.AssociationState#QuarantinedTimestamp.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.artery.AssociationState#QuarantinedTimestamp.apply") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.artery.AssociationState#QuarantinedTimestamp.unapply") +ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.artery.AssociationState$QuarantinedTimestamp$") diff --git a/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala index 44103d42eda..76e618b758a 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/artery/ArteryTransport.scala @@ -108,9 +108,9 @@ private[remote] object AssociationState { quarantined = ImmutableLongMap.empty[QuarantinedTimestamp], new AtomicReference(UniqueRemoteAddressValue(None, Nil))) - final case class QuarantinedTimestamp(nanoTime: Long) { + final case class QuarantinedTimestamp(nanoTime: Long, harmless: Boolean = false) { override def toString: String = - s"Quarantined ${TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - nanoTime)} seconds ago" + s"Quarantined ${TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - nanoTime)} seconds ago (harmless=$harmless)" } private final case class UniqueRemoteAddressValue( @@ -159,6 +159,13 @@ private[remote] final class AssociationState private ( def isQuarantined(uid: Long): Boolean = quarantined.contains(uid) + def quarantinedButHarmless(uid: Long): Boolean = { + quarantined.get(uid) match { + case OptionVal.Some(qt) => qt.harmless + case _ => false + } + } + @tailrec def completeUniqueRemoteAddress(peer: UniqueAddress): Unit = { val current = _uniqueRemoteAddress.get() if (current.uniqueRemoteAddress.isEmpty) { @@ -196,14 +203,14 @@ private[remote] final class AssociationState private ( quarantined, new AtomicReference(UniqueRemoteAddressValue(Some(remoteAddress), Nil))) - def newQuarantined(): AssociationState = + def newQuarantined(harmless: Boolean = false): AssociationState = uniqueRemoteAddress() match { case Some(a) => new AssociationState( incarnation, lastUsedTimestamp = new AtomicLong(System.nanoTime()), controlIdleKillSwitch, - quarantined = quarantined.updated(a.uid, QuarantinedTimestamp(System.nanoTime())), + quarantined = quarantined.updated(a.uid, QuarantinedTimestamp(System.nanoTime(), harmless)), _uniqueRemoteAddress) case None => this } diff --git a/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala b/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala index 6bf999338ff..b97fef185cf 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala @@ -538,7 +538,7 @@ private[remote] class Association( current.uniqueRemoteAddress() match { case Some(peer) if peer.uid == u => if (!current.isQuarantined(u)) { - val newState = current.newQuarantined() + val newState = current.newQuarantined(harmless) if (swapState(current, newState)) { // quarantine state change was performed if (harmless) { diff --git a/remote/src/main/scala/org/apache/pekko/remote/artery/InboundQuarantineCheck.scala b/remote/src/main/scala/org/apache/pekko/remote/artery/InboundQuarantineCheck.scala index 7e845457124..b06940dfe9f 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/artery/InboundQuarantineCheck.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/artery/InboundQuarantineCheck.scala @@ -45,17 +45,26 @@ private[remote] class InboundQuarantineCheck(inboundContext: InboundContext) env.association match { case OptionVal.Some(association) => if (association.associationState.isQuarantined(env.originUid)) { - if (log.isDebugEnabled) - log.debug( - "Dropping message [{}] from [{}#{}] because the system is quarantined", + if (association.associationState.quarantinedButHarmless(env.originUid)) { + log.info( + "Message [{}] from [{}#{}] was dropped. " + + "The system is quarantined but the UID is known to be harmless.", Logging.messageClassName(env.message), association.remoteAddress, env.originUid) - // avoid starting outbound stream for heartbeats - if (!env.message.isInstanceOf[Quarantined] && !isHeartbeat(env.message)) - inboundContext.sendControl( - association.remoteAddress, - Quarantined(inboundContext.localAddress, UniqueAddress(association.remoteAddress, env.originUid))) + } else { + if (log.isDebugEnabled) + log.debug( + "Dropping message [{}] from [{}#{}] because the system is quarantined", + Logging.messageClassName(env.message), + association.remoteAddress, + env.originUid) + // avoid starting outbound stream for heartbeats + if (!env.message.isInstanceOf[Quarantined] && !isHeartbeat(env.message)) + inboundContext.sendControl( + association.remoteAddress, + Quarantined(inboundContext.localAddress, UniqueAddress(association.remoteAddress, env.originUid))) + } pull(in) } else push(out, env) diff --git a/remote/src/test/scala/org/apache/pekko/remote/artery/OutboundIdleShutdownSpec.scala b/remote/src/test/scala/org/apache/pekko/remote/artery/OutboundIdleShutdownSpec.scala index 39beddb4a23..c57350a745e 100644 --- a/remote/src/test/scala/org/apache/pekko/remote/artery/OutboundIdleShutdownSpec.scala +++ b/remote/src/test/scala/org/apache/pekko/remote/artery/OutboundIdleShutdownSpec.scala @@ -116,6 +116,8 @@ class OutboundIdleShutdownSpec extends ArteryMultiNodeSpec(s""" val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid localArtery.quarantine(remoteAddress, Some(remoteUid), "Test") + association.associationState.isQuarantined(remoteUid) shouldBe true + association.associationState.quarantinedButHarmless(remoteUid) shouldBe false eventually { assertStreamActive(association, Association.ControlQueueIndex, expected = false) @@ -128,6 +130,80 @@ class OutboundIdleShutdownSpec extends ArteryMultiNodeSpec(s""" } } + "eliminate quarantined association when not used (harmless=true)" in withAssociation { + (_, remoteAddress, _, localArtery, _) => + val association = localArtery.association(remoteAddress) + val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid + + localArtery.quarantine(remoteAddress, Some(remoteUid), "HarmlessTest", harmless = true) + association.associationState.isQuarantined(remoteUid) shouldBe true + association.associationState.quarantinedButHarmless(remoteUid) shouldBe true + + eventually { + assertStreamActive(association, Association.ControlQueueIndex, expected = false) + assertStreamActive(association, Association.OrdinaryQueueIndex, expected = false) + } + + // the outbound streams are inactive and association quarantined, then it's completely removed + eventually { + localArtery.remoteAddresses should not contain remoteAddress + } + } + + "eliminate quarantined association when not used - echo test" in withAssociation { + (remoteSystem, remoteAddress, _, localArtery, localProbe) => + // event to watch out for, indicator of the issue + remoteSystem.eventStream.subscribe(testActor, classOf[ThisActorSystemQuarantinedEvent]) + + val remoteEcho = remoteSystem.actorSelection("/user/echo").resolveOne(remainingOrDefault).futureValue + + val localAddress = RARP(system).provider.getDefaultAddress + + val localEchoRef = + remoteSystem.actorSelection(RootActorPath(localAddress) / localProbe.ref.path.elements).resolveOne( + remainingOrDefault).futureValue + remoteEcho.tell("ping", localEchoRef) + localProbe.expectMsg("ping") + + val association = localArtery.association(remoteAddress) + val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid + localArtery.quarantine(remoteAddress, Some(remoteUid), "Test") + association.associationState.isQuarantined(remoteUid) shouldBe true + association.associationState.quarantinedButHarmless(remoteUid) shouldBe false + + eventually { + remoteEcho.tell("ping", localEchoRef) // trigger sending message from remote to local, which will trigger local to wrongfully notify remote that it is quarantined + expectMsgType[ThisActorSystemQuarantinedEvent] // this is what remote emits when it learns it is quarantined by local + } + } + + "eliminate quarantined association when not used - echo test (harmless=true)" in withAssociation { + (remoteSystem, remoteAddress, _, localArtery, localProbe) => + // event to watch out for, indicator of the issue + remoteSystem.eventStream.subscribe(testActor, classOf[ThisActorSystemQuarantinedEvent]) + + val remoteEcho = remoteSystem.actorSelection("/user/echo").resolveOne(remainingOrDefault).futureValue + + val localAddress = RARP(system).provider.getDefaultAddress + + val localEchoRef = + remoteSystem.actorSelection(RootActorPath(localAddress) / localProbe.ref.path.elements).resolveOne( + remainingOrDefault).futureValue + remoteEcho.tell("ping", localEchoRef) + localProbe.expectMsg("ping") + + val association = localArtery.association(remoteAddress) + val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid + localArtery.quarantine(remoteAddress, Some(remoteUid), "HarmlessTest", harmless = true) + association.associationState.isQuarantined(remoteUid) shouldBe true + association.associationState.quarantinedButHarmless(remoteUid) shouldBe true + + eventually { + remoteEcho.tell("ping", localEchoRef) // trigger sending message from remote to local, which will trigger local to wrongfully notify remote that it is quarantined + expectNoMessage() + } + } + "remove inbound compression after quarantine" in withAssociation { (_, remoteAddress, _, localArtery, _) => val association = localArtery.association(remoteAddress) val remoteUid = futureUniqueRemoteAddress(association).futureValue.uid