🦋
Butterfly 用户手册
  • Introduction
  • 一 前言
  • 二 开始
    • 安装部署
    • 五分钟体验指南
    • 单机使用手册
    • 应用规范
      • handler specs
      • middleware specs
      • xingqiao_plugin specs
      • yiqiu_program specs
  • 三 客户端功能
    • MySQL 原生协议
    • MySQL ORM
    • Redis 原生协议
      • redis_config
      • redis_tls
    • Redis ORM
    • Redis mcpack
    • Localcache
    • Kazoo
  • 四 应用(通用服务)
    • API JSON 规范
    • 异步任务 BaiChuan(百川)
    • 任务调度 RuQi(如期)
    • 任务编排 XingQiao(星桥)
    • 配置管理 WuXing(五行)
    • 运筹决策 BaiCe(百策)
  • 五 部署运维
    • 单机容器化部署
    • 监控
    • 异常排查
      • CPU Load spike every 7 hours
    • 升级
    • 安全
    • 其他
  • 六 前端
    • butterfly_template
    • butterfly_fe
    • butterfly-admin(json2web)
      • amis
      • sso
      • pangu
    • NoahV
    • PyWebIO
  • 七 潘多拉魔盒
    • 装饰器
      • localcache_decorator
      • retry_decorator
      • custom_decorator
      • command2http_decorator
    • 算法
      • 算法-分位数
      • 算法-变异系数
    • 实用工具
      • host_util
      • shell_util
      • http_util
      • time_util
      • random_util
      • concurrent
      • jsonschema
      • blinker
      • toml
      • command_util
      • config_util
      • picobox
      • 对称加密
        • des
        • aes
      • ascii_art
        • ttable
        • chart
      • business_rules
      • python-mysql-replication
      • dict_util
    • 中间件
      • middleware_status
      • middleware_whitelist
    • test_handler.py
  • 八 最佳实践
    • 分布式架构
    • Code practice
    • Log practice
    • Daemon process
  • 附录
Powered by GitBook
On this page
  • 1 MySQL 连接配置
  • 1.1 default 配置
  • 1.2 自定义配置(仅读取本 region 配置)
  • 1.3 自定义配置(多 region adapter)
  • 2 基本知识
  • 2.1 字段
  • 3 使用说明
  • 3.1 定义 Model,建立数据库
  • 3.2 操作数据库
  • 3.3 查看 ORM 对应的原生 SQL 语句
  • 3.4 常见查询算子
  • 3.5 一些有用的拓展
  • 4 实践
  • 4.1 user
  • 4.2 School
  • 4.3 fn.FIND_IN_SET
  • 4.4 fn.SUM
  • 4.5 JOIN.LEFT_OUTER
  • 4.6 count
  • 4.7 limit
  • 4.8 time 范围
  • 4.9 输出某个字段的最新记录
  1. 三 客户端功能

MySQL ORM

PreviousMySQL 原生协议NextRedis 原生协议

Last updated 1 month ago

peewee:

1 MySQL 连接配置

1.1 default 配置

配置数据库连接方式

conf/config.py

DATABASES = {
    "default": "mysql+retrypool://root:password@127.0.0.1:3306/test?max_connections=300&stale_timeout=300",
}

Butterfly 访问 MySQL 基于 Peewee 库,Peewee 是一个简单小巧的 Python ORM,它非常容易学习,使用起来非常方便。

1.2 自定义配置(仅读取本 region 配置)

如 conf/servicer/db_x1.py 中,增加 url_x1 = "mysql+retrypool://root:password@127.0.0.1:3306/test?max_connections=300&stale_timeout=300"

from xlib import db
from xlib.db import peewee
from xlib.util import config_util


def _get_db(name):
    db_url = config_util.get_config(None, "db_x1", name)
    db_obj = db.connect(db_url)
    return db_obj
    
class ProcessRecords(peewee.Model):
    """
    X1 task process 记录, 用于耗时统计
    """
    id = peewee.BigAutoField()
    task_id = peewee.CharField(constraints=[peewee.SQL("DEFAULT ''")], index=True)
    task_batch_id = peewee.CharField(constraints=[peewee.SQL("DEFAULT ''")])
    work_flow = peewee.CharField(constraints=[peewee.SQL("DEFAULT ''")])
    entity = peewee.CharField(constraints=[peewee.SQL("DEFAULT ''")])
    step = peewee.CharField(constraints=[peewee.SQL("DEFAULT ''")])
    operation = peewee.CharField(constraints=[peewee.SQL("DEFAULT ''")])
    created_at = peewee.DateTimeField(constraints=[peewee.SQL("DEFAULT CURRENT_TIMESTAMP")])
    start_at = peewee.DateTimeField(constraints=[peewee.SQL("DEFAULT 0000-00-00 00:00:00")])
    cost = peewee.IntegerField(constraints=[peewee.SQL("DEFAULT 0")])
    parameters = peewee.TextField(null=True)
    err_msg = peewee.TextField(null=True)

    class Meta(object):
        """
        meta
        """
        database = _get_db("url_x1")
        table_name = 'process_records'

1.3 自定义配置(多 region adapter)

lib_model.py

from xlib import db
from xlib.util import config_util

def model_bind_for_region(req, model, db_url_name="url_scs"):
    """
    model bind

    Returns:
        bool
    """
    db_url = config_util.get_config(req, "db_x1", db_url_name)
    db_obj = db.connect(db_url)
    return model.bind(db_obj)

例子

from xlib import retstat
from handlers.x1 import model_scs
from xlib.middleware import funcattr
from xlib.httpgateway import Request

from handlers.x1 import lib_model

