Python小程序(一):监测设备并发送告警
编写一个程序,实现如下目的:检测IP设备运行状态,如果出现异常则通过工具(企业微信)发送相关告警直至恢复;
main.py:通过读取设备列表文件(ip_list.xlsx)并获取相关内容。通过datetime模块进行循环实现实时监控。
1 #! python
2 # -*- coding:utf-8 -*-
3 # 时间:20220812
4 # 作者:ColoFly
5 # 转载请注明出处及作者
6
7 import openpyxl
8 from openpyxl.utils import get_column_letter
9 from IPy import IP
10 import os
11 import time,datetime
12 from send_text import work_wecat
13
14 End_time = datetime.datetime(2033, 12, 31, 23, 59, 59)
15
16 def Myping(host):
17 check = os.system('ping -n 1 ' + host + '>> nul')
18 if check == 0:
19 Ip_active_list.update({k: v})
20 print("Communication with this %s device is normal." % k)
21 return "It's IP: %s" % v
22 else:
23 Ip_error_list.update({k: v})
24 print("Unable to communicate with this %s device. There may be a warning! !" % k)
25 return "It's IP: %s" % v
26
27 while datetime.datetime.now() < End_time:
28 wb = openpyxl.load_workbook('ip_list.xlsx')
29 Ip_sheet = wb.active
30
31 Max_Row = Ip_sheet.max_row + 1
32 Max_Col = Ip_sheet.max_column + 1
33
34 Ip_error_list = {}
35 Ip_list = {}
36 Ip_active_list = {}
37
38 for Row in range(2, Max_Row):
39 Name_Col = 1
40 Col_let = get_column_letter(Name_Col)
41 Device_Name = Ip_sheet[Col_let + str(Row)].value
42 Ip_Col = 2
43 Col_let = get_column_letter(Ip_Col)
44 Device_Ip = Ip_sheet[Col_let + str(Row)].value
45 Ip_list.update({Device_Name: Device_Ip})
46
47 print("/nDetecting, please wait >>>")
48 for k,v in Ip_list.items():
49 try:
50 if IP(v).version():
51 #print(v)
52 print(Myping(v))
53 except:
54 print("The IP address of this %s device is not compliant." %k)
55 print("It's IP: %s" %v)
56 work_wecat(Ip_error_list)
57 Ip_list.clear()
58 Ip_error_list.clear()
59 time.sleep(20)
send_text.py:代码用于实现企业微信小程序推送相关数据
1 import requests
2 import json
3 import faker
4 import config
5 fk = faker.Faker()
6
7 def work_wecat(Ip_error_list):
8 url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={config.access_token}&random=69152"
9 for k, v in Ip_error_list.items():
10 payload = json.dumps({
11 "touser": config.touser,
12 "toparty": config.toparty,
13 "totag": config.totag,
14 "msgtype": "text",
15 "agentid": config.agentid_ceshi1,
16 "text": {
17 "content": "This %s device is alerting. /n Please deal with the fault in time" % k
18 },
19 "safe": 0,
20 "enable_id_trans": 0,
21 "enable_duplicate_check": 0
22 })
23 headers = {
24 'User-Agent': fk.user_agent(),
25 'Content-Type': 'application/json'
26 }
27
28 response = requests.request("POST", url, headers=headers, data=payload)
29
30 print(response.text)
config.py:相关参数文件
1 access_token='XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX' 2 touser="test" 3 toparty="1" 4 totag="1" 5 agentid_ceshi1=1000002 6 type_image = "image" 7 secretid = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" 8 cropid = "XXXXXXXXXXXXXX"
备注:send_text.py、config.py相关代码均来自我的口袋有三十三(https://blog.csdn.net/xkukeer/article/details/124124245),膜拜大佬。
实现效果截图:

本人Python小白,欢迎大佬指正。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/python/280184.html