安装wmi包
pip install wmi 说明:执行上面命令会自动安装依赖包pywin32
代码演示
import json as JSON import re import paramiko import suds from suds.wsse import * import wmi class WindowsDisListen(): """Windows系统""" def getSpaceInfo(self, P_IP, P_User, P_Pwd): # GB = 1024 ** 3 # total_b, used_b, free_b = shutil.disk_usage('C:') # 查看磁盘的使用情况 # print('总的磁盘空间: {:.2f}GB '.format(total_b / GB)) # print('已经使用的 : {:.2f}GB '.format(used_b / GB)) # print('未使用的 : {:.2f}GB '.format(free_b / GB)) IP, User, Pwd = P_IP, P_User, P_Pwd # 初始化返回数据格式 Ret = {"code": "0", "msg": "OK", "data": []} # 连接服务器 try: Win = wmi.WMI(computer=IP, user=User, password=Pwd) except Exception as e: Ret["code"] = "-1" Ret["msg"] = "目标服务器" + IP + "连接失败!" + str(e) return JSON.dumps(Ret, ensure_ascii=False) # 获取硬盘使用百分情况 GB = 1024 ** 3 Data = [] for Physical_Disk in Win.Win32_DiskDrive(): for Partition in Physical_Disk.associators("Win32_DiskDriveToDiskPartition"): for Logical_Disk in Partition.associators("Win32_LogicalDiskToPartition"): TmpDict = {} TmpDict["盘符"] = Logical_Disk.Caption TmpDict["总量"] = format(int(Logical_Disk.Size) / GB, '.2f') TmpDict["使用量"] = format((int(Logical_Disk.Size) - int(Logical_Disk.FreeSpace)) / GB, '.2f') TmpDict["空闲量"] = format(int(Logical_Disk.FreeSpace) / GB, '.2f') TmpDict["使用率"] = format(int(100.0 * (int(Logical_Disk.Size) - int(Logical_Disk.FreeSpace)) / int(Logical_Disk.Size)), '.2f') + "%" Data.append(TmpDict) Ret["data"] = Data return JSON.dumps(Ret, ensure_ascii=False) class Msg(): """短信接口""" @classmethod def sendListenMsg(cls, Contetn, Phone=""): """调用短信平台,发送短信""" Wsdl_Url = "http://100.169.130.50:57772/dthealth/web/wcw.soap.ShortMsg.cls?WSDL=1&CacheUserName=dhwebservice&CachePassword=dhwebservice&CacheNoRedirect=1" Soap = suds.Client(Wsdl_Url) # 设置Token安全登录验证 security = Security() token = UsernameToken('dhwebservice', 'dhwebservice') security.tokens.append(token) Soap.set_options(wsse=security) # 接收电话(多个手机号用","拼接) if Phone=="": Phone = "180***65,156***67,176***69" # 调用接口 Ret = Soap.service.sendMsg(Contetn, Phone) return Ret def runWindows(P_IP, P_User, P_Pwd): """执行Windows监测程序并发送预警短信""" IP, User, Pwd = P_IP, P_User, P_Pwd WinDis = WindowsDisListen() DataList = JSON.loads(WinDis.getSpaceInfo(IP, User, Pwd)) NeedSendCont = [] for Item in DataList["data"]: for Name, Value in Item.items(): if Name == "使用率" and float(Value.strip("%")) >= 90: NeedSendCont.append("【" + Item["盘符"] + "盘】空间使用率为【" + Item["使用率"] + "】,空间不足。") # 短息内容 MsgContent = "服务器监测预警:服务器" + IP + "".join(NeedSendCont) + "请及时报给信息科处理!" # 发短信 MsgRet = "" if len(NeedSendCont): MsgRet = Msg.sendListenMsg(MsgContent) if MsgRet == "1": MsgRet = "发送短信成功。" else: MsgRet = "未发送短信,或发送短信失败。" print(MsgRet + "短信内容为:" + MsgContent) if __name__ == '__main__': # 程序入口 runWindows("100.169.130.110", "Hisweb110", "Hisweb#2019#11") runWindows("100.169.130.120", "Hisweb120", "Hisweb#2019#12")
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/283087.html