Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IMReceiving message to minimize redundant large message transmissions #1070

Draft
wants to merge 1 commit into
base: p2p-research
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 37 additions & 27 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =

var respControl: ControlMessage
g.handleIDontWant(peer, control.idontwant)
g.handleIMReceiving(peer, control.imreceiving)
let iwant = g.handleIHave(peer, control.ihave)
if iwant.messageIds.len > 0:
respControl.iwant.add(iwant)
Expand Down Expand Up @@ -320,10 +321,39 @@ proc validateAndRelay(g: GossipSub,
msg: Message,
msgId, msgIdSalted: MessageId,
peer: PubSubPeer) {.async.} =

var seenPeers: HashSet[PubSubPeer]
var toSendPeers = HashSet[PubSubPeer]()
for t in msg.topicIds: # for every topic in the message
if t notin g.topics:
continue

g.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
g.mesh.withValue(t, peers): toSendPeers.incl(peers[])
# add direct peers
toSendPeers.incl(g.subscribedDirectPeers.getOrDefault(t))
# Don't send it to source peer
toSendPeers.excl(peer)

for peer in toSendPeers:
for heDontWant in peer.heDontWants:
if msgId in heDontWant:
seenPeers.incl(peer)
libp2p_gossipsub_idontwant_saved_messages.inc
libp2p_gossipsub_saved_bytes.inc(msg.data.len.int64, labelValues = ["idontwant"])
break
toSendPeers.excl(seenPeers)

try:
let validation = await g.validate(msg)
# IDontWant is only worth it if the message is substantially
# bigger than the messageId
if msg.data.len > msgId.len * 10:
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
idontwant: @[ControlIWant(messageIds: @[msgId])]
))), isHighPriority = true)

var seenPeers: HashSet[PubSubPeer]
seenPeers.clear()
let validation = await g.validate(msg)
discard g.validationSeen.pop(msgIdSalted, seenPeers)
libp2p_gossipsub_duplicate_during_validation.inc(seenPeers.len.int64)
libp2p_gossipsub_saved_bytes.inc((msg.data.len * seenPeers.len).int64, labelValues = ["validation_duplicate"])
Expand All @@ -346,35 +376,15 @@ proc validateAndRelay(g: GossipSub,

g.rewardDelivered(peer, msg.topicIds, true)

var toSendPeers = HashSet[PubSubPeer]()
for t in msg.topicIds: # for every topic in the message
if t notin g.topics:
continue

g.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
g.mesh.withValue(t, peers): toSendPeers.incl(peers[])

# add direct peers
toSendPeers.incl(g.subscribedDirectPeers.getOrDefault(t))

# Don't send it to source peer, or peers that
# sent it during validation
toSendPeers.excl(peer)
# Don't send it to peers that sent it during validation
toSendPeers.excl(seenPeers)

# IDontWant is only worth it if the message is substantially
# bigger than the messageId
if msg.data.len > msgId.len * 10:
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
idontwant: @[ControlIWant(messageIds: @[msgId])]
))), isHighPriority = true)

#We have received IMReceiving from these peers, We should not exclude them
#Ideally we should wait (TxTime + large safety cushion) before sending to these peers
for peer in toSendPeers:
for heDontWant in peer.heDontWants:
if msgId in heDontWant:
for heIsReceiving in peer.heIsReceivings:
if msgId in heIsReceiving:
seenPeers.incl(peer)
libp2p_gossipsub_idontwant_saved_messages.inc
libp2p_gossipsub_saved_bytes.inc(msg.data.len.int64, labelValues = ["idontwant"])
break
toSendPeers.excl(seenPeers)

Expand Down
31 changes: 31 additions & 0 deletions libp2p/protocols/pubsub/gossipsub/behavior.nim
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,37 @@ proc handleIDontWant*(g: GossipSub,
if messageId.len > 100: continue
peer.heDontWants[^1].incl(messageId)

var toSendPeers = HashSet[PubSubPeer]()

#Experimental change for quick performance evaluation only (Ideally for very large messages):
#[
1) IDontWant is followed by the message. IMReceiving informs peers that we are receiving this message
2) Prototype implementation for a single topic ("test"). Need topic ID in IDontWant

3) More reasonable solution can be to send Message detail before message, That can be used for IMReceiving
]#
g.floodsub.withValue("test", peers): toSendPeers.incl(peers[])
g.mesh.withValue("test", peers): toSendPeers.incl(peers[])

