首頁 > 軟體

python區塊鏈實現簡版網路

2022-05-25 18:01:17

說明

本文根據https://github.com/liuchengxu/blockchain-tutorial的內容,用python實現的,但根據個人的理解進行了一些修改,大量參照了原文的內容。文章末尾有"本節完整原始碼實現地址"。

引言

到目前為止,我們所構建的原型已經具備了區塊鏈所有的關鍵特性:匿名,安全,隨機生成的地址;區塊鏈資料儲存;工作量證明系統;可靠地儲存交易。儘管這些特性都不可或缺,但是仍有不足。能夠使得這些特性真正發光發熱,使得加密貨幣成為可能的,是網路(network)。如果實現的這樣一個區塊鏈僅僅執行在單一節點上,有什麼用呢?如果只有一個使用者,那麼這些基於密碼學的特性,又有什麼用呢?正是由於網路,才使得整個機制能夠運轉和發光發熱。

你可以將這些區塊鏈特性認為是規則(rule),類似於人類在一起生活,繁衍生息建立的規則,一種社會安排。區塊鏈網路就是一個程式社群,裡面的每個程式都遵循同樣的規則,正是由於遵循著同一個規則,才使得網路能夠長存。類似的,當人們都有著同樣的想法,就能夠將拳頭攥在一起構建一個更好的生活。如果有人遵循著不同的規則,那麼他們就將生活在一個分裂的社群(州,公社,等等)中。同樣的,如果有區塊鏈節點遵循不同的規則,那麼也會形成一個分裂的網路。

重點在於:如果沒有網路,或者大部分節點都不遵守同樣的規則,那麼規則就會形同虛設,毫無用處!

區塊鏈網路

區塊鏈網路是去中心化的,這意味著沒有伺服器,使用者端也不需要依賴伺服器來獲取或處理資料。在區塊鏈網路中,有的是節點,每個節點是網路的一個完全(full-fledged)成員。節點就是一切:它既是一個使用者端,也是一個伺服器。這一點需要牢記於心,因為這與傳統的網頁應用非常不同。

區塊鏈網路是一個 P2P(Peer-to-Peer,端到端)的網路,即節點直接連線到其他節點。它的拓撲是扁平的,因為在節點的世界中沒有層級之分。下面是它的示意圖:

Business vector created by Dooder - Freepik.com

要實現這樣一個網路節點更加困難,因為它們必須執行很多操作。每個節點必須與很多其他節點進行互動,它必須請求其他節點的狀態,與自己的狀態進行比較,當狀態過時時進行更新。

kademlia發現協定

kademlia是p2p的一種節點發現協定,其核心是通過計算節點之間的邏輯距離來發現附近節點以實現節點查詢的收斂。

kademlia詳細介紹

簡化協定

這裡我們為了說明原理儘可能的簡化協定。我們只實現三種請求:

  • 節點握手
  • 獲取區塊資料
  • 交易廣播為了方便,其中又將節點握手作為心跳傳送,並根據心跳資訊進行區塊同步。

網路協定方面,借鑑以太坊的做法,UDP做協定發現,TCP做資料傳輸。每當發現一個節點,就通過TCP建立連線,並行送心跳資料,以保證資料的一致性。

訊息

定義訊息類,分別定義了無意義迴應和以上三種請求。為了方便處理,這裡統一使用字串而不是二進位制資料進行資料傳輸。

class Msg(object):
    NONE_MSG = 0
    HAND_SHAKE_MSG = 1
    GET_BLOCK_MSG = 2
    TRANSACTION_MSG = 3
    def __init__(self, code, data):
        self.code = code
        self.data = data

TCP伺服器端