@funcattr.api
def test_model_for_region(req, region=None):
    """
    test
    """
    assert isinstance(req, Request)
    if region:
        req.idc = region

    cachecluster_model = model_scs.CacheCluster.alias("app")
    print cachecluster_model._meta.database.get_tables()

    data = lib_model.model_bind_for_region(req, cachecluster_model, db_url_name="url_scs")
    print data
    print cachecluster_model._meta.database.get_tables()
    return retstat.OK

2 基本知识

编写 model.py

demo

from datetime import datetime

from xlib.db.peewee import CharField
from xlib.db.peewee import IntegerField
from xlib.db.peewee import PrimaryKeyField
from xlib.db.peewee import DateTimeField
import xlib.db


# Define a model class
class Xxx(xlib.db.BaseModel):
    # If none of the fields are initialized with primary_key=True,
    # an auto-incrementing primary key will automatically be created and named 'id'.
    ip = CharField(index=True, max_length=64)
    port = IntegerField()
    redis_id = PrimaryKeyField()
    history_exception = IntegerField(default=1)
    c_time = DateTimeField(column_name="c_time", default=datetime.now)
    u_time = DateTimeField(column_name="u_time", default=datetime.now)


if __name__ == "__main__":
    xlib.db.my_databases["default"].connect()
    xlib.db.my_databases["default"].drop_tables([Xxx])
    xlib.db.my_databases["default"].create_tables([Xxx])

在官方的 Quckstart 中,Peewee 中 Model 类、fields 和 model 实例与数据库的映射关系如下:

Object
Corresponds to…

Model

class Database table

Field

instance Column on a table

Model

instance Row in a database table

也就是说

  • 一个 Model 类代表一个数据库的表

  • 一个 Field 字段代表数据库中的一个字段

  • 一个 model 类实例化对象则代表数据库中的一行。

2.1 字段

字段类用于描述模型属性到数据库字段的映射,每一个字段类型都有一个相应的 SQL 存储类型,如 varchar, int。并且 python 的数据类型和 SQL 存储类型之间的转换是透明的。

在创建模型类时,字段被定义为类属性。有一种特殊类型的字段 ForeignKeyField,可以以更直观的方式表示模型之间的外键关系。

class Message(Model):
    user = ForeignKeyField(User, backref='messages')
    body = TextField()
    send_date = DateTimeField()

这允许你编写如下的代码:

print(some_message.user.username)
for message in some_user.messages:
    print(message.body)

2.1.1 字段类型表

字段类型
Sqlite
Postgresql
MySQL

IntegerField

integer

integer

integer

BigIntegerField

integer

bigint

bigint

SmallIntegerField

integer

smallint

smallint

AutoField

integer

serial

integer

FloatField

real

real

real

DoubleField

real

double precision

double precision

DecimalField

decimal

numeric

numeric

CharField

varchar

varchar

varchar

FixedCharField

char

char

char

TextField

text

text

longtext

BlobField

blob

bytea

blob

BitField

integer

bigint

bigint

BigBitField

blob

bytea

blob

UUIDField

text

uuid

varchar(40)

DateTimeField

datetime

timestamp

datetime

DateField

date

date

date

TimeField

time

time

time

TimestampField

integer

integer

integer

IPField

integer

bigint

bigint

BooleanField

integer

boolean

bool

BareField

untyped

不支持

不支持

ForeignKeyField

integer

integer

integer

2.1.2 字段初始参数

所有字段类型接受的参数与默认值

  • null = False – 布尔值,表示是否允许存储空值

  • index = False – 布尔值,表示是否在此列上创建索引

  • unique = False – 布尔值,表示是否在此列上创建唯一索引

  • column_name = None – 如果和属性名不同,底层的数据库字段使用这个值

  • default = None – 字段默认值,可以是一个函数,将使用函数返回的值

  • primary_key = False – 布尔值,此字段是否是主键

  • constraints = None - 一个或多个约束的列表 例如:[Check('price > 0')]

  • sequence = None – 序列填充字段(如果后端数据库支持)

  • collation = None – 用于排序字段 / 索引的排序规则

  • unindexed = False – 表示虚拟表上的字段应该是未索引的(仅用于 sqlite)

  • choices = None – 一个可选的迭代器,包含两元数组(value, display)

  • help_text = None – 表示字段的帮助文本

  • verbose_name = None – 表示用户友好的字段名

一些字段的特殊参数

字段类型
特殊参数

CharField

max_length

FixedCharField

max_length

DateTimeField

formats

DateField

formats

TimeField

formats

TimestampField

resolution, utc

DecimalField

max_digits, decimal_places, auto_round, rounding

ForeignKeyField

model, field, backref, on_delete, on_update, extra

BareField

coerce

2.1.3 字段默认值

创建对象时,peewee 可以为字段提供默认值,例如将字段的默认值null设置为0

class Message(Model):
    context = TextField()
    read_count = IntegerField(default=0)

如果想提供一个动态值,比如当前时间,可以传入一个函数

class Message(Model):
    context = TextField()
    timestamp = DateTimeField(default=datetime.datetime.now)

数据库还可以提供字段的默认值。虽然 peewee 没有明确提供设置服务器端默认值的 API,但您可以使用 constraints 参数来指定服务器默认值:

class Message(Model):
    context = TextField()
    timestamp = DateTimeField(constraints=[SQL('DEFAULT CURRENT_TIMESTAMP')])

2.1.4 外键字段

foreignkeyfield 是一种特殊的字段类型,允许一个模型引用另一个模型。通常外键将包含与其相关的模型的主键(但您可以通过指定一个字段来指定特定的列)。

可以通过追加 _id 的外键字段名称来访问原始外键值

tweets = Tweet.select()
for tweet in tweets:
    # Instead of "tweet.user", we will just get the raw ID value stored
    # in the column.
    print(tweet.user_id, tweet.message)

ForeignKeyField 允许将反向引用属性绑定到目标模型。隐含地,这个属性将被命名为 classname_set,其中 classname 是类的小写名称,但可以通过参数覆盖 backref:

