设为首页收藏本站
网站公告 | 这是第一条公告
     

 找回密码
 立即注册
缓存时间07 现在时间07 缓存数据 给自己一个目标,给自己一个希望,给自己一份爱、一份温暖,只为今天快乐,不为昨天烦恼,自己照顾好自己,我的朋友。

给自己一个目标,给自己一个希望,给自己一份爱、一份温暖,只为今天快乐,不为昨天烦恼,自己照顾好自己,我的朋友。

查看: 631|回复: 3

python3 实现mysql数据库连接池的示例代码

[复制链接]

  离线 

TA的专栏

  • 打卡等级:热心大叔
  • 打卡总天数:221
  • 打卡月天数:0
  • 打卡总奖励:3485
  • 最近打卡:2025-03-22 06:57:18
等级头衔

等級:晓枫资讯-上等兵

在线时间
0 小时

积分成就
威望
0
贡献
305
主题
236
精华
0
金钱
4303
积分
599
注册时间
2023-1-26
最后登录
2025-5-31

发表于 2023-5-24 09:12:59 | 显示全部楼层 |阅读模式
dbutils封装文件传送门
DBUtils是一套Python数据库连接池包,并允许对非线程安全的数据库接口进行线程安全包装。DBUtils来自Webware for Python。
DBUtils提供两种外部接口:
       
  • PersistentDB :提供线程专用的数据库连接,并自动管理连接。   
  • PooledDB :提供线程间可共享的数据库连接,并自动管理连接。
需要库
1、DBUtils pip install DBUtils
2、pymysql pip install pymysql/MySQLdb
创建DButils组件
db_config.py 配置文件
  1. # -*- coding: UTF-8 -*-
  2. import pymysql

  3. # 数据库信息
  4. DB_TEST_HOST = "127.0.0.1"
  5. DB_TEST_PORT = 3306
  6. DB_TEST_DBNAME = "ball"
  7. DB_TEST_USER = "root"
  8. DB_TEST_PASSWORD = "123456"

  9. # 数据库连接编码
  10. DB_CHARSET = "utf8"

  11. # mincached : 启动时开启的闲置连接数量(缺省值 0 开始时不创建连接)
  12. DB_MIN_CACHED = 10

  13. # maxcached : 连接池中允许的闲置的最多连接数量(缺省值 0 代表不闲置连接池大小)
  14. DB_MAX_CACHED = 10

  15. # maxshared : 共享连接数允许的最大数量(缺省值 0 代表所有连接都是专用的)如果达到了最大数量,被请求为共享的连接将会被共享使用
  16. DB_MAX_SHARED = 20

  17. # maxconnecyions : 创建连接池的最大数量(缺省值 0 代表不限制)
  18. DB_MAX_CONNECYIONS = 100

  19. # blocking : 设置在连接池达到最大数量时的行为(缺省值 0 或 False 代表返回一个错误<tomany......> 其他代表阻塞直到连接数减少,连接被分配)
  20. DB_BLOCKING = True

  21. # maxusage : 单个连接的最大允许复用次数(缺省值 0 或 False 代表不限制的复用).当达到最大数时,连接会自动重新连接(关闭和重新打开)
  22. DB_MAX_USAGE = 0

  23. # setsession : 一个可选的SQL命令列表用于准备每个会话,如["set datestyle to german", ...]
  24. DB_SET_SESSION = None

  25. # creator : 使用连接数据库的模块
  26. DB_CREATOR = pymysql