class TCPServer(object):
    def __init__(self, ip='0.0.0.0', port=listen_port):
        self.sock = socket.socket()
        self.ip = ip
        self.port = port
    def listen(self):
        self.sock.bind((self.ip, self.port))
        self.sock.listen(5)
    def run(self):
        t = threading.Thread(target=self.listen_loop, args=())
        t.start()
    def handle_loop(self, conn, addr):
        while True:
            recv_data = conn.recv(4096)
            log.info("recv_data:"+str(recv_data))
            try:
                recv_msg = json.loads(recv_data)
            except ValueError as e:
                conn.sendall('{"code": 0, "data": ""}'.encode())
            send_data = self.handle(recv_msg)
            log.info("tcpserver_send:"+send_data)
            conn.sendall(send_data.encode())
    def listen_loop(self):
        while True:
            conn, addr = self.sock.accept()
            t = threading.Thread(target=self.handle_loop, args=(conn, addr))
            t.start()
    def handle(self, msg):
        code = msg.get("code", 0)
        log.info("code:"+str(code))
        if code == Msg.HAND_SHAKE_MSG:
            res_msg = self.handle_handshake(msg)
        elif code == Msg.GET_BLOCK_MSG:
            res_msg = self.handle_get_block(msg)
        elif code == Msg.TRANSACTION_MSG:
            res_msg = self.handle_transaction(msg)
        else:
            return '{"code": 0, "data":""}'
        return json.dumps(res_msg.__dict__)
    def handle_handshake(self, msg):
        block_chain = BlockChain()
        block = block_chain.get_last_block()
        try:
            genesis_block = block_chain[0]
        except IndexError as e:
            genesis_block = None
        data = {
            "last_height": -1,
            "genesis_block": ""
        }
        if genesis_block:
            data = {
                "last_height": block.block_header.height,
                "genesis_block": genesis_block.serialize()
            }
        msg = Msg(Msg.HAND_SHAKE_MSG, data)
        return msg
    def handle_get_block(self, msg):
        height = msg.get("data", 1)
        block_chain = BlockChain()
        block = block_chain.get_block_by_height(height)
        data = block.serialize()
        msg = Msg(Msg.GET_BLOCK_MSG, data)
        return msg
    def handle_transaction(self, msg):
        tx_pool = TxPool()
        txs = msg.get("data", {})
        for tx_data in txs:
            tx = Transaction.deserialize(tx_data)
            tx_pool.add(tx)
        if tx_pool.is_full():
            bc = BlockChain()
            bc.add_block(tx_pool.txs)
            log.info("add block")
            tx_pool.clear()
        msg = Msg(Msg.NONE_MSG, "")
        return msg

TCP端比較簡單,listen_loop方法監聽新的請求並開啟一個新執行緒處理連線中的資料互動。

handle_loop方法呼叫了handle分發處理請求。

handle_handshake處理握手請求,這裡將最新塊高度和創世塊傳送出去了,方便和本地資料進行比較,如果遠端資料更新,那麼就獲取新的部分的區塊。

handle_get_block獲取對應的區塊並將資料傳送給使用者端。

handle_transaction 處理使用者端傳送來的交易資訊。把使用者端傳送來的交易新增到未確認交易池,如果交易池滿了就新增到區塊。這裡是方便處理才這麼做的,實際上,位元幣中並不是這樣做的,而是由礦工根據情況進行打包區塊的。

TCP使用者端

class TCPClient(object):
    def __init__(self, ip, port):
        self.txs = []
        self.sock = socket.socket()
        log.info("connect ip:"+ip+"tport:"+str(port))
        self.sock.connect((ip, port))
    def add_tx(self, tx):
        self.txs.append(tx)
    def send(self, msg):
        data = json.dumps(msg.__dict__)
        self.sock.sendall(data.encode())
        log.info("send:"+data)
        recv_data = self.sock.recv(4096)
        log.info("client_recv_data:"+str(recv_data))
        try:
            recv_msg = json.loads(recv_data)
        except json.decoder.JSONDecodeError as e:
            return
        self.handle(recv_msg)
    def handle(self, msg):
        code = msg.get("code", 0)
        log.info("recv code:"+str(code))
        if code == Msg.HAND_SHAKE_MSG:
            self.handle_shake(msg)
        elif code == Msg.GET_BLOCK_MSG:
            self.handle_get_block(msg)
        elif code == Msg.TRANSACTION_MSG:
            self.handle_transaction(msg)
    def shake_loop(self):
        while True:
            if self.txs:
                data = [tx.serialize() for tx in self.txs]
                msg = Msg(Msg.TRANSACTION_MSG, data)
                self.send(msg)
                self.txs.clear()
            else:
                log.info("shake")
                block_chain = BlockChain()
                block = block_chain.get_last_block()
                try:
                    genesis_block = block_chain[0]
                except IndexError as e:
                    genesis_block = None
                data = {
                    "last_height": -1,
                    "genesis_block": ""
                }
                if genesis_block:
                    data = {
                        "last_height": block.block_header.height,
                        "genesis_block": genesis_block.serialize()
                    }
                msg = Msg(Msg.HAND_SHAKE_MSG, data)
                self.send(msg)
                time.sleep(5)
    def handle_shake(self, msg):
        data = msg.get("data", "")
        last_height = data.get("last_height", 0)
        block_chain = BlockChain()
        block = block_chain.get_last_block()
        if block:
            local_last_height = block.block_header.height
        else:
            local_last_height = -1
        log.info("local_last_height %d, last_height %d" %(local_last_height, last_height))
        for i in range(local_last_height + 1, last_height+1):
            send_msg = Msg(Msg.GET_BLOCK_MSG, i)
            self.send(send_msg)
    def handle_get_block(self, msg):
        data = msg.get("data", "")
        block = Block.deserialize(data)
        bc = BlockChain()
        try:
            bc.add_block_from_peers(block)
        except ValueError as e:
            log.info(str(e))
    def handle_transaction(self, msg):
        data = msg.get("data", {})
        tx = Transaction.deserialize(data)
        tx_pool = TxPool()
        tx_pool.add(tx)
        if tx_pool.is_full():
            bc.add_block(tx_pool.txs)
            log.info("mined a block")
            tx_pool.clear()
    def close(self):
        self.sock.close()

