视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
使用Python的web.py框架实现类似Django的ORM查询的教程
2020-11-27 14:41:46 责编:小采
文档

Django中的对象查询

Django框架自带了ORM,实现了一些比较强大而且方便的查询功能,这些功能和表无关。比如下面这个例子:

class Question(models.Model):
 question_text = models.CharField(max_length=200)
 pub_date = models.DateTimeField('date published')


>>> Question.objects.all()
>>> Question.objects.get(pk=1)

从例子可以看出,objects.all和objects.get这些功能都不是在class Question中定义的,可能在其父类models.Model中定义,也可能不是。那么我们在web.py中如何实现这样的功能呢?(如果你选择使用SQLAlchemy就不需要自己实现了)。
实现
思路

我们注意到Question.objects.all()这样的调用是直接访问了类属性objects,并调用了objects属性的方法all()。这里objects可能是一个实例,也可能是一个类。我个人认为(我没看过Django的实现)这应该是一个实例,因为实例化的过程可以传递一些表的信息,使得类似all()这样的函数可以工作。经过分析之后,我们可以列出我们需要解决的问题:

  • 需要实现一个模型的父类Model,实际的表可以从这个父类继承以获得自己没有定义的功能。
  • 实际的模型类(比如Question类)定义后,不实例话的情况下就要具备objects.all()这样的查询效果。
  • 从上面的需求可以看出,我们需要在类定义的时候就实现这些功能,而不是等到类实例化的时候再实现这些功能。类定义的时候实现功能?这不就是metaclass(元类)做的事情嘛。因此实现过程大概是下面这样的:
  • 实现一个Model类,其绑定方法和表的增、删、改有关。
  • 修改Model类的元类为ModelMetaClass,该元类定义的过程中为类增加一个objects对象,该对象是一个ModelDefaultManager类的实例,实现了表的查询功能。
  • 代码

    都说不给代码就是耍流氓,我还是给吧。说明下:使用的数据库操作都是web.py的db库中的接口。

     # -*- coding: utf-8 -*-
    
     import web
    
     import config # 自定义的配置类,可以忽略
    
    
     def _connect_to_db():
     return web.database(dbn="sqlite", db=config.dbname)
    
    
     def init_db():
     db = _connect_to_db()
     for statement in config.sql_statements:
     db.query(statement)
    
    
     class ModelError(Exception):
     """Exception raised by all models.
    
     Attributes:
     msg: Error message.
     """
    
     def __init__(self, msg=""):
     self.msg = msg
    
     def __str__(self):
     return "ModelError: %s" % self.msg
    
    
     class ModelDefaultManager(object):
     """ModelManager implements query functions against a model.
    
     Attributes:
     cls: The class to be managed.
     """
    
     def __init__(self, cls):
     self.cls = cls
     self._table_name = cls.__name__.lower()
    
     def all(self):
     db = _connect_to_db()
     results = db.select(self._table_name)
     return [self.cls(x) for x in results]
    
     def get(self, query_vars, where):
     results = self.filter(query_vars, where, limit=1)
     if len(results) > 0:
     return results[0]
     else:
     return None
    
     def filter(self, query_vars, where, limit=None):
     db = _connect_to_db()
     try:
     results = db.select(self._table_name, vars=query_vars, where=where,
     limit=limit)
     except (Exception) as e:
     raise ModelError(str(e))
    
     return [self.cls(x) for x in results]
    
    
     class ModelMetaClass(type):
    
     def __new__(cls, classname, bases, attrs):
     new_class = super(ModelMetaClass, cls).__new__(cls, classname,
     bases, attrs)
     objects = ModelDefaultManager(new_class)
     setattr(new_class, "objects", objects)
    
     return new_class
    
    
     class Model(object):
     """Parent class of all models.
     """
    
     __metaclass__ = ModelMetaClass
    
     def __init__(self):
     pass
    
     def _table_name(self):
     return self.__class__.__name__.lower()
    
     def insert(self, **kargs):
     db = _connect_to_db()
     try:
     with db.transaction():
     db.insert(self._table_name(), **kargs)
     except (Exception) as e:
     raise ModelError(str(e))
    
     def delete(self, where, using=None, vars=None):
     db = _connect_to_db()
     try:
     with db.transaction():
     db.delete(self._table_name(), where, vars=vars)
     except (Exception) as e:
     raise ModelError(str(e))
    
     def save(self, where, vars=None, **kargs):
     db = _connect_to_db()
     try:
     with db.transaction():
     db.update(self._table_name(), where, vars, **kargs)
     except (Exception) as e:
     raise ModelError(str(e))
    
    

    使用

    首先定义表对应的类:

    class Users(Model):
     ...
    
    

    使用就和Django的方式一样:

    >>> user_list = Users.objects.all()
    
    

    下载本文
    显示全文
    专题