复制代码
db_dbutils_init.py 创建数据池初始化
  1. from DBUtils.PooledDB import PooledDB
  2. import db_config as config

  3. """
  4. @功能:创建数据库连接池
  5. """


  6. class MyConnectionPool(object):
  7.     __pool = None

  8.     # def __init__(self):
  9.     #     self.conn = self.__getConn()
  10.     #     self.cursor = self.conn.cursor()

  11.     # 创建数据库连接conn和游标cursor
  12.     def __enter__(self):
  13.         self.conn = self.__getconn()
  14.         self.cursor = self.conn.cursor()

  15.     # 创建数据库连接池
  16.     def __getconn(self):
  17.         if self.__pool is None:
  18.             self.__pool = PooledDB(
  19.                 creator=config.DB_CREATOR,
  20.                 mincached=config.DB_MIN_CACHED,
  21.                 maxcached=config.DB_MAX_CACHED,
  22.                 maxshared=config.DB_MAX_SHARED,
  23.                 maxconnections=config.DB_MAX_CONNECYIONS,
  24.                 blocking=config.DB_BLOCKING,
  25.                 maxusage=config.DB_MAX_USAGE,
  26.                 setsession=config.DB_SET_SESSION,
  27.                 host=config.DB_TEST_HOST,
  28.                 port=config.DB_TEST_PORT,
  29.                 user=config.DB_TEST_USER,
  30.                 passwd=config.DB_TEST_PASSWORD,
  31.                 db=config.DB_TEST_DBNAME,
  32.                 use_unicode=False,
  33.                 charset=config.DB_CHARSET
  34.             )
  35.         return self.__pool.connection()

  36.     # 释放连接池资源
  37.     def __exit__(self, exc_type, exc_val, exc_tb):
  38.         self.cursor.close()
  39.         self.conn.close()

  40.     # 关闭连接归还给链接池
  41.     # def close(self):
  42.     #     self.cursor.close()
  43.     #     self.conn.close()

  44.     # 从连接池中取出一个连接
  45.     def getconn(self):
  46.         conn = self.__getconn()
  47.         cursor = conn.cursor()
  48.         return cursor, conn


  49. # 获取连接池,实例化
  50. def get_my_connection():
  51.     return MyConnectionPool()
