-
Notifications
You must be signed in to change notification settings - Fork 2
/
monitor.py
189 lines (156 loc) · 6.08 KB
/
monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import psutil
import common
import threading
import time
from collections import defaultdict
import datetime
import exporter
import platform
def showSystemInfo():
mem = psutil.virtual_memory()
swap = psutil.swap_memory()
print("======== CPU =======")
print("CPU 核心数: %d" % psutil.cpu_count(logical=False))
print("CPU 线程数: %d" % psutil.cpu_count())
print()
print("======== MEM =======")
print("总计内存 : %0.2f GB" % common.trans_B2GB(mem.total))
print("已用内存 : %0.2f GB" % common.trans_B2GB(mem.used))
print("可用内存 : %0.2f GB" % common.trans_B2GB(mem.available))
if platform.system() != "Windows":
print()
print("======== SWAP =======")
print("交换空间 : %0.2f GB" % common.trans_B2GB(swap.total))
print("已用空间 : %0.2f GB" % common.trans_B2GB(swap.used))
print("可用空间 : %0.2f GB" % common.trans_B2GB(swap.free))
class ProcessMonitor:
'''
进程监视器,根据进程名归并,即将子进程并入父进程计算
'''
def __init__(self, process_name: str) -> None:
self.__process_name = process_name
self.__pids = []
self.__is_running = False
self.__thread = None
self.__interval = 10000 # 默认10秒一次
self.__thread = threading.Thread(target=self.__monitorThread)
# 统计信息
self.__statistic = {}
self.__clearStatistic()
def start(self, interval):
# print("start")
self.__is_running = True
self.__interval = interval
self.__thread.start()
def stop(self):
self.__is_running = False
def wait(self):
self.__thread.join()
def setPidSet(self, pids: set):
self.__pids = pids
def getStatistic(self):
return self.__statistic
def __monitorThread(self):
process_map = {}
while(self.__is_running):
pid_num = 0
cpu_loaded = 0.0
mem = 0.0
io = 0.0
for pid in self.__pids:
# TODO 丑陋的代码优化
if pid not in process_map:
try:
process_map[pid] = psutil.Process(pid=pid)
process = process_map[pid]
if(process.name() != self.__process_name):
# 认为pid被复用了
continue
except psutil.NoSuchProcess as e:
continue
else:
process = process_map[pid]
# 合并统计各个信息
try:
cpu_loaded += process.cpu_percent()
mem += common.trans_B2MB(process.memory_info().rss)
io += common.trans_B2MB(process.io_counters().write_bytes)
pid_num += 1
except psutil.NoSuchProcess as e:
continue
if pid_num == 0:
# 所有都结束了
self.__statistic["END_TIME"] = datetime.datetime.now()
self.__pids.clear()
process_map.clear()
self.__recordStat(cpu_loaded, mem, io)
time.sleep(self.__interval/1000)
def __clearStatistic(self):
self.__statistic.clear()
self.__statistic["Time"] = []
self.__statistic["CPU"] = []
self.__statistic["MEM"] = []
self.__statistic["IO"] = []
self.__statistic["START_TIME"] = datetime.datetime.now()
self.__statistic["END_TIME"] = None
def __recordStat(self, cpu, mem, io):
self.__statistic["Time"].append(datetime.datetime.now())
self.__statistic["CPU"].append(cpu)
self.__statistic["MEM"].append(mem)
self.__statistic["IO"].append(io)
class SysMonitor:
def __init__(self) -> None:
self.__is_running = False
self.__interval = 1000
self.__thread = None
self.__process_map = defaultdict(ProcessMonitor) # 存放各种进程管理器
def start(self, interval, filter=set()):
self.__is_running = True
self.__interval = interval
self.__thread = threading.Thread(
target=self.__threadFunc, args=(set(filter),))
self.__thread.start()
def export(self, data_path):
import os
os.makedirs(data_path, exist_ok=True)
exporter.export(self.__process_map, data_path, self.__interval)
def stop(self):
self.__is_running = False
self.__thread.join()
def __threadFunc(self, filter):
'''
子线程函数,用于扫描系统所有的进程,并且归类。再将进程分发给子进程进行监听
'''
# 开启子进程
while(self.__is_running):
for process_name, ids in self.__getProcessesInfo().items():
if(len(filter) > 0 and process_name not in filter):
continue
self.__distributeProcessName(process_name, ids)
time.sleep(self.__interval/1000)
# 取消所有线程和子进程
for _, pm in self.__process_map.items():
pm.stop()
for _, pm in self.__process_map.items():
pm.wait()
def __getProcessesInfo(self):
process_info = defaultdict(set)
for pid in psutil.pids():
try:
process = psutil.Process(pid)
process_info[process.name()].add(pid)
except psutil.NoSuchProcess as e:
print("warn:", pid, "has been exit")
continue
return process_info
def __distributeProcessName(self, process_name: str, ids: set):
if process_name not in self.__process_map:
# 新的进程
self.__process_map[process_name] = ProcessMonitor(process_name)
self.__process_map.get(process_name).setPidSet(ids)
self.__process_map[process_name].start(self.__interval)
self.__process_map.get(process_name).setPidSet(ids)
def __processMonitorFunc(self):
while(self.__is_running):
pass
# 取消所有监听