MySQL 原生协议
1 MySQL 连接配置
配置数据库连接方式
conf/config.py
DATABASES = {
"default": "mysql+retrypool://root:password@127.0.0.1:3306/test?max_connections=300&stale_timeout=300",
}
Butterfly 访问 MySQL 基于 Peewee 库,Peewee 是一个简单小巧的 Python ORM,它非常容易学习,使用起来非常方便。
2 Butterfly handler 访问 MySQL 数据库
2.1 前言
以下例子 SQL 语句来自 菜鸟教程 并转为 handler 例子
备注
如果 SQL 语句中使用了 '%' 的地方,则需要使用 '%%' 代替
SQL 语句拼接时使用 format 方法拼接,而不是 %, 当然也不要使用 + 号进行拼接(比较容易 SQL 注入)
常见场景:
(1)时间格式:
sql = ("select date_format(c_time, '%%Y-%%m-%%d' ) days, count(*) count "
"from "
"qingnang_job "
"where job_type = 'failover' "
"group by days;"
)
(2) 模糊匹配:
sql = "SELECT * FROM EMPLOYEE WHERE FIRST_NAME LIKE '%%{first_name}%%'".format(first_name="M")
2.2 创建数据库表
code
# coding=utf8
from xlib.httpgateway import Request
from xlib import retstat
from xlib.middleware import funcattr
from xlib import db
@funcattr.api
def create_table(req):
"""
带参数请求例子
Args:
req : Request
Returns:
json_status, Content, headers
"""
isinstance(req, Request)
sql = """CREATE TABLE EMPLOYEE (
FIRST_NAME CHAR(20) NOT NULL,
LAST_NAME CHAR(20),
AGE INT,
SEX CHAR(1),
INCOME FLOAT )"""
db.my_databases["default"].execute_sql(sql)
return retstat.OK, {}, [(__info, __version)]
数据库信息
mysql> desc EMPLOYEE;
+------------+----------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------+----------+------+-----+---------+-------+
| FIRST_NAME | char(20) | NO | | NULL | |
| LAST_NAME | char(20) | YES | | NULL | |
| AGE | int(11) | YES | | NULL | |
| SEX | char(1) | YES | | NULL | |
| INCOME | float | YES | | NULL | |
+------------+----------+------+-----+---------+-------+
2.3 数据库插入操作
code
# coding=utf8
from xlib.httpgateway import Request
from xlib import retstat
from xlib.middleware import funcattr
from xlib import db
@funcattr.api
def create(req):
"""
Args:
req : Request
Returns:
json_status, Content, headers
"""
isinstance(req, Request)
sql = """INSERT INTO EMPLOYEE(FIRST_NAME,
LAST_NAME, AGE, SEX, INCOME)
VALUES ('Mac', 'Mohan', 20, 'M', 2000)"""
db.my_databases["default"].execute_sql(sql)
return retstat.OK, {}, [(__info, __version)]
数据库信息
mysql> select * from EMPLOYEE;
+------------+-----------+------+------+--------+
| FIRST_NAME | LAST_NAME | AGE | SEX | INCOME |
+------------+-----------+------+------+--------+
| Mac | Mohan | 20 | M | 2000 |
+------------+-----------+------+------+--------+
1 row in set (0.00 sec)
2.4 数据库查询操作
# coding=utf8
from xlib.httpgateway import Request
from xlib import retstat
from xlib.middleware import funcattr
from xlib import db
@funcattr.api
def get(req):
"""
Args:
req : Request
Returns:
json_status, Content, headers
"""
isinstance(req, Request)
data_list = []
sql = "SELECT * FROM EMPLOYEE WHERE INCOME > {INCOME}".format(INCOME=1000)
record_list = db.my_databases["default"].execute_sql(sql).fetchall()
for record in record_list:
item_dict = {}
item_dict["fname"] = record[0]
item_dict["lname"] = record[1]
item_dict["age"] = record[2]
item_dict["sex"] = record[3]
item_dict["income"] = record[4]
data_list.append(item_dict)
return retstat.OK, {"data": data_list}, [(__info, __version)]
output
{
"stat": "OK",
"data": [{'lname': u'Mohan', 'age': 20, 'income': 2000.0, 'fname': u'Mac', 'sex': u'M'}]
}
fetchone(): 该方法获取下一个查询结果集。结果集是一个对象
record = db.my_databases["default"].execute_sql(sql).fetchone()
record = (u'Mac', u'Mohan', 20, u'M', 2000.0)
如果执行的 SQL 没有符合条件的记录时,返回 None
fetchall(): 接收全部的返回结果行
如果执行的 SQL 没有符合条件的记录时,返回 ()
2.4.1 模糊匹配
模糊匹配时语句,% 号需要使用
%%
转义
from xlib import db
sql = "select * from EMPLOYEE where FIRST_NAME like '%%{FIRST_NAME}%%'".format(FIRST_NAME="M")
record_list = db.my_databases["default"].execute_sql(sql).fetchall()
print record_list
如果需要拼接 SQL 时,使用 format
进行拼接,而不是使用 %
2.4.2 in
数字
sql = "select xxx from xxx where id in ({id_set});".format(id_set = ",".join(id_list))
字符串
例如有这么一个查询语句:
select * from server where ip in (....)
同时一个存放 ip 的列表 :['1.1.1.1','2.2.2.2','2.2.2.2']
我们希望在查询语句的 in 中放入这个 Ip 列表,这里我们首先会想到的是用 join 来对这个列表处理成一个字符串,如下:
>>> a=['1.1.1.1','2.2.2.2','2.2.2.2']
>>> ','.join(a)
'1.1.1.1,2.2.2.2,2.2.2.2'
可以看到,join 后的结果并不是我们想要的结果,因为引号的问题。所以我们会想到另外的办法:
>>> a=['1.1.1.1','2.2.2.2','2.2.2.2']
>>> ','.join(["'{item}'".format(item=item) for item in a])
"'1.1.1.1','2.2.2.2','2.2.2.2'"
2.5 数据库更新操作
code
# coding=utf8
from xlib.httpgateway import Request
from xlib import retstat
from xlib.middleware import funcattr
from xlib import db
@funcattr.api
def update(req):
"""
Args:
req : Request
Returns:
json_status, Content, headers
"""
isinstance(req, Request)
data_list = []
sql = "UPDATE EMPLOYEE SET AGE = AGE + 1 WHERE SEX = '%c'" % ('M')
rowcount = db.my_databases["default"].execute_sql(sql).rowcount
if rowcount:
return retstat.OK, {}, [(__info, __version)]
else:
return retstat.ERR, {}, [(__info, __version)]
rowcount
rowcount: 这是一个只读属性,并返回执行 execute() 方法后影响的行数。
2.6 数据库删除操作
删除操作用于删除数据表中的数据,以下实例演示了删除数据表 EMPLOYEE 中 AGE 大于 20 的所有数据:
# coding=utf8
from xlib.httpgateway import Request
from xlib import retstat
from xlib.middleware import funcattr
from xlib import db
@funcattr.api
def delete(req):
"""
Args:
req : Request
Returns:
json_status, Content, headers
"""
isinstance(req, Request)
data_list = []
sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20)
rowcount = db.my_databases["default"].execute_sql(sql).rowcount
if rowcount:
return retstat.OK, {}, [(__info, __version)]
else:
return retstat.ERR, {}, [(__info, __version)]
3 脚本通过库进行访问 MySQL
3.1 使用 Butterfly 库以及配置
#!/usr/bin/python
# coding=utf8
"""
单纯只是需要使用库时
"""
import sys
import os
# 将 butterfly 目录添加到系统路径中
cur_path = os.path.split(os.path.realpath(__file__))[0]
sys.path.insert(0, os.path.join(cur_path, '../../'))
from xlib import db
mysql_config_url = "mysql+pool://root:password@127.0.0.1:3306/test?max_connections=300&stale_timeout=300"
my_database = db.connect(url=mysql_config_url)
sql = "SELECT * FROM xxx LIMIT 3"
record_list = my_database.execute_sql(sql).fetchall()
for record in record_list:
print record
3.2 直接使用 peewee 库
import peewee
my_database = peewee.MySQLDatabase("test", user = "root", passwd = "123456", host = "127.0.0.1", port=3306, charset = "utf8")
sql = "select * from wuxing_group where wuxing_group.key ='qn_failover' and wuxing_group.value = 'off'"
record_list = my_database.execute_sql(sql).fetchall()
for record in record_list:
...
Last updated