Python--DBUtil
Python--DBUtil包
1 簡介
? ? DBUtils是一套Python數據庫連接池包,并允許對非線程安全的數據庫接口進行線程安全包裝。DBUtils來自Webware for Python。
? ? DBUtils提供兩種外部接口:
- PersistentDB :提供線程專用的數據庫連接,并自動管理連接。
 - PooledDB :提供線程間可共享的數據庫連接,并自動管理連接。
 
實測證明 PersistentDB 的速度是最高的,但是在某些特殊情況下,數據庫的連接過程可能異常緩慢,而此時的PooledDB則可以提供相對來說平均連接時間比較短的管理方式。
另外,實際使用的數據庫驅動也有所依賴,比如SQLite數據庫只能使用PersistentDB作連接池。 下載地址:http://www.webwareforpython.org/downloads/DBUtils/
安裝
pip install DBUtils?
2 使用方法
連接池對象只初始化一次,一般可以作為模塊級代碼來確保。 PersistentDB的連接例子:
import DBUtils.PersistentDB persist=DBUtils.PersistentDB.PersistentDB(dbpai=MySQLdb,maxusage=1000,**kwargs)這里的參數dbpai指使用的底層數據庫模塊,兼容DB-API的。maxusage則為一個連接最大使用次數,參考了官方例子。后面的**kwargs則為實際傳遞給MySQLdb的參數。
獲取連接: conn=persist.connection() 實際編程中用過的連接直接關閉 conn.close() 即可將連接交還給連接池。
PooledDB使用方法同PersistentDB,只是參數有所不同。
- * dbapi :數據庫接口,使用鏈接數據庫的模塊
 - * mincached :初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建
 - * maxcached :連接池最大可用連接數量
 - * maxshared :連接池最大可共享連接數量
 - * maxconnections :?連接池允許的最大連接數,0和None表示不限制連接數
 - * blocking :達到最大數量時是否阻塞
 - * maxusage :單個連接最大復用次數
 - * setsession :用于傳遞到數據庫的準備會話,如 [”set name UTF-8″] 。
 
