Python小程序(一):监测设备并发送告警
编写一个程序,实现如下目的:检测IP设备运行状态,如果出现异常则通过工具(企业微信)发送相关告警直至恢复;
main.py:通过读取设备列表文件(ip_list.xlsx)并获取相关内容。通过datetime模块进行循环实现实时监控。
1 #! python 2 # -*- coding:utf-8 -*- 3 # 时间:20220812 4 # 作者:ColoFly 5 # 转载请注明出处及作者 6 7 import openpyxl 8 from openpyxl.utils import get_column_letter 9 from IPy import IP 10 import os 11 import time,datetime 12 from send_text import work_wecat 13 14 End_time = datetime.datetime(2033, 12, 31, 23, 59, 59) 15 16 def Myping(host): 17 check = os.system('ping -n 1 ' + host + '>> nul') 18 if check == 0: 19 Ip_active_list.update({k: v}) 20 print("Communication with this %s device is normal." % k) 21 return "It's IP: %s" % v 22 else: 23 Ip_error_list.update({k: v}) 24 print("Unable to communicate with this %s device. There may be a warning! !" % k) 25 return "It's IP: %s" % v 26 27 while datetime.datetime.now() < End_time: 28 wb = openpyxl.load_workbook('ip_list.xlsx') 29 Ip_sheet = wb.active 30 31 Max_Row = Ip_sheet.max_row + 1 32 Max_Col = Ip_sheet.max_column + 1 33 34 Ip_error_list = {} 35 Ip_list = {} 36 Ip_active_list = {} 37 38 for Row in range(2, Max_Row): 39 Name_Col = 1 40 Col_let = get_column_letter(Name_Col) 41 Device_Name = Ip_sheet[Col_let + str(Row)].value 42 Ip_Col = 2 43 Col_let = get_column_letter(Ip_Col) 44 Device_Ip = Ip_sheet[Col_let + str(Row)].value 45 Ip_list.update({Device_Name: Device_Ip}) 46 47 print("/nDetecting, please wait >>>") 48 for k,v in Ip_list.items(): 49 try: 50 if IP(v).version(): 51 #print(v) 52 print(Myping(v)) 53 except: 54 print("The IP address of this %s device is not compliant." %k) 55 print("It's IP: %s" %v) 56 work_wecat(Ip_error_list) 57 Ip_list.clear() 58 Ip_error_list.clear() 59 time.sleep(20)
send_text.py:代码用于实现企业微信小程序推送相关数据
1 import requests 2 import json 3 import faker 4 import config 5 fk = faker.Faker() 6 7 def work_wecat(Ip_error_list): 8 url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={config.access_token}&random=69152" 9 for k, v in Ip_error_list.items(): 10 payload = json.dumps({ 11 "touser": config.touser, 12 "toparty": config.toparty, 13 "totag": config.totag, 14 "msgtype": "text", 15 "agentid": config.agentid_ceshi1, 16 "text": { 17 "content": "This %s device is alerting. /n Please deal with the fault in time" % k 18 }, 19 "safe": 0, 20 "enable_id_trans": 0, 21 "enable_duplicate_check": 0 22 }) 23 headers = { 24 'User-Agent': fk.user_agent(), 25 'Content-Type': 'application/json' 26 } 27 28 response = requests.request("POST", url, headers=headers, data=payload) 29 30 print(response.text)
config.py:相关参数文件
1 access_token='XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX' 2 touser="test" 3 toparty="1" 4 totag="1" 5 agentid_ceshi1=1000002 6 type_image = "image" 7 secretid = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" 8 cropid = "XXXXXXXXXXXXXX"
备注:send_text.py、config.py相关代码均来自我的口袋有三十三(https://blog.csdn.net/xkukeer/article/details/124124245),膜拜大佬。
实现效果截图:
本人Python小白,欢迎大佬指正。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/280184.html