视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
抽取oracle数据到mysql数据库的实现过程
2020-11-09 21:17:43 责编:小采
文档


在oracle数据库迁移至mysql数据库,除了oracle数据库模型移到mysql外,还一个重要环节就是要将oracle数据库的数据移到mysql数据库,本人尝试用过多款数据迁移程序,性能都不是很好的,于是自己动手写一个针对于oracle数据库数据迁移到mysql数据程序,其具体过程如下:

1、要抽取mysql表、字段及过滤条件的配制文件imp_data.sql

2、建立一个目录ETL_DIR

3、运行oracle数据库程序P_ETL_ORA_DATA,生成各表的csv数据文件,同时也生成一个导入mysql的脚本文件imp_data.sql

4、导入mysql数据,文件内容如下

load data infile "alarm_hist_inc.csv" into table alarm_hist_inc fields terminated by "," enclosed by "^" lines terminated by "\r\n";
load data infile "button_authority.csv" into table button_authority fields terminated by "," enclosed by "^" lines terminated by "\r\n";
load data infile "c3_sms_hist_inc.csv" into table c3_sms_hist_inc fields terminated by "," enclosed by "^" lines terminated by "\r\n";
load data infile "datapermisson.csv" into table datapermisson fields terminated by "," enclosed by "^" lines terminated by "\r\n";

附:数据库脚本P_ETL_ORA_DATA

