diff --git a/Makefile b/Makefile index 092be7c2eb..ff962a7b12 100644 --- a/Makefile +++ b/Makefile @@ -115,7 +115,6 @@ feedernode: juno-cached --p2p-feeder-node \ --p2p-addr=/ip4/0.0.0.0/tcp/7777 \ --p2p-private-key="5f6cdc3aebcc74af494df054876100368ef6126e3a33fa65b90c765b381ffc37a0a63bbeeefab0740f24a6a38dabb513b9233254ad0020c721c23e69bc820089" \ - --metrics-port=9090 node1: juno-cached ./build/juno \ diff --git a/node/node.go b/node/node.go index abd0b93990..123a5b7085 100644 --- a/node/node.go +++ b/node/node.go @@ -240,6 +240,7 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo,funlen } else if p2pService != nil { // regular p2p node p2pService.WithListener(makeSyncMetrics(&sync.NoopSynchronizer{}, chain)) + p2pService.WithGossipTracer() } } if cfg.GRPC { diff --git a/p2p/gossip_tracer.go b/p2p/gossip_tracer.go new file mode 100644 index 0000000000..da07eabfc2 --- /dev/null +++ b/p2p/gossip_tracer.go @@ -0,0 +1,218 @@ +package p2p + +import ( + "strings" + + "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + _ pubsub.RawTracer = (*gossipTracer)(nil) + knownAgentVersions = []string{"juno", "papyrus", "pathfinder", "madara"} +) + +// Metrics collection for gossipsub +type gossipTracer struct { + host host.Host + + p2PPeerCount *prometheus.GaugeVec + psMsgDeliver *prometheus.CounterVec + psMsgUndeliverable *prometheus.CounterVec + psMsgValidate *prometheus.CounterVec + psMsgDuplicate *prometheus.CounterVec + psMsgReject *prometheus.CounterVec + psTopicsActive *prometheus.GaugeVec + psTopicsPrune *prometheus.CounterVec + psTopicsGraft *prometheus.CounterVec + psThrottlePeer *prometheus.CounterVec + psRPCRecv *prometheus.CounterVec + psRPCSubRecv prometheus.Counter + psRPCPubRecv *prometheus.CounterVec + psRPCDrop *prometheus.CounterVec + psRPCSubDrop prometheus.Counter + psRPCPubDrop *prometheus.CounterVec + psRPCSent *prometheus.CounterVec + psRPCSubSent prometheus.Counter + psRPCPubSent *prometheus.CounterVec +} + +func NewGossipTracer(h host.Host) *gossipTracer { + return &gossipTracer{ + host: h, + p2PPeerCount: promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "p2p_peer_count", + Help: "The number of connected libp2p peers by agent string", + }, []string{"agent"}), + psMsgDeliver: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_deliver_total", + Help: "The number of messages received for delivery", + }, []string{"topic"}), + psMsgUndeliverable: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_undeliverable_total", + Help: "The number of messages received which weren't able to be delivered", + }, []string{"topic"}), + psMsgValidate: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_validate_total", + Help: "The number of messages received for validation", + }, []string{"topic"}), + psMsgDuplicate: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_duplicate_total", + Help: "The number of duplicate messages sent", + }, []string{"topic"}), + psMsgReject: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_reject_total", + Help: "The number of messages rejected", + }, []string{"topic", "reason"}), + psTopicsActive: promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "p2p_pubsub_topic_active", + Help: "The topics that the peer is participating in gossipsub", + }, []string{"topic"}), + psTopicsPrune: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_prune_total", + Help: "The number of prune messages sent by the peer", + }, []string{"topic"}), + psTopicsGraft: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_graft_total", + Help: "The number of graft messages sent by the peer", + }, []string{"topic"}), + psThrottlePeer: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_throttle_total", + Help: "The number of times the peer has been throttled", + }, []string{"topic"}), + psRPCRecv: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_rpc_recv_total", + Help: "The number of messages received via rpc for a particular control message", + }, []string{"control_message"}), + psRPCSubRecv: promauto.NewCounter(prometheus.CounterOpts{ + Name: "p2p_pubsub_rpc_recv_sub_total", + Help: "The number of subscription messages received via rpc", + }), + psRPCPubRecv: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_rpc_recv_pub_total", + Help: "The number of publish messages received via rpc", + }, []string{"topic"}), + psRPCDrop: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_rpc_drop_total", + Help: "The number of messages dropped via rpc for a particular control message", + }, []string{"control_message"}), + psRPCSubDrop: promauto.NewCounter(prometheus.CounterOpts{ + Name: "p2p_pubsub_rpc_drop_sub_total", + Help: "The number of subscription messages dropped via rpc", + }), + psRPCPubDrop: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_rpc_drop_pub_total", + Help: "The number of publish messages dropped via rpc", + }, []string{"topic"}), + psRPCSent: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_rpc_sent_total", + Help: "The number of messages sent via rpc for a particular control message", + }, []string{"control_message"}), + psRPCSubSent: promauto.NewCounter(prometheus.CounterOpts{ + Name: "p2p_pubsub_rpc_sent_sub_total", + Help: "The number of subscription messages sent via rpc", + }), + psRPCPubSent: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_rpc_sent_pub_total", + Help: "The number of publish messages sent via rpc", + }, []string{"topic"}), + } +} + +func (g *gossipTracer) AddPeer(p peer.ID, proto protocol.ID) { + g.p2PPeerCount.WithLabelValues(agentNameFromPeerID(p, g.host.Peerstore())).Inc() +} + +func (g *gossipTracer) RemovePeer(p peer.ID) { + g.p2PPeerCount.WithLabelValues(agentNameFromPeerID(p, g.host.Peerstore())).Dec() +} + +func (g *gossipTracer) Join(topic string) { + g.psTopicsActive.WithLabelValues(topic).Set(1) +} + +func (g *gossipTracer) Leave(topic string) { + g.psTopicsActive.WithLabelValues(topic).Set(0) +} + +func (g *gossipTracer) Graft(p peer.ID, topic string) { + g.psTopicsGraft.WithLabelValues(topic).Inc() +} + +func (g *gossipTracer) Prune(p peer.ID, topic string) { + g.psTopicsPrune.WithLabelValues(topic).Inc() +} + +func (g *gossipTracer) DeliverMessage(msg *pubsub.Message) { + g.psMsgDeliver.WithLabelValues(*msg.Topic).Inc() +} + +func (g *gossipTracer) ValidateMessage(msg *pubsub.Message) { + g.psMsgValidate.WithLabelValues(*msg.Topic).Inc() +} + +func (g *gossipTracer) RejectMessage(msg *pubsub.Message, reason string) { + g.psMsgReject.WithLabelValues(*msg.Topic, reason).Inc() +} + +func (g *gossipTracer) DuplicateMessage(msg *pubsub.Message) { + g.psMsgDuplicate.WithLabelValues(*msg.Topic).Inc() +} + +func (g *gossipTracer) UndeliverableMessage(msg *pubsub.Message) { + g.psMsgUndeliverable.WithLabelValues(*msg.Topic).Inc() +} + +func (g *gossipTracer) ThrottlePeer(p peer.ID) { + g.psThrottlePeer.WithLabelValues(agentNameFromPeerID(p, g.host.Peerstore())).Inc() +} + +func (g *gossipTracer) RecvRPC(rpc *pubsub.RPC) { + g.setRPCMetrics(g.psRPCSubRecv, g.psRPCPubRecv, g.psRPCRecv, rpc) +} + +func (g *gossipTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) { + g.setRPCMetrics(g.psRPCSubSent, g.psRPCPubSent, g.psRPCSent, rpc) +} + +func (g *gossipTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) { + g.setRPCMetrics(g.psRPCSubDrop, g.psRPCPubDrop, g.psRPCDrop, rpc) +} + +func (g *gossipTracer) setRPCMetrics(subCtr prometheus.Counter, pubCtr, ctrlCtr *prometheus.CounterVec, rpc *pubsub.RPC) { + subCtr.Add(float64(len(rpc.Subscriptions))) + if rpc.Control != nil { + ctrlCtr.WithLabelValues("ihave").Add(float64(len(rpc.Control.Ihave))) + ctrlCtr.WithLabelValues("iwant").Add(float64(len(rpc.Control.Iwant))) + ctrlCtr.WithLabelValues("graft").Add(float64(len(rpc.Control.Graft))) + ctrlCtr.WithLabelValues("prune").Add(float64(len(rpc.Control.Prune))) + ctrlCtr.WithLabelValues("idontwant").Add(float64(len(rpc.Control.Idontwant))) + } + + for _, msg := range rpc.Publish { + pubCtr.WithLabelValues(*msg.Topic).Inc() + } +} + +func agentNameFromPeerID(pid peer.ID, store peerstore.PeerMetadata) string { + const unknownAgent = "unknown" + + rawAgent, err := store.Get(pid, "AgentVersion") + agent, ok := rawAgent.(string) + if err != nil || !ok { + return unknownAgent + } + + agent = strings.ToLower(agent) + for _, knownAgent := range knownAgentVersions { + if strings.Contains(agent, knownAgent) { + return knownAgent + } + } + return unknownAgent +} diff --git a/p2p/metrics.go b/p2p/metrics.go deleted file mode 100644 index 4e0b6b3102..0000000000 --- a/p2p/metrics.go +++ /dev/null @@ -1,18 +0,0 @@ -package p2p - -import ( - "github.com/libp2p/go-libp2p/core/peerstore" - "github.com/prometheus/client_golang/prometheus" -) - -func runMetrics(store peerstore.Peerstore) { - numberOfPeersGauge := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Namespace: "p2p", - Name: "number_of_peers", - }, func() float64 { - peersLen := store.Peers().Len() - return float64(peersLen) - }) - - prometheus.MustRegister(numberOfPeersGauge) -} diff --git a/p2p/p2p.go b/p2p/p2p.go index a8e3a30d61..49633f49ee 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -49,6 +49,7 @@ type Service struct { topicsLock sync.RWMutex synchroniser *syncService + gossipTracer *gossipTracer feederNode bool database db.DB @@ -244,7 +245,12 @@ func (s *Service) Run(ctx context.Context) error { return err } - s.pubsub, err = pubsub.NewGossipSub(ctx, s.host) + var options []pubsub.Option + if s.gossipTracer != nil { + options = append(options, pubsub.WithRawTracer(s.gossipTracer)) + } + + s.pubsub, err = pubsub.NewGossipSub(ctx, s.host, options...) if err != nil { return err } @@ -399,10 +405,13 @@ func (s *Service) SetProtocolHandler(pid protocol.ID, handler func(network.Strea } func (s *Service) WithListener(l junoSync.EventListener) { - runMetrics(s.host.Peerstore()) s.synchroniser.WithListener(l) } +func (s *Service) WithGossipTracer() { + s.gossipTracer = NewGossipTracer(s.host) +} + // persistPeers stores the given peers in the peers database func (s *Service) persistPeers() error { txn, err := s.database.NewTransaction(true)