第12节,tornado 使用aiomysql 实现对mysql的异步操作

1. aiomysql

aiomysql 是一个支持异步访问mysql的python库,它依赖并重用PyMySQL的大部分部分代码,tornado 结合aiomysql 能够异步且高效的对mysql进行访问操作。

安装方式

pip install aiomysql

异步代码的编写对于初学者而言,只需要注意使用await关键字就可以了,其他的与同步代码几乎相同,这也是async/await 关键字的优点,它让异步代码看起来与同步代码相似。

为了能在tornado 服务里更好的使用aiomysql, 我封装了一个AioMysqlClient 类,用来实现对mysql数据库的访问功能

db.py

import aiomysql.cursors
import asyncio
import traceback
from tornado.ioloop import IOLoop


class AioMysqlClient():
    def __init__(self, host, port, username, password, db_name, **kwargs):
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.db_name = db_name
        self.kwargs = kwargs            # 其他参数
        self.conn_pool = None           # 连接池
        self.is_connected = False       # 是否处于连接状态
        self.lock = asyncio.Lock()      # 异步锁

    async def init_pool(self):
        """
        创建数据库连接
        :return:
        """
        print("init_pool")
        try:
            self.conn_pool = await aiomysql.create_pool(
                host=self.host,
                port=self.port,
                user=self.username,
                password=self.password,
                db=self.db_name,
                **self.kwargs
            )
            self.is_connected = True
        except:
            print(traceback.format_exc())
            self.is_connected = False

        return self

    async def insert(self, sql, args= None):
        conn = await self.conn_pool.acquire()
        cur = await self.execute(conn, sql, args)
        await conn.commit()
        conn.close()            # 不是关闭连接,而是还到连接池中
        return cur

    async def fetch_one(self, sql, args=None):
        conn = await self.conn_pool.acquire()
        cur = await self.execute(conn, sql, args)
        if cur.rowcount == 0:
            return None
        return await cur.fetchone()

    async def fetch_all(self, sql, args=None):
        conn = await self.conn_pool.acquire()
        cur = await self.execute(conn, sql, args)
        if cur.rowcount == 0:
            return None

        return await cur.fetchall()

    async def execute(self, conn, sql, args=None):
        async with conn.cursor(aiomysql.DictCursor) as cur:
            await cur.execute(sql, args)            # 执行sql
            return cur

async def test_insert():
    mysql_client = AioMysqlClient("127.0.0.1", 6606, 'root', 'root', 'test')
    await mysql_client.init_pool()
    sql = "insert into student(name, age)values('小明', 14)"
    print("执行sql")
    cur = await mysql_client.insert(sql)
    print(cur.lastrowid)

async def test_select():
    mysql_client = AioMysqlClient("127.0.0.1", 6606, 'root', 'root', 'test')
    await mysql_client.init_pool()
    sql = "select * from student"
    data = await mysql_client.fetch_one(sql)
    print(data)

    datas = await mysql_client.fetch_all(sql)
    print(datas)


if __name__ == "__main__":
    asyncio.run(test_select())

AioMysqlClient 在内部维护了一个连接池,使用的时候,通过acquire方法获取一个空闲的连接,连接使用结束后,通过close方法归还给连接池。

2. tornado 结合 aiomysql 访问mysql

app.py

import json
import asyncio
import tornado.ioloop
from tornado.web import RequestHandler, Application
from tornado.httpserver import HTTPServer
from tornado.options import options, define
from tornado.ioloop import IOLoop
from db import AioMysqlClient

define('port', default=8000, help='监听端口')

async def connect_mysql():
    mysql_client = AioMysqlClient("127.0.0.1", 6606, 'root', 'root', 'test')
    await mysql_client.init_pool()
    return mysql_client

class BaseHnadler(RequestHandler):
    def initialize(self):
        self.mysql_client = self.settings['mysql']

class StudentsHandler(BaseHnadler):
    async def post(self):
        data = json.loads(self.request.body)
        sql = f"insert into student(name, age)values('{data['name']}', {data['age']})"
        cur = await self.mysql_client.insert(sql)
        return self.write(json.dumps({'id': cur.lastrowid}))


class StudentInfoHandler(BaseHnadler):
    async def get(self, id):
        sql = f"select * from student where id = {id}"
        data = await self.mysql_client.fetch_one(sql)
        return self.write(json.dumps(data))

def make_app(config):
    options.parse_command_line()
    handlers_routes = [
        (r'/students', StudentsHandler),
        (r'/students/(.*)', StudentInfoHandler)
    ]
    app = Application(handlers=handlers_routes, **config)
    return app


def main():
    config = {
        'mysql': IOLoop.current().run_sync(connect_mysql)
    }
    app = make_app(config)
    http_server = HTTPServer(app)
    http_server.listen(options.port)
    print('ok')
    tornado.ioloop.IOLoop.current().start()

if __name__ == '__main__':
    main()

2.1 初始化AioMysqlClient对象

RequestHandler 可以共用一个数据库连接池,初始化操作放在connect_mysql函数中进行,通过IOLoop.current().run_sync 方法运行,如果connect_mysql是异步的,那么run_sync 会一直运行直到有结果,必须这样处理,才能在server启动前完成mysql连接的初始化操作。

通过向Application 初始化函数传入config, 在RequestHandler 中就可以通过self.settings['mysql'] 获得AioMysqlClient 实例对象。

奇怪的是,以上代码在windows系统下无法正常运行,只能在linux系统下正确执行,问题就出在run_sync 上,总是报错

RuntimeError: no running event loop

关于这个问题,慢慢研究吧,等有了结果,我会更新这篇教程。

2.2 BaseHnadler

BaseHnadler 是所有处理请求类的父类,在这里可以编写所有子类都会用到的代码,RequestHandler 在被创建以后,会调用initialize 做初始化操作,在这里我定义了mysql_client, 后续的调用会更加方便。

2.3 测试代码

现在,服务已经可以正常启动了,接下来,要多一些测试来验证我们的代码是正确的

import requests

def test_add():
    data = {
        'name': '小刚',
        'age': 15
    }

    res = requests.post('http://127.0.0.1:8000/students', json=data)
    print(res.json())

if __name__ == '__main__':
    test_add()

这段代码用来测试mysql新增,测试查询请求,可以直接通过浏览器访问

http://127.0.0.1:8000/students/1

扫描关注, 与我技术互动

QQ交流群: 211426309

加入知识星球, 每天收获更多精彩内容

分享日常研究的python技术和遇到的问题及解决方案