CREATE OR REPLACE PROCEDURE P_ETL_ORA_DATA
(
 P_ORA_DIR VARCHAR2,
 P_DATA_PATH VARCHAR2
) IS
 TYPE T_REC IS RECORD(
 TBN VARCHAR2(40),
 WHR VARCHAR2(4000));
 TYPE T_TABS IS TABLE OF T_REC;
 V_TABS T_TABS := T_TABS();
 V_ETL_DIR VARCHAR2(40) := P_ORA_DIR;
 V_LOAD_FILE UTL_FILE.FILE_TYPE;
 PROCEDURE ETL_DATA
 (
 P_SQL_STMT VARCHAR2,
 P_DATA_PATH VARCHAR2,
 P_TB_NAME VARCHAR2
 ) IS
 BEGIN
 DECLARE
 V_VAR_COL VARCHAR2(32767);
 V_NUM_COL NUMBER;
 V_DATE_COL DATE;
 V_TMZ TIMESTAMP;
 V_COLS NUMBER;
 V_COLS_DESC DBMS_SQL.DESC_TAB;
 V_ROW_STR VARCHAR2(32767);
 V_COL_STR VARCHAR2(32767);
 V_SQL_ID NUMBER;
 V_SQL_REF SYS_REFCURSOR;
 V_EXP_FILE UTL_FILE.FILE_TYPE;
 V_DATA_PATH VARCHAR2(200);
 BEGIN
 V_DATA_PATH := P_DATA_PATH;
 IF REGEXP_SUBSTR(V_DATA_PATH, '\\$') IS NULL
 THEN
 V_DATA_PATH := V_DATA_PATH || '\';
 END IF;
 V_DATA_PATH := REPLACE(V_DATA_PATH, '\', '\\');
 OPEN V_SQL_REF FOR P_SQL_STMT;
 V_SQL_ID := DBMS_SQL.TO_CURSOR_NUMBER(V_SQL_REF);
 DBMS_SQL.DESCRIBE_COLUMNS(V_SQL_ID, V_COLS, V_COLS_DESC);
 FOR I IN V_COLS_DESC.FIRST .. V_COLS_DESC.LAST
 LOOP
 CASE
 WHEN V_COLS_DESC(I).COL_TYPE IN (1, 9, 96) THEN
 DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_VAR_COL, 32767);
 WHEN V_COLS_DESC(I).COL_TYPE = 2 THEN
 DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_NUM_COL);
 WHEN V_COLS_DESC(I).COL_TYPE = 12 THEN
 DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_DATE_COL);
 WHEN V_COLS_DESC(I).COL_TYPE = 180 THEN
 DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_TMZ);
 END CASE;
 END LOOP;
 DECLARE
 V_FLUSH_OVER PLS_INTEGER := 1;
 V_FILE_OVER PLS_INTEGER := 1;
 V_FILE_NO PLS_INTEGER := 1;
 V_FILE_NAME VARCHAR2(200);
 V_LINE VARCHAR2(400);
 BEGIN
 WHILE DBMS_SQL.FETCH_ROWS(V_SQL_ID) > 0
 LOOP
 IF V_FILE_OVER = 1
 THEN
 V_FILE_NAME := P_TB_NAME || '_' || V_FILE_NO || '.csv';
 V_EXP_FILE := UTL_FILE.FOPEN(V_ETL_DIR, V_FILE_NAME, OPEN_MODE => 'w', MAX_LINESIZE => 32767);
 END IF;
 V_ROW_STR := '';
 FOR I IN 1 .. V_COLS
 LOOP
 V_COL_STR := '\N';
 BEGIN
 CASE
 WHEN V_COLS_DESC(I).COL_TYPE IN (1, 9, 96) THEN
 DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_VAR_COL);
 IF V_VAR_COL IS NOT NULL
 THEN
 V_COL_STR := '^' || V_VAR_COL || '^';
 END IF;
 WHEN V_COLS_DESC(I).COL_TYPE = 2 THEN
 DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_NUM_COL);
 IF V_NUM_COL IS NOT NULL
 THEN
 V_COL_STR := V_NUM_COL;
 END IF;
 WHEN V_COLS_DESC(I).COL_TYPE = 12 THEN
 DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_DATE_COL);
 IF V_DATE_COL IS NOT NULL
 THEN
 V_COL_STR := '^' || TO_CHAR(V_DATE_COL, 'yyyy-mm-dd hh24:mi:ss') || '^';
 END IF;
 WHEN V_COLS_DESC(I).COL_TYPE IN (180, 181, 231) THEN
 DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_TMZ);
 IF V_TMZ IS NOT NULL
 THEN
 V_COL_STR := '^' || TO_CHAR(V_TMZ, 'yyyy-mm-dd hh24:mi:ss.ff6') || '^';
 END IF;
 END CASE;
 IF I = 1
 THEN
 V_ROW_STR := V_COL_STR;
 ELSE
 V_ROW_STR := V_ROW_STR || ',' || V_COL_STR;
 END IF;
 END;
 END LOOP;
 UTL_FILE.PUT_LINE(V_EXP_FILE, CONVERT(V_ROW_STR, 'UTF8'));
 IF V_FILE_OVER > 200000 /*每200000条记录就产生一个新的文件*/
 THEN
 V_FILE_OVER := 1;
 V_FLUSH_OVER := 1;
 V_FILE_NO := V_FILE_NO + 1;
 UTL_FILE.FCLOSE(V_EXP_FILE);
 V_LINE := 'load data infile "' || V_DATA_PATH || V_FILE_NAME || '" into table ' || P_TB_NAME;
 V_LINE := V_LINE || ' fields terminated by "," enclosed by "^" lines terminated by "\r\n";';
 UTL_FILE.PUT_LINE(V_LOAD_FILE, V_LINE);
 UTL_FILE.FFLUSH(V_LOAD_FILE);
 CONTINUE;
 END IF;
 V_FILE_OVER := V_FILE_OVER + 1;
 IF V_FLUSH_OVER > 2000 /*每2000条记录就刷新缓存,写到文件中 */
 THEN
 UTL_FILE.FFLUSH(V_EXP_FILE);
 V_FLUSH_OVER := 1;
 ELSE
 V_FLUSH_OVER := V_FLUSH_OVER + 1;
 END IF;
 END LOOP;
 DBMS_SQL.CLOSE_CURSOR(V_SQL_ID);
 IF UTL_FILE.IS_OPEN(V_EXP_FILE)
 THEN
 UTL_FILE.FCLOSE(V_EXP_FILE);
 V_LINE := 'load data infile "' || V_DATA_PATH || V_FILE_NAME || '" into table ' || P_TB_NAME;
 V_LINE := V_LINE || ' fields terminated by "," enclosed by "^" lines terminated by "\r\n";';
 UTL_FILE.PUT_LINE(V_LOAD_FILE, V_LINE);
 UTL_FILE.FFLUSH(V_LOAD_FILE);
 END IF;
 END;
 EXCEPTION
 WHEN OTHERS THEN
 IF DBMS_SQL.IS_OPEN(V_SQL_ID)
 THEN
 DBMS_SQL.CLOSE_CURSOR(V_SQL_ID);
 END IF;
 IF UTL_FILE.IS_OPEN(V_EXP_FILE)
 THEN
 UTL_FILE.FCLOSE(V_EXP_FILE);
 END IF;
 DBMS_OUTPUT.PUT_LINE(SQLERRM);
 DBMS_OUTPUT.PUT_LINE(P_SQL_STMT);
 END;
 END;
