视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
python实现rest请求api示例
2020-11-27 14:30:19 责编:小采
文档

该代码参考新浪python api,适用于各个开源api请求

代码如下:


# -*- coding: utf-8 -*-
import collections
import gzip
import urllib
import urllib2
from urlparse import urlparse

try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO

try:
import json
except ImportError:
import simplejson as json


__author__ = 'myth'


_HTTP_GET = 1
_HTTP_POST = 2
# 超时时间(秒)
TIMEOUT = 45
RETURN_TYPE = {"json": 0, "xml": 1, "html": 2, "text": 3}
_METHOD_MAP = {'GET': _HTTP_GET, 'POST': _HTTP_POST}


class APIError(StandardError):
"""
raise APIError if receiving json message indicating failure.
"""
def __init__(self, error_code, error, request):
self.error_code = error_code
self.error = error
self.request = request
StandardError.__init__(self, error)

def __str__(self):
return 'APIError: %s: %s, request: %s' % (self.error_code, self.error, self.request)


# def callback_type(return_type='json'):
#
# default_type = "json"
# default_value = RETURN_TYPE.get(default_type)
# if return_type:
# if isinstance(return_type, (str, unicode)):
# default_value = RETURN_TYPE.get(return_type.lower(), default_value)
# return default_value


def _format_params(_pk=None, encode=None, **kw):
"""

:param kw:
:type kw:
:param encode:
:type encode:
:return:
:rtype:
"""

__pk = '%s[%%s]' % _pk if _pk else '%s'
encode = encode if encode else urllib.quote

for k, v in kw.iteritems():
_k = __pk % k
if isinstance(v, basestring):
qv = v.encode('utf-8') if isinstance(v, unicode) else v
_v = encode(qv)
yield _k, _v
elif isinstance(v, collections.Iterable):
if isinstance(v, dict):
for _ck, _cv in _format_params(_pk=_k, encode=encode, **v):
yield _ck, _cv
else:
for i in v:
qv = i.encode('utf-8') if isinstance(i, unicode) else str(i)
_v = encode(qv)
yield _k, _v
else:
qv = str(v)
_v = encode(qv)
yield _k, _v


def encode_params(**kw):
"""
do url-encode parameters

>>> encode_params(a=1, b='R&D')
'a=1&b=R%26D'
>>> encode_params(a=u'\u4e2d\u6587', b=['A', 'B', 123])
'a=%E4%B8%AD%E6%96%87&b=A&b=B&b=123'

>>> encode_params(**{
'a1': {'aa1': 1, 'aa2': {'aaa1': 11}},
'b1': [1, 2, 3, 4],
'c1': {'cc1': 'C', 'cc2': ['Q', 1, '@'], 'cc3': {'ccc1': ['s', 2]}}
})
'a1[aa1]=1&a1[aa2][aaa1]=11&c1[cc1]=C&c1[cc3][ccc1]=s&c1[cc3][ccc1]=2&c1[cc2]=Q&c1[cc2]=1&c1[cc2]=%40&b1=1&b1=2&b1=3&b1=4'
"""
# args = []
# for k, v in kw.iteritems():
# if isinstance(v, basestring):
# qv = v.encode('utf-8') if isinstance(v, unicode) else v
# args.append('%s=%s' % (k, urllib.quote(qv)))
# elif isinstance(v, collections.Iterable):
# for i in v:
# qv = i.encode('utf-8') if isinstance(i, unicode) else str(i)
# args.append('%s=%s' % (k, urllib.quote(qv)))
# else:
# qv = str(v)
# args.append('%s=%s' % (k, urllib.quote(qv)))
# return '&'.join(args)

args = []
_urlencode = kw.pop('_urlencode', urllib.quote)
for k, v in _format_params(_pk=None, encode=_urlencode, **kw):
args.append('%s=%s' % (k, v))

args = sorted(args, key=lambda s: s.split("=")[0])
return '&'.join(args)


def _read_body(obj):
using_gzip = obj.headers.get('Content-Encoding', '') == 'gzip'
body = obj.read()
if using_gzip:
gzipper = gzip.GzipFile(fileobj=StringIO(body))
fcontent = gzipper.read()
gzipper.close()
return fcontent
return body


class JsonDict(dict):
"""
general json object that allows attributes to be bound to and also behaves like a dict
"""

def __getattr__(self, attr):
try:
return self[attr]
except KeyError:
raise AttributeError(r"'JsonDict' object has no attribute '%s'" % attr)

def __setattr__(self, attr, value):
self[attr] = value


def _parse_json(s):
"""
parse str into JsonDict
"""

def _obj_hook(pairs):
"""
convert json object to python object
"""
o = JsonDict()
for k, v in pairs.iteritems():
o[str(k)] = v
return o
return json.loads(s, object_hook=_obj_hook)


def _parse_xml(s):
"""
parse str into xml
"""

raise NotImplementedError()


def _parse_html(s):
"""
parse str into html
"""

raise NotImplementedError()


def _parse_text(s):
"""
parse str into text
"""

raise NotImplementedError()


def _http_call(the_url, method, return_type="json", request_type=None, request_suffix=None, headers={}, _timeout=30, **kwargs):
"""
the_url: 请求地址
method 请求方法(get,post)
return_type: 返回格式解析
request_suffix: 请求地址的后缀,如jsp,net
_timeout: 超时时间
kwargs: 请求参数
"""

