from xlib import db
from xlib.util import config_util
def model_bind_for_region(req, model, db_url_name="url_scs"):
"""
model bind
Returns:
bool
"""
db_url = config_util.get_config(req, "db_x1", db_url_name)
db_obj = db.connect(db_url)
return model.bind(db_obj)
from xlib import retstat
from handlers.x1 import model_scs
from xlib.middleware import funcattr
from xlib.httpgateway import Request
from handlers.x1 import lib_model
@funcattr.api
def test_model_for_region(req, region=None):
"""
test
"""
assert isinstance(req, Request)
if region:
req.idc = region
cachecluster_model = model_scs.CacheCluster.alias("app")
print cachecluster_model._meta.database.get_tables()
data = lib_model.model_bind_for_region(req, cachecluster_model, db_url_name="url_scs")
print data
print cachecluster_model._meta.database.get_tables()
return retstat.OK
from datetime import datetime
from xlib.db.peewee import CharField
from xlib.db.peewee import IntegerField
from xlib.db.peewee import PrimaryKeyField
from xlib.db.peewee import DateTimeField
import xlib.db
# Define a model class
class Xxx(xlib.db.BaseModel):
# If none of the fields are initialized with primary_key=True,
# an auto-incrementing primary key will automatically be created and named 'id'.
ip = CharField(index=True, max_length=64)
port = IntegerField()
redis_id = PrimaryKeyField()
history_exception = IntegerField(default=1)
c_time = DateTimeField(column_name="c_time", default=datetime.now)
u_time = DateTimeField(column_name="u_time", default=datetime.now)
if __name__ == "__main__":
xlib.db.my_databases["default"].connect()
xlib.db.my_databases["default"].drop_tables([Xxx])
xlib.db.my_databases["default"].create_tables([Xxx])
class Message(Model):
user = ForeignKeyField(User, backref='messages')
body = TextField()
send_date = DateTimeField()
print(some_message.user.username)
for message in some_user.messages:
print(message.body)
class Message(Model):
context = TextField()
read_count = IntegerField(default=0)
class Message(Model):
context = TextField()
timestamp = DateTimeField(default=datetime.datetime.now)
class Message(Model):
context = TextField()
timestamp = DateTimeField(constraints=[SQL('DEFAULT CURRENT_TIMESTAMP')])
tweets = Tweet.select()
for tweet in tweets:
# Instead of "tweet.user", we will just get the raw ID value stored
# in the column.
print(tweet.user_id, tweet.message)
class Message(Model):
from_user = ForeignKeyField(User)
to_user = ForeignKeyField(User, backref='received_messages')
text = TextField()
for message in some_user.message_set:
# We are iterating over all Messages whose from_user is some_user.
print(message)
for message in some_user.received_messages:
# We are iterating over all Messages whose to_user is some_user
print(message)
from peewee import *
# 连接数据库
database = MySQLDatabase('test', user='root', host='localhost', port=3306)
# 定义 Person
class Person(Model):
name = CharField()
birthday = DateField()
is_relative = BooleanField()
class Meta(object):
database = database
# The usual way:
db = SqliteDatabase('my_app.db', pragmas={'journal_mode': 'wal'})
# Specify the details at run-time:
db = SqliteDatabase(None)
...
db.init(db_filename, pragmas={'journal_mode': 'wal'})
# Or use a placeholder:
db = DatabaseProxy()
...
db.initialize(SqliteDatabase('my_app.db', pragmas={'journal_mode': 'wal'}))
database_proxy = DatabaseProxy() # Create a proxy for our db.
class BaseModel(Model):
class Meta(object):
database = database_proxy # Use proxy for our DB.
class User(BaseModel):
username = CharField()
# Based on configuration, use a different database.
if app.config['DEBUG']:
database = SqliteDatabase('local.db')
elif app.config['TESTING']:
database = SqliteDatabase(':memory:')
else:
database = PostgresqlDatabase('mega_production_db')
# Configure our proxy to use the db we specified in config.
database_proxy.initialize(database)
# 第一种方法插入单条数据
# 不会返回插入的自增 pk,而是成功返回 1,失败返回 0;
# 直接创建示例,然后使用 save() 就添加了一条新数据
p = Person(name='liuchungui', birthday='1990-12-20', is_relative=True)
p.save()
# 第二种方法插入单条数据
# 返回值是一个 Person 对象
p = Person.create(name='liuchungui', birthday='1990-12-20', is_relative=True)
# 第三种方法插入单条数据
# 返回值是整型,值为 id
p = Person.insert(name='liuchungui', birthday='1990-12-20', is_relative=True).execute()
#-------------------------------------------------------------------
# (1) create 方法内部调用 save 方法
# (2) save 方法内部调用的 insert 方法
#-------------------------------------------------------------------
SELECT SUM("t1"."nums") AS "total" FROM "step_collect_infos" AS "t1"
result = StepCollectInfos.select(fn.SUM(StepCollectInfos.nums).alias('total'))
result = StepCollectInfos.select(fn.SUM(StepCollectInfos.nums).alias('total'))
print(result.sql())
print(type(result))
print(result.dicts()[0].get('total'))
for sds in result.dicts():
print(sds)
('SELECT SUM("t1"."nums") AS "total" FROM "step_collect_infos" AS "t1"', [])
<class 'peewee.ModelSelect'>
11633
{'total': 11633}
class BaseModel(Model):
class Meta:
database = db
class Location(BaseModel):
location_key = CharField(primary_key=True)
lat = FloatField(null = False)
lon = FloatField(null = False)
class Household(BaseModel):
name = CharField(null=True)
location_id = CharField(null=True)
query = (HouseHold
.select(HouseHold, Location)
.join(Location, on=(HouseHold.location_id == Location.location_key),
attr='location_obj', join_type=JOIN.LEFT_OUTER, src=HouseHold))
for house in query:
# if there was a match, get the location obj or None.
location_obj = getattr(house, 'location_obj', None)
# the location_id is still present.
print(house.location_id, location_obj)
unit_model = model_scs.CacheInstance
cluster_model = model_scs.CacheCluster.alias("cluster")
# select(需要将涉及的 model 都放进来)
query_cmd = unit_model.select(unit_model,cluster_model)
# left join
query_cmd = query_cmd.join(cluster_model, on=(unit_model.cluster_id == cluster_model.id),
attr='cluster_obj', join_type=peewee.JOIN.LEFT_OUTER, src=unit_model)
# where
expressions = []
expressions.append(peewee.NodeList((unit_model.floating_ip, peewee.SQL('='), ip)))
expressions.append(peewee.NodeList((cluster_model.status, peewee.SQL('!='), 10)))
query_cmd = query_cmd.where(*expressions)
record_list = query_cmd
record_dict_list = []
for record in record_list:
record_dict = shortcuts.model_to_dict(record)
record_dict["cache_instance_type_string"] = cache_instance_type_map[record_dict["cache_instance_type"]]
record_dict_list.append(record_dict)