一個使用過程:
import os import cx_Oracle # 用于以清晰、可讀的形式輸出 Python 數據結構 from pprint import pprint from sys import modules from DBUtils.PooledDB import PooledDBpool= PooledDB(cx_Oracle,user='test',password='test',dsn='testDB',mincached=5, maxcached=20) print(pool.connection()) print(connection.version)# 獲得游標對象 cursor = pool.connection().cursor ()try:# 解析sql語句cursor.parse("select * dual")# 捕獲SQL異常 except cx_Oracle.DatabaseError as e:print(e) # ORA-00923: 未找到要求的 FROM 關鍵字# 執行sql 語句 cursor.execute ("select * from dual") # 提取一條數據,返回一個元祖 row = cursor.fetchone() pprint(row) # ('X',)3.DBUtil功能
功能
SimplePooledDB
DBUtils.SimplePooledDB 是一個非常簡單的數據庫連接池實現。他比完善的 PooledDB 模塊缺少很多功能。 DBUtils.SimplePooledDB 本質上類似于 MiscUtils.DBPool 這個Webware的組成部分。你可以把它看作一種演示程序。
SteadyDB
DBUtils.SteadyDB 是一個模塊實現了”強硬”的數據庫連接,基于DB-API 2建立的原始連接。一個”強硬”的連接意味著在連接關閉之后,或者使用次數操作限制時會重新連接。
一個典型的例子是數據庫重啟時,而你的程序仍然在運行并需要訪問數據庫,或者當你的程序連接了一個防火墻后面的遠程數據庫,而防火墻重啟時丟失了狀態時。
一般來說你不需要直接使用 SteadyDB 它只是給接下來的兩個模塊提供基本服務, PersistentDB 和 PooledDB 。
PersistentDB
DBUtils.PersistentDB 實現了強硬的、線程安全的、頑固的數據庫連接,使用DB-API 2模塊。如下圖展示了使用 PersistentDB 時的連接層步驟:
persist.gif當一個線程首次打開一個數據庫連接時,一個連接會打開并僅供這個線程使用。當線程關閉連接時,連接仍然持續打開供這個線程下次請求時使用這個已經打開的連接。連接在線程死亡時自動關閉。
簡單的來說 PersistentDB 嘗試重用數據庫連接來提高線程化程序的數據庫訪問性能,并且他確保連接不會被線程之間共享。
因此, PersistentDB 可以在底層DB-API模塊并非線程安全的時候同樣工作的很好,并且他會在其他線程改變數據庫會話或者使用多語句事務時同樣避免問題的發生。
PooledDB
DBUtils.PooledDB 實現了一個強硬的、線程安全的、有緩存的、可復用的數據庫連接,使用任何DB-API 2模塊。如下圖展示了使用 PooledDB 時的工作流程:
pool.gif如圖所示 PooledDB 可以在不同線程之間共享打開的數據庫連接。這在你連接并指定 maxshared 參數,并且底層的DB-API 2接口是線程安全才可以,但是你仍然可以使用專用數據庫連接而不在線程之間共享連接。除了共享連接以外,還可以設立一個至少 mincached 的連接池,并且最多允許使用 maxcached 個連接,這可以同時用于專用和共享連接池。當一個線程關閉了一個非共享連接,則會返還到空閑連接池中等待下次使用。
如果底層DB-API模塊是非線程安全的,線程鎖會確保使用 PooledDB 是線程安全的。所以你并不需要為此擔心,但是你在使用專用連接來改變數據庫會話或執行多命令事務時必須小心。
該選擇哪一個?
PersistentDB 和 PooledDB 都是為了重用數據庫連接來提高性能,并保持數據庫的穩定性。
所以選擇何種模塊,可以參考上面的解釋。 PersistentDB 將會保持一定數量的連接供頻繁使用。在這種情況下你總是保持固定數量的連接。如果你的程序頻繁的啟動和關閉線程,最好使用 PooledDB 。后面將會提到更好的調整,尤其在使用線程安全的DB-API 2模塊時。
當然,這兩個模塊的接口是很相似的,你可以方便的在他們之間轉換,并查看哪個更好一些。
?
4.使用PooledDB 操作ORALCE數據庫案例
settings.py
import os''' 日志文件設置 ''' LOG_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) LOG_LEVEL = 'DEBUG' LOG_FILE = 'ops.log'""" 數據庫設置 """ DB_USER = 'XXX' DB_PASSWORD = 'XXX' DB_SID = 'XXX'print(LOG_DIR)?
?my_logset.py
#!/usr/bin/python # -*- coding: utf-8 -*- # @Time : 2018/4/23 8:55 # @Author : hyang # @File : my_logset.py # @Software: PyCharmimport logging import os import sys from logging import handlers BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) # 加入環境變量from utils import settings# 日志格式 log_format = '[%(asctime)s - %(levelname)s - %(name)s - %(filename)s - %(funcName)s- %(lineno)d ] %(message)s 'def get_mylogger(name):"""get log:param name::return:"""logger = logging.getLogger(name)logger.setLevel(settings.LOG_LEVEL)console_handler = logging.StreamHandler()# 文件絕對路徑logfile_path = os.path.join(settings.LOG_DIR, "log", settings.LOG_FILE)if not os.path.exists(logfile_path):# 創建log目錄os.mkdir(os.path.join(settings.LOG_DIR, "log"))# 每天創建一個日志文件,文件數不超過20個file_handler = handlers.TimedRotatingFileHandler(logfile_path, when="D", interval=1, backupCount=25)logger.addHandler(console_handler)logger.addHandler(file_handler)file_format = logging.Formatter(fmt=log_format)console_format = logging.Formatter(fmt=log_format, datefmt='%Y-%m-%d %H:%M:%S ')console_handler.setFormatter(console_format)file_handler.setFormatter(file_format)return loggerif __name__ == '__main__':log = get_mylogger('access')log.info('access')log.error('Error')# # log1 = get_mylogger('trans')# log1.info('trans')?
Oracle_util.py
#!/usr/bin/python # -*- coding: utf-8 -*- # @Time : 2018/5/22 13:17 # @Author : hyang # @File : Oracle_utils.py # @Software: PyCharm# 用于以清晰、可讀的形式輸出 Python 數據結構 from sys import modules import sys import os import cx_Oracle from DBUtils.PooledDB import PooledDBBASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) # 加入環境變量from utils import settings from utils import my_logset""" 通過PooledDB連接Oracle,并完成常用一些操作 """ class Oracle_util(object):__pool = None # 連接池對象_db_info = {'user': settings.DB_USER,'pwd': settings.DB_PASSWORD,'dsn': settings.DB_SID}def __init__(self, db_info=_db_info, arraysize = 500):# 日志self.db_log = my_logset.get_mylogger("oradb_access")self.db_info = db_infoself.conn = Oracle_util.__getConn(db_info)self.cursor = self.conn.cursor()# 每次從數據庫向Python的緩存返回arraysize=100條記錄self.cursor.arraysize = arraysize@staticmethoddef __getConn(db_info):# 靜態方法,從連接池中取出連接if Oracle_util.__pool is None:__pool = PooledDB(cx_Oracle,user=db_info['user'],password=db_info['pwd'],dsn=db_info['dsn'],mincached=20,maxcached=50)return __pool.connection()def get_columns(self, table):# 查詢表的所有列sql = ["select lower(column_name) column_name \from user_tab_columns where table_name=upper('%(table)s')"]rows = self.queryBySql(''.join(sql) % locals())col_list = [k["column_name"] for k in rows]# ['sjhm', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'status']return col_list# 根據表自動創建參數字典def create_params(self, table, args={}):col_list = self.get_columns(table)params = {}for k in col_list:if args.__contains__(k):params[k] = args[k]return params# 執行sqldef execute(self, sql, args={}):try:self.db_log.debug('execute sql:{}'.format(sql))return self.cursor.execute(sql, args)except Exception as e:self.close()raise e# 調用函數 函數名,返回類型, 參數('1',2)元祖類型def callfunc(self, func, ret_type=cx_Oracle.NUMBER, args=()):try:self.db_log.debug('call func:{} {}'.format(func, args))return self.cursor.callfunc(func,ret_type,args)except Exception as e:self.close()raise e# 調用過程 過程名,輸入參數('1',2)元祖類型def callproc(self, proc, in_val=()):try:self.db_log.debug('call proc:{} {}'.format(proc,in_val))return self.cursor.callproc(proc, in_val)except Exception as e:self.close()raise e# 解析sqldef parse(self,sql,args={}):try:# 解析sql語句return self.cursor.parse(sql,args)# 捕獲SQL異常except Exception as e:self.close()raise e# 批量執行def executemany(self, sql, args):try:self.db_log.debug('executemany sql:{}'.format(sql))return self.cursor.executemany(sql, args)except Exception as e:self.close()raise e# 執行sql,參數一:table,參數二:查詢列'col1,col2' 參數三:參數字典{'字段1':'值1','字段2':'值2'}def queryByTable(self, table, column='*',cond_dict={}):# self.execute(sql, args)cond_dict = self.create_params(table, cond_dict)cond_stmt = ' and '.join(['%s=:%s' % (k, k) for k in cond_dict.keys()])# del_sql = 'DELETE FROM %(table)s where %(cond_stmt)s'if not cond_dict:query_sql = 'select %(column)s FROM %(table)s'else:query_sql = 'select %(column)s FROM %(table)s where %(cond_stmt)s'self.execute(query_sql % locals(), cond_dict)return self.get_rows()# 執行sql,參數一:sql語句,如select * from python_modules where module_name=:module_name# 參數二:參數字典{'字段1':'值1','字段2':'值2'} 如{module_name:Oracle}def queryBySql(self, sql, args={}):self.execute(sql, args)return self.get_rows()# 導出結果為文件def exportTxt(self,file_name, sql, args={}, col_split='|', col_flg=True):""":param file_name: 文件位置:param sql: sql語句 如select module_name,china_name from python_modules where module_name=:module_name:param args: 參數 如{'module_name':'oracle'}:param col_split: 列分隔符:param col_flg: 是否輸出列名字段col1|col2:return:"""rt = self.queryBySql(sql, args)if rt:with open(file_name, 'w',encoding="utf-8") as fd:for row in rt:col_info = col_split.join(row.keys())val_info = ''if col_flg:fd.write(col_info+"\n")col_flg = Falseval_info += col_split.join(row.values())val_info += '\n'fd.write(val_info)# 分頁查詢,參數一:sql語句,參數二:參數字典{'字段1':'值1','字段2':'值2'},參數三:頁碼,參數四:分頁大小def query_pages(self, sql, args={}, page=1, page_size=30):_args, count_args = args, argspage = int(page)# print "page:%s" %(page,)# 下一頁next_page = page_size * page# 當前頁cur_page = page_size * (page - 1)if page == 1 or cur_page < 0:cur_page = 0next_page = page_sizesql = """SELECT * FROM(SELECT ROWNUM RN,T.* FROM(""" + sql + """)T WHERE ROWNUM<=:next_page)WHERE RN >=:cur_page """count_sql = """SELECT COUNT(1)CNT FROM (""" + sql + """)"""_args["cur_page"] = cur_page_args["next_page"] = next_pagerows = self.queryBySql(sql, _args)countrows = self.queryBySql(count_sql, count_args)return rows, countrows[0]['cnt']# oracle的參數名必須使用:代替,如 userid = :useriddef insertOne(self, table, column_dict):column_dict = self.create_params(table, column_dict)keys = ','.join(column_dict.keys())values = column_dict.values()placeholder = ','.join([':%s' % (v) for v in column_dict.keys()])ins_sql = 'INSERT INTO %(table)s (%(keys)s) VALUES (%(placeholder)s)'# print(ins_sql % locals())self.execute(ins_sql % locals(), column_dict)# 獲取序列的下一個值,傳入sequence的名稱def nextval(self, seq):self.cursor.execute("SELECT %(seq)s.nextval from dual " % locals())result = self.cursor.fetchall()return result[0][0]# 批量插入數據庫,參數一:表名,參數二:['字段1','字段2',...],參數二:[('值1','值2',...),('值1','值2',...)]def insertMany(self, table, columns=[], values=[]):keys = ','.join(columns)placeholder = ','.join([':%s' % (v) for v in columns])ins_sql = 'INSERT INTO %(table)s (%(keys)s) VALUES(%(placeholder)s)'self.executemany(ins_sql % locals(), values)return self._get_rows_num()# 更新,參數一:表名,參數二用于set 字段1=值1,字段2=值2...格式:{'字段1':'值1','字段2':'值2'},# 參數三:用于where條件,如 where 字段3=值3 and 字段4=值4,格式{'字段3':'值3','字段4':'值4'}def updateByTable(self, table, column_dict={}, cond_dict={}):column_dict = self.create_params(table, column_dict)cond_dict = self.create_params(table, cond_dict)set_stmt = ','.join(['%s=:%s' % (k, k) for k in column_dict.keys()])cond_stmt = ' and '.join(['%s=:%s' % (k, k) for k in cond_dict.keys()])if not cond_dict:upd_sql = 'UPDATE %(table)s set %(set_stmt)s'else:upd_sql = 'UPDATE %(table)s set %(set_stmt)s where %(cond_stmt)s'args = dict(column_dict, **cond_dict) # 合并成1個self.execute(upd_sql % locals(), args)return self._get_rows_num()# 刪除,參數一:表名,#參數二:用于where條件,如 where 字段3=值3 and 字段4=值4,格式{'字段3':'值3','字段4':'值4'}def deleteByTable(self, table, cond_dict={}):cond_dict = self.create_params(table, cond_dict)cond_stmt = ' and '.join(['%s=:%s' % (k, k) for k in cond_dict.keys()])# del_sql = 'DELETE FROM %(table)s where %(cond_stmt)s'if not cond_dict:del_sql = 'DELETE FROM %(table)s'else:del_sql = 'DELETE FROM %(table)s where %(cond_stmt)s'self.execute(del_sql % locals(), cond_dict)return self._get_rows_num()# 提取數據,參數一提取的記錄數,參數二,是否以字典方式提取。為true時返回:{'字段1':'值1','字段2':'值2'}def get_rows(self, size=None, is_dict=True):if size is None:rows = self.cursor.fetchall()else:rows = self.cursor.fetchmany(size)if rows is None:rows = []if is_dict:dict_rows = []dict_keys = [r[0].lower() for r in self.cursor.description]for row in rows:dict_rows.append(dict(zip(dict_keys, row)))rows = dict_rowsreturn rows# 獲取更改記錄數def _get_rows_num(self):return self.cursor.rowcount# 提交def commit(self):self.conn.commit()# 回滾def rollback(self):self.conn.rollback();# 銷毀def __del__(self):self.close()# 關閉連接def close(self):# self.commit() self.cursor.close()self.conn.close()if __name__ == '__main__':# exampleora = Oracle_util()create_table = """CREATE TABLE python_modules (module_name VARCHAR2(50) NOT NULL,file_path VARCHAR2(300) NOT NULL,china_name VARCHAR2(300) NOT NULL)"""# 執行創建表create_flag = ora.execute(create_table)# 得到表所有列print(ora.get_columns('python_modules'))# 添加模塊信息M = []for m_name, m_info in modules.items():try:M.append((m_name, m_info.__file__, '中國'))except AttributeError:passprint(len(M))print(ora.insertMany('python_modules',['module_name', 'file_path','china_name'],M))ora.commit()print(ora.queryBySql(sql="select * from python_modules where module_name=:module_name", args={'module_name':'DBUtils.PooledDB'}))print(ora.updateByTable(table='python_modules',column_dict={'china_name':'北京'},cond_dict={'module_name':'DBUtils.PooledDB'}))ora.commit()print(ora.queryBySql(sql="select * from python_modules where module_name=:module_name", args={'module_name':'DBUtils.PooledDB'}))print(ora.deleteByTable(table='python_modules', cond_dict={'module_name': 'DBUtils.PooledDB'}))ora.commit()print(ora.queryBySql(sql="select module_name,china_name from python_modules where module_name=:module_name", args={'module_name':'DBUtils.PooledDB'}))ora.updateByTable(table='python_modules', column_dict={'china_name': '河北'})ora.commit()ora.exportTxt("a.txt", sql="select * from python_modules", )print(ora.deleteByTable(table='python_modules'))ora.commit()print(ora.queryByTable(table="python_modules"))ora.execute("DROP TABLE python_modules PURGE")print(ora.callfunc('myfunc', cx_Oracle.NUMBER, ('abc', 2)))print(ora.callproc('myproc', (3,)))print(ora.queryByTable(table="ptab",column='mydata, myid',cond_dict={'myid':2}))?
轉載于:https://www.cnblogs.com/xiao-apple36/p/9071553.html
總結
以上是生活随笔為你收集整理的Python--DBUtil的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: 大数据技能练习之爬虫
 - 下一篇: WIN2003 X64 系统上安装sql