BEGIN
 BEGIN
 EXECUTE IMMEDIATE 'create table mysql_etl_tbs(tn varchar2(40),cn varchar2(40),ci number) ';
 EXCEPTION
 WHEN OTHERS THEN
 NULL;
 END;
 EXECUTE IMMEDIATE 'truncate table mysql_etl_tbs';
 DECLARE
 V_CI PLS_INTEGER;
 V_CN VARCHAR2(40);
 V_ETL_COLS VARCHAR2(32767);
 V_TBN VARCHAR2(30);
 V_ETL_CFG VARCHAR2(32767);
 V_CNF_FILE UTL_FILE.FILE_TYPE;
 V_FROM_POS PLS_INTEGER;
 BEGIN
 V_CNF_FILE := UTL_FILE.FOPEN(V_ETL_DIR, 'ETL_TABS.CNF', 'r', 32767);
 LOOP
 UTL_FILE.GET_LINE(V_CNF_FILE, V_ETL_CFG, 32767);
 V_FROM_POS := REGEXP_INSTR(V_ETL_CFG, 'from', 1, 1, 0, 'i');
 V_ETL_COLS := SUBSTR(V_ETL_CFG, 1, V_FROM_POS - 1);
 V_ETL_COLS := REGEXP_SUBSTR(V_ETL_COLS, '(select)(.+)', 1, 1, 'i', 2);
 V_TBN := REGEXP_SUBSTR(V_ETL_CFG, '(\s+from\s+)(\w+)(\s*)', 1, 1, 'i', 2);
 V_TBN := UPPER(V_TBN);
 V_TABS.EXTEND();
 V_TABS(V_TABS.LAST).TBN := V_TBN;
 V_TABS(V_TABS.LAST).WHR := REGEXP_SUBSTR(V_ETL_CFG, '\s+where .+', 1, 1, 'i');
 V_CI := 1;
 LOOP
 V_CN := REGEXP_SUBSTR(V_ETL_COLS, '\S+', 1, V_CI);
 EXIT WHEN V_CN IS NULL;
 V_CN := UPPER(V_CN);
 EXECUTE IMMEDIATE 'insert into mysql_etl_tbs(tn,cn,ci) values(:1,:2,:3)'
 USING V_TBN, V_CN, V_CI;
 COMMIT;
 V_CI := V_CI + 1;
 END LOOP;
 END LOOP;
 EXCEPTION
 WHEN UTL_FILE.INVALID_PATH THEN
 DBMS_OUTPUT.PUT_LINE('指定的目录:ETL_DIR"' || '"无效!');
 RETURN;
 WHEN UTL_FILE.INVALID_FILENAME THEN
 DBMS_OUTPUT.PUT_LINE('指定的文件:" ETL_TABS.CNF' || '"无效!');
 RETURN;
 WHEN NO_DATA_FOUND THEN
 UTL_FILE.FCLOSE(V_CNF_FILE);
 WHEN OTHERS THEN
 DBMS_OUTPUT.PUT_LINE(SQLERRM);
 RETURN;
 END;
 DECLARE
 V_CUR_MATCH SYS_REFCURSOR;
 V_SQL_SMT VARCHAR2(32767);
 V_TN VARCHAR2(40);
 V_CN VARCHAR2(40);
 V_CI PLS_INTEGER;
 V_COLUMN_NAME VARCHAR2(40);
 V_ETL_COLS VARCHAR2(32767);
 V_LINE VARCHAR2(4000);
 V_TBN VARCHAR2(40);
 BEGIN
 V_LOAD_FILE := UTL_FILE.FOPEN(V_ETL_DIR, 'load_data.sql', OPEN_MODE => 'w', MAX_LINESIZE => 32767);
 FOR T_IX IN V_TABS.FIRST .. V_TABS.LAST
 LOOP
 V_SQL_SMT := 'select tn,cn,column_name,ci from ( select * from mysql_etl_tbs where tn='':tbn:'' ) l left join user_tab_columns r on l.tn = r.table_name and l.cn= r.column_name order by ci';
 V_TBN := V_TABS(T_IX).TBN;
 V_SQL_SMT := REPLACE(V_SQL_SMT, ':tbn:', V_TBN);
 V_ETL_COLS := NULL;
 OPEN V_CUR_MATCH FOR V_SQL_SMT;
 LOOP
 FETCH V_CUR_MATCH
 INTO V_TN, V_CN, V_COLUMN_NAME, V_CI;
 EXIT WHEN V_CUR_MATCH%NOTFOUND;
 IF V_CI > 1
 THEN
 V_ETL_COLS := V_ETL_COLS || ' , ';
 END IF;
 IF V_COLUMN_NAME IS NULL
 THEN
 V_ETL_COLS := V_ETL_COLS || ' cast(null as number) ' || V_CN;
 ELSE
 V_ETL_COLS := V_ETL_COLS || V_CN;
 END IF;
 END LOOP;
 CLOSE V_CUR_MATCH;
 V_TBN := LOWER(V_TBN);
 V_SQL_SMT := 'select ' || V_ETL_COLS || ' from ' || V_TBN || V_TABS(T_IX).WHR;
 ETL_DATA(V_SQL_SMT, P_DATA_PATH, V_TBN);
 END LOOP;
 IF UTL_FILE.IS_OPEN(V_LOAD_FILE)
 THEN
 UTL_FILE.FCLOSE(V_LOAD_FILE);
 END IF;
 END;
END P_ETL_ORA_DATA;

总结

下载本文
显示全文
专题