class Message(Model):
    from_user = ForeignKeyField(User)
    to_user = ForeignKeyField(User, backref='received_messages')
    text = TextField()

for message in some_user.message_set:
    # We are iterating over all Messages whose from_user is some_user.
    print(message)

for message in some_user.received_messages:
    # We are iterating over all Messages whose to_user is some_user
    print(message)

2.1.5 日期字段

DateField TimeField 和 DateTimeField 字段

DateField 包含 year month day TimeField 包含 hour minute second DateTimeField 包含以上所有

3 使用说明

而使用过程,分成两步:

  • 定义 Model,建立数据库

  • 操作数据库

3.1 定义 Model,建立数据库

在使用的时候,根据需求先定义好 Model,然后可以通过 create_tables() 创建表,若是已经创建好数据库表了,可以通过 python -m pwiz 脚本工具直接创建 Model。

3.1.1 创建 Model

3.1.1.1 第一种方式

先定义 Model,然后通过 db.create_tables() 创建或 Model.create_table() 创建表。 例如,我们需要建一个 Person 表,里面有 name、birthday 和 is_relative 三个字段,我们定义的 Model 如下:

from peewee import *

# 连接数据库
database = MySQLDatabase('test', user='root', host='localhost', port=3306)

# 定义 Person
class Person(Model):
    name = CharField()
    birthday = DateField()
    is_relative = BooleanField()

    class Meta(object):
        database = database

然后,我们就可以创建表了

# 创建表
Person.create_table()

# 创建表也可以这样,可以创建多个
# database.create_tables([Person])

其中,CharField、DateField、BooleanField 等这些类型与数据库中的数据类型一一对应,我们直接使用它就行,至于 CharField => varchar(255) 这种转换 Peewee 已经为我们做好了 。

完成之后,就会在数据库中看到 test 数据库中,创建好了 person 表

mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| person         |
+----------------+
1 rows in set (0.00 sec)

mysql> desc person;
+-------------+--------------+------+-----+---------+----------------+
| Field       | Type         | Null | Key | Default | Extra          |
+-------------+--------------+------+-----+---------+----------------+
| id          | int(11)      | NO   | PRI | NULL    | auto_increment |
| name        | varchar(255) | NO   |     | NULL    |                |
| birthday    | date         | NO   |     | NULL    |                |
| is_relative | tinyint(1)   | NO   |     | NULL    |                |
+-------------+--------------+------+-----+---------+----------------+
如果使用 autoconnect = True(默认值)初始化数据库,则在使用数据库之前无需显式连接到数据库。 明确地管理连接被认为是最佳实践,因此可以考虑禁用自动连接行为。

3.1.1.2 第二种方式

已经存在过数据库,则直接通过 python -m pwiz 批量创建 Model。

例如,上面我已经创建好了 test 库,并且创建了 person 表,表中拥有 id、name、birthday 和 is_relative 字段。那么,我可以使用下面命令:

# 查看参数
python -m pwiz

# 指定 mysql,用户为 root,host 为 localhost,数据库为 test
python -m pwiz -e mysql -u root -H localhost -p 3306 --password test > testModel.py

然后,输入密码,pwiz 脚本会自动创建 Model,内容如下:

from peewee import *

database = MySQLDatabase('test', **{'charset': 'utf8', 'use_unicode': True, 'host': 'localhost', 'user': 'root', 'password': ''})

class UnknownField(object):
    def __init__(self, *_, **__): pass

class BaseModel(Model):
    class Meta(object):
        database = database

class Person(BaseModel):
    birthday = DateField()
    is_relative = IntegerField()
    name = CharField()

    class Meta(object):
        table_name = 'person'

3.1.2 Model 定义

3.1.2.1 复合主键约束 (CompositeKey)

class Person(Model):
    first = CharField()
    last = CharField()
    class Meta(object):
        primary_key = CompositeKey('first', 'last')

3.1.2.2 联合唯一索引 (indexes)

class Meta(object):
    indexes = (
        (('字段 1', '字段 2'), True),    # 字段 1 与字段 2 整体作为索引,True 代表唯一索引
        (('字段 1', '字段 2'), False),   # 字段 1 与字段 2 整体作为索引,False 代表普通索引
    )

需要注意的是,上面语法,三层元组嵌套, 元组你懂得, 一个元素时需要加个 , 逗号。 别忘了。

3.1.3 设置数据库

使用 Peewee 配置数据库的三种方式

# The usual way:
db = SqliteDatabase('my_app.db', pragmas={'journal_mode': 'wal'})


# Specify the details at run-time:
db = SqliteDatabase(None)
...
db.init(db_filename, pragmas={'journal_mode': 'wal'})


# Or use a placeholder:
db = DatabaseProxy()
...
db.initialize(SqliteDatabase('my_app.db', pragmas={'journal_mode': 'wal'}))

DatabaseProxy

database_proxy = DatabaseProxy()  # Create a proxy for our db.

class BaseModel(Model):
    class Meta(object):
        database = database_proxy  # Use proxy for our DB.

class User(BaseModel):
    username = CharField()

# Based on configuration, use a different database.
if app.config['DEBUG']:
    database = SqliteDatabase('local.db')
elif app.config['TESTING']:
    database = SqliteDatabase(':memory:')
else:
    database = PostgresqlDatabase('mega_production_db')

# Configure our proxy to use the db we specified in config.
database_proxy.initialize(database)

仅当实际数据库驱动程序在运行时变化时才使用 DatabaseProxy

如果只是连接值在运行时发生变化,例如数据库文件的路径或数据库主机,则应改用 Database.init()

故 DatabaseProxy 暂时不需要使用

3.2 操作数据库

操作数据库,就是增、删、改和查。

