<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
前言:
執行緒安全問題:當2個執行緒同時用到執行緒池時,會同時建立2個執行緒池。如果多個執行緒,錯開用到執行緒池,就只會建立一個執行緒池,會共用一個執行緒池。我用的註解方式的單例模式,感覺就是這個註解的單例方式,解決了多執行緒問題,但是沒解決執行緒安全問題,需要優化這個單例模式。
主要通過 PooledDB 模組實現。
db_config.py
# -*- coding: UTF-8 -*- import pymysql # 資料庫資訊 DB_TEST_HOST = "127.0.0.1" DB_TEST_PORT = 3308 DB_TEST_DBNAME = "bt" DB_TEST_USER = "root" DB_TEST_PASSWORD = "123456" # 資料庫連線編碼 DB_CHARSET = "utf8" # mincached : 啟動時開啟的閒置連線數量(預設值 0 開始時不建立連線) DB_MIN_CACHED = 5 # maxcached : 連線池中允許的閒置的最多連線數量(預設值 0 代表不閒置連線池大小) DB_MAX_CACHED = 0 # maxshared : 共用連線數允許的最大數量(預設值 0 代表所有連線都是專用的)如果達到了最大數量,被請求為共用的連線將會被共用使用 DB_MAX_SHARED = 5 # maxconnecyions : 建立連線池的最大數量(預設值 0 代表不限制) DB_MAX_CONNECYIONS = 300 # blocking : 設定在連線池達到最大數量時的行為(預設值 0 或 False 代表返回一個錯誤<toMany......> 其他代表阻塞直到連線數減少,連線被分配) DB_BLOCKING = True # maxusage : 單個連線的最大允許複用次數(預設值 0 或 False 代表不限制的複用).當達到最大數時,連線會自動重新連線(關閉和重新開啟) DB_MAX_USAGE = 0 # setsession : 一個可選的SQL命令列表用於準備每個對談,如["set datestyle to german", ...] DB_SET_SESSION = None # creator : 使用連線資料庫的模組 DB_CREATOR = pymysql
設定連線池最大最小為5個。則啟動連線池時,就會建立5個連線。
singleton.py
#單例模式函數,用來修飾類 def singleton(cls,*args,**kw): instances = {} def _singleton(): if cls not in instances: instances[cls] = cls(*args,**kw) return instances[cls] return _singleton
db_dbutils_init.py
from dbutils.pooled_db import PooledDB import db_config as config # import random from singleton import singleton """ @功能:建立資料庫連線池 """ class MyConnectionPool(object): # 私有屬性 # 能通過物件直接存取,但是可以在本類內部存取; __pool = None # def __init__(self): # self.conn = self.__getConn() # self.cursor = self.conn.cursor() # 建立資料庫連線conn和遊標cursor def __enter__(self): self.conn = self.__getconn() self.cursor = self.conn.cursor() # 建立資料庫連線池 def __getconn(self): if self.__pool is None: # i = random.randint(1, 100) # print("建立執行緒池的數量"+str(i)) self.__pool = PooledDB( creator=config.DB_CREATOR, mincached=config.DB_MIN_CACHED, maxcached=config.DB_MAX_CACHED, maxshared=config.DB_MAX_SHARED, maxconnections=config.DB_MAX_CONNECYIONS, blocking=config.DB_BLOCKING, maxusage=config.DB_MAX_USAGE, setsession=config.DB_SET_SESSION, host=config.DB_TEST_HOST, port=config.DB_TEST_PORT, user=config.DB_TEST_USER, passwd=config.DB_TEST_PASSWORD, db=config.DB_TEST_DBNAME, use_unicode=False, charset=config.DB_CHARSET ) return self.__pool.connection() # 釋放連線池資源 def __exit__(self, exc_type, exc_val, exc_tb): self.cursor.close() self.conn.close() # 關閉連線歸還給連結池 # def close(self): # self.cursor.close() # self.conn.close() # 從連線池中取出一個連線 def getconn(self): conn = self.__getconn() cursor = conn.cursor() return cursor, conn # 獲取連線池,範例化 @singleton def get_my_connection(): return MyConnectionPool()
mysqlhelper.py
import time from db_dbutils_init import get_my_connection """執行語句查詢有結果返回結果沒有返回0;增/刪/改返回變更資料條數,沒有返回0""" class MySqLHelper(object): def __init__(self): self.db = get_my_connection() # 從資料池中獲取連線 # # def __new__(cls, *args, **kwargs): # if not hasattr(cls, 'inst'): # 單例 # cls.inst = super(MySqLHelper, cls).__new__(cls, *args, **kwargs) # return cls.inst # 封裝執行命令 def execute(self, sql, param=None, autoclose=False): """ 【主要判斷是否有引數和是否執行完就釋放連線】 :param sql: 字串型別,sql語句 :param param: sql語句中要替換的引數"select %s from tab where id=%s" 其中的%s就是引數 :param autoclose: 是否關閉連線 :return: 返回連線conn和遊標cursor """ cursor, conn = self.db.getconn() # 從連線池獲取連線 count = 0 try: # count : 為改變的資料條數 if param: count = cursor.execute(sql, param) else: count = cursor.execute(sql) conn.commit() if autoclose: self.close(cursor, conn) except Exception as e: pass return cursor, conn, count # 釋放連線 def close(self, cursor, conn): """釋放連線歸還給連線池""" cursor.close() conn.close() # 查詢所有 def selectall(self, sql, param=None): cursor = None conn = None count = None try: cursor, conn, count = self.execute(sql, param) res = cursor.fetchall() return res except Exception as e: print(e) self.close(cursor, conn) return count # 查詢單條 def selectone(self, sql, param=None): cursor = None conn = None count = None try: cursor, conn, count = self.execute(sql, param) res = cursor.fetchone() self.close(cursor, conn) return res except Exception as e: print("error_msg:", e.args) self.close(cursor, conn) return count # 增加 def insertone(self, sql, param): cursor = None conn = None count = None try: cursor, conn, count = self.execute(sql, param) # _id = cursor.lastrowid() # 獲取當前插入資料的主鍵id,該id應該為自動生成為好 conn.commit() self.close(cursor, conn) return count except Exception as e: print(e) conn.rollback() self.close(cursor, conn) return count # 增加多行 def insertmany(self, sql, param): """ :param sql: :param param: 必須是元組或列表[(),()]或((),()) :return: """ cursor, conn, count = self.db.getconn() try: cursor.executemany(sql, param) conn.commit() return count except Exception as e: print(e) conn.rollback() self.close(cursor, conn) return count # 刪除 def delete(self, sql, param=None): cursor = None conn = None count = None try: cursor, conn, count = self.execute(sql, param) self.close(cursor, conn) return count except Exception as e: print(e) conn.rollback() self.close(cursor, conn) return count # 更新 def update(self, sql, param=None): cursor = None conn = None count = None try: cursor, conn, count = self.execute(sql, param) conn.commit() self.close(cursor, conn) return count except Exception as e: print(e) conn.rollback() self.close(cursor, conn) return count # if __name__ == '__main__': # db = MySqLHelper() # sql = "SELECT SLEEP(10)" # db.execute(sql) # time.sleep(20) # TODO 查詢單條 # sql1 = 'select * from userinfo where name=%s' # args = 'python' # ret = db.selectone(sql=sql1, param=args) # print(ret) # (None, b'python', b'123456', b'0') # TODO 增加單條 # sql2 = 'insert into hotel_urls(cname,hname,cid,hid,url) values(%s,%s,%s,%s,%s)' # ret = db.insertone(sql2, ('1', '2', '1', '2', '2')) # print(ret) # TODO 增加多條 # sql3 = 'insert into userinfo (name,password) VALUES (%s,%s)' # li = li = [ # ('分省', '123'), # ('到達','456') # ] # ret = db.insertmany(sql3,li) # print(ret) # TODO 刪除 # sql4 = 'delete from userinfo WHERE name=%s' # args = 'xxxx' # ret = db.delete(sql4, args) # print(ret) # TODO 更新 # sql5 = r'update userinfo set password=%s WHERE name LIKE %s' # args = ('993333993', '%old%') # ret = db.update(sql5, args) # print(ret)
修改 db_dbutils_init.py 檔案,在建立連線池def __getconn(self):方法下,加一個列印亂數,方便將來我們定位是否時單例的執行緒池。
修改後的db_dbutils_init.py 檔案:
from dbutils.pooled_db import PooledDB import db_config as config import random from singleton import singleton """ @功能:建立資料庫連線池 """ class MyConnectionPool(object): # 私有屬性 # 能通過物件直接存取,但是可以在本類內部存取; __pool = None # def __init__(self): # self.conn = self.__getConn() # self.cursor = self.conn.cursor() # 建立資料庫連線conn和遊標cursor def __enter__(self): self.conn = self.__getconn() self.cursor = self.conn.cursor() # 建立資料庫連線池 def __getconn(self): if self.__pool is None: i = random.randint(1, 100) print("執行緒池的亂數"+str(i)) self.__pool = PooledDB( creator=config.DB_CREATOR, mincached=config.DB_MIN_CACHED, maxcached=config.DB_MAX_CACHED, maxshared=config.DB_MAX_SHARED, maxconnections=config.DB_MAX_CONNECYIONS, blocking=config.DB_BLOCKING, maxusage=config.DB_MAX_USAGE, setsession=config.DB_SET_SESSION, host=config.DB_TEST_HOST, port=config.DB_TEST_PORT, user=config.DB_TEST_USER, passwd=config.DB_TEST_PASSWORD, db=config.DB_TEST_DBNAME, use_unicode=False, charset=config.DB_CHARSET ) return self.__pool.connection() # 釋放連線池資源 def __exit__(self, exc_type, exc_val, exc_tb): self.cursor.close() self.conn.close() # 關閉連線歸還給連結池 # def close(self): # self.cursor.close() # self.conn.close() # 從連線池中取出一個連線 def getconn(self): conn = self.__getconn() cursor = conn.cursor() return cursor, conn # 獲取連線池,範例化 @singleton def get_my_connection(): return MyConnectionPool()
開始測試:
from mysqlhelper import MySqLHelper import time if __name__ == '__main__': sql = "SELECT SLEEP(10)" sql1 = "SELECT SLEEP(15)" db = MySqLHelper() db.execute(sql) db.execute(sql1) time.sleep(20)
在資料庫中,使用 show processlist;
show processlist;
當執行第一個sql時。資料庫連線顯示。
當執行第二個sql時。資料庫連線顯示:
當執行完sql,程式sleep時。資料庫連線顯示:
程式列印結果:
執行緒池的亂數43
由以上可以得出結論:
執行緒池啟動後,生成了5個連線。執行第一個sql時,使用了1個連線。執行完第一個sql後,使用了另外1個連線。 這是一個線性的,執行緒池中一共5個連線,但是每次執行,只使用了其中一個。
有個疑問,連線池如果不支援並行是不是就毫無意義?
如上,雖然開了執行緒池5個連線,但是每次執行sql,只用到了一個連線。那為何不設定執行緒池大小為1呢?設定執行緒池大小的意義何在呢?(如果在非並行的場景下,是不是設定大小無意義?)
相比於不用執行緒池的優點:
如果不用執行緒池,則每次執行一個sql都要建立、斷開連線。 像我們這樣使用連線池,不用反覆建立、斷開連線,拿現成的連線直接用就好了。
from mysqlhelper import MySqLHelper import time if __name__ == '__main__': db = MySqLHelper() db1 = MySqLHelper() sql = "SELECT SLEEP(10)" sql1 = "SELECT SLEEP(15)" db.execute(sql) db1.execute(sql1) time.sleep(20)
第一個範例db,執行sql。執行緒池啟動了5個連線
第二個範例db1,執行sql:
程式睡眠時,一共5個執行緒池:
列印結果:
結果證明:
雖然我們依次建立了2個範例,但是(1)建立執行緒池的列印結果,只列印1次,且從始至終,執行緒池一共只啟動了5個連線,且連線的id沒有發生改變,說明一直是這5個連線。
證明,我們雖然建立了2個範例,但是這2個範例其實是一個範例。(單例模式是生效的)
import threading from mysqlhelper import MySqLHelper import time def sl1(): time.sleep(2) db = MySqLHelper() sql = "SELECT SLEEP(6)" db.execute(sql) def sl2(): time.sleep(4) db = MySqLHelper() sql = "SELECT SLEEP(15)" db.execute(sql) if __name__ == '__main__': threads = [] t1 = threading.Thread(target=sl1) threads.append(t1) t2 = threading.Thread(target=sl2) threads.append(t2) for t in threads: t.setDaemon(True) t.start() time.sleep(20)
2個執行緒間隔了2秒。
觀察資料庫的連線數量:
列印結果:
在並行執行2個sql時,共用了這5個連線,且列印結果只列印了一次,說明雖然並行建立了2次範例,但真正只建立了一個連線池。
import threading from mysqlhelper import MySqLHelper import time if __name__ == '__main__': db = MySqLHelper() sql = "SELECT SLEEP(6)" sql1 = "SELECT SLEEP(15)" threads = [] t1 = threading.Thread(target=db.execute, args=(sql,)) threads.append(t1) t2 = threading.Thread(target=db.execute, args=(sql1,)) threads.append(t2) for t in threads: t.setDaemon(True) t.start() time.sleep(20)
觀察資料庫連線 :
列印結果:
結果表明:
終端列印了2次,資料庫建立了10個連線,說明建立了2個執行緒池。這樣的單例模式,存線上程安全問題。
到此這篇關於Python封裝資料庫連線池詳解的文章就介紹到這了,更多相關Python連線池內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45