首頁 > 軟體

python使用socket實現TCP協定長連線框架

2022-02-14 19:00:03

分析多了協定就會發現,很多的應用,特別是遊戲類和IM類應用,它們的協定會使用長連線的方式,來保持使用者端與伺服器的聯絡,這些長連線,通常是TCP承載的。

如果我們要模擬這個使用者端的行為,根據不同應用伺服器的實現情況,有些長連線不是必須的,但有些長連線,就必須去實現它。例如最近分析的某應用,雖然它主要使用HTTP協定進行互動,但它在TCP長連線中傳輸了一些必須的資訊,如果不實現長連線,就會有很多資訊無法處理。

在python中,很容易實現HTTP協定,當然,也容易實現TCP協定,它的TCP實現,使用socket庫就可以了,只是需要注意,TCP長連線中通常傳輸的是十六進位制資料,協定非標準的,需要自行根據協定分析結果來封裝資料格式。

這裡以一個使用到TCP長連線的協定為樣例,來給出協定的TCP長連線框架,大家有需要可以參考實現,當然,程式碼也是從樣例中摘出來的,並不是完整的。

TCP長連線框架,首先是外部的包裝,初始化一些引數,例如長連線使用到的ip埠及socket通訊端等:

self.longip='im.langren001.com'
        self.longport= 6656
        self.threadLock = threading.Lock()
        self.sockmain = socket.socket(socket.AF_INET, socket.SOCK_STREAM);
        self.longlinktcpstart2()
        tlonglink = threading.Thread(target=lrsuser.longlinktcpth2,name='mainlink_'+ self.playinfo['uid'], args=(self,))
        tlonglink.start()
        self.threadinfo.append(tlonglink)

這個裡面呼叫了兩個函數,一個是longlinktcpstart2函數,作用是建立socket連線,並對一些連線建立初始時的互動進行實現,另一個是longlinktcpth2函數,是一個執行緒,實現對連線內的資料進行收發處理。一般來說,這兩個可以在一起實現,但為了方便socket異常斷開的處理,分成了兩個函數。

 longlinktcpstart2的實現如下:

def longlinktcpstart2(self):
        server_address = (self.longip, int(self.longport))
        self.savelogs('longlinktcpstart2', 'Connecting to %s:%d.' % server_address)
        self.sockmain.connect(server_address)
        self.databuf = b''
        message = genbaseinfo.genalive()
        self.sockmain.sendall(message)
        message = genbaseinfo.genfirstdata()
        if len(message)==0:
            self.savelogs('longlinktcpstart2', 'genfirstdata error ')
            return False
        self.sockmain.sendall(message)
        self.longlinkcnt=2
        cnt = 0
        while (cnt < 2):
            try:
                buf = self.sockmain.recv(2048)
                sz = len(buf)
                self.savelogs('longlinktcpstart2', "recv data len "+str(sz) )
                if sz > 0:
                    self.databuf +=buf
                    self.dealdatabuf()
                    if cnt == 0:
                        alivemsg =  genbaseinfo.genalive()
                        self.sockmain.sendall(alivemsg)
                        self.savelogs('longlinktcpstart2', "sendalive")
                        regtime=int(round(time.time() * 1000))-random.randint(14400000,25200000)
                        regtime=regtime*1000
                        pcode = self.versionstr + '.0'
                        message =  genbaseinfo.genseconddata()
                        if len(message) == 0:
                            self.savelogs('longlinktcpstart2', 'genseconddata error ')
                            return False
                        self.sockmain.sendall(message)
                        self.longlinkcnt = self.longlinkcnt + 1
                    elif cnt == 1:
                        pcode = self.versionstr + '.0'
                        message =  genbaseinfo.genotherdata()
                        if len(message) == 0:
                            self.savelogs('longlinktcpstart2', 'genthirddata error ')
                            return False
                        self.sockmain.sendall(message)
                        self.longlinkcnt = self.longlinkcnt + 1
                    cnt = cnt + 1
                else:
                    self.savelogs('longlinktcpstart2', 'recv data alive')
            except:  # socket.error
                self.savelogs('longlinktcpstart2', 'socket error,do connect fail')
                return False
 
 
        return True