复制代码
制作mysqlhelper.py
  1. from db_dbutils_init import get_my_connection

  2. """执行语句查询有结果返回结果没有返回0;增/删/改返回变更数据条数,没有返回0"""


  3. class MySqLHelper(object):
  4.     def __init__(self):
  5.         self.db = get_my_connection()  # 从数据池中获取连接

  6.     def __new__(cls, *args, **kwargs):
  7.         if not hasattr(cls, 'inst'):  # 单例
  8.             cls.inst = super(MySqLHelper, cls).__new__(cls, *args, **kwargs)
  9.         return cls.inst

  10.     # 封装执行命令
  11.     def execute(self, sql, param=None, autoclose=False):
  12.         """
  13.         【主要判断是否有参数和是否执行完就释放连接】
  14.         :param sql: 字符串类型,sql语句
  15.         :param param: sql语句中要替换的参数"select %s from tab where id=%s" 其中的%s就是参数
  16.         :param autoclose: 是否关闭连接
  17.         :return: 返回连接conn和游标cursor
  18.         """
  19.         cursor, conn = self.db.getconn()  # 从连接池获取连接
  20.         count = 0
  21.         try:
  22.             # count : 为改变的数据条数
  23.             if param:
  24.                 count = cursor.execute(sql, param)
  25.             else:
  26.                 count = cursor.execute(sql)
  27.             conn.commit()
  28.             if autoclose:
  29.                 self.close(cursor, conn)
  30.         except Exception as e:
  31.             pass
  32.         return cursor, conn, count

  33.     # 执行多条命令
  34.     # def executemany(self, lis):
  35.     #     """
  36.     #     :param lis: 是一个列表,里面放的是每个sql的字典'[{"sql":"xxx","param":"xx"}....]'
  37.     #     :return:
  38.     #     """
  39.     #     cursor, conn = self.db.getconn()
  40.     #     try:
  41.     #         for order in lis:
  42.     #             sql = order['sql']
  43.     #             param = order['param']
  44.     #             if param:
  45.     #                 cursor.execute(sql, param)
  46.     #             else:
  47.     #                 cursor.execute(sql)
  48.     #         conn.commit()
  49.     #         self.close(cursor, conn)
  50.     #         return True
  51.     #     except Exception as e:
  52.     #         print(e)
  53.     #         conn.rollback()
  54.     #         self.close(cursor, conn)
  55.     #         return False

  56.     # 释放连接
  57.     def close(self, cursor, conn):
  58.         """释放连接归还给连接池"""
  59.         cursor.close()
  60.         conn.close()

  61.     # 查询所有
  62.     def selectall(self, sql, param=None):
  63.         try:
  64.             cursor, conn, count = self.execute(sql, param)
  65.             res = cursor.fetchall()
  66.             return res
  67.         except Exception as e:
  68.             print(e)
  69.             self.close(cursor, conn)
  70.             return count

  71.     # 查询单条
  72.     def selectone(self, sql, param=None):
  73.         try:
  74.             cursor, conn, count = self.execute(sql, param)
  75.             res = cursor.fetchone()
  76.             self.close(cursor, conn)
  77.             return res
  78.         except Exception as e:
  79.             print("error_msg:", e.args)
  80.             self.close(cursor, conn)
  81.             return count

  82.     # 增加
  83.     def insertone(self, sql, param):
  84.         try:
  85.             cursor, conn, count = self.execute(sql, param)
  86.             # _id = cursor.lastrowid()  # 获取当前插入数据的主键id,该id应该为自动生成为好
  87.             conn.commit()
  88.             self.close(cursor, conn)
  89.             return count
  90.             # 防止表中没有id返回0
  91.             # if _id == 0:
  92.             #     return True
  93.             # return _id
  94.         except Exception as e:
  95.             print(e)
  96.             conn.rollback()
  97.             self.close(cursor, conn)
  98.             return count

  99.     # 增加多行
  100.     def insertmany(self, sql, param):
  101.         """
  102.         :param sql:
  103.         :param param: 必须是元组或列表[(),()]或((),())
  104.         :return:
  105.         """
  106.         cursor, conn, count = self.db.getconn()
  107.         try:
  108.             cursor.executemany(sql, param)
  109.             conn.commit()
  110.             return count
  111.         except Exception as e:
  112.             print(e)
  113.             conn.rollback()
  114.             self.close(cursor, conn)
  115.             return count

  116.     # 删除
  117.     def delete(self, sql, param=None):
  118.         try:
  119.             cursor, conn, count = self.execute(sql, param)
  120.             self.close(cursor, conn)
  121.             return count
  122.         except Exception as e:
  123.             print(e)
  124.             conn.rollback()
  125.             self.close(cursor, conn)
  126.             return count

  127.     # 更新
  128.     def update(self, sql, param=None):
  129.         try:
  130.             cursor, conn, count = self.execute(sql, param)
  131.             conn.commit()
  132.             self.close(cursor, conn)
  133.             return count
  134.         except Exception as e:
  135.             print(e)
  136.             conn.rollback()
  137.             self.close(cursor, conn)
  138.             return count


  139. if __name__ == '__main__':
  140.     db = MySqLHelper()
  141.     # # 查询单条
  142.     # sql1 = 'select * from userinfo where name=%s'
  143.     # args = 'python'
  144.     # ret = db.selectone(sql=sql1, param=args)
  145.     # print(ret)  # (None, b'python', b'123456', b'0')
  146.     # 增加单条
  147.     # sql2 = 'insert into userinfo (name,password) VALUES (%s,%s)'
  148.     # ret = db.insertone(sql2, ('old2','22222'))
  149.     # print(ret)
  150.     # 增加多条
  151.     # sql3 = 'insert into userinfo (name,password) VALUES (%s,%s)'
  152.     # li = li = [
  153.     #     ('分省', '123'),
  154.     #     ('到达','456')
  155.     # ]
  156.     # ret = db.insertmany(sql3,li)
  157.     # print(ret)
  158.     # 删除
  159.     # sql4 = 'delete from  userinfo WHERE name=%s'
  160.     # args = 'xxxx'
  161.     # ret = db.delete(sql4, args)
  162.     # print(ret)
  163.     # 更新
  164.     # sql5 = r'update userinfo set password=%s WHERE name LIKE %s'
  165.     # args = ('993333993', '%old%')
  166.     # ret = db.update(sql5, args)
  167.     # print(ret)
复制代码
python3 实现mysql数据库连接池


原理
101348qqx2fmfth2x2bmnb.png