http_url = "%s.%s" (the_url, request_suffix) if request_suffix else the_url

if request_type == 'json':
headers['Content-Type'] = 'application/json'
# method = _HTTP_POST
# json_data = json.dumps(kwargs)
# # convert str to bytes (ensure encoding is OK)
# params = json_data.encode('utf-8')
params = json.dumps(kwargs)
else:
params = encode_params(**kwargs)
http_url = '%s?%s' % (http_url, params) if method == _HTTP_GET else http_url
print http_url
# u = urlparse(http_url)
# headers.setdefault("host", u.hostname)
http_body = None if method == _HTTP_GET else params

req = urllib2.Request(http_url, data=http_body, headers=headers)

callback = globals().get('_parse_{0}'.format(return_type))
if not hasattr(callback, '__call__'):
print "return '%s' unable to resolve" % return_type
callback = _parse_json
try:

resp = urllib2.urlopen(req, timeout=_timeout if _timeout else TIMEOUT)
body = _read_body(resp)
r = callback(body)
# if hasattr(r, 'error_code'):
# raise APIError(r.error_code, r.get('error', ''), r.get('request', ''))
return r
except urllib2.HTTPError, e:
try:
body = _read_body(e)
r = callback(body)
return r
except:
r = None
# if hasattr(r, 'error_code'):
# raise APIError(r.error_code, r.get('error', ''), r.get('request', ''))
raise e


class HttpObject(object):

def __init__(self, client, method):
self.client = client
self.method = method

def __getattr__(self, attr):

def wrap(**kw):
if attr:
the_url = '%s/%s' % (self.client.api_url, attr.replace('__', '/'))
else:
the_url = self.client.api_url
return _http_call(the_url, self.method, **kw)
return wrap

def __call__(self, **kw):
return _http_call(self.client.api_url, self.method, **kw)


class APIClient(object):
"""
使用方法:
比如:api 请求地址为:http://api.open.zbjdev.com/kuaiyinserv/kuaiyin/billaddress
请求方式为: GET
需要的参数为:user_id 用户的UID
is_all 是否查询所有数据,0为默认邮寄地址 1为全部邮寄地址
access_token 平台认证
返回数据为:json

那么此时使用如下:
domain = "api.open.zbjdev.com"
#如果是https请求,需要将is_https设置为True
client = APIClient(domain)
data = {"user_id": "14035462", "is_all": 1, "access_token": "XXXXXXXXXX"}
# 如果是post请求,请将get方法改为post方法
result = client.kuaiyinserv.kuaiyin.billaddress.get(return_type="json", **data)
#等同于
# result = client.kuaiyinserv__kuaiyin__billaddress__get(return_type="json", **data)
# result = client.kuaiyinserv__kuaiyin__billaddress(return_type="json", **data)
"""

def __init__(self, domain, is_https=False):

http = "http"
if domain.startswith("http://") or domain.startswith("https://"):
http, domain = domain.split("://")
# else:
if is_https:
http = "https"

self.api_url = ('%s://%s' % (http, domain)).rstrip("/")
self.get = HttpObject(self, _HTTP_GET)
self.post = HttpObject(self, _HTTP_POST)

def __getattr__(self, attr):
if '__' in attr:

method = self.get
if attr[-6:] == "__post":
attr = attr[:-6]
method = self.post

elif attr[-5:] == "__get":
attr = attr[:-5]

if attr[:2] == '__':
attr = attr[2:]
return getattr(method, attr)
return _Callable(self, attr)


class _Executable(object):

def __init__(self, client, method, path):
self._client = client
self._method = method
self._path = path

def __call__(self, **kw):
method = _METHOD_MAP[self._method]
return _http_call('%s/%s' % (self._client.api_url, self._path), method, **kw)

def __str__(self):
return '_Executable (%s %s)' % (self._method, self._path)

__repr__ = __str__


class _Callable(object):

def __init__(self, client, name):
self._client = client
self._name = name

def __getattr__(self, attr):
if attr == 'get':
return _Executable(self._client, 'GET', self._name)
if attr == 'post':
return _Executable(self._client, 'POST', self._name)
name = '%s/%s' % (self._name, attr)
return _Callable(self._client, name)

def __str__(self):
return '_Callable (%s)' % self._name

__repr__ = __str__


def test_logistics():

domain = "https://api.kuaidi100.com/api"

#如果是https请求,需要将is_https设置为True
client = APIClient(domain)

data = {"id": "45f2d1f2sds", "com": "yunda", "nu": "1500066330925"}
result = client.__get(_timeout=2, **data)
print result
print result["message"]
print result.get("message")
print result.message


if __name__ == '__main__':


# test_logistics()

data = {
"data":{
"id": 1,
"pid": 3,
"name": u'中的阿斯顿'
},
"sign": 'asdfdsdsfsdf',
}

domain = "kuaiyin.zhubajie.com"

client = APIClient(domain)
# headers = {'Content-Type': 'application/json'}
headers = {"host": domain}

result = client.api.zhubajie.fz.info.post(return_type="json", request_type="json", headers=headers, **data)
print result
print result['data']['msg']

c = APIClient('task.6.zbj.cn')
r = getattr(c.api, 'kuaiyin-action-delcache').post(request_type="json",
headers={},
**{"sign": "",
"data": {
"product": "pvc",
"category": "card",
"req_type": "pack_list"
}})
print r
print r['data']['msg']

下载本文
显示全文
专题