這裡面的genbaseinfo 相關的函數可以忽略,是用來生成傳送的訊息資料的實現,用自己的函數去替換即可。dealdatabuf函數是用來處理收到的訊息資料實現,這兩個都要根據具體的協定分析情況去實現,注意,生成的用來傳送的資料和接收到的需要處理的資料,都需要按十六進位制處理,這裡不做詳述。

執行緒longlinktcpth2是一個迴圈,協定不退出,迴圈不結束,實現如下:

def longlinktcpth2(self):
        tmalive = 0;
        r_inputs = set()
        r_inputs.add(self.sockmain)
        w_inputs = set()
        w_inputs.add(self.sockmain)
        e_inputs = set()
        e_inputs.add(self.sockmain)
        tm=int(round(time.time()))
        self.savelogs('longlinktcpth2', 'enter' )
        while (self.quitflag==0):
            try:
                r_list, w_list, e_list = select.select(r_inputs, w_inputs, e_inputs, 1)
                for event in r_list:
                    try:
                        buf = event.recv(2048)
                        sz = len(buf)
                        self.savelogs('longlinktcpth2', "loop recv data len:"+ str(sz) )
                        if sz > 0:
                            self.databuf += buf
                            self.dealdatabuf()
                            alivemsg = genbaseinfo.genalive()
                            self.sockmain.sendall(alivemsg)
                            self.savelogs('longlinktcpth2', "sendalive")
                        else:
                            self.savelogs('longlinktcpth2', "遠端斷開連線,do reconnect")
                            r_inputs.clear()
                            time.sleep(3)
                            self.sockmain = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                            self.longlinktcpstart2()
                            r_inputs = set()
                            r_inputs.add(self.sockmain)
                            w_inputs = set()
                            w_inputs.add(self.sockmain)
                            e_inputs = set()
                            e_inputs.add(self.sockmain)
                    except Exception as e:
                        self.savelogs('longlinktcpth2', str(e))
                self.threadLock.acquire()
                if (len(self.msglist) > 0):
                    msg = self.msglist.pop(0)
                    self.threadLock.release()
                    self.sockmain.sendall(msg)
                    self.savelogs('longlinktcpth2',"send a msg")
                else:
                    self.threadLock.release()
                tmnow=int(round(time.time()))
                if tmnow-tm>30:
 
 
                    message = genbaseinfo.genotherdata()
                    if len(message) == 0:
                        self.savelogs('longlinktcpth2', 'genalivedata error ')
                        return False
                    self.sockmain.sendall(message)
                    self.savelogs('longlinktcpth2', "send alivemsg"+str(self.longlinkcnt))
                    self.longlinkcnt = self.longlinkcnt + 1 #這個要一條連線統一,不能亂,回頭加鎖
                    tm=tmnow
                if len(w_list) > 0:  # 產生了可寫的事件,即連線完成
                    self.savelogs('longlinktcpth2',str(w_list))
                    w_inputs.clear()  # 當連線完成之後,清除掉完成連線的socket
 
 
                if len(e_list) > 0:  # 產生了錯誤的事件,即連線錯誤
                    self.savelogs('longlinktcpth2', str(e_list))
                    e_inputs.clear()  # 當連線有錯誤發生時,清除掉髮生錯誤的socket
            except OSError as e:
                self.savelogs('longlinktcpth2', 'socket error,do reconnect')
                time.sleep(3)
                self.sockmain = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.longlinktcpstart2()
                r_inputs = set()
                r_inputs.add(self.sockmain)
                w_inputs = set()
                w_inputs.add(self.sockmain)
                e_inputs = set()
                e_inputs.add(self.sockmain)
                
        self.savelogs('longlinktcpth2', 'leave')

由於這個程式碼主要是在windows上使用,因此,longlinktcpth2執行緒採用了select來實現,而沒有使用epoll。在迴圈中,對異常進行了處理,如果發生異常,連線被斷開,則呼叫longlinktcpstart2重新連線,而不退出迴圈,其餘的和longlinktcpstart2裡面一致。

由於TCP連線是流的概念,因此,需要對資料進行快取拼接,這就是上面程式碼中databuf的作用,防止每次收到的資料不完整或者太多,方便後續的處理,這才是一個合格的碼農的信仰的自我昇華。

到此這篇關於python使用socket實現TCP協定長連線框架的文章就介紹到這了,更多相關python使用socket實現TCP協定長連線框架內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com