From a7796bf7687224f80eab3d7f1f4168e1ec85b67b Mon Sep 17 00:00:00 2001 From: Alejandro Cabeza Romero Date: Thu, 29 Aug 2024 15:18:10 +0200 Subject: [PATCH 1/9] Implement D parameter tests. --- tests/pubsub/testgossipsub.nim | 70 ++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index e5def7852a..c22280de83 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -37,6 +37,9 @@ import ../helpers, ../utils/[async, futures, async, tests] proc `$`(peer: PubSubPeer): string = shortLog(peer) +proc voidTopicHandler(topic: string, data: seq[byte]) {.async.} = + discard + template tryPublish( call: untyped, require: int, wait = 10.milliseconds, timeout = 5.seconds ): untyped = @@ -1252,3 +1255,70 @@ suite "GossipSub": await allFuturesThrowing(node0.switch.stop(), node1.switch.stop()) await allFuturesThrowing(nodesFut.concat()) + +suite "Gossipsub Parameters": + teardown: + checkTrackers() + + asyncTest "dont prune peers if mesh len is less than d_high": + let + numberOfNodes = 5 + topic = "foobar" + nodes = generateNodes(numberOfNodes, gossip = true) + nodesFut = await allFinished(nodes.mapIt(it.switch.start())) + + await subscribeNodes(nodes) + + for node in nodes: + node.subscribe(topic, voidTopicHandler) + + for x in 0 ..< numberOfNodes: + for y in 0 ..< numberOfNodes: + if x != y: + await waitSub(nodes[x], nodes[y], topic) + + let expectedNumberOfPeers = numberOfNodes - 1 + for i in 0 ..< numberOfNodes: + var gossip = GossipSub(nodes[i]) + check: + gossip.gossipsub[topic].len == expectedNumberOfPeers + gossip.mesh[topic].len == expectedNumberOfPeers + gossip.fanout.len == 0 + + await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) + + asyncTest "prune peers if mesh len is higher than d_high": + let + numberofNodes = 15 + topic = "foobar" + nodes = generateNodes(numberofNodes, gossip = true) + nodesFut = await allFinished(nodes.mapIt(it.switch.start())) + + await subscribeNodes(nodes) + + for node in nodes: + node.subscribe(topic, voidTopicHandler) + + for x in 0 ..< numberofNodes: + for y in 0 ..< numberofNodes: + if x != y: + await waitSub(nodes[x], nodes[y], topic) + + # Give it time for a heartbeat + await sleepAsync(DURATION_TIMEOUT) + + let + expectedNumberOfPeers = numberofNodes - 1 + dHigh = 12 + d = 6 + dLow = 4 + + for i in 0 ..< numberofNodes: + var gossip = GossipSub(nodes[i]) + + check: + gossip.gossipsub[topic].len == expectedNumberOfPeers + gossip.mesh[topic].len >= dLow and gossip.mesh[topic].len <= dHigh + gossip.fanout.len == 0 + + await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) From 4bffaf1a0635b67dfaf5031c2b0698db6127a213 Mon Sep 17 00:00:00 2001 From: Alejandro Cabeza Romero Date: Mon, 2 Sep 2024 18:35:08 +0200 Subject: [PATCH 2/9] Implement gossip propagation test. --- tests/pubsub/testgossipsub.nim | 35 ++++++++++++++++++++++++++++++++++ tests/pubsub/utils.nim | 34 ++++++++++++++++++++++++++++++++- 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index c22280de83..23569f753d 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -13,6 +13,7 @@ import sequtils, options, tables, sets, sugar import chronos, stew/byteutils, chronos/ratelimit import chronicles import metrics +import std/options import ../../libp2p/protocols/pubsub/gossipsub/behavior import utils, @@ -34,6 +35,8 @@ import import ../../libp2p/protocols/pubsub/errors as pubsub_errors import ../helpers, ../utils/[async, futures, async, tests] +from ../../libp2p/protocols/pubsub/mcache import window + proc `$`(peer: PubSubPeer): string = shortLog(peer) @@ -1322,3 +1325,35 @@ suite "Gossipsub Parameters": gossip.fanout.len == 0 await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) + + asyncTest "messages sent to peers not in the mesh are propagated via gossip": + var validatorFut = newFuture[bool]() + proc validator( + topic: string, message: Message + ): Future[ValidationResult] {.async.} = + check topic == "foobar" + validatorFut.complete(true) + result = ValidationResult.Accept + + let + numberOfNodes = 5 + topic = "foobar" + dValues = DValues(dLow: some(2), dHigh: some(3), d: some(2), dOut: some(1)) + nodes = generateNodes(numberOfNodes, gossip = true, dValues = some(dValues)) + nodesFut = await allFinished(nodes.mapIt(it.switch.start())) + + await subscribeNodes(nodes) + + for node in nodes: + node.subscribe(topic, voidTopicHandler) + + # await waitForMesh(nodes[0], nodes[1], topic) + await sleepAsync(1.seconds) + + # try: + let mcache = GossipSub(nodes[0]).mcache + + discard nodes[0].publish(topic, "Hello!".toBytes()) + await sleepAsync(3.seconds) + + await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index faa77cd5c6..ba517bde6e 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -6,6 +6,7 @@ const import hashes, random, tables, sets, sequtils import chronos, stew/[byteutils, results], chronos/ratelimit +import std/options import ../../libp2p/[ builders, @@ -24,7 +25,15 @@ export builders randomize() -type TestGossipSub* = ref object of GossipSub +type + TestGossipSub* = ref object of GossipSub + DValues* = object + d*: Option[int] + dLow*: Option[int] + dHigh*: Option[int] + dScore*: Option[int] + dOut*: Option[int] + dLazy*: Option[int] proc getPubSubPeer*(p: TestGossipSub, peerId: PeerId): PubSubPeer = proc getConn(): Future[Connection] = @@ -71,6 +80,7 @@ proc generateNodes*( overheadRateLimit: Opt[tuple[bytes: int, interval: Duration]] = Opt.none(tuple[bytes: int, interval: Duration]), gossipSubVersion: string = "", + dValues: Option[DValues] = DValues.none(), ): seq[PubSub] = for i in 0 ..< num: let switch = newStandardSwitch( @@ -94,6 +104,28 @@ proc generateNodes*( p.unsubscribeBackoff = unsubscribeBackoff p.enablePX = enablePX p.overheadRateLimit = overheadRateLimit + + if dValues.isSome: + let dValuesSome = dValues.get + + if dValuesSome.d.isSome: + p.d = dValuesSome.d.get + + if dValuesSome.dLow.isSome: + p.dLow = dValuesSome.dLow.get + + if dValuesSome.dHigh.isSome: + p.dHigh = dValuesSome.dHigh.get + + if dValuesSome.dScore.isSome: + p.dScore = dValuesSome.dScore.get + + if dValuesSome.dOut.isSome: + p.dOut = dValuesSome.dOut.get + + if dValuesSome.dLazy.isSome: + p.dLazy = dValuesSome.dLazy.get + p ), ) From 952b08cff5ef69dfead6feb0c7733e1dfc709671 Mon Sep 17 00:00:00 2001 From: Alejandro Cabeza Romero Date: Tue, 3 Sep 2024 19:35:12 +0200 Subject: [PATCH 3/9] Implement message should not go back to source test. --- tests/pubsub/testgossipsub.nim | 55 +++++++++++++++++++++++++++++++--- 1 file changed, 51 insertions(+), 4 deletions(-) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 23569f753d..cf0b19093a 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -13,6 +13,7 @@ import sequtils, options, tables, sets, sugar import chronos, stew/byteutils, chronos/ratelimit import chronicles import metrics +import results import std/options import ../../libp2p/protocols/pubsub/gossipsub/behavior import @@ -1347,13 +1348,59 @@ suite "Gossipsub Parameters": for node in nodes: node.subscribe(topic, voidTopicHandler) - # await waitForMesh(nodes[0], nodes[1], topic) await sleepAsync(1.seconds) - # try: - let mcache = GossipSub(nodes[0]).mcache - discard nodes[0].publish(topic, "Hello!".toBytes()) await sleepAsync(3.seconds) await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) + + asyncTest "messages are not sent back to source or forwarding peer": + # Given 3 handlers, one for each node + var + handlerFuture0 = newFuture[bool]() + handlerFuture1 = newFuture[bool]() + handlerFuture2 = newFuture[bool]() + + proc handler0(topic: string, data: seq[byte]) {.async.} = + handlerFuture0.complete(true) + + proc handler1(topic: string, data: seq[byte]) {.async.} = + handlerFuture1.complete(true) + + proc handler2(topic: string, data: seq[byte]) {.async.} = + handlerFuture2.complete(true) + + # Instantiate 3 nodes + let + numberOfNodes = 3 + topic = "foobar" + nodes = generateNodes(numberOfNodes, gossip = true) + nodesFut = await allFinished(nodes.mapIt(it.switch.start())) + node0 = nodes[0] + node1 = nodes[1] + node2 = nodes[2] + + # Connect them in a ring + await node0.switch.connect(node1.peerInfo.peerId, node1.peerInfo.addrs) + await node1.switch.connect(node2.peerInfo.peerId, node2.peerInfo.addrs) + await node2.switch.connect(node0.peerInfo.peerId, node0.peerInfo.addrs) + await sleepAsync(DURATION_TIMEOUT) + + # Subscribe them all to the same topic + nodes[0].subscribe(topic, handler0) + nodes[1].subscribe(topic, handler1) + nodes[2].subscribe(topic, handler2) + await sleepAsync(DURATION_TIMEOUT) + + # When node 0 sends a message + discard nodes[0].publish(topic, "Hello!".toBytes()) + await sleepAsync(DURATION_TIMEOUT) + + # Nodes 1 and 2 should receive the message, but node 0 shouldn't receive it back + check: + (await handlerFuture0.waitForResult()).isErr + (await handlerFuture1.waitForResult()).isOk + (await handlerFuture2.waitForResult()).isOk + + await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) From a2d2e5d2875ebb11e51245cb9dd9278e56d87dc8 Mon Sep 17 00:00:00 2001 From: Alejandro Cabeza Romero Date: Wed, 4 Sep 2024 13:27:09 +0200 Subject: [PATCH 4/9] Fix gossip check test. --- tests/pubsub/testgossipsub.nim | 62 ++++++++++++++++++++++------------ 1 file changed, 40 insertions(+), 22 deletions(-) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index cf0b19093a..02fb0e0c5d 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -1328,14 +1328,7 @@ suite "Gossipsub Parameters": await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) asyncTest "messages sent to peers not in the mesh are propagated via gossip": - var validatorFut = newFuture[bool]() - proc validator( - topic: string, message: Message - ): Future[ValidationResult] {.async.} = - check topic == "foobar" - validatorFut.complete(true) - result = ValidationResult.Accept - + # Given 5 nodes let numberOfNodes = 5 topic = "foobar" @@ -1343,20 +1336,55 @@ suite "Gossipsub Parameters": nodes = generateNodes(numberOfNodes, gossip = true, dValues = some(dValues)) nodesFut = await allFinished(nodes.mapIt(it.switch.start())) + # All of them are checking for iHave messages + var receivedIHaves: seq[bool] = repeat(false, numberOfNodes) + for i in 0 ..< numberOfNodes: + var pubsubObserver: PubSubObserver + capture i: + let checkForIhaves = proc(peer: PubSubPeer, msgs: var RPCMsg) = + if msgs.control.isSome: + let iHave = msgs.control.get.ihave + if iHave.len > 0: + for msg in iHave: + if msg.topicID == topic: + receivedIHaves[i] = true + break + + pubsubObserver = PubSubObserver(onRecv: checkForIhaves) + + nodes[i].addObserver(pubsubObserver) + + # All of them are interconnected await subscribeNodes(nodes) + # And subscribed to the same topic for node in nodes: node.subscribe(topic, voidTopicHandler) + await sleepAsync(DURATION_TIMEOUT) - await sleepAsync(1.seconds) - + # When node 0 sends a message discard nodes[0].publish(topic, "Hello!".toBytes()) - await sleepAsync(3.seconds) + await sleepAsync(DURATION_TIMEOUT) + + # At least one of the nodes should have received an iHave message + # The check is made this way because the mesh structure changes from run to run + check: + anyIt(receivedIHaves, it == true) await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) asyncTest "messages are not sent back to source or forwarding peer": - # Given 3 handlers, one for each node + # Instantiate 3 nodes + let + numberOfNodes = 3 + topic = "foobar" + nodes = generateNodes(numberOfNodes, gossip = true) + nodesFut = await allFinished(nodes.mapIt(it.switch.start())) + node0 = nodes[0] + node1 = nodes[1] + node2 = nodes[2] + + # Each node with a handler var handlerFuture0 = newFuture[bool]() handlerFuture1 = newFuture[bool]() @@ -1371,16 +1399,6 @@ suite "Gossipsub Parameters": proc handler2(topic: string, data: seq[byte]) {.async.} = handlerFuture2.complete(true) - # Instantiate 3 nodes - let - numberOfNodes = 3 - topic = "foobar" - nodes = generateNodes(numberOfNodes, gossip = true) - nodesFut = await allFinished(nodes.mapIt(it.switch.start())) - node0 = nodes[0] - node1 = nodes[1] - node2 = nodes[2] - # Connect them in a ring await node0.switch.connect(node1.peerInfo.peerId, node1.peerInfo.addrs) await node1.switch.connect(node2.peerInfo.peerId, node2.peerInfo.addrs) From aac457ec5ba24fa7d553d1456f6364ee8f6605e9 Mon Sep 17 00:00:00 2001 From: Alejandro Cabeza Romero Date: Wed, 4 Sep 2024 18:35:28 +0200 Subject: [PATCH 5/9] Implement flood publish with score test --- tests/pubsub/testgossipsub.nim | 51 ++++++++++++++++++++++++++++++++++ tests/pubsub/utils.nim | 3 +- 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 02fb0e0c5d..6abe6707e7 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -1422,3 +1422,54 @@ suite "Gossipsub Parameters": (await handlerFuture2.waitForResult()).isOk await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) + + asyncTest "flood publish to all peers with score above threshold, regardless of subscription": + # Given 3 nodes + let + numberOfNodes = 3 + topic = "foobar" + nodes = generateNodes(numberOfNodes, gossip = true, floodPublish = true) + nodesFut = nodes.mapIt(it.switch.start()) + g0 = GossipSub(nodes[0]) + + # Nodes 1 and 2 are connected to node 0 + await nodes[0].switch.connect(nodes[1].peerInfo.peerId, nodes[1].peerInfo.addrs) + await nodes[0].switch.connect(nodes[2].peerInfo.peerId, nodes[2].peerInfo.addrs) + + # Given 2 handlers + var + handlerFut1 = newFuture[bool]() + handlerFut2 = newFuture[bool]() + + proc handler1(topic: string, data: seq[byte]) {.async.} = + handlerFut1.complete(true) + + proc handler2(topic: string, data: seq[byte]) {.async.} = + handlerFut2.complete(true) + + # Nodes are subscribed to the same topic + nodes[1].subscribe(topic, handler1) + nodes[2].subscribe(topic, handler2) + await sleepAsync(1.seconds) + + # Given node 2's score is below the threshold + for peer in g0.gossipsub.getOrDefault(topic): + if peer.peerId == nodes[2].peerInfo.peerId: + peer.score = (g0.parameters.publishThreshold - 1) + + # When node 0 publishes a message to topic "foo" + let message = "Hello!".toBytes() + check (await nodes[0].publish(topic, message)) > 0 + await sleepAsync(3.seconds) + + # Then only node 1 should receive the message + let + result1 = await handlerFut1.waitForResult(DURATION_TIMEOUT) + result2 = await handlerFut2.waitForResult(DURATION_TIMEOUT) + check: + result1.isOk and result1.get == true + result2.isErr + + # Cleanup + await allFuturesThrowing(nodes.mapIt(it.switch.stop())) + await allFuturesThrowing(nodesFut) diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index ba517bde6e..662ea204ad 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -80,6 +80,7 @@ proc generateNodes*( overheadRateLimit: Opt[tuple[bytes: int, interval: Duration]] = Opt.none(tuple[bytes: int, interval: Duration]), gossipSubVersion: string = "", + floodPublish: bool = false, dValues: Option[DValues] = DValues.none(), ): seq[PubSub] = for i in 0 ..< num: @@ -98,7 +99,7 @@ proc generateNodes*( maxMessageSize = maxMessageSize, parameters = ( var p = GossipSubParams.init() - p.floodPublish = false + p.floodPublish = floodPublish p.historyLength = 20 p.historyGossip = 20 p.unsubscribeBackoff = unsubscribeBackoff From e6a06dcf329e62a75e195a6151f89148f88c364e Mon Sep 17 00:00:00 2001 From: Alejandro Cabeza Romero Date: Wed, 4 Sep 2024 22:05:16 +0200 Subject: [PATCH 6/9] Implement gossip adaptive dissemination tests. --- tests/pubsub/testgossipsub.nim | 162 ++++++++++++++++++++++++++++++++- tests/pubsub/utils.nim | 4 + 2 files changed, 162 insertions(+), 4 deletions(-) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 6abe6707e7..d62a3452a2 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -1337,7 +1337,7 @@ suite "Gossipsub Parameters": nodesFut = await allFinished(nodes.mapIt(it.switch.start())) # All of them are checking for iHave messages - var receivedIHaves: seq[bool] = repeat(false, numberOfNodes) + var receivedIHaves: seq[int] = repeat(0, numberOfNodes) for i in 0 ..< numberOfNodes: var pubsubObserver: PubSubObserver capture i: @@ -1347,8 +1347,7 @@ suite "Gossipsub Parameters": if iHave.len > 0: for msg in iHave: if msg.topicID == topic: - receivedIHaves[i] = true - break + receivedIHaves[i] += 1 pubsubObserver = PubSubObserver(onRecv: checkForIhaves) @@ -1369,7 +1368,7 @@ suite "Gossipsub Parameters": # At least one of the nodes should have received an iHave message # The check is made this way because the mesh structure changes from run to run check: - anyIt(receivedIHaves, it == true) + anyIt(receivedIHaves, it > 0) await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) @@ -1473,3 +1472,158 @@ suite "Gossipsub Parameters": # Cleanup await allFuturesThrowing(nodes.mapIt(it.switch.stop())) await allFuturesThrowing(nodesFut) + + asyncTest "adaptive gossip dissemination, dLazy and gossipFactor to 0": + # Given 20 nodes + let + numberOfNodes = 20 + topic = "foobar" + dValues = DValues( + dLow: some(2), dHigh: some(3), d: some(2), dOut: some(1), dLazy: some(0) + ) + nodes = generateNodes( + numberOfNodes, + gossip = true, + dValues = some(dValues), + gossipFactor = some(0.float), + ) + nodesFut = await allFinished(nodes.mapIt(it.switch.start())) + + # All of them are checking for iHave messages + var receivedIHaves: seq[int] = repeat(0, numberOfNodes) + for i in 0 ..< numberOfNodes: + var pubsubObserver: PubSubObserver + capture i: + let checkForIhaves = proc(peer: PubSubPeer, msgs: var RPCMsg) = + if msgs.control.isSome: + let iHave = msgs.control.get.ihave + if iHave.len > 0: + for msg in iHave: + if msg.topicID == topic: + receivedIHaves[i] += 1 + + pubsubObserver = PubSubObserver(onRecv: checkForIhaves) + + nodes[i].addObserver(pubsubObserver) + + # All of them are connected to node 0 + for i in 1 ..< numberOfNodes: + await nodes[0].switch.connect(nodes[i].peerInfo.peerId, nodes[i].peerInfo.addrs) + + # And subscribed to the same topic + for node in nodes: + node.subscribe(topic, voidTopicHandler) + await sleepAsync(DURATION_TIMEOUT) + + # When node 0 sends a message + discard nodes[0].publish(topic, "Hello!".toBytes()) + await sleepAsync(DURATION_TIMEOUT) + + # None of the nodes should have received an iHave message + check: + filterIt(receivedIHaves, it > 0).len == 0 + + await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) + + asyncTest "adaptive gossip dissemination, with gossipFactor priority": + # Given 20 nodes + let + numberOfNodes = 20 + topic = "foobar" + dValues = DValues( + dLow: some(2), dHigh: some(3), d: some(2), dOut: some(1), dLazy: some(4) + ) + nodes = generateNodes( + numberOfNodes, gossip = true, dValues = some(dValues), gossipFactor = some(0.5) + ) + nodesFut = await allFinished(nodes.mapIt(it.switch.start())) + + # All of them are checking for iHave messages + var receivedIHaves: seq[int] = repeat(0, numberOfNodes) + for i in 0 ..< numberOfNodes: + var pubsubObserver: PubSubObserver + capture i: + let checkForIhaves = proc(peer: PubSubPeer, msgs: var RPCMsg) = + if msgs.control.isSome: + let iHave = msgs.control.get.ihave + if iHave.len > 0: + for msg in iHave: + if msg.topicID == topic: + receivedIHaves[i] += 1 + + pubsubObserver = PubSubObserver(onRecv: checkForIhaves) + + nodes[i].addObserver(pubsubObserver) + + # All of them are connected to node 0 + for i in 1 ..< numberOfNodes: + await nodes[0].switch.connect(nodes[i].peerInfo.peerId, nodes[i].peerInfo.addrs) + + # And subscribed to the same topic + for node in nodes: + node.subscribe(topic, voidTopicHandler) + await sleepAsync(DURATION_TIMEOUT) + + # When node 0 sends a message + discard nodes[0].publish(topic, "Hello!".toBytes()) + await sleepAsync(DURATION_TIMEOUT) + + # At least 8 of the nodes should have received an iHave message + # That's because the gossip factor is 0.5 over 16 available nodes + check: + filterIt(receivedIHaves, it > 0).len >= 8 + + await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) + + asyncTest "adaptive gossip dissemination, with dLazy priority": + # Given 20 nodes + let + numberOfNodes = 20 + topic = "foobar" + dValues = DValues( + dLow: some(2), dHigh: some(3), d: some(2), dOut: some(1), dLazy: some(6) + ) + nodes = generateNodes( + numberOfNodes, + gossip = true, + dValues = some(dValues), + gossipFactor = some(0.float), + ) + nodesFut = await allFinished(nodes.mapIt(it.switch.start())) + + # All of them are checking for iHave messages + var receivedIHaves: seq[int] = repeat(0, numberOfNodes) + for i in 0 ..< numberOfNodes: + var pubsubObserver: PubSubObserver + capture i: + let checkForIhaves = proc(peer: PubSubPeer, msgs: var RPCMsg) = + if msgs.control.isSome: + let iHave = msgs.control.get.ihave + if iHave.len > 0: + for msg in iHave: + if msg.topicID == topic: + receivedIHaves[i] += 1 + + pubsubObserver = PubSubObserver(onRecv: checkForIhaves) + + nodes[i].addObserver(pubsubObserver) + + # All of them are connected to node 0 + for i in 1 ..< numberOfNodes: + await nodes[0].switch.connect(nodes[i].peerInfo.peerId, nodes[i].peerInfo.addrs) + + # And subscribed to the same topic + for node in nodes: + node.subscribe(topic, voidTopicHandler) + await sleepAsync(DURATION_TIMEOUT) + + # When node 0 sends a message + discard nodes[0].publish(topic, "Hello!".toBytes()) + await sleepAsync(DURATION_TIMEOUT) + + # At least 6 of the nodes should have received an iHave message + # That's because the dLazy is 6 + check: + filterIt(receivedIHaves, it > 0).len == 6 + + await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index 662ea204ad..1da5a9b9c7 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -82,6 +82,7 @@ proc generateNodes*( gossipSubVersion: string = "", floodPublish: bool = false, dValues: Option[DValues] = DValues.none(), + gossipFactor: Option[float] = float.none(), ): seq[PubSub] = for i in 0 ..< num: let switch = newStandardSwitch( @@ -106,6 +107,9 @@ proc generateNodes*( p.enablePX = enablePX p.overheadRateLimit = overheadRateLimit + if gossipFactor.isSome: + p.gossipFactor = gossipFactor.get + if dValues.isSome: let dValuesSome = dValues.get From 37458458de3af932a9756241097f04fb20674916 Mon Sep 17 00:00:00 2001 From: Alejandro Cabeza Romero Date: Thu, 5 Sep 2024 18:46:34 +0200 Subject: [PATCH 7/9] iDontWant broadcasting test --- tests/pubsub/testgossipsub.nim | 64 ++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index d62a3452a2..1d7b2c79ca 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -1627,3 +1627,67 @@ suite "Gossipsub Parameters": filterIt(receivedIHaves, it > 0).len == 6 await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) + + asyncTest "iDontWant messages are broadcast immediately after receiving the first message instance": + # Given 3 nodes + let + numberOfNodes = 3 + topic = "foobar" + nodes = generateNodes(numberOfNodes, gossip = true) + nodesFut = await allFinished(nodes.mapIt(it.switch.start())) + node0 = nodes[0] + node1 = nodes[1] + node2 = nodes[2] + + # And with iDontWant observers + var + iDontWantReceived0 = newFuture[bool]() + iDontWantReceived1 = newFuture[bool]() + iDontWantReceived2 = newFuture[bool]() + + proc observer0(peer: PubSubPeer, msgs: var RPCMsg) = + if msgs.control.isSome: + let iDontWant = msgs.control.get.idontwant + if iDontWant.len > 0: + iDontWantReceived0.complete(true) + + proc observer1(peer: PubSubPeer, msgs: var RPCMsg) = + if msgs.control.isSome: + let iDontWant = msgs.control.get.idontwant + if iDontWant.len > 0: + iDontWantReceived1.complete(true) + + proc observer2(peer: PubSubPeer, msgs: var RPCMsg) = + if msgs.control.isSome: + let iDontWant = msgs.control.get.idontwant + if iDontWant.len > 0: + iDontWantReceived2.complete(true) + + node0.addObserver(PubSubObserver(onRecv: observer0)) + node1.addObserver(PubSubObserver(onRecv: observer1)) + node2.addObserver(PubSubObserver(onRecv: observer2)) + + # Connect them in a line + await node0.switch.connect(node1.peerInfo.peerId, node1.peerInfo.addrs) + await node1.switch.connect(node2.peerInfo.peerId, node2.peerInfo.addrs) + await sleepAsync(DURATION_TIMEOUT) + + # Subscribe them all to the same topic + nodes[0].subscribe(topic, voidTopicHandler) + nodes[1].subscribe(topic, voidTopicHandler) + nodes[2].subscribe(topic, voidTopicHandler) + await sleepAsync(DURATION_TIMEOUT) + + # When node 0 sends a large message + # let largeMsg = newSeq[byte](1000) + let largeMsg = newSeq[byte](300) + discard nodes[0].publish(topic, largeMsg) + await sleepAsync(DURATION_TIMEOUT) + + # Only node 2 should have received the iDontWant message + check: + (await iDontWantReceived0.waitForResult()).isErr + (await iDontWantReceived1.waitForResult()).isErr + (await iDontWantReceived2.waitForResult()).isOk + + await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) From 264e0dcca0a09fc29b64bc3194b26d341a595507 Mon Sep 17 00:00:00 2001 From: Alejandro Cabeza Romero Date: Thu, 5 Sep 2024 22:11:27 +0200 Subject: [PATCH 8/9] Refactor utils for naming clarity --- tests/helpers.nim | 6 ++-- tests/pubsub/testgossipsub.nim | 2 +- tests/pubsub/testgossipsub2.nim | 5 ++-- tests/testdiscovery.nim | 2 +- tests/utils/async.nim | 30 ------------------- .../{asyncunit.nim => utils/async_tests.nim} | 0 tests/utils/futures.nim | 30 ++++++++++++++++++- tests/utils/{tests.nim => xtests.nim} | 0 8 files changed, 38 insertions(+), 37 deletions(-) delete mode 100644 tests/utils/async.nim rename tests/{asyncunit.nim => utils/async_tests.nim} (100%) rename tests/utils/{tests.nim => xtests.nim} (100%) diff --git a/tests/helpers.nim b/tests/helpers.nim index 3b8c90f2a2..7264a2f308 100644 --- a/tests/helpers.nim +++ b/tests/helpers.nim @@ -14,8 +14,10 @@ import ../libp2p/protocols/secure/secure import ../libp2p/switch import ../libp2p/nameresolving/[nameresolver, mockresolver] -import "."/[asyncunit, errorhelpers] -export asyncunit, errorhelpers, mockresolver +import errorhelpers +import utils/async_tests + +export async_tests, errorhelpers, mockresolver const StreamTransportTrackerName = "stream.transport" diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 1d7b2c79ca..3577da1feb 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -34,7 +34,7 @@ import protocols/pubsub/rpc/messages, ] import ../../libp2p/protocols/pubsub/errors as pubsub_errors -import ../helpers, ../utils/[async, futures, async, tests] +import ../helpers, ../utils/[futures, async_tests, xtests] from ../../libp2p/protocols/pubsub/mcache import window diff --git a/tests/pubsub/testgossipsub2.nim b/tests/pubsub/testgossipsub2.nim index ffc5c24e40..13ccb140c3 100644 --- a/tests/pubsub/testgossipsub2.nim +++ b/tests/pubsub/testgossipsub2.nim @@ -25,8 +25,9 @@ import protocols/pubsub/pubsubpeer, protocols/pubsub/peertable, protocols/pubsub/rpc/messages, - ] -import ../helpers + ], + ../utils/[futures, async_tests], + ../helpers template tryPublish( call: untyped, require: int, wait = 10.milliseconds, timeout = 10.seconds diff --git a/tests/testdiscovery.nim b/tests/testdiscovery.nim index 7d92c2f2c9..09510a42fb 100644 --- a/tests/testdiscovery.nim +++ b/tests/testdiscovery.nim @@ -18,7 +18,7 @@ import discovery/discoverymngr, discovery/rendezvousinterface, ] -import ./helpers, ./asyncunit, ./utils/[async, assertions, futures] +import ./helpers, ./utils/[futures, assertions, async_tests] proc createSwitch(rdv: RendezVous = RendezVous.new()): Switch = SwitchBuilder diff --git a/tests/utils/async.nim b/tests/utils/async.nim deleted file mode 100644 index b65fcb3385..0000000000 --- a/tests/utils/async.nim +++ /dev/null @@ -1,30 +0,0 @@ -import chronos/futures, stew/results, chronos -import ./futures - -proc toOk(future: Future[void]): Result[void, string] = - return results.ok() - -proc toOk[T](future: Future[T]): Result[T, string] = - return results.ok(future.read()) - -proc toResult*[T](future: Future[T]): Result[T, string] = - if future.cancelled(): - return results.err("Future cancelled/timed out.") - elif future.finished(): - if not future.failed(): - return future.toOk() - else: - return results.err("Future finished but failed.") - else: - return results.err("Future still not finished.") - -proc waitForResult*[T]( - future: Future[T], timeout = DURATION_TIMEOUT -): Future[Result[T, string]] {.async.} = - discard await future.withTimeout(timeout) - return future.toResult() - -proc reset*[T](future: Future[T]): void = - # Likely an incomplete reset, but good enough for testing purposes (for now) - future.internalError = nil - future.internalState = FutureState.Pending diff --git a/tests/asyncunit.nim b/tests/utils/async_tests.nim similarity index 100% rename from tests/asyncunit.nim rename to tests/utils/async_tests.nim diff --git a/tests/utils/futures.nim b/tests/utils/futures.nim index e98d6526e1..5166638178 100644 --- a/tests/utils/futures.nim +++ b/tests/utils/futures.nim @@ -1,5 +1,33 @@ -import chronos +import chronos/futures, stew/results, chronos const DURATION_TIMEOUT* = 1.seconds DURATION_TIMEOUT_EXTENDED* = 1500.milliseconds + +proc toOk(future: Future[void]): Result[void, string] = + return results.ok() + +proc toOk[T](future: Future[T]): Result[T, string] = + return results.ok(future.read()) + +proc toResult*[T](future: Future[T]): Result[T, string] = + if future.cancelled(): + return results.err("Future cancelled/timed out.") + elif future.finished(): + if not future.failed(): + return future.toOk() + else: + return results.err("Future finished but failed.") + else: + return results.err("Future still not finished.") + +proc waitForResult*[T]( + future: Future[T], timeout = DURATION_TIMEOUT +): Future[Result[T, string]] {.async.} = + discard await future.withTimeout(timeout) + return future.toResult() + +proc reset*[T](future: Future[T]): void = + # Likely an incomplete reset, but good enough for testing purposes (for now) + future.internalError = nil + future.internalState = FutureState.Pending diff --git a/tests/utils/tests.nim b/tests/utils/xtests.nim similarity index 100% rename from tests/utils/tests.nim rename to tests/utils/xtests.nim From dff60c2f8a23a3d94c9258ac9154636e419c1471 Mon Sep 17 00:00:00 2001 From: Alejandro Cabeza Romero Date: Thu, 5 Sep 2024 22:11:46 +0200 Subject: [PATCH 9/9] Fix test and add remaining direct peers test. --- tests/pubsub/testgossipsub.nim | 3 +-- tests/pubsub/testgossipsub2.nim | 46 +++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 3577da1feb..3ab3d851b4 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -1679,8 +1679,7 @@ suite "Gossipsub Parameters": await sleepAsync(DURATION_TIMEOUT) # When node 0 sends a large message - # let largeMsg = newSeq[byte](1000) - let largeMsg = newSeq[byte](300) + let largeMsg = newSeq[byte](1000) discard nodes[0].publish(topic, largeMsg) await sleepAsync(DURATION_TIMEOUT) diff --git a/tests/pubsub/testgossipsub2.nim b/tests/pubsub/testgossipsub2.nim index 13ccb140c3..8f051d3882 100644 --- a/tests/pubsub/testgossipsub2.nim +++ b/tests/pubsub/testgossipsub2.nim @@ -259,6 +259,52 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) + asyncTest "GossipSub directPeers: send message to unsubscribed direct peer": + # Given 2 nodes + let + numberOfNodes = 2 + nodes = generateNodes(numberOfNodes, gossip = true) + nodesFut = await allFinished(nodes.mapIt(it.switch.start())) + node0 = nodes[0] + node1 = nodes[1] + g0 = GossipSub(node0) + g1 = GossipSub(node1) + + # With message observers + var + messageReceived0 = newFuture[bool]() + messageReceived1 = newFuture[bool]() + + proc observer0(peer: PubSubPeer, msgs: var RPCMsg) = + for message in msgs.messages: + if message.topic == "foobar": + messageReceived0.complete(true) + + proc observer1(peer: PubSubPeer, msgs: var RPCMsg) = + for message in msgs.messages: + if message.topic == "foobar": + messageReceived1.complete(true) + + node0.addObserver(PubSubObserver(onRecv: observer0)) + node1.addObserver(PubSubObserver(onRecv: observer1)) + + # Connect them as direct peers + await g0.addDirectPeer(node1.peerInfo.peerId, node1.peerInfo.addrs) + await g1.addDirectPeer(node0.peerInfo.peerId, node0.peerInfo.addrs) + + # When node 0 sends a message + let message = "Hello!".toBytes() + discard node0.publish("foobar", message) + + # None should receive the message as they are not subscribed to the topic + check: + (await messageReceived0.waitForResult()).isErr + (await messageReceived1.waitForResult()).isErr + + # Cleanup + await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) + await allFuturesThrowing(nodesFut) + asyncTest "GossipSub peers disconnections mechanics": var runs = 10