第5节,mysql-connector 创建数据库连接池

1. 什么时候使用数据库连接池

如果的程序需要以非常高的并发向mysql写入数据库时,该如何提高写入的速度呢?这里只考虑客户端程序,不考虑mysql服务端,假设mysql服务器的性能足够好。很容易想到采用多线程或者使用多进程。

使用多进程时,使用数据库连接池是没有效果的,因为数据库连接池只在多线程条件下才会使用。多线程条件下,如果你的程序与数据库之间只有一个连接,就会出现多个线程争抢一个数据库连接的情况,你不得不用多线程互斥技术避免出现混乱。

数据库连接池则很好的解决了多线程争抢数据库连接的问题。所谓数据库连接池,只是提前申请一定数量的连接,每个线程在需要向数据库写入数据时,先从连接池里申请连接,得到真实连接后进行数据库操作,操作结束后再将连接放回到连接池。假设你起的是10个多线程,综合实际并发,你或许只申请大小为5的连接池就足够了,这能保证同一时刻有5个线程同时操作数据库。

你去申请连接,未必总是能顺利的得到连接,因为连接池里的连接可能全部都处于busy状态,正在被其他的线程使用,因此你需要处理无法获取到连接的异常,或者将连接池的大小设置的与线程数相同,这样理论上每个线程都能得到一个连接。但这样做,并不理智,失去了创建连接池的部分意义。

为什么不能每个线程维护一个连接呢,这样就不需要数据库连接池了。这个想法很普遍,可这样做在技术上是一个糟糕的选择,因为在线程里维护连接是非常麻烦的。连接池则统一帮助你维护这些连接,如果某个连接断开了,连接池还可以帮你恢复连接。

2. mysql-connector 创建连接池

使用mysql-connector 创建连接池是非常容易的

from mysql.connector.pooling import MySQLConnectionPool

# 第一步,创建连接
mysql_pool = MySQLConnectionPool(
    host="10.110.30.3",     # 数据库主机地址
    user="flink_user",      # 数据库用户名
    passwd="123456",        # 数据库密码
    port=6606,
    database='flink_db',
    pool_size= 5
)

sql = "select * from city"
conn = mysql_pool.get_connection()
cursor = conn.cursor()
cursor.execute(sql)

for data in cursor.fetchall():
    print(data)

cursor.close()
conn.close()            # 将连接放回到连接池

接下来,我结合源码为你讲解连接池如何工作

2.1 建立连接池

创建连接池,用到了MySQLConnectionPool类,咱们来看一下它的初始化函数

class MySQLConnectionPool(object):
    """Class defining a pool of MySQL connections"""
    def __init__(self, pool_size=5, pool_name=None, pool_reset_session=True,
                 **kwargs):
                 
        if kwargs:
            self.set_config(**kwargs)
            cnt = 0
            while cnt < self._pool_size:
                self.add_connection()
                cnt += 1

为了易于阅读,我去掉了一些无关紧要的代码,后面的源码也做相同处理。在MySQLConnectionPool类的初始化函数里,根据_pool_size属性创建足够多的连接。在add_connecttion方法里,新创建的连接都放在了self._cnx_queue 中。

2.2 从连接池里获取连接

从连接池里获取连接使用get_connection方法,我们看看它是如何工作的

    def get_connection(self):
        with CONNECTION_POOL_LOCK:
            try:
                cnx = self._cnx_queue.get(block=False)      # 从self._cnx_queue 里获取连接
            except queue.Empty:
                raise errors.PoolError(
                    "Failed getting connection; pool exhausted")

            # pylint: disable=W0201,W0212
            if not cnx.is_connected() \
                    or self._config_version != cnx._pool_config_version:
                cnx.config(**self._cnx_config)
                try:
                    cnx.reconnect()     # 重新建立连接
                except errors.InterfaceError:
                    # Failed to reconnect, give connection back to pool
                    self._queue_connection(cnx)
                    raise
                cnx._pool_config_version = self._config_version
            # pylint: enable=W0201,W0212

            return PooledMySQLConnection(self, cnx)     # 返回PooledMySQLConnection对象

get_connection 做了3件事情:

  1. 从self._cnx_queue 获取连接
  2. 如果连接已经失效了,则重新建立连接
  3. 返回PooledMySQLConnection对象(内部包含cnx连接),这才是用户真正使用的可以操作数据库的对象

2.3 close将连接释放,放回到连接池

在使用完连接后,一定要调用close方法()释放连接,来看一下PooledMySQLConnection的close方法如何工作

    def close(self):
        try:
            cnx = self._cnx
            if self._cnx_pool.reset_session:
                cnx.reset_session()
        finally:
            self._cnx_pool.add_connection(cnx)
            self._cnx = None

close方法会重新将连接加入到连接池_cnx_pool中,在这之前,要将连接的session重置。

扫描关注, 与我技术互动

QQ交流群: 211426309

加入知识星球, 每天收获更多精彩内容

分享日常研究的python技术和遇到的问题及解决方案