handle_transaction處理伺服器傳送來的交易,將交易新增到交易池,如果交易池滿了就新增到區塊鏈中。 

handle_get_block處理伺服器傳送來的區塊,並將區塊更新到鏈上。 

handle_shake處理伺服器響應的握手資訊,如果發現當前的的區塊高度低於資料中響應的區塊高高度,則發起請求獲取新的幾個區塊。 

shake_loop 每間隔10秒傳送一次握手資訊(5秒同步一次區塊),如果發現有需要廣播的交易則進行交易的廣播。

P2P伺服器

p2p節點發現部分,使用了kademlia協定,並使用了kademlia庫,安裝方法pip3 install kademlia

class P2p(object):
    def __init__(self):
        self.server = Server()
        self.loop = None
    def run(self):
        loop = asyncio.get_event_loop()
        self.loop = loop
        loop.run_until_complete(self.server.listen(listen_port))
        self.loop.run_until_complete(self.server.bootstrap([(bootstrap_host, bootstrap_port)]))
        loop.run_forever()
    def get_nodes(self):
        nodes = []
        for bucket in self.server.protocol.router.buckets:
            nodes.extend(bucket.get_nodes())
        return nodes

其中run方法啟動節點監聽並連線一個初始節點,並執行p2p節點監聽。get_nodes方法獲取當前所有的節點。

連線節點

class PeerServer(Singleton):
    def __init__(self):
        if not hasattr(self, "peers"):
            self.peers = []
        if not hasattr(self, "nodes"):
            self.nodes = []
    def nodes_find(self, p2p_server):
        local_ip = socket.gethostbyname(socket.getfqdn(socket.gethostname()))
        while True:
            nodes = p2p_server.get_nodes()
            for node in nodes:
                if node not in self.nodes:
                    ip = node.ip
                    port = node.port
                    if local_ip == ip:
                        continue
                    client = TCPClient(ip, port)
                    t = threading.Thread(target=client.shake_loop, args=())
                    t.start()
                    self.peers.append(client)
                    self.nodes.append(node)
            time.sleep(1)
    def broadcast_tx(self, tx):
        for peer in self.peers:
            peer.add_tx(tx)
    def run(self, p2p_server):
        t = threading.Thread(target=self.nodes_find, args=(p2p_server,))
        t.start()

nodes_find為節點發現方法,每隔1秒進行查詢當前是否有新的節點,並開啟執行緒進行連線。broadcast_tx為廣播交易的方法,將交易新增到待廣播交易池。

RPC

開啟網路監聽後,主執行緒就被p2p網路佔用了,我們需要另外的方法進行互動操作。RPC就是常用的方法。我們將命令列操作都通過rpc匯出,然後通過rpc呼叫獲取資訊。

class Cli(object):
    def get_balance(self, addr):
        bc = BlockChain()
        balance = 0
        utxo = UTXOSet()
        utxo.reindex(bc)
        utxos = utxo.find_utxo(addr)
        print(utxos)
        for fout in utxos:
            balance += fout.txoutput.value
        print('%s balance is %d' %(addr, balance))
        return balance
    def create_wallet(self):
        w = Wallet.generate_wallet()
        ws = Wallets()
        ws[w.address] = w
        ws.save()
        return w.address
    def print_all_wallet(self):
        ws = Wallets()
        wallets = []
        for k, _ in ws.items():
            wallets.append(k)
        return wallets
    def send(self, from_addr, to_addr, amount):
        bc = BlockChain()
        tx = bc.new_transaction(from_addr, to_addr, amount)
        # bc.add_block([tx])
        tx_pool = TxPool()
        tx_pool.add(tx)
        from network import log
        log.info("tx_pool:"+str(id(tx_pool)))
        log.info("txs_len:"+str(len(tx_pool.txs)))
        try:
            server = PeerServer()
            server.broadcast_tx(tx)
            log.info("tx_pool is full:"+str(tx_pool.is_full()))
            log.info("tx_pool d :"+str(tx_pool))
            if tx_pool.is_full():
                bc.add_block(tx_pool.txs)
                log.info("add block")
                tx_pool.clear()
        except Exception as e:
            import traceback
            msg = traceback.format_exc()
            log.info("error_msg:"+msg)
        print('send %d from %s to %s' %(amount, from_addr, to_addr))
    def print_chain(self, height):
        bc = BlockChain()
        return bc[height].block_header.serialize()
    def create_genesis_block(self):
        bc = BlockChain()
        w = Wallet.generate_wallet()
        ws = Wallets()
        ws[w.address] = w
        ws.save()
        tx = bc.coin_base_tx(w.address)
        bc.new_genesis_block(tx)
        return w.address