# add direct peers
toSendPeers.incl(g.subscribedDirectPeers.getOrDefault("test"))

g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
imreceiving: @[ControlIWant(messageIds: @[messageId])]
))), isHighPriority = true)



proc handleIMReceiving*(g: GossipSub,
peer: PubSubPeer,
imreceivings: seq[ControlIWant]) =
for imreceiving in imreceivings:
for messageId in imreceiving.messageIds:
if peer.heIsReceivings[^1].len > 1000: break
if messageId.len > 100: continue
peer.heIsReceivings[^1].incl(messageId)


proc handleIWant*(g: GossipSub,
peer: PubSubPeer,
iwants: seq[ControlIWant]): seq[Message] {.raises: [].} =
Expand Down
2 changes: 2 additions & 0 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type
score*: float64
sentIHaves*: Deque[HashSet[MessageId]]
heDontWants*: Deque[HashSet[MessageId]]
heIsReceivings*:Deque[HashSet[MessageId]]
iHaveBudget*: int
pingBudget*: int
maxMessageSize: int
Expand Down Expand Up @@ -480,4 +481,5 @@ proc new*(
)
result.sentIHaves.addFirst(default(HashSet[MessageId]))
result.heDontWants.addFirst(default(HashSet[MessageId]))
result.heIsReceivings.addFirst(default(HashSet[MessageId]))
result.startSendNonPriorityTask()
6 changes: 4 additions & 2 deletions libp2p/protocols/pubsub/rpc/messages.nim
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type
graft*: seq[ControlGraft]
prune*: seq[ControlPrune]
idontwant*: seq[ControlIWant]
imreceiving*: seq[ControlIWant]

ControlIHave* = object
topicId*: string
Expand Down Expand Up @@ -163,11 +164,12 @@ static: expectedFields(ControlPrune, @["topicId", "peers", "backoff"])
proc byteSize(controlPrune: ControlPrune): int =
controlPrune.topicId.len + controlPrune.peers.foldl(a + b.byteSize, 0) + 8 # 8 bytes for uint64

static: expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant"])
static: expectedFields(ControlMessage, @["ihave", "iwant", "graft", "prune", "idontwant", "imreceiving"])
proc byteSize(control: ControlMessage): int =
control.ihave.foldl(a + b.byteSize, 0) + control.iwant.foldl(a + b.byteSize, 0) +
control.graft.foldl(a + b.byteSize, 0) + control.prune.foldl(a + b.byteSize, 0) +
control.idontwant.foldl(a + b.byteSize, 0)
control.idontwant.foldl(a + b.byteSize, 0) +
control.imreceiving.foldl(a + b.byteSize, 0)

static: expectedFields(RPCMsg, @["subscriptions", "messages", "control", "ping", "pong"])
proc byteSize*(rpc: RPCMsg): int =
Expand Down
6 changes: 6 additions & 0 deletions libp2p/protocols/pubsub/rpc/protobuf.nim
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ proc write*(pb: var ProtoBuffer, field: int, control: ControlMessage) =
ipb.write(4, prune)
for idontwant in control.idontwant:
ipb.write(5, idontwant)
for imreceiving in control.imreceiving:
ipb.write(6, imreceiving)
if len(ipb.buffer) > 0:
ipb.finish()
pb.write(field, ipb)
Expand Down Expand Up @@ -213,6 +215,7 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.
var graftpbs: seq[seq[byte]]
var prunepbs: seq[seq[byte]]
var idontwant: seq[seq[byte]]
var imreceiving: seq[seq[byte]]
if ? cpb.getRepeatedField(1, ihavepbs):
for item in ihavepbs:
control.ihave.add(? decodeIHave(initProtoBuffer(item)))
Expand All @@ -228,6 +231,9 @@ proc decodeControl*(pb: ProtoBuffer): ProtoResult[Option[ControlMessage]] {.
if ? cpb.getRepeatedField(5, idontwant):
for item in idontwant:
control.idontwant.add(? decodeIWant(initProtoBuffer(item)))
if ? cpb.getRepeatedField(6, imreceiving):
for item in imreceiving:
control.imreceiving.add(? decodeIWant(initProtoBuffer(item)))
trace "decodeControl: message statistics", graft_count = len(control.graft),
prune_count = len(control.prune),
ihave_count = len(control.ihave),
Expand Down
Loading