视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
MySQL数据库设计之利用Python操作Schema方法详解
2020-11-09 20:29:42 责编:小采
文档


弓在箭要射出之前,低声对箭说道,“你的自由是我的”。Schema如箭,弓似Python,选择Python,是Schema最大的自由。而自由应是一个能使自己变得更好的机会。

Schema是什么?

不管我们做什么应用,只要和用户输入打交道,就有一个原则--永远不要相信用户的输入数据。意味着我们要对用户输入进行严格的验证,web开发时一般输入数据都以JSON形式发送到后端API,API要对输入数据做验证。一般我都是加很多判断,各种if,导致代码很丑陋,能不能有一种方式比较优雅的验证用户数据呢?Schema就派上用场了。

㈠ MySQLdb部分

表结构:

mysql> use sakila; 
mysql> desc actor; 
+-------------+----------------------+------+-----+-------------------+-----------------------------+ 
| Field | Type | Null | Key | Default | Extra | 
+-------------+----------------------+------+-----+-------------------+-----------------------------+ 
| actor_id | smallint(5) unsigned | NO | PRI | NULL | auto_increment | 
| first_name | varchar(45) | NO | | NULL | | 
| last_name | varchar(45) | NO | MUL | NULL | | 
| last_update | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP | 
+-------------+----------------------+------+-----+-------------------+-----------------------------+ 
4 rows in set (0.00 sec) 

数据库连接模块:

[root@DataHacker ~]# cat dbapi.py 
#!/usr/bin/env ipython 
#coding = utf-8 
#Author: linwaterbin@gmail.com 
#Time: 2014-1-29 
 
import MySQLdb as dbapi 
 
USER = 'root' 
PASSWD = 'oracle' 
HOST = '127.0.0.1' 
DB = 'sakila' 
 
conn = dbapi.connect(user=USER,passwd=PASSWD,host=HOST,db=DB) 

1 打印列的元数据

[root@DataHacker ~]# cat QueryColumnMetaData.py 
#!/usr/bin/env ipython 
 
from dbapi import * 
 
cur = conn.cursor() 
statement = """select * from actor limit 1""" 
cur.execute(statement) 
 
print "output column metadata....." 
print 
for record in cur.description: 
 print record 
 
cur.close() 
conn.close() 

1.)调用execute()之后,cursor应当设置其description属性
2.)是个tuple,共7列:列名、类型、显示大小、内部大小、精度、范围以及一个是否接受null值的标记

[root@DataHacker ~]# chmod +x QueryColumnMetaData.py 
[root@DataHacker ~]# ./QueryColumnMetaData.py 
output column metadata..... 
 
('actor_id', 2, 1, 5, 5, 0, 0) 
('first_name', 253, 8, 45, 45, 0, 0) 
('last_name', 253, 7, 45, 45, 0, 0) 
('last_update', 7, 19, 19, 19, 0, 0) 

2 通过列名访问列值

默认情况下,获取方法从数据库作为"行"返回的值是元组

In [1]: from dbapi import * 
In [2]: cur = conn.cursor() 
In [3]: v_sql = "select actor_id,last_name from actor limit 2" 
In [4]: cur.execute(v_sql) 
Out[4]: 2L 
In [5]: results = cur.fetchone() 
In [6]: print results[0] 
58 
In [7]: print results[1] 
AKROYD 

我们能够借助cursorclass属性来作为字典返回

In [2]: import MySQLdb.cursors 
In [3]: import MySQLdb 
In [4]: conn = MySQLdb.connect(user='root',passwd='oracle',host='127.0.0.1',db='sakila',cursorclass=MySQLdb.cursors.DictCursor) 
In [5]: cur = conn.cursor() 
In [6]: v_sql = "select actor_id,last_name from actor limit 2" 
In [7]: cur.execute(v_sql) 
Out[7]: 2L 
In [8]: results = cur.fetchone() 
In [9]: print results['actor_id'] 
58 
In [10]: print results['last_name'] 
AKROYD 

㈡ SQLAlchemy--SQL炼金术师

虽然SQL有国际标准,但遗憾的是,各个数据库厂商对这些标准的解读都不一样,并且都在标准的基础上实现了各自的私有语法。为了隐藏不同SQL“方言”之间到区别,人们开发了诸如SQLAlchemy之类的工具

SQLAlchemy连接模块:

[root@DataHacker Desktop]# cat sa.py 
import sqlalchemy as sa 
engine = sa.create_engine('mysql://root:oracle@127.0.0.1/testdb',pool_recycle=3600) 
metadata = sa.MetaData() 

example 1:表定义

In [3]: t = Table('t',metadata, 
 ...: Column('id',Integer), 
 ...: Column('name',VARCHAR(20)), 
 ...: mysql_engine='InnoDB', 
 ...: mysql_charset='utf8' 
 ...: ) 
 
In [4]: t.create(bind=engine) 

example 2:表删除

有2种方式,其一: 
In [5]: t.drop(bind=engine,checkfirst=True) 
另一种是: 
In [5]: metadata.drop_all(bind=engine,checkfirst=True),其中可以借助tables属性指定要删除的对象 

example 3: 5种约束