3.2.1 增 (object.save,Model.create,Model.insert)

# 第一种方法插入单条数据
# 不会返回插入的自增 pk,而是成功返回 1,失败返回 0;
# 直接创建示例,然后使用 save() 就添加了一条新数据
p = Person(name='liuchungui', birthday='1990-12-20', is_relative=True)
p.save()


# 第二种方法插入单条数据
# 返回值是一个 Person 对象
p = Person.create(name='liuchungui', birthday='1990-12-20', is_relative=True)

# 第三种方法插入单条数据
# 返回值是整型,值为 id
p = Person.insert(name='liuchungui', birthday='1990-12-20', is_relative=True).execute()


#-------------------------------------------------------------------
# (1) create 方法内部调用 save 方法
# (2) save 方法内部调用的 insert 方法
#-------------------------------------------------------------------

批量写数据

def create():
    """批量添加数据"""
    data_source = [
        (avartar, 'Catherine', '2', pwd),
        (avartar, 'Jane', '2', pwd),
        (avartar 'Mary', '2', pwd),
    ]
    field = [User.avartar, User.uname, User.gender, User.password]
    uid = User.insert_many(data_source, field).execute()
    print('uid=%d' % uid)

3.2.2 删 (object.delete_instance,Model.delete)

删除有两种方式

  • 使用 object.delete_instance

  • 使用 Model.delete

使用 delete().where().execute() 进行删除,where() 是条件,execute() 负责执行语句。若是已经查询出来的实例,则直接使用 delete_instance() 删除。
# 删除姓名为 perter 的数据
# 执行之后会返回影响行数
Person.delete().where(Person.name == 'perter').execute()

# 已经实例化的数据,使用 delete_instance
p = Person(name='liuchungui', birthday='1990-12-20', is_relative=False)
p.id = 1
p.save()
p.delete_instance()

3.2.3 改 (Model.update, object.save)

若是,已经添加过数据的的实例或查询到的数据实例,且表拥有 primary key 时,此时使用 save() 就是修改数据;若是未拥有实例,则使用 update().where() 进行更新数据。

# 已经实例化的数据,指定了 id 这个 primary key, 则此时保存就是更新数据
p = Person(name='liuchungui', birthday="1990-12-20", is_relative=False)
p.id = 1
p.save()

# 更新 birthday 数据
q = Person.update({Person.birthday: "1983-12-21"}).where(Person.name == 'liuchungui')
# 返回影响行数
q.execute()

3.2.3.1 peewee 的 update 是原子的

需要注意的是,在使用 update 的时候千万不要在 Python 中使用计算再更新,要使用 SQL 语句来更新,这样才能具有原子性。

错误做法

>>> for stat in Stat.select().where(Stat.url == request.url):
...     stat.counter += 1
...     stat.save()

正确做法

>>> query = Stat.update(counter=Stat.counter + 1).where(Stat.url == request.url)
>>> query.execute()

// output 影响行数
>>> 1

3.2.3.2 update 的几种方法

# 方法一
Person.update({Person.Name: '赵六', Person.Remarks: 'abc'}).where(Person.Name=='王五').execute()

# 方法二
Person.update({'Name': '赵六', 'Remarks': 'abc'}).where(Person.Name=='张三').execute()

# 方法三
Person.update(Name='赵六', Remarks='abc').where(Person.Name=='李四').execute()
Person.update(Person.age=Person.age+1).where(Person.Name=='李四').execute()

3.2.3.3 无则插入,有则更新

两种方式

存在更新,不存在则插入 replace 与 on_conflict_replace() 是等效的

class User(Model):
    username = TextField(unique=True)
    last_login = DateTimeField(null=True)

# last_login 值将更新,
user_id = User.replace(username='the-user', last_login=datetime.now()).execute()
user_id = User.insert(username='the-user', last_login=datetime.now()).on_conflict_replace().execute()

MySQL 提供了一种独有的语法 ON DUPLICATE KEY UPDATE 可以使用以下方法实现。

class User(Model):
    username = TextField(unique=True)
    last_login = DateTimeField(null=True)
    login_count = IntegerField()

#插入一个新用户
User.create(username='huey', login_count=0)

# 模拟用户登录。
登录计数和时间戳,要么正确创建,要么更新。

now = datetime.now()
rowid = User.insert(username='huey', last_login=now, login_count=1)
         .on_conflict(preserve=[User.last_login],  # 使用我们将插入的值
         .update={User.login_count: User.login_count + 1}
         ).execute()

3.2.4 查(单条 Model.get)

3.2.4.1 get

单条数据使用 Person.get() 就行了,也可以使用 Person.select().where().get()。若是查询多条数据,则使用 Person.select().where(),去掉 get() 就行了。语法很直观,select() 就是查询,where 是条件,get 是获取第一条数据。

# 查询单条数据
p = Person.get(Person.name == 'liuchungui')
print(p.name, p.birthday, p.is_relative)

# 使用 where().get() 查询
p = Person.select().where(Person.name == 'liuchungui').get()
print(p.name, p.birthday, p.is_relative)

# 查询多条数据
persons = Person.select().where(Person.is_relative == True)
for p in persons:
    print(p.name, p.birthday, p.is_relative)

3.2.4.2 get_or_none

如果当获取的结果不存在时,不想报错,可以使用 Model.get_or_none() 方法,会返回 None,参数和 get 方法一致。

3.2.4.3 get_or_create

Peewee 有一个辅助方法来执行“获取 / 创建”类型的操作: Model.get_or_create() 首先尝试检索匹配的行。如果失败,将创建一个新行。

