百木园-与人分享,
就是让自己快乐。

zabbix 线路质量监控自定义python模块,集成ICMP/TCP/UDP探测,批量监控线路质量自定义阈值联动mtr保存线路故障日志并发送至noc邮箱

互联网故障一般表现为丢包和时延增大,持续性故障不难排查,难的是间歇性或凌晨故障,后者往往来不及等我们测试就已经恢复正常,得不到异常时的mtr无法判断故障点在哪里

故此有了根据丢包率和时延变换联动mtr的需求

前段时间使用Mysql实现了这个功能,缺点是占用太多系统资源,且脚本繁重,优点是数据可复用,做多种形式的展示

后续使用socket+deque实现低能耗与轻量,也可用通过开放互联网API来做分布式监控,缺点是历史数据不留存,用完即丢

系统环境

  Ubuntu 18.04.5 LTS+Python 3.6.9 

python库

  自带基本库,考虑到系统权限问题没有使用第三方库

ip查询

  http://ip-api.com,免费版,限制频率45次/分钟,国外归属地准确率较高,国内查询一塌糊涂,国内推荐使用ipip

1 #!/usr/bin/env python3
2 #-*-coding:utf-8-*-
3 from collections import deque
4 import itertools,time
5 import queue,json
6 import argparse,sys,re,os,subprocess
7 import time,socket,random,string
8 import threading
9 from functools import reduce
10 import logging
11
12 ipqli=deque()
13 filename = os.path.realpath(sys.argv[0])
14 def logger():
15 dir = os.path.dirname(os.path.realpath(sys.argv[0]))
16 log_name = dir+\'/log\'
17 logger = logging.getLogger()
18 fh = logging.FileHandler(log_name)
19 formater = logging.Formatter(\"%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s\")
20 fh.setFormatter(formater)
21 logger.setLevel(logging.DEBUG)
22 logger.addHandler(fh)
23 return logger
24 log = logger()
25 #ping程序,避免系统权限问题未使用ping3
26 class Ping:
27 def __init__(self,ip,count=20,udp_length=64):
28 ip = tuple(ip)
29 self.sip,self.tip,self.type,self.port,self.inver=ip
30 self.type = self.type.lower()
31 self.port = int(self.port)
32 self.count=count
33 self.inver = float(self.inver)
34 self.udp_length=udp_length
35 restime_name = \'restime_deque\'+\'\'.join(ip).replace(\'.\',\'\')
36 pkloss_name = \'pkloss_deque\'+\'\'.join(ip).replace(\'.\',\'\')
37 ipqevent = \'event\'+\'\'.join(ip).replace(\'.\',\'\')
38 locals()[restime_name] = deque(maxlen=60)
39 locals()[pkloss_name] = deque(maxlen=60)
40 self.restime_deque = locals()[restime_name]
41 self.pkloss_deque = locals()[pkloss_name]
42 self.ret_restime_deque = globals()[restime_name]
43 self.ret_pkloss_deque = globals()[pkloss_name]
44 self.ipqevent = globals()[ipqevent]
45 self.compile= r\'(?<=time=)\\d+\\.?\\d+(?= ms)\'
46 def _tcp(self):
47 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
48 s.settimeout(1)
49 start_time = time.time()
50 res_count=0
51 try:
52 s.bind((self.sip,0))
53 s.connect((self.tip, self.port))
54 s.shutdown(socket.SHUT_RD)
55 value = (time.time() - start_time)*1000
56 self.restime_deque.append(value)
57 self.pkloss_deque.append(0)
58 res_count=1
59 except (socket.timeout,ConnectionError):
60 self.restime_deque.append(0)
61 self.pkloss_deque.append(1)
62 except OSError as e:
63 log.debug(e)
64 return 0,0
65 usetime = time.time()-start_time
66 sleep_time = self.inver - usetime if usetime<self.inver else self.inver
67 return sleep_time,res_count
68 def _udp(self):
69 res_count=0
70 s = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
71 s.settimeout(1)
72 start_time = time.time()
73 data=\'\'.join(random.choice(string.ascii_letters+ string.digits) for x in range(self.udp_length))
74 try:
75 s.sendto(data.encode(\'utf-8\'),(self.tip,self.port))
76 s.recv(1024)
77 value = (time.time() - start_time)*1000
78 79 self.restime_deque.append(value)
80 self.pkloss_deque.append(0)
81 res_count=1
82 except socket.timeout:
83 self.restime_deque.append(0)
84 self.pkloss_deque.append(1)
85 except OSError as e:
86 log.debug(e)
87 return 0,0
88 usetime = time.time()-start_time
89 sleep_time = self.inver - usetime if usetime<self.inver else self.inver
90 return sleep_time,res_count
91 def _icmp(self):
92 res_count=0
93 start_time = time.time()
94 cmd = \'ping -i %s -c 1 -W 1 -I %s %s\'%(self.inver,self.sip,self.tip)
95 ret = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode(\'utf8\')
96 try:
97 value=re.findall(self.compile, ret,re.S)[0]
98 self.restime_deque.append(value)
99 self.pkloss_deque.append(0)
100 res_count=1
101 except:
102 self.pkloss_deque.append(1)
103 self.restime_deque.append(0)
104 usetime = time.time()-start_time
105 sleep_time = self.inver - usetime if usetime<self.inver else self.inver
106 return sleep_time,res_count
107 def fastping(self):
108 getattr(self, \'_\'+self.type)()
109 def slow_ping(self):
110 index = 0
111 res_count=0
112 self.ipqevent.set()
113 while index<self.count:
114 sleep_time,count=getattr(self, \'_\'+self.type)()
115 index+=1
116 res_count+=count
117 if len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2 :
118 break
119 time.sleep(sleep_time)
120 return index,res_count
121 def ping_value(self):
122 start_time = time.time()
123 count = self.count
124 rescount = self.count
125 if len(self.ret_restime_deque)<2 or len(self.ret_pkloss_deque)<2:
126 fastli=[]
127 for x in range(self.count):
128 t = threading.Thread(target=self.fastping)
129 t.start()
130 fastli.append(t)
131 for th in fastli:
132 th.join()
133 else:
134 count,rescount = self.slow_ping()
135 rescount=count if rescount==0 else rescount
136 use_time = round(time.time()-start_time,4)
137 li = [self.restime_deque.pop() for x in range(count)]
138 pkli = [self.pkloss_deque.pop() for x in range(count)]
139 try:
140 restime = reduce(lambda x ,y :round(float(x)+float(y),2), li)/rescount if len(li) >1 else round(float(li[0]),2)
141 pkloss= reduce(lambda x ,y :int(x)+int(y), pkli)/count*100
142 return (round(restime,2),round(pkloss,2),use_time)
143 except Exception as e:
144 log.debug(e)
145 return 0,0,0
146 #server端代码
147 class Server():
148 def __init__(self,sock):
149 global ipqli
150 self.ipqli=ipqli
151 self.thli=[]
152 self.ipli = []
153 self.sock=sock
154 self.basedir = os.path.dirname(os.path.realpath(sys.argv[0]))
155 self.env = threading.Event()
156 @classmethod
157 def start(cls):
158 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
159 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
160 address = (\'127.0.0.1\',6590)
161 s.bind(address)
162 obj = cls(s)
163 ping_server=threading.Thread(target=obj.server)
164 ping_server.start()
165 obj.thli.append(ping_server)
166 create_t = threading.Thread(target=obj.create)
167 create_t.start()
168 obj.thli.append(create_t)
169 for t in obj.thli:
170 t.join()
171 def server(self):
172 while True:
173 try:
174 self.sock.listen(100)
175 conn,addr = self.sock.accept()
176 data=conn.recv(1024)
177 data = data.decode(\'utf-8\')
178 data = json.loads(data)
179 ip,item = data
180 restime_ipq = \'restime_deque\'+\'\'.join(ip).replace(\'.\',\'\')
181 pkloss_ipq = \'pkloss_deque\'+\'\'.join(ip).replace(\'.\',\'\')
182 ipqevent = \'event\'+\'\'.join(ip).replace(\'.\',\'\')
183 if ip not in self.ipli:
184 globals()[restime_ipq] = deque(maxlen=30)
185 globals()[pkloss_ipq] = deque(maxlen=30)
186 globals()[ipqevent] = threading.Event()
187 self.ipqli.append(ip)
188 self.ipli.append(ip)
189 log.debug(\'create ipdeque %s %s\'%(restime_ipq,pkloss_ipq))
190 self.env.set()
191 self.sendvalue(conn,ip,item)
192 conn.close()
193 except Exception as e:
194 log.debug(str(e))
195 conn.close()
196 def create(self):
197 while True:
198 self.env.wait()
199 try:
200 ip = self.ipqli.pop()
201 log.debug(\'create %s\'%ip)
202 t=threading.Thread(target=self.makevalue,args=(ip,))
203 t.start()
204 except Exception as a:
205 log.debug(str(a))
206 if not self.ipqli:
207 self.env.clear()
208
209 def makevalue(self,ip):
210 restime_name = \'restime_deque\'+\'\'.join(ip).replace(\'.\',\'\')
211 pkloss_name = \'pkloss_deque\'+\'\'.join(ip).replace(\'.\',\'\')
212 ipqevent_name = \'event\'+\'\'.join(ip).replace(\'.\',\'\')
213 restime_ipq = globals()[restime_name]
214 pkloss_ipq = globals()[pkloss_name]
215 ipqevent = globals()[ipqevent_name]
216 obj = Ping(ip)
217 while len(restime_ipq) < 30 or len(pkloss_ipq) <30:
218 restime,pkloss,use_time=obj.ping_value()
219 restime_ipq.append((restime,use_time))
220 pkloss_ipq.append((pkloss,use_time))
221 else:
222 del restime_ipq
223 del pkloss_ipq
224 del ipqevent
225 self.ipli.remove(ip)
226 log.debug(\'delete ipdeque %s %s\'%(restime_name,pkloss_name))
227 def sendvalue(self,conn,ip,item):
228 fromat_ip=\'\'.join(ip).replace(\'.\',\'\')
229 _,tip,*arg=ip
230 restime_name = \'restime_deque\'+fromat_ip
231 pkloss_name = \'pkloss_deque\'+fromat_ip
232 ipqevent_name = \'event\'+fromat_ip
233 restime_ipq = globals()[restime_name]
234 pkloss_ipq = globals()[pkloss_name]
235 ipqevent = globals()[ipqevent_name]
236 mtr_dir = self.basedir+\'/mtr_log/\'+tip+\'-\'+time.strftime(\'%Y-%m-%d\',time.localtime()) + \'.log\'
237 mtr_cmd = self.basedir + \'/mtr.py\'+\' \'+tip+\' \'+mtr_dir
238 if len(restime_ipq) < 2 and len(restime_ipq) <2:
239 ipqevent.clear()
240 try:
241 ipqevent.wait()
242 if item ==\'restime\':
243 ret,use_time = restime_ipq.pop()
244 hisret,_=restime_ipq[-1]
245 if ret - hisret >20:
246 subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
247 elif item ==\'pkloss\':
248 ret,use_time = pkloss_ipq.pop()
249 if 100> ret >20:
250 subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
251 except Exception as a:
252 ret = a
253 log.debug(str(ret))
254 conn.sendall(str(ret).encode())
255
256 #用户输入IP格式检查
257 class Ipcheck():
258 def __init__(self,sip,tip,item,ping_type,inver):
259 self.sip =sip
260 self.tip=tip
261 self.item=item
262 self.type = ping_type.lower()
263 self.inver=float(inver)
264 def check(self):
265 if self.item not in [\'restime\',\'pkloss\'] or self.type not in [\'icmp\',\'tcp\',\'udp\'] or self.inver<0.2:
266 return False
267 elif not self.checkipformat():
268 return False
269 else:
270 return True
271 def check_fun(self,ip):
272 return int(ip)<256
273 def checkipformat(self):
274 try:
275 tiplist = self.tip.split(\'.\')
276 tipformat = re.findall(r\'^\\d+\\.\\d+\\.\\d+\\.\\d+$\', self.tip)
277 if self.sip:
278 siplist = self.sip.split(\'.\')
279 sipformat = re.findall(r\'^\\d+\\.\\d+\\.\\d+\\.\\d+$\', self.sip)
280 else:
281 siplist=[1,1,1,1]
282 sipformat=True
283 if not tipformat or not sipformat:
284 raise
285 check_ipli = tiplist+siplist
286 return self.checkiplength(check_ipli)
287 except:
288 return False
289 def checkiplength(self,check_ipli):
290 if list(itertools.filterfalse(self.check_fun, check_ipli)):
291 return False
292 else:
293 return True
294 def run():
295
296 cmd = \'python3 %s -S server\'%filename
297 subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
298 #socket_client端,向server请求数据并返回给用户
299 def socket_client(ip,item):
300 try:
301 s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
302 s.settimeout(3)
303 s.connect((\'127.0.0.1\',6590))
304 data = [ip,item]
305 data = json.dumps(data)
306 s.sendall(data.encode())
307 ret = s.recv(1024)
308 s.close()
309 print(ret.decode())
310 except socket.timeout as t:
311 log.debug(str(t))
312 s.close()
313 sys.exit(0)
314 except Exception as e:
315 print(\'server will start\')
316 log.debug(str(e))
317 sys.exit(0)
318
319 if __name__ == \'__main__\':
320 parser = argparse.ArgumentParser(description=\'icmp for monitor\')
321 parser.add_argument(\'-S\',action = \'store\',dest=\'server\')
322 parser.add_argument(\'-t\',action = \'store\',dest=\'tip\')
323 parser.add_argument(\'-s\',action = \'store\',dest=\'sip\')
324 parser.add_argument(\'-I\',action=\'store\',dest=\'item\')
325 parser.add_argument(\'-i\',action=\'store\',dest=\'inver\',default=\'1\')
326 parser.add_argument(\'-T\',action=\'store\',dest=\'ping_type\',default=\'icmp\')
327 parser.add_argument(\'-p\',action=\'store\',dest=\'port\',default=\'0\')
328 args= parser.parse_args()
329 server_status_cmd = \"ps -ef | grep \'%s -S server\' | grep -v grep | cut -c 9-16\"%filename
330 server_status = subprocess.Popen(server_status_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0]
331 if not server_status:
332 run()
333 if args.server:
334 Server.start()
335 sys.exit(0)
336 try:
337 tip = socket.gethostbyname(args.tip)
338 sip = args.sip
339 item = args.item
340 ping_type = args.ping_type
341 port = args.port
342 inver=args.inver
343 ip=(sip,tip,ping_type,port,inver)
344 except:
345 print(\'format error\')
346 check = Ipcheck(sip, tip, item,ping_type,inver)
347 if not check.check():
348 print(\'\'\'---------------------------Options-----------------------------------
349 -s --source ip address
350 -t --destination ip address
351 -I --item(restime/pkloss)
352 -T --type(icmp/tcp/udp default icmp)
353 -p --port(default 0)
354 -i --inver(default 1/min 0.2)
355 ---------------------------Example-----------------------------------
356 ------pingd -s 10.0.3.108 -t 10.0.0.1 -I restime -i 1 -T tcp -p 80-------
357 \'\'\')
358 sys.exit(0)
359 socket_client(ip,item)

来源:https://www.cnblogs.com/darkchen/p/15524856.html
图文来源于网络,如有侵权请联系删除。

未经允许不得转载:百木园 » zabbix 线路质量监控自定义python模块,集成ICMP/TCP/UDP探测,批量监控线路质量自定义阈值联动mtr保存线路故障日志并发送至noc邮箱

相关推荐

  • 暂无文章