首頁 > 軟體

Python非同步傳送紀錄檔到遠端伺服器詳情

2022-07-05 18:02:05

背景

在Python中使用紀錄檔最常用的方式就是在控制檯和檔案中輸出紀錄檔了,logging模組也很好的提供的相應 的類,使用起來也非常方便,但是有時我們可能會有一些需求,如還需要將紀錄檔傳送到遠端,或者直接寫入數 據庫,這種需求該如何實現呢?

StreamHandler和FileHandler

首先我們先來寫一套簡單輸出到cmd和檔案中的程式碼:

# -*- coding: utf-8 -*-
"""
-------------------------------------------------
 File Name:   loger
 Description :
 Author :    yangyanxing
 date:     2020/9/23
-------------------------------------------------
"""
import logging
import sys
import os
# 初始化logger
logger = logging.getLogger("yyx")
logger.setLevel(logging.DEBUG)
# 設定紀錄檔格式
fmt = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d
%H:%M:%S')
# 新增cmd handler
cmd_handler = logging.StreamHandler(sys.stdout)
cmd_handler.setLevel(logging.DEBUG)
cmd_handler.setFormatter(fmt)
# 新增檔案的handler
logpath = os.path.join(os.getcwd(), 'debug.log')
file_handler = logging.FileHandler(logpath)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(fmt)
# 將cmd和file handler新增到logger中
logger.addHandler(cmd_handler)
logger.addHandler(file_handler)
logger.debug("今天天氣不錯")

先初始化一個logger, 並且設定它的紀錄檔級別是DEBUG,然後添初始化了 cmd_handler和 file_handler,最後將它們新增到logger中, 執行指令碼,會在cmd中列印出

[2020-09-23 10:45:56] [DEBUG] 今天天氣不錯且會寫入到當前目錄下的debug.log檔案中

新增HTTPHandler

如果想要在記錄時將紀錄檔傳送到遠端伺服器上,可以新增一個 HTTPHandler , 在python標準庫logging.handler中,已經為我們定義好了很多handler,有些我們可以直接用,本地使用tornado寫一個接收 紀錄檔的介面,將接收到的引數全都列印出來

# 新增一個httphandler
import logging.handlers
http_handler = logging.handlers.HTTPHandler(r"127.0.0.1:1987", '/api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("今天天氣不錯")
結果在伺服器端我們收到了很多資訊

{
'name': [b 'yyx'],
'msg': [b
'xe4xbbx8axe5xa4xa9xe5xa4xa9xe6xb0x94xe4xb8x8dxe9x94x99'],
'args': [b '()'],
'levelname': [b 'DEBUG'],
'levelno': [b '10'],
'pathname': [b 'I:/workplace/yangyanxing/test/loger.py'],
'filename': [b 'loger.py'],
'module': [b 'loger'],
'exc_info': [b 'None'],
'exc_text': [b 'None'],
'stack_info': [b 'None'],
'lineno': [b '41'],
'funcName': [b '<module>'],
'created': [b '1600831054.8881223'],
'msecs': [b '888.1223201751709'],
'relativeCreated': [b '22.99976348876953'],
'thread': [b '14876'],
'threadName': [b 'MainThread'],
'processName': [b 'MainProcess'],
'process': [b '8648'],
'message': [b
'xe4xbbx8axe5xa4xa9xe5xa4xa9xe6xb0x94xe4xb8x8dxe9x94x99'],
'asctime': [b '2020-09-23 11:17:34']
}

可以說是資訊非常之多,但是卻並不是我們想要的樣子,我們只是想要類似於

[2020-09-23 10:45:56][DEBUG] 今天天氣不錯這樣的紀錄檔
logging.handlers.HTTPHandler 只是簡單的將紀錄檔所有資訊傳送給伺服器端,至於伺服器端要怎麼組織內 容是由伺服器端來完成. 所以我們可以有兩種方法,一種是改伺服器端程式碼,根據傳過來的紀錄檔資訊重新組織一 下紀錄檔內容, 第二種是我們重新寫一個類,讓它在傳送的時候將重新格式化紀錄檔內容傳送到伺服器端。

我們採用第二種方法,因為這種方法比較靈活, 伺服器端只是用於記錄,傳送什麼內容應該是由使用者端來決定。

我們需要重新定義一個類,我們可以參考 logging.handlers.HTTPHandler 這個類,重新寫一個httpHandler類

每個紀錄檔類都需要重寫emit方法,記錄紀錄檔時真正要執行是也就是這個emit方法:

class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  def emit(self, record):
    '''
   重寫emit方法,這裡主要是為了把初始化時的baseParam新增進來
   :param record:
   :return:
   '''
    msg = self.format(record)
    if self.method == "GET":
      if (self.url.find("?") >= 0):
        sep = '&'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
msg}))
      requests.get(url, timeout=1)
    else:
      headers = {
        "Content-type": "application/x-www-form-urlencoded",
        "Content-length": str(len(msg))
     }
      requests.post(self.url, data={'log': msg}, headers=headers,
timeout=1)