p, created = Person.get_or_create(Name='赵六', defaults={'Age': 80, 'Birthday': date(1940, 1, 1)})
print(p, created)
# SQL 语句
('SELECT "t1"."id", "t1"."Name", "t1"."Age", "t1"."Birthday", "t1"."Remarks" FROM "person" AS "t1" WHERE ("t1"."Name" = ?) LIMIT ? OFFSET ?', ['赵六', 1, 0])
('BEGIN', None)
('INSERT INTO "person" ("Name", "Age", "Birthday") VALUES (?, ?, ?)', ['赵六', 80, datetime.date(1940, 1, 1)])

参数:

get_or_create 的参数是 **kwargs,其中 defaults 为非查询条件的参数,剩余的为尝试检索匹配的条件,这个看执行时的 SQL 语句就一目了然了。对于“创建或获取”类型逻辑,通常会依赖唯一 约束或主键来防止创建重复对象。但这并不是强制的,比如例子中,我以 Name 为条件,而 Name 并非主键。只是最好不要这样做。

返回值:

get_or_create 方法有两个返回值,第一个是“获取 / 创建”的模型实例,第二个是是否新创建。

3.2.4.4 get_by_id

对于主键查找,还可以使用快捷方法 Model.get_by_id()

Person.get_by_id(1)

3.2.4.5 select

使用 Model.select() 查询获取多条数据。select 后可以添加 where 条件,如果不加则查询整个表。

  • select 代表 sql 语句中 select 后面的语句表示要展示的字段

  • where 代表 where 条件语句 得到一个数据集合

语法:

select(*fields)

参数:

fields:需要查询的字段,不传时返回所有字段。传递方式如下例所示。

示例:

ps = Person.select(Person.Name, Person.Age).where(Person.Name == '张三')

select() 返回结果是一个 ModelSelect 对象,该对象可迭代、索引、切片。当查询不到结果时,不报错,返回 None。并且 select() 结果是延时返回的。如果想立即执行,可以调用 execute() 方法。

注意

注意:where 中的条件不支持 Name='张三' 这种写法,只能是 Person.Name == '张三'。

3.2.4.6 获取记录条数 count 方法

使用 .count() 方法可以获取记录条数。

Person.select().count()

也许你会问,用 len() 方法可以吗?当然也是可以的,但是是一种不可取的方法。

len(Person.select())

这两者的实现方式天差地远。用 count() 方法,执行的 SQL 语句是:

('SELECT COUNT(1) FROM (SELECT 1 FROM "person" AS "t1") AS "_wrapped"', [])

而用 len() 方法执行的 SQL 语句却是:

('SELECT "t1"."id", "t1"."Name", "t1"."Age", "t1"."Birthday", "t1"."Remarks" FROM "person" AS "t1"', [])

直接返回所有记录然后获取长度,这种方法是非常不可取的。

3.2.4.7 排序 order_by 方法

Person.select().order_by(Person.Age)

排序默认是升序排列,也可以用 + 或 asc() 来明确表示是升序排列:

Person.select().order_by(+Person.Age)
Person.select().order_by(Person.Age.asc())

用 - 或 desc() 来表示降序:

Person.select().order_by(-Person.Age)
Person.select().order_by(Person.Age.desc())

如要对多个字段进行排序,逗号分隔写就可以了

3.2.4.8 查询条件

当查询条件不止一个,需要使用逻辑运算符连接,而 Python 中的 and、or 在 Peewee 中是不支持的,此时我们需要使用 Peewee 封装好的运算符,如下:

逻辑符
含义
样例

&

and

Person.select().where((Person.Name == '张三') & (Person.Age == 30))

`

`

or

Person.select().where((Person.Name == '张三') | (Person.Age == 30))

~

not

Person.select().where(~Person.Name == '张三')

特别注意:有多个条件时,每个条件必须用 () 括起来。

当条件全为 and 时,也可以用逗号分隔,get 和 select 中都可以

Person.get(Person.Name == '张三', Person.Age == 30)

3.2.4.9 支持的比较符

运算符    含义
==    等于
<    小于
<=    小于等于
>    大于
>=    大于等于
!=    不等于
<<    x in y,其中 y 是列表或查询
>>    x is y, 其中 y 可以是 None
%    x like y

3.2.4.10 如何根据查询项,设置不同的搜索条件

搜索项支持 job_reqid,job_id 等

query_cmd = job_model.Job.select()
expressions = []
if job_reqid is not None:
    expressions.append(peewee.NodeList((job_model.Job.job_reqid, peewee.SQL('='), job_reqid)))

if job_id is not None:
    expressions.append(peewee.NodeList((job_model.Job.job_id, peewee.SQL('='), job_id)))

...

if len(expressions):
    query_cmd = query_cmd.where(*expressions)

# 用于返回分页总页数
record_count = query_cmd.count()

# 判断是否需要分页返回数据
if page_index is None:
    record_list = query_cmd.order_by(job_model.Job.c_time.desc())
else:
    record_list = query_cmd.order_by(job_model.Job.c_time.desc()).paginate(int(page_index), int(page_size))

3.3 查看 ORM 对应的原生 SQL 语句

后缀 .sql() 打印对应原生 sql

.....ORM 语句.sql()

3.4 常见查询算子

.in_(value)

