首頁 > 軟體

Python利用socket實現多程序的埠掃描器

2022-12-07 14:00:44

作為開發人員經常需要檢視服務的埠開啟狀態判斷服務是否宕機。

特別是部署的服務比較多的情況下,可能存在幾個甚至幾十個伺服器埠的佔用,於是我利用socket不斷向服務傳送請求的方式來判斷埠服務是否已經完成開啟。

其中加入多程序的呼叫方式來提高階口掃描的效率,供大家參考!

首先,我們將需要的python模組全部匯入到我們的程式碼塊中,若是沒有安裝的模組使用pip的當時安裝一下即可。

# Importing the socket module.
import socket

# Importing the datetime module from the datetime package.
from datetime import datetime

# It's a shortcut for `from multiprocessing import Pool`
from multiprocessing.dummy import Pool

# It's a shortcut for `from loguru import logger`
from loguru import logger

然後,建立一個埠掃描類PortScanner來完成對整個業務邏輯的處理,另外,封裝到類中也便於大家參考和修改。

class PortScanner:
    def __init__(self):
        """
        A constructor. It is called when an object is created from a class and it allows the class to initialize the
        attributes of a class.
        """
        super(PortScanner, self).__init__()
        self.remote_ip = None
        self.ports = []

    def scanner(self, port):
        """
        It scans the port.

        :param port: The port you want to scan
        """
        try:
            socket_ = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            result_ = socket_.connect_ex((self.remote_ip, port))
            if result_ == 0:
                logger.info('地址:{} 埠:{} 已成功開啟!'.format(self.remote_ip, port))
            else:
                logger.info('地址:{} 埠:{} 未開啟!'.format(self.remote_ip, port))
        except Exception as e:
            logger.error('埠掃描出現異常!')
        finally:
            socket_.close()

    def start(self):
        """
        It starts the game.
        """
        remote_server = input("輸入要掃描的主機地址(127.0.0.1):")
        if remote_server.strip() == '':
            remote_server = '127.0.0.1'
        self.remote_ip = socket.gethostbyname(remote_server)
        port_range = input("輸入要掃描的埠範圍(1,50000):")
        scanner_ports = []
        if port_range.strip() == '':
            port_range = '1,50000'
        scanner_ports = [n for n in range(int(port_range.split(',')[0]), int(port_range.split(',')[1]))]
        socket.setdefaulttimeout(0.5)
        start_ = datetime.now()
        pool = Pool(processes=10)
        pool.map(self.scanner, scanner_ports)
        pool.close()
        pool.join()
        end_ = datetime.now()
        logger.info('所有埠掃描已完成,總共耗時:{}'.format(str(end_ - start_)))

使用python模組中的main函數呼叫整個埠掃描器執行掃描任務。

# It's a common idiom to determine if the script is being run directly or being imported.
if __name__ == '__main__':
    scanner_ = PortScanner()
    scanner_.start()

到此這篇關於Python利用socket實現多程序的埠掃描器的文章就介紹到這了,更多相關Python socket多程序埠掃描內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com