视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
通过Loadtable命令将数据文件加载到SybaseIQ数据库里面的Python
2020-11-09 15:13:23 责编:小采
文档


CREATE TABLE poc_app.sys_ftp_cfg ( ftp_id varchar(100) NOT NULL, --话单文件名标记 ftp_cycle_id varchar(1) NOT NULL, --话单文件名周期 ftp_stage_filepath varchar(255) NOT NULL, --话单处理后路径 ftp_stage_filereg varchar(100) NOT NULL, --话单

CREATE TABLE poc_app.sys_ftp_cfg
(
ftp_id varchar(100) NOT NULL, --话单文件名标记
ftp_cycle_id varchar(1) NOT NULL, --话单文件名周期
ftp_stage_filepath varchar(255) NOT NULL, --话单处理后路径
ftp_stage_filereg varchar(100) NOT NULL, --话单处理后名称格式
stage_schema varchar(100) NOT NULL, --schema名称
table_name varchar(100) NOT NULL, --表名
delimiter_type_id varchar(10) NOT NULL --分隔符
);

insert into poc_app.sys_ftp_cfg
values('jiang_test_d','D','/home/sybase/day','jiang_test_[YYYYMMDD].dat','poc_app','jiang_test','|');

#!/usr/bin/python

#-*- encoding: utf-8 -*-
####################################################################################
# name: SybaseIQ_LoadData.py
# describe: 通过Load table命令将数据文件加载到Sybase IQ数据库里面
####################################################################################
import os
import pyodbc
import string
import sys
from subprocess import Popen,PIPE
import ConfigParser
reload(sys)
sys.setdefaultencoding('utf8')

'''
将数据文件加载到Sybase IQ数据库里面
'''
class SybaseIQLoad:
debug = 0
def __init__(self,dbinfo):
self.UID = dbinfo[1]
self.PWD = dbinfo[2]
odbcinfo = 'DSN=%s;UID=%s;PWD=%s'%(dbinfo[0],dbinfo[1],dbinfo[2])
self.cnxn = pyodbc.connect(odbcinfo,autocommit=True,ansi=True)
self.cursor = self.cnxn.cursor()

def __del__(self):
if self.cursor:
self.cursor.close()
if self.cnxn:
self.cnxn.close()

def _printinfo(self,msg):
print "%s"%(msg)
print "\n"

def _GetStageName(self,ftp_stage_filereg,ftp_cycle_id,cur_static_time):
if ftp_cycle_id.lower() == 'h':
ftp_stage_filename = ftp_stage_filereg.replace('[YYYYMMDDHH]',cur_static_time[0:10])
if ftp_cycle_id.lower() == 'd':
ftp_stage_filename = ftp_stage_filereg.replace('[YYYYMMDD]',cur_static_time[0:8])
if ftp_cycle_id.lower() == 'w':
ftp_stage_filename = ftp_stage_filereg.replace('[YYYY_WW]',cur_static_time[0:7])
if ftp_cycle_id.lower() == 'm':
ftp_stage_filename = ftp_stage_filereg.replace('[YYYYMM]',cur_static_time[0:6])
return ftp_stage_filename

def _getLoadInfo(self,ftp_id):
sql = '''
select
ftp_cycle_id
,ftp_stage_filepath
,ftp_stage_filereg
,stage_schema
,delimiter_type_id
,table_name
from jiang.sys_ftp_cfg
where ftp_id = '%s'
''' %(ftp_id)
self.cursor.execute(sql.strip())
row = self.cursor.fetchone()
return row

def _getSybIQServInfo(self):
# 保存SybaseIQ的主机和端口号
sybservinfo = []

# ODBC配置文件绝对路径
unixodbc_file = "/etc/unixODBC/odbc.ini"
config = ConfigParser.ConfigParser()
config.read(unixodbc_file)
# 获取SybaseIQ的IP地址
ServerIP = config.get("SybaseIQDSN", "Server")
# 获取SybaseIQ的端口号
Port = config.get("SybaseIQDSN", "Port")

# 保存获取的IP地址和端口号
sybservinfo.append(ServerIP)
sybservinfo.append(Port)

return sybservinfo

def loaddata(self,ftp_id,cur_static_time):
#取文件加载相关配置信息
row = self._getLoadInfo(ftp_id)

ftp_cycle_id = row[0]
ftp_stage_filepath = row[1]
ftp_stage_filereg = row[2]
stage_schema = row[3]
delimiter_type_id = row[4]
table_name = row[5]

# 获取指定日期的文件名
ftp_stage_filename = self._GetStageName(ftp_stage_filereg,ftp_cycle_id,cur_static_time)

# 获取清洗后文件的绝对路径
ftp_stage_absolute_filename = os.path.join(ftp_stage_filepath,ftp_stage_filename)

# 对清洗后的文件再进行处理
#ftp_stage_absolute_filename_final = ftp_stage_absolute_filename + '*'

# 获取SybaseIQ的主机IP地址和端口号
sybaseiq_ipport = self._getSybIQServInfo()

# 获取表的所有字段
table_columns = '''
select column_name
from syscolumn a
join systable b
on a.table_id = b.table_id
where b.table_name = '%s' ># /tmp/table_name.log
'''%(table_name)
load_sql='''dbisql -c "uid=%s;pwd=%s" -Host %s -port %s -nogui "%s"'''%(self.UID,self.PWD,sybaseiq_ipport[0],sybaseiq_ipport[1],table_columns)
os.system(load_sql)

# 处理生成的表字段文件
columns_sql = '''
cat /tmp/table_name.log | sed "s/'//g" | awk '{printf "%s,",$0}'| sed 's/,$//g'
'''
result = Popen(columns_sql,shell=True,stdout=PIPE,stderr=PIPE)
right_info = result.stdout.read().strip('\xef|\xbb|\xbf')
err_info = result.stderr.read()

loadsql = '''
load table %s.cpms_area_user
(
%s
)
USING FILE '%s'
FORMAT ASCII
ESCAPES OFF
QUOTES OFF
NOTIFY 1000000
DELIMITED BY '%s'
WITH CHECKPOINT ON;
COMMIT;
'''%(stage_schema, right_info, ftp_stage_absolute_filename, delimiter_type_id)

try:
iserr = 0
print "*************Begin to execute load table command...*************\n"
if self.debug == 1:
self._printinfo(loadsql.strip())
#self.cursor.execute(loadsql.strip())
loadsql='''dbisql -c "uid=%s;pwd=%s" -Host %s -port %s -nogui "%s"'''%(self.UID,self.PWD,sybaseiq_ipport[0],sybaseiq_ipport[1],loadsql)
os.system(loadsql)
print "\n*************End to execute load table command...*************"
print "**************************Successful**************************"
except Exception,err:
iserr = 1
print "Return value %s,Error %s" % (iserr,err)

return iserr
#Main
def main():
# 检查传入参数个数
if len(sys.argv) < 6 :
print 'usage: python SybaseIQ_LoadData.py SybaseDSN username password ftp_id cur_static_time\n'
sys.exit(1)

# 定义连接Sybase IQ的信息
dbinfo = []
#dbinfo.append('SybaseIQDSN')
#dbinfo.append('jiang')
#dbinfo.append('jiang')
dbinfo.append(sys.argv[1])
dbinfo.append(sys.argv[2])
dbinfo.append(sys.argv[3])

ftp_id = sys.argv[4]
cur_static_time = sys.argv[5]

SIQ = SybaseIQLoad(dbinfo)
ret = SIQ.loaddata(ftp_id,cur_static_time)
return ret
if __name__ == '__main__':
sys.exit(main())



下载本文
显示全文
专题