1 MySQL 连接配置
1.1 default 配置
配置数据库连接方式
conf/config.py
Copy DATABASES = {
"default": "mysql+retrypool://root:password@127.0.0.1:3306/test?max_connections=300&stale_timeout=300",
}
Butterfly 访问 MySQL 基于 Peewee 库,Peewee 是一个简单小巧的 Python ORM,它非常容易学习,使用起来非常方便。
1.2 自定义配置(仅读取本 region 配置)
如 conf/servicer/db_x1.py 中,增加 url_x1 = "mysql+retrypool://root:password@127.0.0.1:3306/test?max_connections=300&stale_timeout=300"
Copy from xlib import db
from xlib.db import peewee
from xlib.util import config_util
def _get_db(name):
db_url = config_util.get_config(None, "db_x1", name)
db_obj = db.connect(db_url)
return db_obj
class ProcessRecords(peewee.Model):
"""
X1 task process 记录, 用于耗时统计
"""
id = peewee.BigAutoField()
task_id = peewee.CharField(constraints=[peewee.SQL("DEFAULT ''")], index=True)
task_batch_id = peewee.CharField(constraints=[peewee.SQL("DEFAULT ''")])
work_flow = peewee.CharField(constraints=[peewee.SQL("DEFAULT ''")])
entity = peewee.CharField(constraints=[peewee.SQL("DEFAULT ''")])
step = peewee.CharField(constraints=[peewee.SQL("DEFAULT ''")])
operation = peewee.CharField(constraints=[peewee.SQL("DEFAULT ''")])
created_at = peewee.DateTimeField(constraints=[peewee.SQL("DEFAULT CURRENT_TIMESTAMP")])
start_at = peewee.DateTimeField(constraints=[peewee.SQL("DEFAULT 0000-00-00 00:00:00")])
cost = peewee.IntegerField(constraints=[peewee.SQL("DEFAULT 0")])
parameters = peewee.TextField(null=True)
err_msg = peewee.TextField(null=True)
class Meta(object):
"""
meta
"""
database = _get_db("url_x1")
table_name = 'process_records'
1.3 自定义配置(多 region adapter)
lib_model.py
Copy from xlib import db
from xlib.util import config_util
def model_bind_for_region(req, model, db_url_name="url_scs"):
"""
model bind
Returns:
bool
"""
db_url = config_util.get_config(req, "db_x1", db_url_name)
db_obj = db.connect(db_url)
return model.bind(db_obj)
例子
Copy from xlib import retstat
from handlers.x1 import model_scs
from xlib.middleware import funcattr
from xlib.httpgateway import Request
from handlers.x1 import lib_model
@funcattr.api
def test_model_for_region(req, region=None):
"""
test
"""
assert isinstance(req, Request)
if region:
req.idc = region
cachecluster_model = model_scs.CacheCluster.alias("app")
print cachecluster_model._meta.database.get_tables()
data = lib_model.model_bind_for_region(req, cachecluster_model, db_url_name="url_scs")
print data
print cachecluster_model._meta.database.get_tables()
return retstat.OK
2 基本知识
编写 model.py
demo
Copy from datetime import datetime
from xlib.db.peewee import CharField
from xlib.db.peewee import IntegerField
from xlib.db.peewee import PrimaryKeyField
from xlib.db.peewee import DateTimeField
import xlib.db
# Define a model class
class Xxx(xlib.db.BaseModel):
# If none of the fields are initialized with primary_key=True,
# an auto-incrementing primary key will automatically be created and named 'id'.
ip = CharField(index=True, max_length=64)
port = IntegerField()
redis_id = PrimaryKeyField()
history_exception = IntegerField(default=1)
c_time = DateTimeField(column_name="c_time", default=datetime.now)
u_time = DateTimeField(column_name="u_time", default=datetime.now)
if __name__ == "__main__":
xlib.db.my_databases["default"].connect()
xlib.db.my_databases["default"].drop_tables([Xxx])
xlib.db.my_databases["default"].create_tables([Xxx])
在官方的 Quckstart 中,Peewee 中 Model 类、fields 和 model 实例与数据库的映射关系如下:
instance Column on a table
instance Row in a database table
也就是说
一个 model 类实例化对象则代表数据库中的一行。
Peewee 的实现原理可以结合 道生一,一生二,二生三,三生万物
2.1 字段
字段类用于描述模型属性到数据库字段的映射,每一个字段类型都有一个相应的 SQL 存储类型,如 varchar
, int
。并且 python 的数据类型和 SQL 存储类型之间的转换是透明的。
在创建模型类时,字段被定义为类属性。有一种特殊类型的字段 ForeignKeyField
,可以以更直观的方式表示模型之间的外键关系。
Copy class Message(Model):
user = ForeignKeyField(User, backref='messages')
body = TextField()
send_date = DateTimeField()
这允许你编写如下的代码:
Copy print(some_message.user.username)
for message in some_user.messages:
print(message.body)
2.1.1 字段类型表
字段类型
Sqlite
Postgresql
MySQL
2.1.2 字段初始参数
所有字段类型接受的参数与默认值
null = False
– 布尔值,表示是否允许存储空值
index = False
– 布尔值,表示是否在此列上创建索引
unique = False
– 布尔值,表示是否在此列上创建唯一索引
column_name = None
– 如果和属性名不同,底层的数据库字段使用这个值
default = None
– 字段默认值,可以是一个函数,将使用函数返回的值
primary_key = False
– 布尔值,此字段是否是主键
constraints = None
- 一个或多个约束的列表 例如:[Check('price > 0')]
sequence = None
– 序列填充字段(如果后端数据库支持)
collation = None
– 用于排序字段 / 索引的排序规则
unindexed = False
– 表示虚拟表上的字段应该是未索引的(仅用于 sqlite)
choices = None
– 一个可选的迭代器,包含两元数组(value, display)
help_text = None
– 表示字段的帮助文本
verbose_name = None
– 表示用户友好的字段名
一些字段的特殊参数
max_digits, decimal_places, auto_round, rounding
model, field, backref, on_delete, on_update, extra
2.1.3 字段默认值
创建对象时,peewee 可以为字段提供默认值,例如将字段的默认值null
设置为0
Copy class Message(Model):
context = TextField()
read_count = IntegerField(default=0)
如果想提供一个动态值,比如当前时间,可以传入一个函数
Copy class Message(Model):
context = TextField()
timestamp = DateTimeField(default=datetime.datetime.now)
数据库还可以提供字段的默认值。虽然 peewee 没有明确提供设置服务器端默认值的 API,但您可以使用 constraints
参数来指定服务器默认值:
Copy class Message(Model):
context = TextField()
timestamp = DateTimeField(constraints=[SQL('DEFAULT CURRENT_TIMESTAMP')])
2.1.4 外键字段
foreignkeyfield
是一种特殊的字段类型,允许一个模型引用另一个模型。通常外键将包含与其相关的模型的主键(但您可以通过指定一个字段来指定特定的列)。
可以通过追加 _id
的外键字段名称来访问原始外键值
Copy tweets = Tweet.select()
for tweet in tweets:
# Instead of "tweet.user", we will just get the raw ID value stored
# in the column.
print(tweet.user_id, tweet.message)
ForeignKeyField
允许将反向引用属性绑定到目标模型。隐含地,这个属性将被命名为 classname_set
,其中 classname 是类的小写名称,但可以通过参数覆盖 backref:
Copy class Message(Model):
from_user = ForeignKeyField(User)
to_user = ForeignKeyField(User, backref='received_messages')
text = TextField()
for message in some_user.message_set:
# We are iterating over all Messages whose from_user is some_user.
print(message)
for message in some_user.received_messages:
# We are iterating over all Messages whose to_user is some_user
print(message)
2.1.5 日期字段
DateField
TimeField
和 DateTimeField
字段
DateField
包含 year
month
day
TimeField
包含 hour
minute
second
DateTimeField
包含以上所有
3 使用说明
而使用过程,分成两步:
3.1 定义 Model,建立数据库
在使用的时候,根据需求先定义好 Model,然后可以通过 create_tables() 创建表,若是已经创建好数据库表了,可以通过 python -m pwiz 脚本工具直接创建 Model。
3.1.1 创建 Model
3.1.1.1 第一种方式
先定义 Model,然后通过 db.create_tables() 创建或 Model.create_table() 创建表。 例如,我们需要建一个 Person 表,里面有 name、birthday 和 is_relative 三个字段,我们定义的 Model 如下:
Copy from peewee import *
# 连接数据库
database = MySQLDatabase('test', user='root', host='localhost', port=3306)
# 定义 Person
class Person(Model):
name = CharField()
birthday = DateField()
is_relative = BooleanField()
class Meta(object):
database = database
然后,我们就可以创建表了
Copy # 创建表
Person.create_table()
# 创建表也可以这样,可以创建多个
# database.create_tables([Person])
其中,CharField、DateField、BooleanField 等这些类型与数据库中的数据类型一一对应,我们直接使用它就行,至于 CharField => varchar(255) 这种转换 Peewee 已经为我们做好了 。
完成之后,就会在数据库中看到 test 数据库中,创建好了 person 表
Copy mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| person |
+----------------+
1 rows in set (0.00 sec)
mysql> desc person;
+-------------+--------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+--------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| name | varchar(255) | NO | | NULL | |
| birthday | date | NO | | NULL | |
| is_relative | tinyint(1) | NO | | NULL | |
+-------------+--------------+------+-----+---------+----------------+
Copy 如果使用 autoconnect = True(默认值)初始化数据库,则在使用数据库之前无需显式连接到数据库。 明确地管理连接被认为是最佳实践,因此可以考虑禁用自动连接行为。
3.1.1.2 第二种方式
已经存在过数据库,则直接通过 python -m pwiz 批量创建 Model。
例如,上面我已经创建好了 test 库,并且创建了 person 表,表中拥有 id、name、birthday 和 is_relative 字段。那么,我可以使用下面命令:
Copy # 查看参数
python -m pwiz
# 指定 mysql,用户为 root,host 为 localhost,数据库为 test
python -m pwiz -e mysql -u root -H localhost -p 3306 --password test > testModel.py
然后,输入密码,pwiz 脚本会自动创建 Model,内容如下:
Copy from peewee import *
database = MySQLDatabase('test', **{'charset': 'utf8', 'use_unicode': True, 'host': 'localhost', 'user': 'root', 'password': ''})
class UnknownField(object):
def __init__(self, *_, **__): pass
class BaseModel(Model):
class Meta(object):
database = database
class Person(BaseModel):
birthday = DateField()
is_relative = IntegerField()
name = CharField()
class Meta(object):
table_name = 'person'
3.1.2 Model 定义
3.1.2.1 复合主键约束 (CompositeKey)
Copy class Person(Model):
first = CharField()
last = CharField()
class Meta(object):
primary_key = CompositeKey('first', 'last')
3.1.2.2 联合唯一索引 (indexes)
Copy class Meta(object):
indexes = (
(('字段 1', '字段 2'), True), # 字段 1 与字段 2 整体作为索引,True 代表唯一索引
(('字段 1', '字段 2'), False), # 字段 1 与字段 2 整体作为索引,False 代表普通索引
)
需要注意的是,上面语法,三层元组嵌套, 元组你懂得, 一个元素时需要加个 , 逗号。 别忘了。
3.1.3 设置数据库
使用 Peewee 配置数据库的三种方式
Copy # The usual way:
db = SqliteDatabase('my_app.db', pragmas={'journal_mode': 'wal'})
# Specify the details at run-time:
db = SqliteDatabase(None)
...
db.init(db_filename, pragmas={'journal_mode': 'wal'})
# Or use a placeholder:
db = DatabaseProxy()
...
db.initialize(SqliteDatabase('my_app.db', pragmas={'journal_mode': 'wal'}))
详情
DatabaseProxy
Copy database_proxy = DatabaseProxy() # Create a proxy for our db.
class BaseModel(Model):
class Meta(object):
database = database_proxy # Use proxy for our DB.
class User(BaseModel):
username = CharField()
# Based on configuration, use a different database.
if app.config['DEBUG']:
database = SqliteDatabase('local.db')
elif app.config['TESTING']:
database = SqliteDatabase(':memory:')
else:
database = PostgresqlDatabase('mega_production_db')
# Configure our proxy to use the db we specified in config.
database_proxy.initialize(database)
仅当实际数据库驱动程序在运行时变化时才使用 DatabaseProxy
如果只是连接值在运行时发生变化,例如数据库文件的路径或数据库主机,则应改用 Database.init()
故 DatabaseProxy 暂时不需要使用
3.2 操作数据库
操作数据库,就是增、删、改和查。
3.2.1 增 (object.save,Model.create,Model.insert)
Copy # 第一种方法插入单条数据
# 不会返回插入的自增 pk,而是成功返回 1,失败返回 0;
# 直接创建示例,然后使用 save() 就添加了一条新数据
p = Person(name='liuchungui', birthday='1990-12-20', is_relative=True)
p.save()
# 第二种方法插入单条数据
# 返回值是一个 Person 对象
p = Person.create(name='liuchungui', birthday='1990-12-20', is_relative=True)
# 第三种方法插入单条数据
# 返回值是整型,值为 id
p = Person.insert(name='liuchungui', birthday='1990-12-20', is_relative=True).execute()
#-------------------------------------------------------------------
# (1) create 方法内部调用 save 方法
# (2) save 方法内部调用的 insert 方法
#-------------------------------------------------------------------
批量写数据
Copy def create():
"""批量添加数据"""
data_source = [
(avartar, 'Catherine', '2', pwd),
(avartar, 'Jane', '2', pwd),
(avartar 'Mary', '2', pwd),
]
field = [User.avartar, User.uname, User.gender, User.password]
uid = User.insert_many(data_source, field).execute()
print('uid=%d' % uid)
3.2.2 删 (object.delete_instance,Model.delete)
删除有两种方式
使用 object.delete_instance
Copy 使用 delete().where().execute() 进行删除,where() 是条件,execute() 负责执行语句。若是已经查询出来的实例,则直接使用 delete_instance() 删除。
# 删除姓名为 perter 的数据
# 执行之后会返回影响行数
Person.delete().where(Person.name == 'perter').execute()
# 已经实例化的数据,使用 delete_instance
p = Person(name='liuchungui', birthday='1990-12-20', is_relative=False)
p.id = 1
p.save()
p.delete_instance()
3.2.3 改 (Model.update, object.save)
若是,已经添加过数据的的实例或查询到的数据实例,且表拥有 primary key 时,此时使用 save() 就是修改数据;若是未拥有实例,则使用 update().where() 进行更新数据。
Copy # 已经实例化的数据,指定了 id 这个 primary key, 则此时保存就是更新数据
p = Person(name='liuchungui', birthday="1990-12-20", is_relative=False)
p.id = 1
p.save()
# 更新 birthday 数据
q = Person.update({Person.birthday: "1983-12-21"}).where(Person.name == 'liuchungui')
q.execute()
3.2.3.1 peewee 的 update 是原子的
需要注意的是,在使用 update 的时候千万不要在 Python 中使用计算再更新,要使用 SQL 语句来更新,这样才能具有原子性。
错误做法
Copy >>> for stat in Stat.select().where(Stat.url == request.url):
... stat.counter += 1
... stat.save()
正确做法
Copy >>> query = Stat.update(counter=Stat.counter + 1).where(Stat.url == request.url)
>>> query.execute()
// output 影响行数
>>> 1
3.2.3.2 update 的几种方法
Copy # 方法一
Person.update({Person.Name: '赵六', Person.Remarks: 'abc'}).where(Person.Name=='王五').execute()
# 方法二
Person.update({'Name': '赵六', 'Remarks': 'abc'}).where(Person.Name=='张三').execute()
# 方法三
Person.update(Name='赵六', Remarks='abc').where(Person.Name=='李四').execute()
Person.update(Person.age=Person.age+1).where(Person.Name=='李四').execute()
3.2.3.3 无则插入,有则更新
两种方式
存在更新,不存在则插入 replace 与 on_conflict_replace() 是等效的
Copy class User(Model):
username = TextField(unique=True)
last_login = DateTimeField(null=True)
# last_login 值将更新,
user_id = User.replace(username='the-user', last_login=datetime.now()).execute()
user_id = User.insert(username='the-user', last_login=datetime.now()).on_conflict_replace().execute()
MySQL 提供了一种独有的语法 ON DUPLICATE KEY UPDATE 可以使用以下方法实现。
Copy class User(Model):
username = TextField(unique=True)
last_login = DateTimeField(null=True)
login_count = IntegerField()
#插入一个新用户
User.create(username='huey', login_count=0)
# 模拟用户登录。
登录计数和时间戳,要么正确创建,要么更新。
now = datetime.now()
rowid = User.insert(username='huey', last_login=now, login_count=1)
.on_conflict(preserve=[User.last_login], # 使用我们将插入的值
.update={User.login_count: User.login_count + 1}
).execute()
3.2.4 查(单条 Model.get)
3.2.4.1 get
单条数据使用 Person.get() 就行了,也可以使用 Person.select().where().get()。若是查询多条数据,则使用 Person.select().where(),去掉 get() 就行了。语法很直观,select() 就是查询,where 是条件,get 是获取第一条数据。
Copy # 查询单条数据
p = Person.get(Person.name == 'liuchungui')
print(p.name, p.birthday, p.is_relative)
# 使用 where().get() 查询
p = Person.select().where(Person.name == 'liuchungui').get()
print(p.name, p.birthday, p.is_relative)
# 查询多条数据
persons = Person.select().where(Person.is_relative == True)
for p in persons:
print(p.name, p.birthday, p.is_relative)
3.2.4.2 get_or_none
Copy 如果当获取的结果不存在时,不想报错,可以使用 Model.get_or_none() 方法,会返回 None,参数和 get 方法一致。
3.2.4.3 get_or_create
Peewee 有一个辅助方法来执行“获取 / 创建”类型的操作: Model.get_or_create() 首先尝试检索匹配的行。如果失败,将创建一个新行。
Copy p, created = Person.get_or_create(Name='赵六', defaults={'Age': 80, 'Birthday': date(1940, 1, 1)})
print(p, created)
Copy # SQL 语句
('SELECT "t1"."id", "t1"."Name", "t1"."Age", "t1"."Birthday", "t1"."Remarks" FROM "person" AS "t1" WHERE ("t1"."Name" = ?) LIMIT ? OFFSET ?', ['赵六', 1, 0])
('BEGIN', None)
('INSERT INTO "person" ("Name", "Age", "Birthday") VALUES (?, ?, ?)', ['赵六', 80, datetime.date(1940, 1, 1)])
参数:
Copy get_or_create 的参数是 **kwargs,其中 defaults 为非查询条件的参数,剩余的为尝试检索匹配的条件,这个看执行时的 SQL 语句就一目了然了。对于“创建或获取”类型逻辑,通常会依赖唯一 约束或主键来防止创建重复对象。但这并不是强制的,比如例子中,我以 Name 为条件,而 Name 并非主键。只是最好不要这样做。
返回值:
Copy get_or_create 方法有两个返回值,第一个是“获取 / 创建”的模型实例,第二个是是否新创建。
3.2.4.4 get_by_id
对于主键查找,还可以使用快捷方法 Model.get_by_id()
3.2.4.5 select
使用 Model.select() 查询获取多条数据。select 后可以添加 where 条件,如果不加则查询整个表。
select 代表 sql 语句中 select 后面的语句表示要展示的字段
where 代表 where 条件语句 得到一个数据集合
语法:
参数:
Copy fields:需要查询的字段,不传时返回所有字段。传递方式如下例所示。
示例:
Copy ps = Person.select(Person.Name, Person.Age).where(Person.Name == '张三')
select() 返回结果是一个 ModelSelect 对象,该对象可迭代、索引、切片。当查询不到结果时,不报错,返回 None。并且 select() 结果是延时返回的。如果想立即执行,可以调用 execute() 方法。
注意
Copy 注意:where 中的条件不支持 Name='张三' 这种写法,只能是 Person.Name == '张三'。
3.2.4.6 获取记录条数 count 方法
使用 .count() 方法可以获取记录条数。
Copy Person.select().count()
也许你会问,用 len() 方法可以吗?当然也是可以的,但是是一种不可取的方法。
这两者的实现方式天差地远。用 count() 方法,执行的 SQL 语句是:
Copy ('SELECT COUNT(1) FROM (SELECT 1 FROM "person" AS "t1") AS "_wrapped"', [])
而用 len() 方法执行的 SQL 语句却是:
Copy ('SELECT "t1"."id", "t1"."Name", "t1"."Age", "t1"."Birthday", "t1"."Remarks" FROM "person" AS "t1"', [])
直接返回所有记录然后获取长度,这种方法是非常不可取的。
3.2.4.7 排序 order_by 方法
Copy Person.select().order_by(Person.Age)
排序默认是升序排列,也可以用 + 或 asc() 来明确表示是升序排列:
Copy Person.select().order_by(+Person.Age)
Person.select().order_by(Person.Age.asc())
用 - 或 desc() 来表示降序:
Copy Person.select().order_by(-Person.Age)
Person.select().order_by(Person.Age.desc())
如要对多个字段进行排序,逗号分隔写就可以了
3.2.4.8 查询条件
当查询条件不止一个,需要使用逻辑运算符连接,而 Python 中的 and、or 在 Peewee 中是不支持的,此时我们需要使用 Peewee 封装好的运算符,如下:
Person.select().where((Person.Name == '张三') & (Person.Age == 30))
Person.select().where((Person.Name == '张三') | (Person.Age == 30))
Person.select().where(~Person.Name == '张三')
Copy 特别注意:有多个条件时,每个条件必须用 () 括起来。
当条件全为 and 时,也可以用逗号分隔,get 和 select 中都可以
Copy Person.get(Person.Name == '张三', Person.Age == 30)
3.2.4.9 支持的比较符
Copy 运算符 含义
== 等于
< 小于
<= 小于等于
> 大于
>= 大于等于
!= 不等于
<< x in y,其中 y 是列表或查询
>> x is y, 其中 y 可以是 None
% x like y
3.2.4.10 如何根据查询项,设置不同的搜索条件
搜索项支持 job_reqid,job_id 等
Copy query_cmd = job_model.Job.select()
expressions = []
if job_reqid is not None:
expressions.append(peewee.NodeList((job_model.Job.job_reqid, peewee.SQL('='), job_reqid)))
if job_id is not None:
expressions.append(peewee.NodeList((job_model.Job.job_id, peewee.SQL('='), job_id)))
...
if len(expressions):
query_cmd = query_cmd.where(*expressions)
# 用于返回分页总页数
record_count = query_cmd.count()
# 判断是否需要分页返回数据
if page_index is None:
record_list = query_cmd.order_by(job_model.Job.c_time.desc())
else:
record_list = query_cmd.order_by(job_model.Job.c_time.desc()).paginate(int(page_index), int(page_size))
3.3 查看 ORM 对应的原生 SQL 语句
后缀 .sql() 打印对应原生 sql
3.4 一些有用的拓展
3.4.1 模型转换成字典
除了在查询的时候使用 model.dicts 以外,还可以使用 model_to_dict(model) 这个函数。
Copy >>> user = User.create(username='meetbill')
>>> model_to_dict(user)
{'id': 1, 'username': 'meetbill'}
3.4.2 从数据库生成模型
可以使用 pwiz 工具从已有的数据库产生 peewee 的模型文件
Copy python -m pwiz -e postgresql charles_blog > blog_models.py
eg.
python -m pwiz -e mysql -u root -H <ip> -p <port> --password {database} > {database}.py
3.4.3 创建自己的 Field
peewee 中创建自己的 Field, 主要通过继承 Field 或其子类来完成,
如果有 mysql 中对应的字段,则将其赋值给 db_field 即可。这里对 set, enum 并没有找到通用的字段定义,但是对具体的业务可以进行如 GenderField 这样的个性化定制。
如果没有,则使用其父类的 db_field 字段,并定义 db_value 和 python_value 两个方法来完成与数据库中数据类型之间的转化
Copy # 时间戳字段
class TimeStampField(Field):
db_field = 'timestamp'
class SmallIntegerField(IntegerField):
db_field = 'smallint'
class PasswordField(FixedCharField):
def __init__(self, *args, **kwargs):
self.max_length =64
super(PasswordField, self).__init__(max_length=self.max_length, *args, **kwargs)
def db_value(self, value):
return encrypt(value)
def python_value(self, value):
return encrypt(value)
# 性别字段
class GenderField(Field):
db_field = 'enum("f", "m")'
使用
Copy class Base(Model):
class Meta(object):
database = db
class Person(Base):
name = CharField(max_length=20, default='haha')
intro = TextField(default='')
birth = DateTimeField(default=datetime.now())
Married = BooleanField(default=False)
height = FloatField(default=0)
wight = DoubleField(default=0)
salary = DecimalField(default=0)
Save = BigIntegerField(default=0)
family = SmallIntegerField(default=0)
age = IntegerField(default=0)
username = CharField(max_length=20)
password = PasswordField(default='')
ctime = TimeStampField()
today = DateField(default=datetime.date(datetime.today()))
now = TimeField(default=datetime.today())
secret = BlobField(default='')
gender = GenderField()
4 实践
4.1 user
ImportError: No module named dateutil //faker 使用
Copy pip install python-dateutil
Copy from xlib.db.peewee import *
from faker import Factory
from datetime import datetime
import xlib.db
# Create an instance of a Database
mysql_config_url="mysql+pool://root:123456@127.0.0.1:3306/test?max_connections=300&stale_timeout=300"
db = xlib.db.connect(url=mysql_config_url)
# Define a model class
class User(Model):
# If none of the fields are initialized with primary_key=True,
# an auto-incrementing primary key will automatically be created and named 'id'.
id = PrimaryKeyField()
email = CharField(index=True, max_length=64)
username = CharField(unique=True, max_length=32)
password = CharField(null=True, max_length=64)
createTime = DateTimeField(column_name="create_time", default=datetime.now)
class Meta(object):
database = db
table_name = 'tb_user'
# If Models without a Primary Key
# primary_key = False
def __str__(self):
return "User(id:{} email:{} username:{} password:{} createTime: {})".format(self.id, self.email, self.username, self.password, self.createTime)
db.connect()
db.drop_tables([User])
db.create_tables([User])
""" CREATE """
print("-------------CREATE")
# 创建 User 对象
user = User.create(email="meetbill@163.com", username="meetbill", password="meet")
# 保存 User
user.save()
# 创建 faker 工厂对象
faker = Factory.create()
# 利用 faker 创建多个 User 对象
fake_users = [{
'username': faker.name(),
'password': faker.word(),
'email': faker.email(),
} for i in range(5)]
# 批量插入
User.insert_many(fake_users).execute()
""" RETRIEVE/GET/FIND """
print("-------------RETRIEVE/GET/FIND")
user = User.select().where(User.id != 1).get()
print(user)
# User(id:2 email:bcalderon@hotmail.com username:Victoria Sullivan password:off createTime: 2019-08-15 23:25:59)
user = User.select().where(User.username.contains("meet")).get()
print(user)
# User(id:1 email:meetbill@163.com username:meetbill password:meet createTime: 2019-08-15 23:25:59)
count = User.select().filter(User.id >= 3).count()
print(count)
# 4
users = User.select().order_by(User.email)
for u in users:
print(u)
"""
User(id:2 email:bcalderon@hotmail.com username:Victoria Sullivan password:off createTime: 2019-08-15 23:25:59)
User(id:6 email:evanscatherine@johnson.com username:Tracy Santiago password:you createTime: 2019-08-15 23:25:59)
User(id:1 email:meetbill@163.com username:meetbill password:meet createTime: 2019-08-15 23:25:59)
User(id:5 email:nking@yahoo.com username:Marissa Mckay password:last createTime: 2019-08-15 23:25:59)
User(id:4 email:thopkins@powers-booth.biz username:Brian Wise password:country createTime: 2019-08-15 23:25:59)
User(id:3 email:xaviercastillo@robinson.com username:Victoria Turner password:him createTime: 2019-08-15 23:25:59)
"""
""" UPDATE """
print("-------------UPDATE")
effect_count = User.update({User.username: "lisi", User.email: "ls@163.com"}).where(User.id == 1).execute()
print(effect_count)
# 1
""" DELETE """
print("-------------DELETE")
effect_count = User().delete_by_id(6)
print(effect_count)
# 1
effect_count = User.delete().where(User.id >= 4).execute()
print(effect_count)
# 2
结果:
Copy -------------CREATE
-------------RETRIEVE/GET/FIND
User(id:2 email:bcalderon@hotmail.com username:Victoria Sullivan password:off createTime: 2019-08-15 23:25:59)
User(id:1 email:meetbill@163.com username:meetbill password:meet createTime: 2019-08-15 23:25:59)
4
User(id:2 email:bcalderon@hotmail.com username:Victoria Sullivan password:off createTime: 2019-08-15 23:25:59)
User(id:6 email:evanscatherine@johnson.com username:Tracy Santiago password:you createTime: 2019-08-15 23:25:59)
User(id:1 email:meetbill@163.com username:meetbill password:meet createTime: 2019-08-15 23:25:59)
User(id:5 email:nking@yahoo.com username:Marissa Mckay password:last createTime: 2019-08-15 23:25:59)
User(id:4 email:thopkins@powers-booth.biz username:Brian Wise password:country createTime: 2019-08-15 23:25:59)
User(id:3 email:xaviercastillo@robinson.com username:Victoria Turner password:him createTime: 2019-08-15 23:25:59)
-------------UPDATE
1
-------------DELETE
1
2
4.2 School
Copy # -*- coding:utf-8 -*-
#导入模块
import peewee
import datetime
#建立链接
#connect = peewee.MySQLDatabase(
# database = 'first_database', #数据库名字
# host = 'localhost', #数据库地址
# user = 'root', #数据库用户
# passwd = '123' #对应用户密码
# )
#建立链接
connect = peewee.SqliteDatabase("test.db") #运行该程序后就能在当前目录下创建“test.db”数据库
#创建表
#类名必须大写
#peewee 创建数据库的时候,默认会添加主键 id
#peewee 创建数据库字段默认不可为空
class School(peewee.Model):
name = peewee.CharField(max_length = 32) # 相当于在数据库中定义了 name char(32)
address = peewee.CharField(max_length = 32) # 相当于在数据库定义了 address char(32)
age = peewee.IntegerField() # age int
birthday = peewee.DateTimeField() #日期
#将表和数据库连接
class Meta(object):
database = connect
if __name__ =="__main__":
# peewee 的增删改查
#注意:peewee 最主要的是作增删改,查一般都用 sql 语句,查逻辑复杂 ORM 模型适应不了,
# 后面在介绍查的部分时会给出用 sql 语句查询的例子
#-----------------------------------------------------------------------------
#增
# 第一种方法,常用
T = School() #实例化表的类型
T.name = 'lishi'
T.age = 40
T.address = 'shayang'
T.birthday = datetime.datetime.now() #通过时间模块输入当前时间值
T.save() #提交
#第二种方法
T = School().insert(
name = 'shayang',
age = 40,
birthday = datetime.datetime.now(),
address = 'jinmen'
)
T.execute() #执行
#-------------------------------------------------------------------------------
#删
T =School.delete().where(School.id == 1)
T.execute() #执行指令
#-------------------------------------------------------------------------------
#改
#第一种方法,不常用
T =School.update(name = "beifang").where(School.id == 5)
T.execute()
#第二种方法,常用
T = School().get(id = 4) #实例化表的类型
T.name = 'mingzhu'
T.save()
#-------------------------------------------------------------------------------
#查
#查所有
#第一种方法
T_list = School.select()
#print(T_list)
for T in T_list:
#print(T)
print(T.name,T.age,T.address,T.birthday) #输出的第一个 u' 代表 unicode 编码
#第二种方法,按照给定的标准按顺序排列
T_list = School.select().order_by(School.age)
for T in T_list:
print(T.name,T.age,T.address,T.birthday)
#查多条,给定一个筛选条件
T_list = School.select().where(School.age == 50)
for T in T_list:
print(T.name,T.age,T.address,T.birthday)
#查一条
print "---------------------------"
T= School.get(id=3)
print(T.id,T.name,T.age,T.address,T.birthday)
#综合使用举例:
#综合举例
T_list = School.select().where(School.name == "lishi",School.age == 40)
for T in T_list:
print(T.name) #找不出则为空
#一般查,使用 SQL 语句,下面是用的 Mysql 语句,在 sqlite 里面无用
sql = "select * from school where name = \'lishi\' or age = 40"
help(School.raw(sql))
4.3 fn.FIND_IN_SET
以【星桥】列表为例, 搜索 job_tags 使用FIND_IN_SET 函数(使用 fn 功能),搜索 is_valid 使用 peewee.NodeList 进行拼接
Copy from xlib.taskflow import model
from xlib.db import shortcuts
from xlib.db import peewee
job_model = model.Job
query_cmd = job_model.select()
expressions = []
expressions.append(peewee.NodeList((job_model.is_valid, peewee.SQL('='), True)))
expressions.append(peewee.fn.FIND_IN_SET("flag=pega",job_model.job_tags))
query_cmd = query_cmd.where(*expressions)
record_list = query_cmd
print query_cmd.sql()
for record in record_list:
record_dict = shortcuts.model_to_dict(record)
print record_dict
SQL
Copy ('SELECT `t1`.`job_id`, `t1`.`job_client`, `t1`.`job_namespace`, `t1`.`job_reqid`, `t1`.`job_name`, '
'`t1`.`job_status`, `t1`.`job_type`, `t1`.`job_tags`, `t1`.`ret_stat`, `t1`.`ret_data`, '
'`t1`.`job_cost`, `t1`.`job_extra`, `t1`.`job_timeout`, `t1`.`job_lock`, `t1`.`exe_id`, '
'`t1`.`operator`, `t1`.`is_valid`, `t1`.`c_time`, `t1`.`s_time`, `t1`.`e_time` '
'FROM `workflow_job` AS `t1` '
'WHERE (`t1`.`is_valid` = %s AND FIND_IN_SET(%s, `t1`.`job_tags`))', [True, 'flag=pega'])
4.4 fn.SUM
查询统计总量:
Copy SELECT SUM("t1"."nums") AS "total" FROM "step_collect_infos" AS "t1"
使用的 peewee 的话,使用方法是:
Copy result = StepCollectInfos.select(fn.SUM(StepCollectInfos.nums).alias('total'))
说明,:
结果返回的是一个ModelSelect
示例:
Copy result = StepCollectInfos.select(fn.SUM(StepCollectInfos.nums).alias('total'))
print(result.sql())
print(type(result))
print(result.dicts()[0].get('total'))
for sds in result.dicts():
print(sds)
结果:
Copy ('SELECT SUM("t1"."nums") AS "total" FROM "step_collect_infos" AS "t1"', [])
<class 'peewee.ModelSelect'>
11633
{'total': 11633}
建议使用 result.dicts()[0]
Copy 使用 sum 的话,result 有且仅有 1 个记录
# 有结果
result.dicts()[0]: {'total': 11633}
# 没有结果
result.dicts()[0]: {'total': None}
4.5 JOIN.LEFT_OUTER
Copy class BaseModel(Model):
class Meta:
database = db
class Location(BaseModel):
location_key = CharField(primary_key=True)
lat = FloatField(null = False)
lon = FloatField(null = False)
class Household(BaseModel):
name = CharField(null=True)
location_id = CharField(null=True)
Copy query = (HouseHold
.select(HouseHold, Location)
.join(Location, on=(HouseHold.location_id == Location.location_key),
attr='location_obj', join_type=JOIN.LEFT_OUTER, src=HouseHold))
for house in query:
# if there was a match, get the location obj or None.
location_obj = getattr(house, 'location_obj', None)
# the location_id is still present.
print(house.location_id, location_obj)
可以通过 alias 将 table 自定义名
Copy unit_model = model_scs.CacheInstance
cluster_model = model_scs.CacheCluster.alias("cluster")
# select
query_cmd = unit_model.select(unit_model)
# left join
query_cmd = query_cmd.join(cluster_model, on=(unit_model.cluster_id == cluster_model.id),
attr='cluster_obj', join_type=peewee.JOIN.LEFT_OUTER, src=unit_model)
# where
expressions = []
expressions.append(peewee.NodeList((unit_model.floating_ip, peewee.SQL('='), ip)))
expressions.append(peewee.NodeList((cluster_model.status, peewee.SQL('!='), 10)))
query_cmd = query_cmd.where(*expressions)
record_list = query_cmd
record_dict_list = []
for record in record_list:
record_dict = shortcuts.model_to_dict(record)
record_dict["cache_instance_type_string"] = cache_instance_type_map[record_dict["cache_instance_type"]]
record_dict_list.append(record_dict)
连表操作: query_cmd = query_cmd.join(cluster_model, on=(unit_model.cluster_id == cluster_model.id), attr='cluster_obj', join_type=peewee.JOIN.LEFT_OUTER)
(1) query_cmd = unit_model.select(unit_model, cluster_model)
(2) cluster_obj = getattr(record, 'cluster_obj', None)
Copy 执行两次 peewee.JOIN.LEFT_OUTER 连表操作
---- query cmd
query_cmd = unit_model.select(unit_model, cluster_model, user_model)
query_cmd = query_cmd.join(cluster_model, on=(unit_model.cluster_id == cluster_model.id),attr='cluster_obj', join_type=peewee.JOIN.LEFT_OUTER)
query_cmd = query_cmd.join(user_model, on=(user_model.id == cluster_model.user_id),attr='user_obj', join_type=peewee.JOIN.LEFT_OUTER)
---- 获取结果(user_obj 需要从 cluster_obj 中获取)
cluster_obj = getattr(record, 'cluster_obj', None)
user_obj = getattr(cluster_obj, 'user_obj', None)
4.6 count
Copy qry=Contacts.select().where(Contacts.City=='Nasik').count()
print (qry)
4.7 limit
Copy one_ticket = Ticket.select().limit(1)
4.8 time 范围
如:
DateTimeField(column_name="c_time", default=datetime.now)
Copy job_model.c_time < "2022-07-25 00:00:33"
4.9 常见查询算子
在查找中(与 <<
),如 model.<field_name>.in_(<field_value>.split(",")
在哪里搜索 low <= value <= high
。