python编程中可以使用MySQLdb进行数据库的连接及诸如查询/插入/更新等操作,但是每次连接mysql数据库请求时,都是独立的去请求访问,相当浪费资源,
而且访问数量达到一定数量时,对mysql的性能会产生较大的影响。
因此,实际使用中,通常会使用数据库的连接池技术,来访问数据库达到资源复用的目的。
安装数据库连接池模块DBUtils

  1. pip3 install DBUtils
复制代码
DBUtils是一套Python数据库连接池包,并允许对非线程安全的数据库接口进行线程安全包装。DBUtils来自Webware for Python。
DBUtils提供两种外部接口:
       
  • * PersistentDB :提供线程专用的数据库连接,并自动管理连接。   
  • * PooledDB :提供线程间可共享的数据库连接,并自动管理连接。
下载地址:DBUtils   下载解压后,使用python setup.py install 命令进行安装
下面利用MySQLdb和DBUtils建立自己的mysql数据库连接池工具包

在工程目录下新建package命名为:dbConnecttion,并新建module命名为MySqlConn,下面是MySqlConn.py,该模块创建Mysql的连接池对象,并创建了如查询/插入等通用的操作方法。该部分代码实现如下:
还有很多其他参数可以配置:
    dbapi :数据库接口
    mincached :启动时开启的空连接数量
    maxcached :连接池最大可用连接数量
    maxshared :连接池最大可共享连接数量
    maxconnections :最大允许连接数量
    blocking :达到最大数量时是否阻塞
    maxusage :单个连接最大复用次数