上面程式碼中有一行定義傳送的引數 msg = self.format(record)這行程式碼錶示,將會根據紀錄檔物件設定的格式返回對應的內容。

之後再將內容通過requests庫進行傳送,無論使用get 還是post方式,伺服器端都可以正常的接收到紀錄檔

{'log': [b'[2020-09-23 11:39:45] [DEBUG]
xe4xbbx8axe5xa4xa9xe5xa4xa9xe6xb0x94xe4xb8x8dxe9x94x99']}

將bytes型別轉一下就得到了:

[2020-09-23 11:43:50] [DEBUG] 今天天氣不錯

非同步的傳送遠端紀錄檔

現在我們考慮一個問題,當紀錄檔傳送到遠端伺服器過程中,如果遠端伺服器處理的很慢,會耗費一定的時間, 那麼這時記錄紀錄檔就會都變慢修改伺服器紀錄檔處理類,讓其停頓5秒鐘,模擬長時間的處理流程

async def post(self):
  print(self.getParam('log'))
  await asyncio.sleep(5)
  self.write({"msg": 'ok'})

此時我們再列印上面的紀錄檔:

logger.debug("今天天氣不錯")
logger.debug("是風和日麗的")

得到的輸出為:

[2020-09-23 11:47:33] [DEBUG] 今天天氣不錯
[2020-09-23 11:47:38] [DEBUG] 是風和日麗的

我們注意到,它們的時間間隔也是5秒。
那麼現在問題來了,原本只是一個記錄紀錄檔,現在卻成了拖累整個指令碼的累贅,所以我們需要非同步的來 處理遠端寫紀錄檔。

1使用多執行緒處理

首先想的是應該是用多執行緒來執行傳送紀錄檔方法;

def emit(self, record):
  msg = self.format(record)
  if self.method == "GET":
    if (self.url.find("?") >= 0):
      sep = '&'
    else:
      sep = '?'
    url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
    t = threading.Thread(target=requests.get, args=(url,))
    t.start()
  else:
    headers = {
      "Content-type": "application/x-www-form-urlencoded",
      "Content-length": str(len(msg))
   }
    t = threading.Thread(target=requests.post, args=(self.url,), kwargs=
{"data":{'log': msg},

這種方法是可以達到不阻塞主目的,但是每列印一條紀錄檔就需要開啟一個執行緒,也是挺浪費資源的。我們也 可以使用執行緒池來處理

2使用執行緒池處理

python 的 concurrent.futures 中有ThreadPoolExecutor, ProcessPoolExecutor類,是執行緒池和程序池, 就是在初始化的時候先定義幾個執行緒,之後讓這些執行緒來處理相應的函數,這樣不用每次都需要新建立執行緒

執行緒池的基本使用:

exector = ThreadPoolExecutor(max_workers=1) # 初始化一個執行緒池,只有一個執行緒
exector.submit(fn, args, kwargs) # 將函數submit到執行緒池中

如果執行緒池中有n個執行緒,當提交的task數量大於n時,則多餘的task將放到佇列中。
再次修改上面的emit函數

exector = ThreadPoolExecutor(max_workers=1)
def emit(self, record):
  msg = self.format(record)
  timeout = aiohttp.ClientTimeout(total=6)
  if self.method == "GET":
    if (self.url.find("?") >= 0):
      sep = '&'
    else:
      sep = '?'
    url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
    exector.submit(requests.get, url, timeout=6)
  else:
    headers = {
      "Content-type": "application/x-www-form-urlencoded",
      "Content-length": str(len(msg))
   }
    exector.submit(requests.post, self.url, data={'log': msg},
headers=headers, timeout=6)

這裡為什麼要只初始化一個只有一個執行緒的執行緒池? 因為這樣的話可以保證先進佇列裡的紀錄檔會先被髮 送,如果池子中有多個執行緒,則不一定保證順序了。

3使用非同步aiohttp庫來傳送請求

上面的CustomHandler類中的emit方法使用的是requests.post來傳送紀錄檔,這個requests本身是阻塞運 行的,也正上由於它的存在,才使得指令碼卡了很長時間,所們我們可以將阻塞執行的requests庫替換為非同步 的aiohttp來執行get和post方法, 重寫一個CustomHandler中的emit方法

class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  async def emit(self, record):
    msg = self.format(record)
    timeout = aiohttp.ClientTimeout(total=6)
    if self.method == "GET":
      if (self.url.find("?") >= 0):
        sep = '&'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
msg}))
      async with aiohttp.ClientSession(timeout=timeout) as session:
      async with session.get(self.url) as resp:
          print(await resp.text())
      else:
        headers = {
        "Content-type": "application/x-www-form-urlencoded",
        "Content-length": str(len(msg))
     }
      async with aiohttp.ClientSession(timeout=timeout, headers=headers)
as session:
      async with session.post(self.url, data={'log': msg}) as resp:
          print(await resp.text())

這時程式碼執行崩潰了:

C:Python37liblogging__init__.py:894: RuntimeWarning: coroutine
'CustomHandler.emit' was never awaited
self.emit(record)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

伺服器端也沒有收到傳送紀錄檔的請求。
究其原因是由於emit方法中使用 async with session.post 函數,它需要在一個使用async 修飾的函數 裡執行,所以修改emit函數,使用async來修飾,這裡emit函數變成了非同步的函數, 返回的是一個 coroutine 物件,要想執行coroutine物件,需要使用await, 但是指令碼裡卻沒有在哪裡呼叫 await emit() ,所以崩潰資訊 中顯示 coroutine 'CustomHandler.emit' was never awaited。

既然emit方法返回的是一個coroutine物件,那麼我們將它放一個loop中執行

async def main():
  await logger.debug("今天天氣不錯")
  await logger.debug("是風和日麗的")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

執行依然報錯:

raise TypeError('An asyncio.Future, a coroutine or an awaitable is '

意思是需要的是一個coroutine,但是傳進來的物件不是。
這似乎就沒有辦法了,想要使用非同步庫來傳送,但是卻沒有可以呼叫await的地方。

解決辦法是有的,我們使用 asyncio.get_event_loop() 獲取一個事件迴圈物件, 我們可以在這個物件上註冊很多協程物件,這樣當執行事件迴圈的時候,就是去執行註冊在該事件迴圈上的協程,

我們通過一個小例子來看一下:

import asyncio
async def test(n):
 while n > 0:
   await asyncio.sleep(1)
   print("test {}".format(n))
   n -= 1
 return n

async def test2(n):
 while n >0:
   await asyncio.sleep(1)
   print("test2 {}".format(n))
   n -= 1
def stoploop(task):
 print("執行結束, task n is {}".format(task.result()))
 loop.stop()
loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task2 = loop.create_task(test2(3))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
loop.run_forever()

我們使用 loop = asyncio.get_event_loop() 建立了一個事件迴圈物件loop, 並且在loop上建立了兩個task, 並且給task1新增了一個回撥函數,在task1它執行結束以後,將loop停掉。
注意看上面的程式碼,我們並沒有在某處使用await來執行協程,而是通過將協程註冊到某個事件迴圈物件上, 然後呼叫該回圈的 run_forever() 函數,從而使該回圈上的協程物件得以正常的執行。

上面得到的輸出為:

test 5
test2 3
test 4
test2 2
test 3
test2 1
test 2
test 1
執行結束, task n is 0

可以看到,使用事件迴圈物件建立的task,在該回圈執行run_forever() 以後就可以執行了如果不執行 loop.run_forever() 函數,則註冊在它上面的協程也不會執行

loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
time.sleep(5)
# loop.run_forever()

上面的程式碼將loop.run_forever() 註釋掉,換成time.sleep(5) 停5秒, 這時指令碼不會有任何輸出,在停了5秒 以後就中止了,
回到之前的紀錄檔傳送遠端伺服器的程式碼,我們可以使用aiohttp封裝一個傳送資料的函數, 然後在emit中將 這個函數註冊到全域性的事件迴圈物件loop中,最後再執行loop.run_forever()

loop = asyncio.get_event_loop()
class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  # 使用aiohttp封裝傳送資料函數
  async def submit(self, data):
    timeout = aiohttp.ClientTimeout(total=6)
    if self.method == "GET":
      if self.url.find("?") >= 0:
        sep = '&'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
data}))
      async with aiohttp.ClientSession(timeout=timeout) as session:
        async with session.get(url) as resp:
          print(await resp.text())
    else:
      headers = {
        "Content-type": "application/x-www-form-urlencoded",
     }
      async with aiohttp.ClientSession(timeout=timeout, headers=headers)
as session:
        async with session.post(self.url, data={'log': data}) as resp:
          print(await resp.text())
    return True
  def emit(self, record):
    msg = self.format(record)
    loop.create_task(self.submit(msg))
# 新增一個httphandler
http_handler = CustomHandler(r"http://127.0.0.1:1987", 'api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("今天天氣不錯")
logger.debug("是風和日麗的")
loop.run_forever()

這時指令碼就可以正常的非同步執行了:

loop.create_task(self.submit(msg)) 也可以使用
asyncio.ensure_future(self.submit(msg), loop=loop) 來代替,目的都是將協程物件註冊到事件迴圈中。

但這種方式有一點要注意,loop.run_forever() 將會一直阻塞,所以需要有個地方呼叫 loop.stop() 方法. 可以註冊到某個task的回撥中。

到此這篇關於Python非同步傳送紀錄檔到遠端伺服器詳情的文章就介紹到這了,更多相關Python非同步傳送內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com