在查找中(与 << ),如 model.<field_name>.in_(<field_value>.split(",")

.not_in(value)

不在查找中。

.is_null(is_null)

为空或不为空。接受布尔参数。

.contains(substr)

子字符串的通配符搜索。

.startswith(prefix)

搜索以开头的值 prefix .

.endswith(suffix)

搜索以结尾的值 suffix .

.between(low, high)

在哪里搜索 low <= value <= high 。

.regexp(exp)

正则表达式匹配(区分大小写)。

.iregexp(exp)

正则表达式匹配(不区分大小写)。

.bin_and(value)

二进制和

.bin_or(value)

二进制或

.concat(other)

使用连接两个字符串或对象 || .

.distinct()

为非重复选择标记列。

.collate(collation)

使用给定的排序规则指定列。

.cast(type)

将列的值强制转换为给定的类型。

3.5 一些有用的拓展

3.5.1 模型转换成字典

除了在查询的时候使用 model.dicts 以外,还可以使用 model_to_dict(model) 这个函数。

>>> user = User.create(username='meetbill')
>>> model_to_dict(user)
{'id': 1, 'username': 'meetbill'}

3.5.2 从数据库生成模型

可以使用 pwiz 工具从已有的数据库产生 peewee 的模型文件

python -m pwiz -e postgresql charles_blog > blog_models.py

eg.
python -m pwiz -e mysql -u root -H  <ip> -p <port> --password {database} > {database}.py

直接拉取最新的 peewee 项目库,在 peewee 项目库中执行产生 peewee 模型文件即可

3.5.3 创建自己的 Field

peewee 中创建自己的 Field, 主要通过继承 Field 或其子类来完成,

  • 如果有 mysql 中对应的字段,则将其赋值给 db_field 即可。这里对 set, enum 并没有找到通用的字段定义,但是对具体的业务可以进行如 GenderField 这样的个性化定制。

  • 如果没有,则使用其父类的 db_field 字段,并定义 db_value 和 python_value 两个方法来完成与数据库中数据类型之间的转化

# 时间戳字段
class TimeStampField(Field):
    db_field = 'timestamp'

class SmallIntegerField(IntegerField):
    db_field = 'smallint'

class PasswordField(FixedCharField):

    def __init__(self, *args, **kwargs):
        self.max_length =64
        super(PasswordField, self).__init__(max_length=self.max_length, *args, **kwargs)

    def db_value(self, value):
        return encrypt(value)

    def python_value(self, value):
        return encrypt(value)

# 性别字段
class GenderField(Field):
    db_field = 'enum("f", "m")'

使用

class Base(Model):
    class Meta(object):
        database = db

class Person(Base):
    name = CharField(max_length=20, default='haha')
    intro = TextField(default='')
    birth = DateTimeField(default=datetime.now())
    Married = BooleanField(default=False)
    height = FloatField(default=0)
    wight = DoubleField(default=0)
    salary = DecimalField(default=0)
    Save = BigIntegerField(default=0)
    family = SmallIntegerField(default=0)
    age = IntegerField(default=0)
    username = CharField(max_length=20)
    password = PasswordField(default='')
    ctime = TimeStampField()
    today = DateField(default=datetime.date(datetime.today()))
    now = TimeField(default=datetime.today())
    secret = BlobField(default='')
    gender = GenderField()

4 实践

4.1 user

ImportError: No module named dateutil //faker 使用

pip  install python-dateutil
from xlib.db.peewee import *
from faker import Factory
from datetime import datetime
import xlib.db

# Create an instance of a Database
mysql_config_url="mysql+pool://root:123456@127.0.0.1:3306/test?max_connections=300&stale_timeout=300"
db = xlib.db.connect(url=mysql_config_url)


# Define a model class
class User(Model):
    # If none of the fields are initialized with primary_key=True,
    # an auto-incrementing primary key will automatically be created and named 'id'.
    id = PrimaryKeyField()
    email = CharField(index=True, max_length=64)
    username = CharField(unique=True, max_length=32)
    password = CharField(null=True, max_length=64)
    createTime = DateTimeField(column_name="create_time", default=datetime.now)
    class Meta(object):
        database = db
        table_name = 'tb_user'
        # If Models without a Primary Key
        # primary_key = False

    def __str__(self):
        return "User(id:{} email:{} username:{} password:{} createTime: {})".format(self.id, self.email, self.username, self.password, self.createTime)


db.connect()
db.drop_tables([User])
db.create_tables([User])

""" CREATE """
print("-------------CREATE")

# 创建 User 对象
user = User.create(email="meetbill@163.com", username="meetbill", password="meet")
# 保存 User
user.save()

# 创建 faker 工厂对象
faker = Factory.create()
# 利用 faker 创建多个 User 对象
fake_users = [{
    'username': faker.name(),
    'password': faker.word(),
    'email': faker.email(),
} for i in range(5)]
# 批量插入
User.insert_many(fake_users).execute()

""" RETRIEVE/GET/FIND """
print("-------------RETRIEVE/GET/FIND")

user = User.select().where(User.id != 1).get()
print(user)
# User(id:2 email:bcalderon@hotmail.com username:Victoria Sullivan password:off createTime: 2019-08-15 23:25:59)

user = User.select().where(User.username.contains("meet")).get()
print(user)
# User(id:1 email:meetbill@163.com username:meetbill password:meet createTime: 2019-08-15 23:25:59)

count = User.select().filter(User.id >= 3).count()
print(count)
# 4

users = User.select().order_by(User.email)
for u in users:
    print(u)
"""
User(id:2 email:bcalderon@hotmail.com username:Victoria Sullivan password:off createTime: 2019-08-15 23:25:59)
User(id:6 email:evanscatherine@johnson.com username:Tracy Santiago password:you createTime: 2019-08-15 23:25:59)
User(id:1 email:meetbill@163.com username:meetbill password:meet createTime: 2019-08-15 23:25:59)
User(id:5 email:nking@yahoo.com username:Marissa Mckay password:last createTime: 2019-08-15 23:25:59)
User(id:4 email:thopkins@powers-booth.biz username:Brian Wise password:country createTime: 2019-08-15 23:25:59)
User(id:3 email:xaviercastillo@robinson.com username:Victoria Turner password:him createTime: 2019-08-15 23:25:59)
"""

""" UPDATE """
print("-------------UPDATE")

effect_count = User.update({User.username: "lisi", User.email: "ls@163.com"}).where(User.id == 1).execute()
print(effect_count)
# 1

""" DELETE """
print("-------------DELETE")

effect_count = User().delete_by_id(6)
print(effect_count)
# 1

effect_count = User.delete().where(User.id >= 4).execute()
print(effect_count)
# 2

结果:

-------------CREATE
-------------RETRIEVE/GET/FIND
User(id:2 email:bcalderon@hotmail.com username:Victoria Sullivan password:off createTime: 2019-08-15 23:25:59)
User(id:1 email:meetbill@163.com username:meetbill password:meet createTime: 2019-08-15 23:25:59)
4
User(id:2 email:bcalderon@hotmail.com username:Victoria Sullivan password:off createTime: 2019-08-15 23:25:59)
User(id:6 email:evanscatherine@johnson.com username:Tracy Santiago password:you createTime: 2019-08-15 23:25:59)
User(id:1 email:meetbill@163.com username:meetbill password:meet createTime: 2019-08-15 23:25:59)
User(id:5 email:nking@yahoo.com username:Marissa Mckay password:last createTime: 2019-08-15 23:25:59)
User(id:4 email:thopkins@powers-booth.biz username:Brian Wise password:country createTime: 2019-08-15 23:25:59)
User(id:3 email:xaviercastillo@robinson.com username:Victoria Turner password:him createTime: 2019-08-15 23:25:59)
-------------UPDATE
1
-------------DELETE
1
2

4.2 School

# -*- coding:utf-8 -*-

#导入模块
import peewee
import datetime
#建立链接
#connect = peewee.MySQLDatabase(
#    database = 'first_database', #数据库名字
#    host = 'localhost',  #数据库地址
#    user = 'root',  #数据库用户
#    passwd = '123'  #对应用户密码
#    )


#建立链接
connect = peewee.SqliteDatabase("test.db") #运行该程序后就能在当前目录下创建“test.db”数据库

#创建表
    #类名必须大写
    #peewee 创建数据库的时候,默认会添加主键 id
    #peewee 创建数据库字段默认不可为空
class School(peewee.Model):
    name = peewee.CharField(max_length = 32) # 相当于在数据库中定义了 name char(32)
    address = peewee.CharField(max_length = 32) # 相当于在数据库定义了 address char(32)
    age = peewee.IntegerField() # age int
    birthday = peewee.DateTimeField() #日期

    #将表和数据库连接
    class Meta(object):
        database = connect

if __name__ =="__main__":
    # peewee 的增删改查
    #注意:peewee 最主要的是作增删改,查一般都用 sql 语句,查逻辑复杂 ORM 模型适应不了,
    #     后面在介绍查的部分时会给出用 sql 语句查询的例子

#-----------------------------------------------------------------------------
    #增
        # 第一种方法,常用
    T = School() #实例化表的类型
    T.name = 'lishi'
    T.age = 40
    T.address = 'shayang'
    T.birthday = datetime.datetime.now() #通过时间模块输入当前时间值
    T.save()   #提交
        #第二种方法
    T = School().insert(
        name = 'shayang',
        age = 40,
        birthday = datetime.datetime.now(),
        address = 'jinmen'
    )
    T.execute()  #执行
#-------------------------------------------------------------------------------
    #删
    T =School.delete().where(School.id == 1)
    T.execute()  #执行指令
#-------------------------------------------------------------------------------

    #改
        #第一种方法,不常用
    T =School.update(name = "beifang").where(School.id == 5)
    T.execute()
        #第二种方法,常用
    T = School().get(id = 4)  #实例化表的类型
    T.name = 'mingzhu'
    T.save()

#-------------------------------------------------------------------------------

    #查
        #查所有
            #第一种方法
    T_list = School.select()
    #print(T_list)
    for T in T_list:
        #print(T)
        print(T.name,T.age,T.address,T.birthday) #输出的第一个 u' 代表 unicode 编码

            #第二种方法,按照给定的标准按顺序排列
    T_list = School.select().order_by(School.age)
    for T in T_list:
        print(T.name,T.age,T.address,T.birthday)

        #查多条,给定一个筛选条件
    T_list = School.select().where(School.age == 50)
    for T in T_list:
        print(T.name,T.age,T.address,T.birthday)

        #查一条
    print "---------------------------"
    T= School.get(id=3)
    print(T.id,T.name,T.age,T.address,T.birthday)

#综合使用举例:
    #综合举例
    T_list = School.select().where(School.name == "lishi",School.age == 40)
    for T in T_list:
        print(T.name)  #找不出则为空

#一般查,使用 SQL 语句,下面是用的 Mysql 语句,在 sqlite 里面无用
    sql = "select * from school where name = \'lishi\' or age = 40"
    help(School.raw(sql))

4.3 fn.FIND_IN_SET

以【星桥】列表为例, 搜索 job_tags 使用FIND_IN_SET 函数(使用 fn 功能),搜索 is_valid 使用 peewee.NodeList 进行拼接

from xlib.taskflow import model
from xlib.db import shortcuts
from xlib.db import peewee

job_model = model.Job

query_cmd = job_model.select()
expressions = []
expressions.append(peewee.NodeList((job_model.is_valid, peewee.SQL('='), True)))
expressions.append(peewee.fn.FIND_IN_SET("flag=pega",job_model.job_tags))
query_cmd = query_cmd.where(*expressions)
record_list = query_cmd

print query_cmd.sql()

for record in record_list:
    record_dict = shortcuts.model_to_dict(record)
    print record_dict

SQL

('SELECT `t1`.`job_id`, `t1`.`job_client`, `t1`.`job_namespace`, `t1`.`job_reqid`, `t1`.`job_name`, '
    '`t1`.`job_status`, `t1`.`job_type`, `t1`.`job_tags`, `t1`.`ret_stat`, `t1`.`ret_data`, '
    '`t1`.`job_cost`, `t1`.`job_extra`, `t1`.`job_timeout`, `t1`.`job_lock`, `t1`.`exe_id`, '
    '`t1`.`operator`, `t1`.`is_valid`, `t1`.`c_time`, `t1`.`s_time`, `t1`.`e_time` '
'FROM `workflow_job` AS `t1` '
'WHERE (`t1`.`is_valid` = %s AND FIND_IN_SET(%s, `t1`.`job_tags`))', [True, 'flag=pega'])

4.4 fn.SUM

查询统计总量:

SELECT SUM("t1"."nums") AS "total" FROM "step_collect_infos" AS "t1"

使用的 peewee 的话,使用方法是:

result = StepCollectInfos.select(fn.SUM(StepCollectInfos.nums).alias('total'))

说明,: 结果返回的是一个ModelSelect

示例:

result = StepCollectInfos.select(fn.SUM(StepCollectInfos.nums).alias('total'))
print(result.sql())
print(type(result))
print(result.dicts()[0].get('total'))
for sds in result.dicts():
    print(sds)

结果:

('SELECT SUM("t1"."nums") AS "total" FROM "step_collect_infos" AS "t1"', [])
<class 'peewee.ModelSelect'>
11633
{'total': 11633}

建议使用 result.dicts()[0]

使用 sum 的话,result 有且仅有 1 个记录
# 有结果
result.dicts()[0]: {'total': 11633}
# 没有结果
result.dicts()[0]: {'total': None}

4.5 JOIN.LEFT_OUTER

class BaseModel(Model):
    class Meta:
        database = db

class Location(BaseModel):
    location_key = CharField(primary_key=True)
    lat = FloatField(null = False)
    lon = FloatField(null = False)


class Household(BaseModel):
    name = CharField(null=True)
    location_id = CharField(null=True)
query = (HouseHold
         .select(HouseHold, Location)
         .join(Location, on=(HouseHold.location_id == Location.location_key),
               attr='location_obj', join_type=JOIN.LEFT_OUTER, src=HouseHold))
for house in query:
    # if there was a match, get the location obj or None.
    location_obj = getattr(house, 'location_obj', None)
    # the location_id is still present.
    print(house.location_id, location_obj)

可以通过 alias 将 table 自定义名

unit_model = model_scs.CacheInstance
cluster_model = model_scs.CacheCluster.alias("cluster")

# select
query_cmd = unit_model.select(unit_model)

# left join
query_cmd = query_cmd.join(cluster_model, on=(unit_model.cluster_id == cluster_model.id),
                           attr='cluster_obj', join_type=peewee.JOIN.LEFT_OUTER, src=unit_model)
# where
expressions = []
expressions.append(peewee.NodeList((unit_model.floating_ip, peewee.SQL('='), ip)))
expressions.append(peewee.NodeList((cluster_model.status, peewee.SQL('!='), 10)))
query_cmd = query_cmd.where(*expressions)

record_list = query_cmd
record_dict_list = []
for record in record_list:
    record_dict = shortcuts.model_to_dict(record)
    record_dict["cache_instance_type_string"] = cache_instance_type_map[record_dict["cache_instance_type"]]
    record_dict_list.append(record_dict)
  • 连表操作: query_cmd = query_cmd.join(cluster_model, on=(unit_model.cluster_id == cluster_model.id), attr='cluster_obj', join_type=peewee.JOIN.LEFT_OUTER)

  • 获取连表的 field:

(1) query_cmd = unit_model.select(unit_model, cluster_model)

(2) cluster_obj = getattr(record, 'cluster_obj', None)

执行两次 peewee.JOIN.LEFT_OUTER 连表操作
---- query cmd
query_cmd = unit_model.select(unit_model, cluster_model, user_model)
query_cmd = query_cmd.join(cluster_model, on=(unit_model.cluster_id == cluster_model.id),attr='cluster_obj', join_type=peewee.JOIN.LEFT_OUTER)
query_cmd = query_cmd.join(user_model, on=(user_model.id == cluster_model.user_id),attr='user_obj', join_type=peewee.JOIN.LEFT_OUTER)
---- 获取结果(user_obj 需要从 cluster_obj 中获取)
cluster_obj = getattr(record, 'cluster_obj', None)
user_obj = getattr(cluster_obj, 'user_obj', None)

4.6 count

qry=Contacts.select().where(Contacts.City=='Nasik').count()
print (qry)

4.7 limit

one_ticket = Ticket.select().limit(1)

4.8 time 范围

如:

DateTimeField(column_name="c_time", default=datetime.now)

job_model.c_time < "2022-07-25 00:00:33"

4.9 输出某个字段的最新记录

使用子查询和 join

from peewee import *
from models import YourModel  # 替换为你的模型

# 创建一个子查询,获取每个 user_id 的最大 create_time
subquery = (YourModel
            .select(
                YourModel.user_id,  # 替换为你的去重字段
                fn.MAX(YourModel.create_time).alias('max_time')
            )
            .group_by(YourModel.user_id)
            .alias('subquery'))

# 主查询,通过 JOIN 获取完整记录
query = (YourModel
         .select()
         .join(subquery, on=(
             (YourModel.user_id == subquery.c.user_id) &
             (YourModel.create_time == subquery.c.max_time)
         )))

# 执行查询并输出结果
for record in query:
    print(record.id, record.user_id, record.create_time)

在上面的代码中,subquery.c.max_time 是子查询中计算的最大时间列。确保在 JOIN 条件中使用的是 max_time,而不是 create_time。

Peewee 的实现原理可以结合

pwiz 传送门:

https://github.com/coleifer/peewee/releases/tag/3.9.6
道生一,一生二,二生三,三生万物
详情
https://github.com/coleifer/peewee/blob/master/pwiz.py
How to do left outer join with PeeWee and no ForeignKey?Stack Overflow
Logo