根据自己的需要合理配置上述的资源参数,以满足自己的实际需要。
代码:
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. import pymysql, os, configparser
  4. from pymysql.cursors import DictCursor
  5. from DBUtils.PooledDB import PooledDB


  6. class Config(object):
  7.     """
  8.     # Config().get_content("user_information")

  9.     配置文件里面的参数
  10.     [notdbMysql]
  11.     host = 192.168.1.101
  12.     port = 3306
  13.     user = root
  14.     password = python123
  15.     """

  16.     def __init__(self, config_filename="myProjectConfig.cnf"):
  17.         file_path = os.path.join(os.path.dirname(__file__), config_filename)
  18.         self.cf = configparser.ConfigParser()
  19.         self.cf.read(file_path)

  20.     def get_sections(self):
  21.         return self.cf.sections()

  22.     def get_options(self, section):
  23.         return self.cf.options(section)

  24.     def get_content(self, section):
  25.         result = {}
  26.         for option in self.get_options(section):
  27.             value = self.cf.get(section, option)
  28.             result[option] = int(value) if value.isdigit() else value
  29.         return result


  30. class BasePymysqlPool(object):
  31.     def __init__(self, host, port, user, password, db_name=None):
  32.         self.db_host = host
  33.         self.db_port = int(port)
  34.         self.user = user
  35.         self.password = str(password)
  36.         self.db = db_name
  37.         self.conn = None
  38.         self.cursor = None


  39. class MyPymysqlPool(BasePymysqlPool):
  40.     """
  41.     MYSQL数据库对象,负责产生数据库连接 , 此类中的连接采用连接池实现获取连接对象:conn = Mysql.getConn()
  42.             释放连接对象;conn.close()或del conn
  43.     """
  44.     # 连接池对象
  45.     __pool = None

  46.     def __init__(self, conf_name=None):
  47.         self.conf = Config().get_content(conf_name)
  48.         super(MyPymysqlPool, self).__init__(**self.conf)
  49.         # 数据库构造函数,从连接池中取出连接,并生成操作游标
  50.         self._conn = self.__getConn()
  51.         self._cursor = self._conn.cursor()

  52.     def __getConn(self):
  53.         """
  54.         @summary: 静态方法,从连接池中取出连接
  55.         @return MySQLdb.connection
  56.         """
  57.         if MyPymysqlPool.__pool is None:
  58.             __pool = PooledDB(creator=pymysql,
  59.                               mincached=1,
  60.                               maxcached=20,
  61.                               host=self.db_host,
  62.                               port=self.db_port,
  63.                               user=self.user,
  64.                               passwd=self.password,
  65.                               db=self.db,
  66.                               use_unicode=False,
  67.                               charset="utf8",
  68.                               cursorclass=DictCursor)
  69.         return __pool.connection()

  70.     def getAll(self, sql, param=None):
  71.         """
  72.         @summary: 执行查询,并取出所有结果集
  73.         @param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
  74.         @param param: 可选参数,条件列表值(元组/列表)
  75.         @return: result list(字典对象)/boolean 查询到的结果集
  76.         """
  77.         if param is None:
  78.             count = self._cursor.execute(sql)
  79.         else:
  80.             count = self._cursor.execute(sql, param)
  81.         if count > 0:
  82.             result = self._cursor.fetchall()
  83.         else:
  84.             result = False
  85.         return result

  86.     def getOne(self, sql, param=None):
  87.         """
  88.         @summary: 执行查询,并取出第一条
  89.         @param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
  90.         @param param: 可选参数,条件列表值(元组/列表)
  91.         @return: result list/boolean 查询到的结果集
  92.         """
  93.         if param is None:
  94.             count = self._cursor.execute(sql)
  95.         else:
  96.             count = self._cursor.execute(sql, param)
  97.         if count > 0:
  98.             result = self._cursor.fetchone()
  99.         else:
  100.             result = False
  101.         return result

  102.     def getMany(self, sql, num, param=None):
  103.         """
  104.         @summary: 执行查询,并取出num条结果
  105.         @param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
  106.         @param num:取得的结果条数
  107.         @param param: 可选参数,条件列表值(元组/列表)
  108.         @return: result list/boolean 查询到的结果集
  109.         """
  110.         if param is None:
  111.             count = self._cursor.execute(sql)
  112.         else:
  113.             count = self._cursor.execute(sql, param)
  114.         if count > 0:
  115.             result = self._cursor.fetchmany(num)
  116.         else:
  117.             result = False
  118.         return result

  119.     def insertMany(self, sql, values):
  120.         """
  121.         @summary: 向数据表插入多条记录
  122.         @param sql:要插入的SQL格式
  123.         @param values:要插入的记录数据tuple(tuple)/list[list]
  124.         @return: count 受影响的行数
  125.         """
  126.         count = self._cursor.executemany(sql, values)
  127.         return count

  128.     def __query(self, sql, param=None):
  129.         if param is None:
  130.             count = self._cursor.execute(sql)
  131.         else:
  132.             count = self._cursor.execute(sql, param)
  133.         return count

  134.     def update(self, sql, param=None):
  135.         """
  136.         @summary: 更新数据表记录
  137.         @param sql: SQL格式及条件,使用(%s,%s)
  138.         @param param: 要更新的  值 tuple/list
  139.         @return: count 受影响的行数
  140.         """
  141.         return self.__query(sql, param)

  142.     def insert(self, sql, param=None):
  143.         """
  144.         @summary: 更新数据表记录
  145.         @param sql: SQL格式及条件,使用(%s,%s)
  146.         @param param: 要更新的  值 tuple/list
  147.         @return: count 受影响的行数
  148.         """
  149.         return self.__query(sql, param)

  150.     def delete(self, sql, param=None):
  151.         """
  152.         @summary: 删除数据表记录
  153.         @param sql: SQL格式及条件,使用(%s,%s)
  154.         @param param: 要删除的条件 值 tuple/list
  155.         @return: count 受影响的行数
  156.         """
  157.         return self.__query(sql, param)

  158.     def begin(self):
  159.         """
  160.         @summary: 开启事务
  161.         """
  162.         self._conn.autocommit(0)

  163.     def end(self, option='commit'):
  164.         """
  165.         @summary: 结束事务
  166.         """
  167.         if option == 'commit':
  168.             self._conn.commit()
  169.         else:
  170.             self._conn.rollback()

  171.     def dispose(self, isEnd=1):
  172.         """
  173.         @summary: 释放连接池资源
  174.         """
  175.         if isEnd == 1:
  176.             self.end('commit')
  177.         else:
  178.             self.end('rollback')
  179.         self._cursor.close()
  180.         self._conn.close()


  181. if __name__ == '__main__':
  182.     mysql = MyPymysqlPool("notdbMysql")

  183.     sqlAll = "select * from myTest.aa;"
  184.     result = mysql.getAll(sqlAll)
  185.     print(result)

  186.     sqlAll = "select * from myTest.aa;"
  187.     result = mysql.getMany(sqlAll, 2)
  188.     print(result)

  189.     result = mysql.getOne(sqlAll)
  190.     print(result)

  191.     # mysql.insert("insert into myTest.aa set a=%s", (1))

  192.     # 释放资源
  193.     mysql.dispose()
