<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
Zabbix 是一款強大的開源網管監控工具,該工具的使用者端與伺服器端是分開的,我們可以直接使用自帶的zabbix_get
命令來實現拉取使用者端上的各種資料,在本地組裝引數並使用Popen開子執行緒執行該命令,即可實現批次監測。
封裝Engine類: 該類的主要封裝了Zabbix介面的呼叫,包括最基本的引數收集.
import subprocess,datetime,time,math class Engine(): def __init__(self,address,port): self.address = address self.port = port def GetValue(self,key): try: command = "get.exe -s {0} -p {1} -k {2}".format(self.address,self.port,key).split(" ") start = datetime.datetime.now() process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) while process.poll() is None: time.sleep(1) now = datetime.datetime.now() if (now - start).seconds > 2: return 0 return str(process.stdout.readlines()[0].split()[0],"utf-8") except Exception: return 0 # ping檢測 def GetPing(self): ref_dict = {"Address":0,"Ping":0} ref_dict["Address"] = self.address ref_dict["Ping"] = self.GetValue("agent.ping") if ref_dict["Ping"] == "1": return ref_dict else: ref_dict["Ping"] = "0" return ref_dict return ref_dict # 獲取主機組基本資訊 def GetSystem(self): ref_dict = { "Address" : 0 ,"HostName" : 0,"Uname":0 } ref_dict["Address"] = self.address ref_dict["HostName"] = self.GetValue("system.hostname") ref_dict["Uname"] = self.GetValue("system.uname") return ref_dict # 獲取CPU利用率 def GetCPU(self): ref_dict = { "Address": 0 ,"Core": 0,"Active":0 , "Avg1": 0 ,"Avg5":0 , "Avg15":0 } ref_dict["Address"] = self.address ref_dict["Core"] = self.GetValue("system.cpu.num") ref_dict["Active"] = math.ceil(float(self.GetValue("system.cpu.util"))) ref_dict["Avg1"] = self.GetValue("system.cpu.load[,avg1]") ref_dict["Avg5"] = self.GetValue("system.cpu.load[,avg5]") ref_dict["Avg15"] = self.GetValue("system.cpu.load[,avg15]") return ref_dict # 獲取記憶體利用率 def GetMemory(self): ref_dict = { "Address":"0","Total":"0","Free":0,"Percentage":"0" } ref_dict["Address"] = self.address fps = self.GetPing() if fps['Ping'] != "0": ref_dict["Total"] = self.GetValue("vm.memory.size[total]") ref_dict["Free"] = self.GetValue("vm.memory.size[free]") # 計算百分比: percentage = 100 - int(Free/int(Total/100)) ref_dict["Percentage"] = str( 100 - int( int(ref_dict.get("Free")) / (int(ref_dict.get("Total"))/100)) ) + "%" return ref_dict else: return ref_dict # 獲取磁碟資料 def GetDisk(self): ref_list = [] fps = self.GetPing() if fps['Ping'] != "0": disk_ = eval( self.GetValue("vfs.fs.discovery")) for x in range(len(disk_)): dict_ = {"Address": 0, "Name": 0, "Type": 0, "Free": 0} dict_["Address"] = self.address dict_["Name"] = disk_[x].get("{#FSNAME}") dict_["Type"] = disk_[x].get("{#FSTYPE}") if dict_["Type"] != "UNKNOWN": pfree = self.GetValue("vfs.fs.size["{0}",pfree]".format(dict_["Name"])) dict_["Free"] = str(math.ceil(float(pfree))) else: dict_["Free"] = -1 ref_list.append(dict_) return ref_list return ref_list # 獲取程序狀態 def GetProcessStatus(self,process_name): fps = self.GetPing() dict_ = {"Address": '0', "ProcessName": '0', "ProcessCount": '0', "Status": '0'} if fps['Ping'] != "0": proc_id = self.GetValue("proc.num["{}"]".format(process_name)) dict_['Address'] = self.address dict_['ProcessName'] = process_name if proc_id != "0": dict_['ProcessCount'] = proc_id dict_['Status'] = "True" else: dict_['Status'] = "False" return dict_ return dict_ # 獲取埠開放狀態 def GetNetworkPort(self,port): dict_ = {"Address": '0', "Status": 'False'} dict_['Address'] = self.address fps = self.GetPing() if fps['Ping'] != "0": port_ = self.GetValue("net.tcp.listen[{}]".format(port)) if port_ == "1": dict_['Status'] = "True" else: dict_['Status'] = "False" return dict_ return dict_ # 檢測Web伺服器狀態 通過本地地址:埠 => 檢測目標地址:埠 def CheckWebServerStatus(self,check_addr,check_port): dict_ = {"local_address": "0", "remote_address": "0", "remote_port": "0", "Status":"False"} fps = self.GetPing() dict_['local_address'] = self.address dict_['remote_address'] = check_addr dict_['remote_port'] = check_port if fps['Ping'] != "0": check_ = self.GetValue("net.tcp.port["{}","{}"]".format(check_addr,check_port)) if check_ == "1": dict_['Status'] = "True" else: dict_['Status'] = "False" return dict_ return dict_
當我們需要使用時,只需要定義變數呼叫即可,其呼叫程式碼如下。
from engine import Engine if __name__ == "__main__": ptr_windows = Engine("127.0.0.1","10050") ret = ptr_windows.GetDisk() if len(ret) != 0: for item in ret: addr = item.get("Address") name = item.get("Name") type = item.get("Type") space = item.get("Free") if type != "UNKNOWN" and space != -1: print("地址: {} --> 碟符: {} --> 格式: {} --> 剩餘空間: {}".format(addr,name,type,space))
到此這篇關於Python封裝zabbix-get介面的程式碼分享的文章就介紹到這了,更多相關Python封裝zabbix-get介面內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45