RPC匯出:

rpc = RPCServer(export_instance=Cli())
    rpc.start(False)

測試

分別開啟兩臺主機A和B:A主機:

$python3 cli.py start

將B主機的conf.py中的bootstrap_host和bootstrap_port修改為A主機的ip和埠。然後啟動B主機。

$python3 cli.py start

任意一臺主機開啟新的視窗執行生成創世塊:

$python3 cli.py genesis_block
Genesis Wallet is: 1LYHea8NjTxaYboXJbR7LemvUZjyQc839r

分別在兩臺機器上檢視餘額:

$python3 cli.py balance 1LYHea8NjTxaYboXJbR7LemvUZjyQc839r
1LYHea8NjTxaYboXJbR7LemvUZjyQc839r balance is 1000

分別在兩臺機器上建立地址:

$python3 cli.py createwallet
Wallet address is 14sQYjj3n2fReJyVNoqHCmCFjNKEZAVcEB

檢視當前機器的所有地址

python3 cli.py printwallet
Wallet are:
	19zR4zT9eSFsbSNvnQ1RCrhjN71VzPFTnH
	1MVUrxPuRgtkyLQvAoma4yEarzcMzvQqym
	18kruspe7jAbggR1sUF8fCFsZLn6efSeFk
	14sQYjj3n2fReJyVNoqHCmCFjNKEZAVcEB

轉賬(至少要轉兩筆才能確認哦,可以修改txpool.py的SIZE屬性來調整區塊大小)。注意:只有當前有這個地址(即有這個私鑰)才能作為from轉賬給其他地址。

$python3 cli.py send --from 1LYHea8NjTxaYboXJbR7LemvUZjyQc839r --to 19zR4zT9eSFsbSNvnQ1RCrhjN71VzPFTnH --amount 100
$python3 cli.py send --from 1LYHea8NjTxaYboXJbR7LemvUZjyQc839r --to 19zR4zT9eSFsbSNvnQ1RCrhjN71VzPFTnH --amount 100

分別在兩臺機器上檢視餘額:

python3 cli.py balance 1LYHea8NjTxaYboXJbR7LemvUZjyQc839r
1LYHea8NjTxaYboXJbR7LemvUZjyQc839r balance is 1900

注意:這裡因為重複轉了兩筆賬,使用了同一個UTXO,所以第二筆會失敗,由於1LYHea8NjTxaYboXJbR7LemvUZjyQc839r為被獎勵地址,所以獲得了1000得挖礦獎勵所以餘額為:1000-100+900=1900。

列印區塊資訊:

$python3 cli.py print 1
{'timestamp': '1551347915.3271294', 'prev_block_hash': '9f12dad81ab988f247884d7d06de46c6951688dcbedb87df2159669594a44f0d', 'hash': 'a9d02b72690398805fb83efd4680cb710ed4f3c67ea7926fe8faab256c1cad1c', 'hash_merkle_root': 'fe768edf1040c504674e8a468c89f00574a181b88ad2297ef29d307695adb38e', 'height': 1, 'nonce': 3}

區塊同步方式

為了簡單,區塊採用最簡單的方式進行同步。方法如下:

如果發現對方區塊高度低於自己,則不做處理。

如果發現對方區塊高度高於自己

(1) 當前最新區塊在對應區塊能找到,那麼就更新最新的區塊

(2) 當前最新區塊在對應區塊不能找到,那麼回滾當前區塊,直到回到交叉點,再進行更新區塊。

涉及到的原始碼修改較多,這裡就不貼原始碼了。移步到本節完整實現原始碼檢視完整原始碼。

問題

  • 為了簡單,將握手和廣播交易合一了,這導致了廣播交易不及時。
  • 新區塊沒有實時進行廣播,而是被動等待同步,這也導致了區塊同步較慢。
  • 在區塊未確認的情況下用同一個地址的幣進行轉賬有隻有第一筆會成功,後面的都會失敗。這是由目前獲取UTXO的方式決定的。

總結

我們已經實現了一個簡版的位元幣,並且實現了任意節點加入和區塊的同步等功能。為了簡化並說明原理,忽略掉了很多細節,並且忽略掉了效能問題,但它可以說明區塊鏈的基本原理。

參考:

[1] 本節完整實現原始碼

以上就是python區塊鏈實現簡版網路的詳細內容,更多關於python區塊鏈網路的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com