复制代码
参考博客:https://www.cnblogs.com/renfanzi/p/7656142.html
到此这篇关于python3 实现mysql数据库连接池的示例代码的文章就介绍到这了,更多相关python3 mysql连接池内容请搜索晓枫资讯以前的文章或继续浏览下面的相关文章希望大家以后多多支持晓枫资讯!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
晓枫资讯-科技资讯社区-免责声明
免责声明:以上内容为本网站转自其它媒体,相关信息仅为传递更多信息之目的,不代表本网观点,亦不代表本网站赞同其观点或证实其内容的真实性。
      1、注册用户在本社区发表、转载的任何作品仅代表其个人观点,不代表本社区认同其观点。
      2、管理员及版主有权在不事先通知或不经作者准许的情况下删除其在本社区所发表的文章。
      3、本社区的文章部分内容可能来源于网络,仅供大家学习与参考,如有侵权,举报反馈:点击这里给我发消息进行删除处理。
      4、本社区一切资源不代表本站立场,并不代表本站赞同其观点和对其真实性负责。
      5、以上声明内容的最终解释权归《晓枫资讯-科技资讯社区》所有。
http://bbs.yzwlo.com 晓枫资讯--游戏IT新闻资讯~~~

  离线 

TA的专栏

等级头衔

等級:晓枫资讯-列兵

在线时间
0 小时

积分成就
威望
0
贡献
0
主题
0
精华
0
金钱
12
积分
4
注册时间
2022-12-24
最后登录
2022-12-24

发表于 2025-2-14 04:03:06 | 显示全部楼层
顶顶更健康!!!
http://bbs.yzwlo.com 晓枫资讯--游戏IT新闻资讯~~~

  离线 

TA的专栏

  • 打卡等级:无名新人
  • 打卡总天数:1
  • 打卡月天数:0
  • 打卡总奖励:10
  • 最近打卡:2024-07-10 07:28:03
等级头衔

等級:晓枫资讯-列兵

在线时间
0 小时

积分成就
威望
0
贡献
0
主题
0
精华
0
金钱
23
积分
6
注册时间
2023-10-21
最后登录
2024-7-10

发表于 2025-3-9 11:40:54 | 显示全部楼层
路过,支持一下
http://bbs.yzwlo.com 晓枫资讯--游戏IT新闻资讯~~~

  离线 

TA的专栏

等级头衔

等級:晓枫资讯-列兵

在线时间
0 小时

积分成就
威望
0
贡献
0
主题
0
精华
0
金钱
14
积分
8
注册时间
2022-12-25
最后登录
2022-12-25

发表于 3 天前 | 显示全部楼层
感谢楼主分享。
http://bbs.yzwlo.com 晓枫资讯--游戏IT新闻资讯~~~
严禁发布广告,淫秽、色情、赌博、暴力、凶杀、恐怖、间谍及其他违反国家法律法规的内容。!晓枫资讯-社区
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

1楼
2楼
3楼
4楼

手机版|晓枫资讯--科技资讯社区 本站已运行

CopyRight © 2022-2025 晓枫资讯--科技资讯社区 ( BBS.yzwlo.com ) . All Rights Reserved .

晓枫资讯--科技资讯社区

本站内容由用户自主分享和转载自互联网,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。

如有侵权、违反国家法律政策行为,请联系我们,我们会第一时间及时清除和处理! 举报反馈邮箱:点击这里给我发消息

Powered by Discuz! X3.5

快速回复 返回顶部 返回列表