From 4d1d52ccf9619ecdb1211648783b35c99861e734 Mon Sep 17 00:00:00 2001 From: Adrian Lopez Date: Tue, 22 Aug 2023 14:36:45 +0200 Subject: [PATCH] feat(inputs.procstat): add tcp related metrics For a given process, return the number of connections in each state: SYN_SENT, SYN_RECV, ESTABLISHED, etc. Also added a new measurement, procstat_tcp, that returns the endpoints where the process is connected to and where it is listening. If it is listening in "[::]" or "0.0.0.0", resolve with the local IPs. --- docs/LICENSE_OF_DEPENDENCIES.md | 1 + go.mod | 2 + go.sum | 3 + plugins/inputs/procstat/README.md | 44 ++ plugins/inputs/procstat/connections.go | 36 + .../inputs/procstat/connections_fallback.go | 35 + plugins/inputs/procstat/connections_linux.go | 235 +++++++ plugins/inputs/procstat/procstat.go | 178 +++-- plugins/inputs/procstat/procstat_fallback.go | 17 + plugins/inputs/procstat/procstat_linux.go | 110 +++ .../inputs/procstat/procstat_linux_test.go | 624 ++++++++++++++++++ plugins/inputs/procstat/procstat_test.go | 55 +- plugins/inputs/procstat/sample.conf | 2 + 13 files changed, 1284 insertions(+), 58 deletions(-) create mode 100644 plugins/inputs/procstat/connections.go create mode 100644 plugins/inputs/procstat/connections_fallback.go create mode 100644 plugins/inputs/procstat/connections_linux.go create mode 100644 plugins/inputs/procstat/procstat_fallback.go create mode 100644 plugins/inputs/procstat/procstat_linux.go create mode 100644 plugins/inputs/procstat/procstat_linux_test.go diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index ae7c13ea33142..1544eab9b243d 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -121,6 +121,7 @@ following works: - github.com/eapache/queue [MIT License](https://github.com/eapache/queue/blob/master/LICENSE) - github.com/eclipse/paho.golang [Eclipse Public License - v 2.0](https://github.com/eclipse/paho.golang/blob/master/LICENSE) - github.com/eclipse/paho.mqtt.golang [Eclipse Public License - v 2.0](https://github.com/eclipse/paho.mqtt.golang/blob/master/LICENSE) +- github.com/elastic/gosigar [Apache License 2.0](https://github.com/elastic/gosigar/blob/master/LICENSE) - github.com/emicklei/go-restful [MIT License](https://github.com/emicklei/go-restful/blob/v3/LICENSE) - github.com/fatih/color [MIT License](https://github.com/fatih/color/blob/master/LICENSE.md) - github.com/form3tech-oss/jwt-go [MIT License](https://github.com/form3tech-oss/jwt-go/blob/master/LICENSE) diff --git a/go.mod b/go.mod index f2a0098221bc7..e44d562f71da2 100644 --- a/go.mod +++ b/go.mod @@ -213,6 +213,8 @@ require ( modernc.org/sqlite v1.24.0 ) +require github.com/elastic/gosigar v0.14.2 + require ( cloud.google.com/go v0.110.4 // indirect cloud.google.com/go/compute v1.20.1 // indirect diff --git a/go.sum b/go.sum index 52b1a2daf02d6..7f5fea44c51c5 100644 --- a/go.sum +++ b/go.sum @@ -494,6 +494,8 @@ github.com/eclipse/paho.golang v0.11.0 h1:6Avu5dkkCfcB61/y1vx+XrPQ0oAl4TPYtY0uw3 github.com/eclipse/paho.golang v0.11.0/go.mod h1:rhrV37IEwauUyx8FHrvmXOKo+QRKng5ncoN1vJiJMcs= github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4= github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= +github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/4= +github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= github.com/emicklei/go-restful/v3 v3.10.1 h1:rc42Y5YTp7Am7CS630D7JmhRjq4UlEUuEKfrDac4bSQ= github.com/emicklei/go-restful/v3 v3.10.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -1779,6 +1781,7 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/plugins/inputs/procstat/README.md b/plugins/inputs/procstat/README.md index f28560471abef..0d6507332404b 100644 --- a/plugins/inputs/procstat/README.md +++ b/plugins/inputs/procstat/README.md @@ -84,6 +84,8 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## - "mem": to enable collection of memory percentage used ## - "mem_percent": to enable collection of procs' limits ## - "limits": to enable collection of procs' limits + ## - "tcp_stats": tcp_* and upd_socket metrics + ## - "connections_endpoints": new metric procstat_tcp with connections and listeners endpoints ## Default value: # metrics_include = [ # "threads", @@ -198,6 +200,46 @@ the `win_perf_counters` input plugin as a more mature alternative. - running (int) - result_code (int, success = 0, lookup_error = 1) +If ``tcp_stats`` enabled, added fields: + +- procstat + - fields: + - tcp_close (int) + - tcp_close_wait (int) + - tcp_closing (int) + - tcp_established (int) + - tcp_fin_wait1 (int) + - tcp_fin_wait2 (int) + - tcp_last_ack (int) + - tcp_listen (int) + - tcp_none (int) + - tcp_syn_recv (int) + - tcp_syn_sent (int) + +If ``connections_endpoints`` enabled, added fields: + +- procstat_tcp + - tags: + - pid (when `pid_tag` is true) + - cmdline (when 'cmdline_tag' is true) + - process_name + - pidfile (when defined) + - exe (when defined) + - pattern (when defined) + - user (when selected) + - systemd_unit (when defined) + - cgroup (when defined) + - fields: + - conn (string) + - listen (string) + +To gather connection info, if Telegraf is not run as root, it needs the +following capabilities: + +```text +sudo setcap "CAP_DAC_READ_SEARCH,CAP_SYS_PTRACE+ep" telegraf +``` + *NOTE: Resource limit > 2147483647 will be reported as 2147483647.* ## Example Output @@ -205,4 +247,6 @@ the `win_perf_counters` input plugin as a more mature alternative. ```text procstat_lookup,host=prash-laptop,pattern=influxd,pid_finder=pgrep,result=success pid_count=1i,running=1i,result_code=0i 1582089700000000000 procstat,host=prash-laptop,pattern=influxd,process_name=influxd,user=root involuntary_context_switches=151496i,child_minor_faults=1061i,child_major_faults=8i,cpu_time_user=2564.81,cpu_time_idle=0,cpu_time_irq=0,cpu_time_guest=0,pid=32025i,major_faults=8609i,created_at=1580107536000000000i,voluntary_context_switches=1058996i,cpu_time_system=616.98,cpu_time_steal=0,cpu_time_guest_nice=0,memory_swap=0i,memory_locked=0i,memory_usage=1.7797634601593018,num_threads=18i,cpu_time_nice=0,cpu_time_iowait=0,cpu_time_soft_irq=0,memory_rss=148643840i,memory_vms=1435688960i,memory_data=0i,memory_stack=0i,minor_faults=1856550i 1582089700000000000 +procstat,host=laptop,pattern=httpd,process_name=httpd,user=root child_major_faults=0i,child_minor_faults=70i,cpu_time=0i,cpu_time_guest=0,cpu_time_guest_nice=0,cpu_time_idle=0,cpu_time_iowait=0,cpu_time_irq=0,cpu_time_nice=0,cpu_time_soft_irq=0,cpu_time_steal=0,cpu_time_system=0.01,cpu_time_user=0.02,cpu_usage=0,created_at=1611738400000000000i,involuntary_context_switches=15i,listen=1i,major_faults=0i,memory_data=999424i,memory_locked=0i,memory_rss=4677632i,memory_stack=135168i,memory_swap=0i,memory_usage=0.013990458101034164,memory_vms=6078464i,minor_faults=1636i,nice_priority=20i,num_fds=8i,num_threads=1i,pid=1738811i,read_bytes=0i,read_count=4397i,realtime_priority=0i,rlimit_cpu_time_hard=2147483647i,rlimit_cpu_time_soft=2147483647i,rlimit_file_locks_hard=2147483647i,rlimit_file_locks_soft=2147483647i,rlimit_memory_data_hard=2147483647i,rlimit_memory_data_soft=2147483647i,rlimit_memory_locked_hard=65536i,rlimit_memory_locked_soft=65536i,rlimit_memory_rss_hard=2147483647i,rlimit_memory_rss_soft=2147483647i,rlimit_memory_stack_hard=2147483647i,rlimit_memory_stack_soft=8388608i,rlimit_memory_vms_hard=2147483647i,rlimit_memory_vms_soft=2147483647i,rlimit_nice_priority_hard=0i,rlimit_nice_priority_soft=0i,rlimit_num_fds_hard=1048576i,rlimit_num_fds_soft=1048576i,rlimit_realtime_priority_hard=0i,rlimit_realtime_priority_soft=0i,rlimit_signals_pending_hard=127473i,rlimit_signals_pending_soft=127473i,signals_pending=0i,tcp_close=0i,tcp_close_wait=0i,tcp_closing=0i,tcp_established=0i,tcp_fin_wait1=0i,tcp_fin_wait2=0i,tcp_last_ack=0i,tcp_listen=1i,tcp_syn_recv=0i,tcp_syn_sent=0i,voluntary_context_switches=169i,write_bytes=53248i,write_count=10i 1611738522000000000 +procstat_tcp,host=laptop,pattern=httpd,process_name=httpd,user=root conn="",listen="192.168.1.35:80,192.168.1.48:80,[da01:beef:234:3830:aeda:f001:a00c:0091]:80,[aa01:beef:234:3830:e8e:0000:000a:6b0f]:80" 1611738522000000000 ``` diff --git a/plugins/inputs/procstat/connections.go b/plugins/inputs/procstat/connections.go new file mode 100644 index 0000000000000..db85b4e57acac --- /dev/null +++ b/plugins/inputs/procstat/connections.go @@ -0,0 +1,36 @@ +package procstat + +import ( + "fmt" + "net" +) + +const ( + // dockerMACPrefix https://macaddress.io/faq/how-to-recognise-a-docker-container-by-its-mac-address + dockerMACPrefix = "02:42" + //nolint:lll // avoid splitting the link + // virtualBoxMACPrefix https://github.com/mdaniel/virtualbox-org-svn-vbox-trunk/blob/2d259f948bc352ee400f9fd41c4a08710cd9138a/src/VBox/HostDrivers/VBoxNetAdp/VBoxNetAdp.c#L93 + virtualBoxMACPrefix = "0a:00:27" + // hardwareAddrLength is the number of bytes of a MAC address + hardwareAddrLength = 6 +) + +// errPIDNotFound is the error generated when the pid does not have network info +var errPIDNotFound = fmt.Errorf("pid not found") + +// inodeInfo represents information of a proc associated with an inode +type inodeInfo struct { + pid uint32 +} + +// networkInfo implements networkInfo using the netlink calls and parsing /proc to map sockets to PIDs +type networkInfo struct { + // tcp contains the connection info for each pid + tcp map[uint32][]connInfo + // listenPorts is a map with the listen ports in the host, used to ignore inbound connections + listenPorts map[uint32]interface{} + // publicIPs list of IPs considered "public" (used to connect to other hosts) + publicIPs []net.IP + // privateIPs list of IPs considered "private" (loopback, virtual interfaces, point2point, etc) + privateIPs []net.IP +} diff --git a/plugins/inputs/procstat/connections_fallback.go b/plugins/inputs/procstat/connections_fallback.go new file mode 100644 index 0000000000000..cb6f0251559ec --- /dev/null +++ b/plugins/inputs/procstat/connections_fallback.go @@ -0,0 +1,35 @@ +//go:build !linux +// +build !linux + +package procstat + +import ( + "fmt" + "net" +) + +type connInfo struct{} + +func (n *NetworkInfo) IsAListenPort(port uint32) bool { + return false +} + +func (n *NetworkInfo) Fetch() error { + return nil +} + +func (n *NetworkInfo) GetConnectionsByPid(pid uint32) (conn []connInfo, err error) { + return conn, fmt.Errorf("platform not supported") +} + +func (n *NetworkInfo) GetPublicIPs() []net.IP { + return []net.IP{} +} + +func (n *NetworkInfo) GetPrivateIPs() []net.IP { + return []net.IP{} +} + +func (n *NetworkInfo) IsPidListeningInAddr(pid uint32, ip net.IP, port uint32) bool { + return false +} diff --git a/plugins/inputs/procstat/connections_linux.go b/plugins/inputs/procstat/connections_linux.go new file mode 100644 index 0000000000000..eb003d274c9e3 --- /dev/null +++ b/plugins/inputs/procstat/connections_linux.go @@ -0,0 +1,235 @@ +//go:build linux +// +build linux + +package procstat + +import ( + "fmt" + "io" + "net" + "os" + "strconv" + "strings" + + "github.com/elastic/gosigar/sys/linux" +) + +// connInfo represents a single proc's connection and the parent pid (for practical purpouses) +type connInfo struct { + state linux.TCPState + srcIP net.IP + srcPort uint32 + dstIP net.IP + dstPort uint32 +} + +// IsAListenPort returns true if the port param is associated with a listener found in the host connections +func (n *networkInfo) IsAListenPort(port uint32) bool { + _, ok := n.listenPorts[port] + return ok +} + +// Fetch fetches network info: TCP connections and hosts' IPs. +// Parameter getConnections is the function that will be used to obtain TCP connections +// Parameter getLocalIPs is the function that will be used to get IPs. +// It is passed as a parameter to facilitate testing +func (n *networkInfo) Fetch() error { + var err error + n.tcp, n.listenPorts, err = getTCPProcInfo() + if err != nil { + return fmt.Errorf("gathering host TCP info: %w", err) + } + + // Get IPs, to be able to resolve procs listening in 0.0.0.0 or :: + ifaces, err := net.Interfaces() + if err != nil { + return fmt.Errorf("getting network interfaces: %w", err) + } + + n.publicIPs, n.privateIPs, err = getLocalIPs(ifaces) + if err != nil { + return fmt.Errorf("procstat getting local IPs: %w", err) + } + + return nil +} + +// GetConnectionsByPid return connection info for a particular PID +func (n *networkInfo) GetConnectionsByPid(pid uint32) (conn []connInfo, err error) { + conn, ok := n.tcp[pid] + if !ok { + return conn, errPIDNotFound + } + return conn, nil +} + +// GetPublicIPs return the list of public IPs (used to connect to others hosts) +func (n *networkInfo) GetPublicIPs() []net.IP { + return n.publicIPs +} + +// GetPrivateIPs return the list of private IPs (loopback devices, virtual, point2point) +func (n *networkInfo) GetPrivateIPs() []net.IP { + return n.privateIPs +} + +// IsPidListeningInAddr returns true if the pid has a listener in that ip and port +// Return false is pid=0 +func (n *networkInfo) IsPidListeningInAddr(pid uint32, ip net.IP, port uint32) bool { + if pid == 0 { + return false + } + + for _, c := range n.tcp[pid] { + if c.srcIP.Equal(ip) && c.srcPort == port { + return true + } + } + + return false +} + +// getLocalIPs return the IPv4/v6 addresses active in the current host divided in two groups: +// "publicIPs" contains addresses to connect with other external hosts. +// "privateIPs" contains loopback addreses, virtual interfaces, etc. +// This division is a best effort and probably does not contains all the possibilities. +// It should extract the information from a list of interfaces passed as a parameter. +func getLocalIPs(ifaces []net.Interface) (publicIPs, privateIPs []net.IP, err error) { + for _, i := range ifaces { + // Ignore down interfaces + if i.Flags&net.FlagUp == 0 { + continue + } + + addresses, err := i.Addrs() + if err != nil { + return nil, nil, fmt.Errorf("getting addresses from interfaces: %w", err) + } + + ips, err := extractIPs(addresses) + if err != nil { + return nil, nil, fmt.Errorf("getting IPs from interface addresses: %w", err) + } + + if i.Flags&net.FlagLoopback != 0 || // Ignore loopback interfaces + i.Flags&net.FlagPointToPoint != 0 || // ignore VPN interfaces + len(i.HardwareAddr) != hardwareAddrLength || // ignore interfaces without a MAC address + strings.HasPrefix(i.HardwareAddr.String(), dockerMACPrefix) || // ignore docker virtual interfaces + strings.HasPrefix(i.HardwareAddr.String(), virtualBoxMACPrefix) { // ignore VirtualBox virtual interfaces + privateIPs = append(privateIPs, ips...) + } else { + for _, i := range ips { + if i.IsLinkLocalUnicast() { + // Do not add link-local IPs: 169.254.0.0/16 or fe80::/10 + continue + } + publicIPs = append(publicIPs, i) + } + } + } + + return publicIPs, privateIPs, nil +} + +// getTCPProcInfo return the connections grouped by pid and a map of listening ports. +// Both results are for IPv4 and IPv6 +func getTCPProcInfo() (connectionsByPid map[uint32][]connInfo, listeners map[uint32]interface{}, err error) { + req := linux.NewInetDiagReq() + var diagWriter io.Writer + msgs, err := linux.NetlinkInetDiagWithBuf(req, nil, diagWriter) + if err != nil { + return nil, nil, fmt.Errorf("calling netlink to get sockets: %w", err) + } + + listeners = map[uint32]interface{}{} + connectionsByPid = map[uint32][]connInfo{} + + inodeToPid, err := mapInodesToPid() + if err != nil { + return nil, nil, fmt.Errorf("mapping inodes to pid: %w", err) + } + + for _, diag := range msgs { + inodeInfo := inodeToPid[diag.Inode] + + for _, proc := range inodeInfo { + if linux.TCPState(diag.State) == linux.TCP_LISTEN { + listeners[uint32(diag.SrcPort())] = nil + } + + connectionsByPid[proc.pid] = append(connectionsByPid[proc.pid], connInfo{ + state: linux.TCPState(diag.State), + srcIP: diag.SrcIP(), + srcPort: uint32(diag.SrcPort()), + dstIP: diag.DstIP(), + dstPort: uint32(diag.DstPort()), + }) + } + } + + return connectionsByPid, listeners, nil +} + +// mapInodesToPid return a map with the procs associated to each inode. +func mapInodesToPid() (map[uint32][]inodeInfo, error) { + ret := map[uint32][]inodeInfo{} + + fd, err := os.Open("/proc") + if err != nil { + return nil, fmt.Errorf("opening /proc: %w", err) + } + defer fd.Close() + + dirContents, err := fd.Readdirnames(0) + if err != nil { + return nil, fmt.Errorf("reading /proc files: %w", err) + } + + for _, pidStr := range dirContents { + readPidFDs(pidStr, ret) + } + + return ret, nil +} + +// readPidFDs given a PID, add to the ret map info about its inodes +func readPidFDs(pidStr string, ret map[uint32][]inodeInfo) { + pid, err := strconv.ParseUint(pidStr, 10, 32) + if err != nil { + // exclude files with a not numeric name. We only want to access pid directories + return + } + + pidDir, err := os.Open("/proc/" + pidStr + "/fd/") + if err != nil { + // ignore errors: + // - missing directory, pid has already finished + // - permission denied + return + } + defer pidDir.Close() + + fds, err := pidDir.Readdirnames(0) + if err != nil { + return + } + + for _, fd := range fds { + link, err := os.Readlink("/proc/" + pidStr + "/fd/" + fd) + if err != nil { + continue + } + + var inode uint32 + + _, err = fmt.Sscanf(link, "socket:[%d]", &inode) + if err != nil { + // this inode is not a socket + continue + } + + ret[inode] = append(ret[inode], inodeInfo{ + pid: uint32(pid), + }) + } +} diff --git a/plugins/inputs/procstat/procstat.go b/plugins/inputs/procstat/procstat.go index fd0783f407542..b58d227b3231e 100644 --- a/plugins/inputs/procstat/procstat.go +++ b/plugins/inputs/procstat/procstat.go @@ -4,7 +4,9 @@ package procstat import ( "bytes" _ "embed" + "errors" "fmt" + "net" "os" "os/exec" "path/filepath" @@ -27,43 +29,54 @@ var ( defaultProcess = NewProc // defaultCollection is the default group of metrics to gather defaultCollection = []string{ - MetricsThreads, - MetricsFDs, - MetricsContextSwitches, - MetricsPageFaults, - MetricsIO, - MetricsCreateTime, - MetricsCPU, - MetricsCPUPercent, - MetricsMemory, - MetricsMemoryPercent, - MetricsLimits, + metricsThreads, + metricsFDs, + metricsContextSwitches, + metricsPageFaults, + metricsIO, + metricsCreateTime, + metricsCPU, + metricsCPUPercent, + metricsMemory, + metricsMemoryPercent, + metricsLimits, } ) const ( - // MetricsThreads to enable collection of number of threads - MetricsThreads = "threads" - // MetricsFDs to enable collection of number of file descriptors - MetricsFDs = "fds" - // MetricsContextSwitches to enable collection of context switches - MetricsContextSwitches = "ctx_switches" - // MetricsPageFaults to enable collection of page faults - MetricsPageFaults = "page_faults" - // MetricsIO to enable collection of IO - MetricsIO = "io" - // MetricsCreateTime to enable collection of proc creation time - MetricsCreateTime = "create_time" - // MetricsCPU to enable collection of CPU time used - MetricsCPU = "cpu" - // MetricsCPUPercent to enable collection of percentage of CPU used - MetricsCPUPercent = "cpu_percent" - // MetricsMemory to enable collection of memory used - MetricsMemory = "mem" - // MetricsMemoryPercent to enable collection of memory percentage used - MetricsMemoryPercent = "mem_percent" - // MetricsLimits to enable collection of procs' limits - MetricsLimits = "limits" + // metricsThreads to enable collection of number of threads + metricsThreads = "threads" + // metricsFDs to enable collection of number of file descriptors + metricsFDs = "fds" + // metricsContextSwitches to enable collection of context switches + metricsContextSwitches = "ctx_switches" + // metricsPageFaults to enable collection of page faults + metricsPageFaults = "page_faults" + // metricsIO to enable collection of IO + metricsIO = "io" + // metricsCreateTime to enable collection of proc creation time + metricsCreateTime = "create_time" + // metricsCPU to enable collection of CPU time used + metricsCPU = "cpu" + // metricsCPUPercent to enable collection of percentage of CPU used + metricsCPUPercent = "cpu_percent" + // metricsMemory to enable collection of memory used + metricsMemory = "mem" + // metricsMemoryPercent to enable collection of memory percentage used + metricsMemoryPercent = "mem_percent" + // metricsLimits to enable collection of procs' limits + metricsLimits = "limits" + // metricsTCPStats to enable collection of procs' TCP stats + metricsTCPStats = "tcp_stats" + // metricsConnectionsEndpoints to enable collection of metric procstat_tcp + metricsConnectionsEndpoints = "connections_endpoints" + + // metricNameTCPConnections is the measurement name for TCP connections metrics + metricNameTCPConnections = "procstat_tcp" + // tcpConnectionKey is the metric value to put all the listen endpoints + tcpConnectionKey = "conn" + // tcpListenKey is the metric value to put all the connection endpoints + tcpListenKey = "listen" ) type PID int32 @@ -83,7 +96,8 @@ type Procstat struct { PidTag bool WinService string `toml:"win_service"` Mode string - MetricsInclude []string `toml:"metrics_include"` + MetricsInclude []string `toml:"metrics_include"` + Log telegraf.Logger `toml:"-"` solarisMode bool @@ -148,8 +162,20 @@ func (p *Procstat) Gather(acc telegraf.Accumulator) error { } p.procs = newProcs + + // Initialize the conn object. Gather info about all TCP connections organized per PID + // Avoid repeating this task for each proc + netInfo := networkInfo{} + // Only collect this info if we are going to use it (avoid reading all /proc/N/fd dirs) + if (p.metricEnabled(metricsTCPStats) || p.metricEnabled(metricsConnectionsEndpoints)) && len(p.procs) > 0 { + err := netInfo.Fetch() + if err != nil { + acc.AddError(fmt.Errorf("getting TCP network info: %w", err)) + } + } + for _, proc := range p.procs { - p.addMetric(proc, acc, now) + p.addMetric(proc, acc, now, netInfo) } fields := map[string]interface{}{ @@ -166,7 +192,7 @@ func (p *Procstat) Gather(acc telegraf.Accumulator) error { } // Add metrics a single Process -func (p *Procstat) addMetric(proc Process, acc telegraf.Accumulator, t time.Time) { +func (p *Procstat) addMetric(proc Process, acc telegraf.Accumulator, t time.Time, netInfo networkInfo) { var prefix string if p.Prefix != "" { prefix = p.Prefix + "_" @@ -205,21 +231,21 @@ func (p *Procstat) addMetric(proc Process, acc telegraf.Accumulator, t time.Time } } - if p.metricEnabled(MetricsThreads) { + if p.metricEnabled(metricsThreads) { numThreads, err := proc.NumThreads() if err == nil { fields[prefix+"num_threads"] = numThreads } } - if p.metricEnabled(MetricsFDs) { + if p.metricEnabled(metricsFDs) { fds, err := proc.NumFDs() if err == nil { fields[prefix+"num_fds"] = fds } } - if p.metricEnabled(MetricsContextSwitches) { + if p.metricEnabled(metricsContextSwitches) { ctx, err := proc.NumCtxSwitches() if err == nil { fields[prefix+"voluntary_context_switches"] = ctx.Voluntary @@ -227,7 +253,7 @@ func (p *Procstat) addMetric(proc Process, acc telegraf.Accumulator, t time.Time } } - if p.metricEnabled(MetricsPageFaults) { + if p.metricEnabled(metricsPageFaults) { faults, err := proc.PageFaults() if err == nil { fields[prefix+"minor_faults"] = faults.MinorFaults @@ -237,7 +263,7 @@ func (p *Procstat) addMetric(proc Process, acc telegraf.Accumulator, t time.Time } } - if p.metricEnabled(MetricsIO) { + if p.metricEnabled(metricsIO) { io, err := proc.IOCounters() if err == nil { fields[prefix+"read_count"] = io.ReadCount @@ -247,14 +273,14 @@ func (p *Procstat) addMetric(proc Process, acc telegraf.Accumulator, t time.Time } } - if p.metricEnabled(MetricsCreateTime) { + if p.metricEnabled(metricsCreateTime) { createdAt, err := proc.CreateTime() // Returns epoch in ms if err == nil { fields[prefix+"created_at"] = createdAt * 1000000 // Convert ms to ns } } - if p.metricEnabled(MetricsCPU) { + if p.metricEnabled(metricsCPU) { cpuTime, err := proc.Times() if err == nil { fields[prefix+"cpu_time_user"] = cpuTime.User @@ -270,7 +296,7 @@ func (p *Procstat) addMetric(proc Process, acc telegraf.Accumulator, t time.Time } } - if p.metricEnabled(MetricsCPUPercent) { + if p.metricEnabled(metricsCPUPercent) { cpuPerc, err := proc.Percent(time.Duration(0)) if err == nil { if p.solarisMode { @@ -281,7 +307,7 @@ func (p *Procstat) addMetric(proc Process, acc telegraf.Accumulator, t time.Time } } - if p.metricEnabled(MetricsMemory) { + if p.metricEnabled(metricsMemory) { mem, err := proc.MemoryInfo() if err == nil { fields[prefix+"memory_rss"] = mem.RSS @@ -293,14 +319,14 @@ func (p *Procstat) addMetric(proc Process, acc telegraf.Accumulator, t time.Time } } - if p.metricEnabled(MetricsMemoryPercent) { + if p.metricEnabled(metricsMemoryPercent) { memPerc, err := proc.MemoryPercent() if err == nil { fields[prefix+"memory_usage"] = memPerc } } - if p.metricEnabled(MetricsLimits) { + if p.metricEnabled(metricsLimits) { rlims, err := proc.RlimitUsage(true) if err == nil { for _, rlim := range rlims { @@ -351,7 +377,40 @@ func (p *Procstat) addMetric(proc Process, acc telegraf.Accumulator, t time.Time fields[prefix+"status"] = status[0] } + if p.metricEnabled(metricsTCPStats) { + // Add values with the number of connections in each TCP state + pidConnections, err := netInfo.GetConnectionsByPid(uint32(proc.PID())) + if err == nil { + addConnectionStats(pidConnections, fields, prefix) + } else { + // Ignore errors because pid was not found. It is normal to have procs without connections + if !errors.Is(err, errPIDNotFound) { + p.Log.Debugf("not able to get connections for pid=%v: %v", proc.PID(), err) + } + } + } + acc.AddFields("procstat", fields, proc.Tags(), t) + + if p.metricEnabled(metricsConnectionsEndpoints) { + // add measurement procstat_tcp with tcp listeners and connections for each proccess + err := addConnectionEndpoints(acc, proc, netInfo) + if err != nil { + p.Log.Debugf("not able to generate network metrics for pid=%v: %v", proc.PID(), err) + } + } +} + +// extractIPs extract and return IPs from addresses +func extractIPs(addreses []net.Addr) (ret []net.IP, err error) { + for _, a := range addreses { + ip, _, err := net.ParseCIDR(a.String()) + if err != nil { + return nil, fmt.Errorf("parsing interface address: %w", err) + } + ret = append(ret, ip) + } + return ret, nil } // Update monitored Processes @@ -581,6 +640,31 @@ func (p *Procstat) Init() error { return nil } +func containsIP(a []net.IP, x net.IP) bool { + for _, n := range a { + if x.Equal(n) { + return true + } + } + return false +} + +func isIPV4(ip net.IP) bool { + return ip.To4() != nil +} + +func isIPV6(ip net.IP) bool { + return ip.To4() == nil +} + +// endpointString return the correct representation of ip and port for IPv4 and IPv6 +func endpointString(ip net.IP, port uint32) string { + if isIPV6(ip) { + return fmt.Sprintf("[%s]:%d", ip, port) + } + return fmt.Sprintf("%s:%d", ip, port) +} + // metricEnabled check is some group of metrics are enabled in the config file func (p *Procstat) metricEnabled(m string) bool { for _, n := range p.MetricsInclude { diff --git a/plugins/inputs/procstat/procstat_fallback.go b/plugins/inputs/procstat/procstat_fallback.go new file mode 100644 index 0000000000000..114e004d28cdc --- /dev/null +++ b/plugins/inputs/procstat/procstat_fallback.go @@ -0,0 +1,17 @@ +//go:build !linux +// +build !linux + +package procstat + +import ( + "fmt" + + "github.com/influxdata/telegraf" +) + +func addConnectionStats(pidConnections []ConnInfo, fields map[string]interface{}, prefix string) { +} + +func addConnectionEndpoints(acc telegraf.Accumulator, proc Process, netInfo NetworkInfo) error { + return fmt.Errorf("platform not supported") +} diff --git a/plugins/inputs/procstat/procstat_linux.go b/plugins/inputs/procstat/procstat_linux.go new file mode 100644 index 0000000000000..fe02006883f01 --- /dev/null +++ b/plugins/inputs/procstat/procstat_linux.go @@ -0,0 +1,110 @@ +//go:build linux +// +build linux + +package procstat + +import ( + "errors" + "fmt" + "sort" + "strings" + + "github.com/elastic/gosigar/sys/linux" + "github.com/influxdata/telegraf" +) + +// addConnectionStats count the number of connections in each TCP state and add those values to the metric +func addConnectionStats(pidConnections []connInfo, fields map[string]interface{}, prefix string) { + counts := make(map[linux.TCPState]int) + for _, netcon := range pidConnections { + counts[netcon.state]++ + } + + fields[prefix+"tcp_established"] = counts[linux.TCP_ESTABLISHED] + fields[prefix+"tcp_syn_sent"] = counts[linux.TCP_SYN_SENT] + fields[prefix+"tcp_syn_recv"] = counts[linux.TCP_SYN_RECV] + fields[prefix+"tcp_fin_wait1"] = counts[linux.TCP_FIN_WAIT1] + fields[prefix+"tcp_fin_wait2"] = counts[linux.TCP_FIN_WAIT2] + // Do not add TIME-WAIT as connections does not have a pid associated + fields[prefix+"tcp_close"] = counts[linux.TCP_CLOSE] + fields[prefix+"tcp_close_wait"] = counts[linux.TCP_CLOSE_WAIT] + fields[prefix+"tcp_last_ack"] = counts[linux.TCP_LAST_ACK] + fields[prefix+"tcp_listen"] = counts[linux.TCP_LISTEN] + fields[prefix+"tcp_closing"] = counts[linux.TCP_CLOSING] +} + +// addConnectionEndpoints add listen and connection endpoints to the procstat_tcp metric. +// If listen is 0.0.0.0 or ::, it will be added one value for each of the IP addresses of the host. +// Listeners in private IPs are ignored (maybe a flag could be added, but now the reasoning is matching connections between hosts). +// Connections made to this server are ignored (the local port is one of the listening ports). +func addConnectionEndpoints(acc telegraf.Accumulator, proc Process, netInfo networkInfo) error { + tcpListen := map[string]interface{}{} + tcpConn := map[string]interface{}{} + + pidConnections, err := netInfo.GetConnectionsByPid(uint32(proc.PID())) + if err != nil { + if errors.Is(err, errPIDNotFound) { + return nil + } + + return fmt.Errorf("not able to get connections for pid=%v: %w", proc.PID(), err) + } + + // In case of error, ppid=0 and will be ignored in IsPidListeningInPort + ppid, _ := proc.Ppid() + + for _, c := range pidConnections { + // Ignore listeners or connections in/to localhost or private IPs + if c.srcIP.IsLoopback() || containsIP(netInfo.GetPrivateIPs(), c.srcIP) { + continue + } + + if c.state == linux.TCP_LISTEN { + if netInfo.IsPidListeningInAddr(uint32(ppid), c.srcIP, c.srcPort) { + continue + } + + if c.srcIP.IsUnspecified() { + // 0.0.0.0 listen in all IPv4 addresses + // :: listen in all IPv4 + IPv6 addresses + for _, ip := range netInfo.GetPublicIPs() { + if isIPV4(ip) || isIPV6(c.srcIP) { + tcpListen[endpointString(ip, c.srcPort)] = nil + } + } + } else { + tcpListen[endpointString(c.srcIP, c.srcPort)] = nil + } + } else if c.state != linux.TCP_SYN_SENT { // All TCP states except LISTEN (already processed) and SYN_SENT imply a connection between the hosts + // Ignore connections from outside hosts to listeners in this host (status != LISTEN and localPort in listenPorts) + if !netInfo.IsAListenPort(c.srcPort) { + tcpConn[endpointString(c.dstIP, c.dstPort)] = nil + } + } + } + + // Only add metrics if we have data + if len(tcpConn) > 0 || len(tcpListen) > 0 { + tcpConnections := []string{} + tcpListeners := []string{} + + for k := range tcpConn { + tcpConnections = append(tcpConnections, k) + } + sort.Strings(tcpConnections) // sort to make testing simplier + + for k := range tcpListen { + tcpListeners = append(tcpListeners, k) + } + sort.Strings(tcpListeners) + + fields := map[string]interface{}{ + tcpConnectionKey: strings.Join(tcpConnections, ","), + tcpListenKey: strings.Join(tcpListeners, ","), + } + + acc.AddFields(metricNameTCPConnections, fields, proc.Tags()) + } + + return nil +} diff --git a/plugins/inputs/procstat/procstat_linux_test.go b/plugins/inputs/procstat/procstat_linux_test.go new file mode 100644 index 0000000000000..24639e3c59e5f --- /dev/null +++ b/plugins/inputs/procstat/procstat_linux_test.go @@ -0,0 +1,624 @@ +package procstat + +import ( + "net" + "testing" + "time" + + "github.com/elastic/gosigar/sys/linux" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" +) + +func TestAddConnectionEndpoints(t *testing.T) { + tests := []struct { + name string + pid PID + ppid int32 + listenPorts map[uint32]interface{} + tcp map[uint32][]connInfo + publicIPs []net.IP + privateIPs []net.IP + metrics []telegraf.Metric + err string + }{ + { + name: "no connections, no metrics", + }, + { + name: "outside connection", + pid: 100, + listenPorts: map[uint32]interface{}{}, + tcp: map[uint32][]connInfo{ + 100: { + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 34567, + dstIP: net.ParseIP("1.1.1.1"), + dstPort: 80, + state: linux.TCP_ESTABLISHED, + }, + }, + }, + publicIPs: []net.IP{}, + privateIPs: []net.IP{}, + metrics: []telegraf.Metric{ + testutil.MustMetric( + metricNameTCPConnections, + map[string]string{}, + map[string]interface{}{ + tcpConnectionKey: "1.1.1.1:80", + }, + time.Now(), + ), + }, + }, + { + name: "TCP states except SYN_SENT are used for connections", + pid: 100, + listenPorts: map[uint32]interface{}{}, + tcp: map[uint32][]connInfo{ + 100: { + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 10000, + dstIP: net.ParseIP("1.1.1.1"), + dstPort: 80, + state: linux.TCP_ESTABLISHED, + }, + { // this is ignore, is a host trying to connect but the other end has not replied + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 10001, + dstIP: net.ParseIP("1.1.1.1"), + dstPort: 81, + state: linux.TCP_SYN_SENT, + }, + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 10002, + dstIP: net.ParseIP("1.1.1.1"), + dstPort: 82, + state: linux.TCP_SYN_RECV, + }, + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 10003, + dstIP: net.ParseIP("1.1.1.1"), + dstPort: 83, + state: linux.TCP_FIN_WAIT1, + }, + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 10004, + dstIP: net.ParseIP("1.1.1.1"), + dstPort: 84, + state: linux.TCP_FIN_WAIT2, + }, + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 10005, + dstIP: net.ParseIP("1.1.1.1"), + dstPort: 85, + state: linux.TCP_TIME_WAIT, + }, + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 10006, + dstIP: net.ParseIP("1.1.1.1"), + dstPort: 86, + state: linux.TCP_CLOSE, + }, + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 10007, + dstIP: net.ParseIP("1.1.1.1"), + dstPort: 87, + state: linux.TCP_CLOSE_WAIT, + }, + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 10008, + dstIP: net.ParseIP("1.1.1.1"), + dstPort: 88, + state: linux.TCP_LAST_ACK, + }, + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 10009, + dstIP: net.ParseIP("1.1.1.1"), + dstPort: 89, + state: linux.TCP_CLOSING, + }, + }, + }, + publicIPs: []net.IP{}, + privateIPs: []net.IP{}, + metrics: []telegraf.Metric{ + testutil.MustMetric( + metricNameTCPConnections, + map[string]string{}, + map[string]interface{}{ + tcpConnectionKey: "1.1.1.1:80,1.1.1.1:82,1.1.1.1:83,1.1.1.1:84,1.1.1.1:85,1.1.1.1:86,1.1.1.1:87,1.1.1.1:88,1.1.1.1:89", + }, + time.Now(), + ), + }, + }, + { + name: "IPv4 listener", + pid: 100, + listenPorts: map[uint32]interface{}{80: nil}, + tcp: map[uint32][]connInfo{ + 100: { + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 80, + state: linux.TCP_LISTEN, + }, + }, + }, + publicIPs: []net.IP{net.ParseIP("192.168.0.2")}, + privateIPs: []net.IP{}, + metrics: []telegraf.Metric{ + testutil.MustMetric( + metricNameTCPConnections, + map[string]string{}, + map[string]interface{}{ + tcpListenKey: "192.168.0.2:80", + }, + time.Now(), + ), + }, + }, + { + name: "process listening in a IP not present in the local IPs will generate metric anyway", + pid: 100, + listenPorts: map[uint32]interface{}{80: nil}, + tcp: map[uint32][]connInfo{ + 100: { + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 80, + state: linux.TCP_LISTEN, + }, + }, + }, + publicIPs: []net.IP{}, + privateIPs: []net.IP{}, + metrics: []telegraf.Metric{ + testutil.MustMetric( + metricNameTCPConnections, + map[string]string{}, + map[string]interface{}{ + tcpListenKey: "192.168.0.2:80", + }, + time.Now(), + ), + }, + }, + { + name: "process listening in a port not present in the listeners list will generate metric anyway", + pid: 100, + listenPorts: map[uint32]interface{}{}, + tcp: map[uint32][]connInfo{ + 100: { + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 80, + state: linux.TCP_LISTEN, + }, + }, + }, + publicIPs: []net.IP{}, + privateIPs: []net.IP{}, + metrics: []telegraf.Metric{ + testutil.MustMetric( + metricNameTCPConnections, + map[string]string{}, + map[string]interface{}{ + tcpListenKey: "192.168.0.2:80", + }, + time.Now(), + ), + }, + }, + { + name: "IPv6 listener", + pid: 100, + listenPorts: map[uint32]interface{}{80: nil}, + tcp: map[uint32][]connInfo{ + 100: { + { + srcIP: net.ParseIP("dead::beef"), + srcPort: 80, + state: linux.TCP_LISTEN, + }, + }, + }, + publicIPs: []net.IP{}, + privateIPs: []net.IP{}, + metrics: []telegraf.Metric{ + testutil.MustMetric( + metricNameTCPConnections, + map[string]string{}, + map[string]interface{}{ + tcpListenKey: "[dead::beef]:80", + }, + time.Now(), + ), + }, + }, + { + name: "private IPv4 listener do not generate metrics", + pid: 100, + listenPorts: map[uint32]interface{}{ + 80: nil, + }, + tcp: map[uint32][]connInfo{ + 100: { + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 80, + state: linux.TCP_LISTEN, + }, + }, + }, + publicIPs: []net.IP{}, + privateIPs: []net.IP{net.ParseIP("192.168.0.2")}, + metrics: []telegraf.Metric{}, + }, + { + name: "private IPv6 listener do not generate metrics", + pid: 100, + listenPorts: map[uint32]interface{}{80: nil}, + tcp: map[uint32][]connInfo{ + 100: { + { + srcIP: net.ParseIP("dead::beef"), + srcPort: 80, + state: linux.TCP_LISTEN, + }, + }, + }, + publicIPs: []net.IP{}, + privateIPs: []net.IP{net.ParseIP("dead::beef")}, + metrics: []telegraf.Metric{}, + }, + { + name: "0.0.0.0 listener listen in all public IPv4s", + pid: 100, + listenPorts: map[uint32]interface{}{80: nil}, + tcp: map[uint32][]connInfo{ + 100: { + { + srcIP: net.ParseIP("0.0.0.0"), + srcPort: 80, + state: linux.TCP_LISTEN, + }, + }, + }, + publicIPs: []net.IP{net.ParseIP("192.168.0.2"), net.ParseIP("10.10.0.2"), net.ParseIP("dead::beef")}, + privateIPs: []net.IP{}, + metrics: []telegraf.Metric{ + testutil.MustMetric( + metricNameTCPConnections, + map[string]string{}, + map[string]interface{}{ + tcpListenKey: "10.10.0.2:80,192.168.0.2:80", + }, + time.Now(), + ), + }, + }, + { + name: ":: listener listen in all public IPv4 and IPv6s", + pid: 100, + listenPorts: map[uint32]interface{}{80: nil}, + tcp: map[uint32][]connInfo{ + 100: { + { + srcIP: net.ParseIP("::"), + srcPort: 80, + state: linux.TCP_LISTEN, + }, + }, + }, + publicIPs: []net.IP{net.ParseIP("192.168.0.2"), net.ParseIP("10.10.0.2"), net.ParseIP("dead::beef")}, + privateIPs: []net.IP{}, + metrics: []telegraf.Metric{ + testutil.MustMetric( + metricNameTCPConnections, + map[string]string{}, + map[string]interface{}{ + tcpListenKey: "10.10.0.2:80,192.168.0.2:80,[dead::beef]:80", + }, + time.Now(), + ), + }, + }, + { + name: "ignore listeners in loopback IPs", + pid: 100, + listenPorts: map[uint32]interface{}{80: nil}, + tcp: map[uint32][]connInfo{ + 100: { + { + srcIP: net.ParseIP("127.0.0.1"), + srcPort: 80, + state: linux.TCP_LISTEN, + }, + }, + }, + publicIPs: []net.IP{}, + privateIPs: []net.IP{}, + metrics: []telegraf.Metric{}, + }, + { + name: "ignore connections from external hosts to local listeners", + pid: 100, + listenPorts: map[uint32]interface{}{80: nil}, + tcp: map[uint32][]connInfo{ + 100: { + { + srcIP: net.ParseIP("127.0.0.1"), + srcPort: 80, + dstIP: net.ParseIP("54.89.89.54"), + dstPort: 30123, + state: linux.TCP_ESTABLISHED, + }, + }, + }, + publicIPs: []net.IP{}, + privateIPs: []net.IP{}, + metrics: []telegraf.Metric{}, + }, + { + name: "ignore connections from internal procs to other internal procs using the public IPs", + pid: 100, + listenPorts: map[uint32]interface{}{80: nil}, + tcp: map[uint32][]connInfo{ + 100: { + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 30000, + dstIP: net.ParseIP("192.168.0.2"), + dstPort: 80, + state: linux.TCP_ESTABLISHED, + }, + }, + }, + publicIPs: []net.IP{}, + privateIPs: []net.IP{net.ParseIP("192.168.0.2")}, + metrics: []telegraf.Metric{}, + }, + { // We are testing how behaves addConnectionEndpoints it if received a "pid not found" kind of error + name: "proc without network info does not generates an error, nor metrics", + pid: 100, + listenPorts: map[uint32]interface{}{}, + tcp: map[uint32][]connInfo{}, + publicIPs: []net.IP{}, + privateIPs: []net.IP{}, + metrics: []telegraf.Metric{}, + }, + { + name: "process listening in two differents ports using :: with differents public IPs", + }, + { // same schema valid for: apache httpd, php-fpm + name: "service with a parent process and several child, only the parent should report the listeners, parent case (nginx style)", + pid: 101, // parent + listenPorts: map[uint32]interface{}{ + 80: nil, + }, + tcp: map[uint32][]connInfo{ + 100: { // parent + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 80, + state: linux.TCP_LISTEN, + }, + }, + 101: { // child + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 80, + state: linux.TCP_LISTEN, + }, + }, + }, + publicIPs: []net.IP{}, + privateIPs: []net.IP{}, + metrics: []telegraf.Metric{ + testutil.MustMetric( + metricNameTCPConnections, + map[string]string{}, + map[string]interface{}{ + tcpListenKey: "192.168.0.2:80", + }, + time.Now(), + ), + }, + }, + { + name: "service with a parent process and several child, only the parent should report the listeners, child case (nginx style)", + pid: 101, // child + ppid: 100, + listenPorts: map[uint32]interface{}{ + 80: nil, + }, + tcp: map[uint32][]connInfo{ + 100: { // parent + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 80, + state: linux.TCP_LISTEN, + }, + }, + 101: { // child + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 80, + state: linux.TCP_LISTEN, + }, + }, + }, + publicIPs: []net.IP{}, + privateIPs: []net.IP{}, + metrics: []telegraf.Metric{}, + }, + { + name: "child process listening in parent process plus other port, generate metric with the extra listener", + pid: 101, // child + ppid: 100, + listenPorts: map[uint32]interface{}{ + 80: nil, + 443: nil, + }, + tcp: map[uint32][]connInfo{ + 100: { // parent + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 80, + state: linux.TCP_LISTEN, + }, + }, + 101: { // child + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 80, + state: linux.TCP_LISTEN, + }, + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 443, + state: linux.TCP_LISTEN, + }, + }, + }, + publicIPs: []net.IP{}, + privateIPs: []net.IP{}, + metrics: []telegraf.Metric{ + testutil.MustMetric( + metricNameTCPConnections, + map[string]string{}, + map[string]interface{}{ + tcpListenKey: "192.168.0.2:443", + }, + time.Now(), + ), + }, + }, + { + name: "process listening in 0.0.0.0 and also in some IPv4 address, avoid duplication", + pid: 100, + listenPorts: map[uint32]interface{}{80: nil}, + tcp: map[uint32][]connInfo{ + 100: { + { + srcIP: net.ParseIP("0.0.0.0"), + srcPort: 80, + state: linux.TCP_LISTEN, + }, + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 80, + state: linux.TCP_LISTEN, + }, + { + srcIP: net.ParseIP("172.17.0.1"), + srcPort: 80, + state: linux.TCP_LISTEN, + }, + }, + }, + publicIPs: []net.IP{net.ParseIP("192.168.0.2"), net.ParseIP("10.10.0.2"), net.ParseIP("dead::beef")}, + privateIPs: []net.IP{}, + metrics: []telegraf.Metric{ + testutil.MustMetric( + metricNameTCPConnections, + map[string]string{}, + map[string]interface{}{ + tcpListenKey: "10.10.0.2:80,172.17.0.1:80,192.168.0.2:80", + }, + time.Now(), + ), + }, + }, + { + name: "avoid duplication in outboun connections", + pid: 100, + listenPorts: map[uint32]interface{}{}, + tcp: map[uint32][]connInfo{ + 100: { + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 34567, + dstIP: net.ParseIP("1.1.1.1"), + dstPort: 80, + state: linux.TCP_ESTABLISHED, + }, + { + srcIP: net.ParseIP("192.168.0.2"), + srcPort: 34568, + dstIP: net.ParseIP("1.1.1.1"), + dstPort: 80, + state: linux.TCP_ESTABLISHED, + }, + }, + }, + publicIPs: []net.IP{}, + privateIPs: []net.IP{}, + metrics: []telegraf.Metric{ + testutil.MustMetric( + metricNameTCPConnections, + map[string]string{}, + map[string]interface{}{ + tcpConnectionKey: "1.1.1.1:80", + }, + time.Now(), + ), + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + var acc testutil.Accumulator + + proc := &testProc{ + pid: test.pid, + ppid: test.ppid, + } + + netInfo := networkInfo{ + tcp: test.tcp, + listenPorts: test.listenPorts, + publicIPs: test.publicIPs, + privateIPs: test.privateIPs, + } + + err := addConnectionEndpoints(&acc, proc, netInfo) + if err != nil { + assert.EqualError(t, err, test.err) + } + + // Function has generated the same number of metrics defined in the test + assert.Len(t, acc.GetTelegrafMetrics(), len(test.metrics)) + + for _, m := range test.metrics { + for _, value := range m.FieldList() { + assert.Truef( + t, + acc.HasPoint(m.Name(), m.Tags(), value.Key, value.Value), + "Missing point: %s,%v %s=%s\nMetrics: %v", + m.Name(), + m.Tags(), + value.Key, + value.Value, + acc.GetTelegrafMetrics(), + ) + } + } + }) + } +} diff --git a/plugins/inputs/procstat/procstat_test.go b/plugins/inputs/procstat/procstat_test.go index 2dda8939e0126..f176ec228651f 100644 --- a/plugins/inputs/procstat/procstat_test.go +++ b/plugins/inputs/procstat/procstat_test.go @@ -97,6 +97,7 @@ func (pg *testPgrep) FullPattern(_ string) ([]PID, error) { type testProc struct { pid PID + ppid int32 tags map[string]string } @@ -168,7 +169,7 @@ func (p *testProc) RlimitUsage(_ bool) ([]process.RlimitStat, error) { } func (p *testProc) Ppid() (int32, error) { - return 0, nil + return p.ppid, nil } func (p *testProc) Status() ([]string, error) { @@ -253,7 +254,7 @@ func TestGather_PidTag(t *testing.T) { Exe: exe, PidTag: true, createPIDFinder: pidFinder([]PID{pid}), - MetricsInclude: []string{MetricsThreads}, + MetricsInclude: []string{metricsThreads}, createProcess: newTestProc, } require.NoError(t, acc.GatherError(p.Gather)) @@ -268,7 +269,7 @@ func TestGather_Prefix(t *testing.T) { Exe: exe, Prefix: "custom_prefix", createPIDFinder: pidFinder([]PID{pid}), - MetricsInclude: []string{MetricsFDs}, + MetricsInclude: []string{metricsFDs}, createProcess: newTestProc, } require.NoError(t, acc.GatherError(p.Gather)) @@ -348,7 +349,7 @@ func TestGather_PercentFirstPass(t *testing.T) { Pattern: "foo", PidTag: true, createPIDFinder: pidFinder([]PID{pid}), - MetricsInclude: []string{MetricsCPU, MetricsCPUPercent}, + MetricsInclude: []string{metricsCPU, metricsCPUPercent}, createProcess: NewProc, } require.NoError(t, acc.GatherError(p.Gather)) @@ -365,7 +366,7 @@ func TestGather_PercentSecondPass(t *testing.T) { Pattern: "foo", PidTag: true, createPIDFinder: pidFinder([]PID{pid}), - MetricsInclude: []string{MetricsCPU, MetricsCPUPercent}, + MetricsInclude: []string{metricsCPU, metricsCPUPercent}, createProcess: NewProc, } require.NoError(t, acc.GatherError(p.Gather)) @@ -445,13 +446,13 @@ func TestGather_SameTimestamps(t *testing.T) { func TestMetricEnabled(t *testing.T) { p := Procstat{ - MetricsInclude: []string{MetricsCPU, MetricsMemory, MetricsIO}, + MetricsInclude: []string{metricsCPU, metricsMemory, metricsIO}, } - require.True(t, p.metricEnabled(MetricsCPU)) - require.True(t, p.metricEnabled(MetricsMemory)) - require.True(t, p.metricEnabled(MetricsIO)) - require.False(t, p.metricEnabled(MetricsContextSwitches)) + require.True(t, p.metricEnabled(metricsCPU)) + require.True(t, p.metricEnabled(metricsMemory)) + require.True(t, p.metricEnabled(metricsIO)) + require.False(t, p.metricEnabled(metricsContextSwitches)) } // TestGatherFilter tests that the metricEnabled filter works. @@ -463,7 +464,7 @@ func TestGatherFilter(t *testing.T) { Pattern: "foo", PidTag: true, createPIDFinder: pidFinder([]PID{pid}), - MetricsInclude: []string{MetricsCPU, MetricsMemory}, + MetricsInclude: []string{metricsCPU, metricsMemory}, createProcess: NewProc, } require.NoError(t, acc.GatherError(p.Gather)) @@ -473,3 +474,35 @@ func TestGatherFilter(t *testing.T) { require.False(t, acc.HasUIntField("procstat", "memory_usage")) require.False(t, acc.HasUIntField("procstat", "rlimit_num_fds_hard")) } + +func BenchmarkDefaultCollectionPlusEndpoints(b *testing.B) { + var acc testutil.Accumulator + pattern := "." + + p := Procstat{ + Pattern: pattern, + CmdLineTag: true, + MetricsInclude: append(defaultCollection, metricsConnectionsEndpoints), + } + + for i := 0; i < b.N; i++ { + err := acc.GatherError(p.Gather) + require.NoError(b, err) + } +} + +func BenchmarkConnEndpointsOnly(b *testing.B) { + var acc testutil.Accumulator + pattern := "." + + p := Procstat{ + Pattern: pattern, + CmdLineTag: true, + MetricsInclude: []string{metricsConnectionsEndpoints}, + } + + for i := 0; i < b.N; i++ { + err := acc.GatherError(p.Gather) + require.NoError(b, err) + } +} diff --git a/plugins/inputs/procstat/sample.conf b/plugins/inputs/procstat/sample.conf index ee3e0af295c99..12357560ca880 100644 --- a/plugins/inputs/procstat/sample.conf +++ b/plugins/inputs/procstat/sample.conf @@ -56,6 +56,8 @@ ## - "mem": to enable collection of memory percentage used ## - "mem_percent": to enable collection of procs' limits ## - "limits": to enable collection of procs' limits + ## - "tcp_stats": tcp_* and upd_socket metrics + ## - "connections_endpoints": new metric procstat_tcp with connections and listeners endpoints ## Default value: # metrics_include = [ # "threads",