Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomrsantos committed Feb 13, 2024
1 parent a04f8d2 commit eed49c6
Showing 1 changed file with 24 additions and 7 deletions.
31 changes: 24 additions & 7 deletions tests/pubsub/testgossipinternal.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

{.used.}

import std/[options, deques, sequtils, enumerate, algorithm]
import std/[options, deques, sequtils, enumerate, algorithm, os]
import stew/byteutils
import ../../libp2p/builders
import ../../libp2p/errors
Expand Down Expand Up @@ -718,15 +718,19 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()

proc setupTest(): Future[tuple[gossip0: GossipSub, gossip1: GossipSub, receivedMessages: ref HashSet[seq[byte]]]] {.async.} =
proc setupTest(maxDurationInNonPriorityQueue1: Opt[Duration] = Opt.none(Duration)):
Future[tuple[gossip0: GossipSub, gossip1: GossipSub, receivedMessages: ref HashSet[seq[byte]]]] {.async.} =
let
nodes = generateNodes(2, gossip = true, verifySignature = false)
discard await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start()
)
var gossip0: GossipSub = GossipSub(nodes[0])
var gossip1: GossipSub = GossipSub(nodes[1])

await nodes[1].switch.connect(nodes[0].switch.peerInfo.peerId, nodes[0].switch.peerInfo.addrs)
gossip1.parameters.maxDurationInNonPriorityQueue = maxDurationInNonPriorityQueue1
await gossip1.switch.connect(gossip0.switch.peerInfo.peerId, gossip0.switch.peerInfo.addrs)

var receivedMessages = new(HashSet[seq[byte]])

Expand All @@ -736,12 +740,10 @@ suite "GossipSub internal":
proc handlerB(topic: string, data: seq[byte]) {.async.} =
discard

nodes[0].subscribe("foobar", handlerA)
nodes[1].subscribe("foobar", handlerB)
gossip0.subscribe("foobar", handlerA)
gossip1.subscribe("foobar", handlerB)
await waitSubGraph(nodes, "foobar")

var gossip0: GossipSub = GossipSub(nodes[0])
var gossip1: GossipSub = GossipSub(nodes[1])

return (gossip0, gossip1, receivedMessages)

Expand Down Expand Up @@ -844,3 +846,18 @@ suite "GossipSub internal":
check receivedMessages[].len == 1

await teardownTest(gossip0, gossip1)

asyncTest "e2e - drop msg if it is in the non-priority queue for too long":
# This test checks if two messages, both below the maxSize, are correctly processed and sent.
# Expected: Both messages should be received.
let maxDurationInNonPriorityQueueGossip1 = 1.millis
let (gossip0, gossip1, receivedMessages) = await setupTest(Opt.some(maxDurationInNonPriorityQueueGossip1))

let topic = "foobar"
gossip1.broadcast(gossip1.mesh[topic], RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](35))]), false)
sleep(2) # pause all tasks to ensure that the message stay in the non-priority queue longer than maxDurationInNonPriorityQueueGossip1
gossip1.broadcast(gossip1.mesh[topic], RPCMsg(messages: @[Message(topicIDs: @[topic], data: newSeq[byte](36))]), false)
await sleepAsync(10.milliseconds) # wait for the messages to be processed
check: receivedMessages[].len == 1 # only the second message should be received

await teardownTest(gossip0, gossip1)

0 comments on commit eed49c6

Please sign in to comment.