视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
Python操作Oracle数据库的简单方法和封装类实例
2020-11-27 14:21:40 责编:小采
文档


然而,我的客户端的配置项大概是这样的,

(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=127.0.0.1)(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=KGDB)))

我擦,好像长得一样的,类型也都是字符串类型的,试试直接把我文件里的配置项赋值给tns试试。

tns = '(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=127.0.0.1)(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=KGDB)))'
conn = cx_Oracle.connect('nicker', '123456', tns)

嗯。成功了~

最后,贴一个基本使用方法全代码

#coding:utf-8
import cx_Oracle
# 创建数据库连接
# cx_Oracle.connect('username','pwd','ora的tns信息')
# oracle数据库的tns信息,从tnsnames.ora中找到plsql可用的配置项,将该配置项直接拷贝过来即可
ora_tns = '(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=127.0.0.1)(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=KGDB)))'
conn = cx_Oracle.connect('nicker', '123456', ora_tns)
# 操作游标
cursor = conn.cursor()
# 执行查询
cursor.execute("SELECT * FROM inst_info")
# 获取返回信息
rs = cursor.fetchall()
# 
输出信息 for v in rs: print v #关闭连接,释放资源 cursor.close() conn.close()

观察发现总结很重要,理解也需要

贴上一个自己封装Oracle的类

#coding:utf-8
import cx_Oracle
# 封装的类
class cxOracle:
 '''
 tns的取值tnsnames.ora对应的配置项的值,如:
 tns = '(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=10.16.18.23)(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=MYDB)))'
 '''
 def __init__(self ,uname, upwd,tns ):
 self ._uname = uname
 self ._upwd = upwd
 self ._tns = tns
 self ._conn = None
 self ._ReConnect()
 def _ReConnect(self ):
 if not self._conn :
 self ._conn = cx_Oracle.connect (self. _uname, self ._upwd, self._tns)
 else:
 pass
 def __del__(self ):
 if self. _conn:
 self ._conn. close()
 self ._conn = None
 def _NewCursor(self ):
 cur = self. _conn.cursor ()
 if cur:
 return cur
 else:
 print "#Error# Get New Cursor Failed."
 return None
 def _DelCursor(self , cur):
 if cur:
 cur .close()
 # 检查是否允许执行的sql语句
 def _PermitedUpdateSql(self ,sql):
 rt = True
 lrsql = sql. lower()
 sql_elems = [ lrsql.strip ().split()]
 # update和delete最少有四个单词项
 if len( sql_elems) < 4 :
 rt = False
 # 更新删除语句,判断首单词,不带where语句的sql不予执行
 elif sql_elems[0] in [ 'update', 'delete']:
 if 'where' not in sql_elems :
 rt = False
 return rt
 # 导出
结果为文件 def Export(self , sql, file_name, colfg ='||'): rt = self. Query(sql ) if rt: with open( file_name, 'a') as fd: for row in rt: ln_info = '' for col in row: ln_info += str( col) + colfg ln_info += ' ' fd .write( ln_info) # 查询 def Query(self , sql, nStart=0 , nNum=- 1): rt = [] # 获取cursor cur = self. _NewCursor() if not cur: return rt # 查询到列表 cur .execute(sql) if ( nStart==0 ) and (nNum==1 ): rt .append( cur.fetchone ()) else: rs = cur. fetchall() if nNum==- 1: rt .extend( rs[nStart:]) else: rt .extend( rs[nStart:nStart +nNum]) # 释放cursor self ._DelCursor(cur) return rt # 更新 def Exec(self ,sql): # 获取cursor rt = None cur = self. _NewCursor() if not cur: return rt # 判断sql是否允许其执行 if not _PermitedUpdateSql(sql ): return rt # 执行语句 rt = cur. execute(sql ) # 释放cursor self ._DelCursor(cur) return rt # 类使用示例 tns = '(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=10.16.17.46)(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=MYDB)))' ora = cxOracle ('nicker', '123456', tns) # 导出结果为文件 rs = ora .Export("SELECT * FROM org", '1.txt') # 查询结果到列表 rs = ora.Query("SELECT * FROM org") print rs # 更新数据 ora.Exec("update org set org_name='NewNameForUpdate' where org_id=123456;")

下载本文
显示全文
专题