3 .1 primary key 
下面2种方式都可以,一个是列级,一个是表级 
In [7]: t_pk_col = Table('t_pk_col',metadata,Column('id',Integer,primary_key=True),Column('name',VARCHAR(20))) 
In [8]: t_pk_col.create(bind=engine) 
In [9]: t_pk_tb = Table('t_pk_01',metadata,Column('id',Integer),Column('name',VARCHAR(20)),PrimaryKeyConstraint('id','name',name='prikey')) 
In [10]: t_pk_tb.create(bind=engine) 
3.2 Foreign Key 
In [13]: t_fk = Table('t_fk',metadata,Column('id',Integer,ForeignKey('t_pk.id'))) 
In [14]: t_fk.create(bind=engine) 
In [15]: t_fk_tb = Table('t_fk_tb',metadata,Column('col1',Integer),Column('col2',VARCHAR(10)),ForeignKeyConstraint(['col1','col2'],['t_pk.id','t_pk.name'])) 
In [16]: t_fk_tb.create(bind=engine) 
3.3 unique 
In [17]: t_uni = Table('t_uni',metadata,Column('id',Integer,unique=True)) 
In [18]: t_uni.create(bind=engine) 
In [19]: t_uni_tb = Table('t_uni_tb',metadata,Column('col1',Integer),Column('col2',VARCHAR(10)),UniqueConstraint('col1','col2')) 
In [20]: t_uni_tb.create(bind=engine) 
3.4 check 
 虽然能成功,但MySQL目前尚未支持check约束。这里就不举例了。 
3.5 not null 
In [21]: t_null = Table('t_null',metadata,Column('id',Integer,nullable=False)) 
In [22]: t_null.create(bind=engine) 

4 默认值

分2类:悲观(值由DB Server提供)和乐观(值由SQLAlshemy提供),其中乐观又可分:insert和update

4.1 例子:insert 
In [23]: t_def_inser = Table('t_def_inser',metadata,Column('id',Integer),Column('name',VARCHAR(10),server_default='cc')) 
In [24]: t_def_inser.create(bind=engine) 
3.2 例子:update 
In [25]: t_def_upda = Table('t_def_upda',metadata,Column('id',Integer),Column('name',VARCHAR(10),server_onupdate='DataHacker')) 
In [26]: t_def_upda.create(bind=engine) 
3.3 例子:Passive 
In [27]: t_def_pass = Table('t_def_pass',metadata,Column('id',Integer),Column('name',VARCHAR(10),DefaultClause('cc'))) 
In [28]: t_def_pass.create(bind=engine) 

㈢ 隐藏Schema

数据的安全是否暴露在完全可信任的对象面前,这是任何有安全意识的DBA都不会去冒的风险。比较好的方式是尽可能隐藏Schema结构并验证用户输入的数据完整性,这在一定程度上虽然增加了运维成本,但安全无小事。

这里借助开发一个命令行工具来阐述该问题

需求:隐藏表结构,实现动态查询,并将结果模拟mysql \G输出

版本: 
[root@DataHacker ~]# ./sesc.py --version 
1.0 
查看帮助: 
[root@DataHacker ~]# ./sesc.py -h 
Usage: sesc.py [options] <arg1> <arg2> [<arg3>...] 
Options: 
 --version show program's version number and exit 
 -h, --help show this help message and exit 
 -q TERM assign where predicate 
 -c COL, --column=COL assign query column 
 -t TABLE assign query table 
 -f, --format -f must match up -o 
 -o OUTFILE assign output file 
我们要的效果: 
[root@DataHacker ~]# ./sesc.py -t actor -c last_name -q s% -f -o output.txt 
[root@DataHacker ~]# cat output.txt 
************ 1 row ******************* 
actor_id: 180 
first_name: JEFF 
last_name: SILVERSTONE 
last_update: 2006-02-15 04:34:33 
************ 2 row ******************* 
actor_id: 195 
first_name: JAYNE 
last_name: SILVERSTONE 
last_update: 2006-02-15 04:34:33 
......<此处省略大部分
输出>......

请看代码

#!/usr/bin/env python
import optparse
from dbapi import *

#构造OptionParser实例,配置期望的选项
parser = optparse.OptionParser(usage="%prog [options] <arg1> <arg2> [<arg3>...]",version='1.0',)
#定义命令行选项,用add_option一次增加一个
parser.add_option("-q",action="store",type="string",dest="term",help="assign where predicate")
parser.add_option("-c","--column",action="store",type="string",dest="col",help="assign query column")
parser.add_option("-t",action="store",type="string",dest="table",help="assign query table")
parser.add_option("-f","--format",action="store_true",dest="format",help="-f must match up -o")
parser.add_option("-o",action="store",type="string",dest="outfile",help="assign output file")
#解析命令行
options,args = parser.parse_args()
#把上述dest值赋给我们自定义的变量
table = options.table
column = options.col
term = options.term
format = options.format
#实现动态读查询
statement = "select * from %s where %s like '%s'"%(table,column,term)
cur = conn.cursor()
cur.execute(statement)
results = cur.fetchall()
#模拟 \G 
输出形式 if format is True: columns_query = "describe %s"%(table) cur.execute(columns_query) heards = cur.fetchall() column_list = [] for record in heards: column_list.append(record[0]) output = "" count = 1 for record in results: output = output + "************ %s row ************\n\n"%(count) for field_no in xrange(0, len(column_list)): output = output + column_list[field_no]+ ": " + str(record[field_no]) + "\n" output = output + "\n" count = count + 1 else: output = [] for record in xrange(0,len(results)): output.append(results[record]) output = ''.join(output) #把输出结果定向到指定文件 if options.outfile: outfile = options.outfile with open(outfile,'w') as out: out.write(output) else: print output #关闭游标与连接 conn.close() cur.close()

总结

下载本文
显示全文
专题