### 背景:
1、 如今关系型数据库种类繁多,虽都使用SQL语言进行操作,但各数据库之间的SQL依然存在一些语法差异
2、 企业的ETL作业大量使用SQL脚本,脚本的运行往往存在依赖关系,而脚本间依赖关系往往需要人工识别
3、 管理或运维人员在分析SQL脚本时,只能用文本编辑器打开整个SQL脚本文件,在杂乱的屎山代码中分析问题
4、 一个SQL脚本文件是最小的运行单位,且一般只能按照从上倒下的顺序,逐个运行脚本中的代码段
### 问题:
1、 能否对不同关系型数据库的SQL语法进行兼容解析?
2、 能否通过对SQL脚本的解析,自动获得脚本间的依赖关系?
3、 分析SQL脚本时,能否先将脚本拆解成代码块,然后层级化、图形化展示,提升代码可读性?
4、 能否对某些SQL大脚本进行自动化步骤分析,拆解成若干个可“并行运行”的执行步骤,提升大脚本的执行效率?
5、 扩展思考:若SQL大脚本的上游依赖,只有个别变动,能否只选择执行大脚本中的某个“分支”步骤?
### 方案:
为了解决以上问题,可以使用ZGLanguage对各关系型数据库的SQL进行语法配置,配置文件为 MARK_SQLS.syn 。
目前已经完成对几款主流数据库(Hive、Greenplum、DWS、Oracle、Mysql、Hana)的主要SQL语法的配置,不仅可识别“增删改查”等基础语法,还可以识别“判断、循环、游标、赋值”等语法。
在支持跨数据库SQL语法解析的同时,还提供了SQL代码拆解标注功能,通过此功能,用户可以对任意SQL脚本进行拆解分析。
这里提供了一个SQL脚本拆解分析的案例,使用Python语言编写(split_etl.py),通过对ZGLanguage标注后的结果拆解分析,得到一个Json结果,说明如下:
{
SQL_FILE_NAME : SQL脚本文件名
, DEPEND_TABLES : SQL脚本依赖表清单
, CREATE_TABLES : SQL脚本创建表清单
, UPDATE_INSERT_TABLES : SQL脚本更改表清单
, SQL_SPLIT_INFO : [ SQL脚本里每个SQL代码段拆解信息 :
{ SQL_SEQ : 序号 : 1,2,3 ...
, SQL_TYPE : 类型 : __CREATE_TABLE_SELECT__, __INSERT_TABLE_SELECT__, __DELETE_TABLE__, __IF__ ....
, SQL_CODE : 完整代码
, LVL_STRUCT : 代码的层级结构
, LVL_CODE : 层级结构的代码
, TAR_TAB : 目标表
, SRC_TAB : 来源表
}
,
{
.................
}
]
, SQL_DEPEND_LVL : SQL脚本代码段步骤(序号)依赖层级
}参数使用建议:
1、 使用 DEPEND_TABLES、 CREATE_TABLES、 UPDATE_INSERT_TABLES,可以建立起各SQL脚本(包括存储过程)之间的依赖关系,辅助实现调度工具依赖自动化配置
2、 使用 SQL_SPLIT_INFO,可以对SQL脚本进行层级化、图形化展示,提升代码可读性
3、 使用 SQL_DEPEND_LVL,可以辅助实现大脚本中的: a、代码段并行; b、选择“分支”步骤运行功能
### 案例代码如下 :
# 假设存储过程代码(proc_test.prc):
CREATE OR REPLACE PROCEDURE PROC_F_CWWS_LOAN
(
P_AS_OF_DATE IN DATE default date'20200101',
RET_FLG OUT VARCHAR2,
RET_MSG OUT VARCHAR2
) IS
/******************************************************************************
功能描述:xxxx业务数据ETL处理
源 表:
目 标 表:MA_F_LOAN
备 注:
******************************************************************************/
-- 声明变量并初始化
V_COUNT NUMBER := 0;
V_PROC_NAME VARCHAR2(200) := 'PROC_F_CWWS_LOAN';
V_PROC_DESC VARCHAR2(100) := 'xxxx业务数据ETL处理';
V_P_FREQ VARCHAR2(4) := '';
BEGIN
--设置会话日期格式
EXECUTE IMMEDIATE ' ALTER SESSION SET NLS_DATE_FORMAT = ''YYYY-MM-DD''';
--查询参数表中,该程序对应的频率值
SELECT P_FREQ
INTO V_P_FREQ
FROM ETL_PROC_STATUS_DEF
WHERE PROC_NAME = V_PROC_NAME;
--判断是调度频率
IF P_AS_OF_DATE = FUNC_GET_FREQ_DAYS(P_AS_OF_DATE, V_P_FREQ) THEN
--调用分区维护程序
ETL.ETL_ADD_PARTITION('MA_F_LOAN', P_AS_OF_DATE, 'ETL');
--删除取上下次支付日临时表
DELETE TMP_XD_LAST_PAYDATE;
DELETE TMP_XD_NEXT_PAYDATE;
COMMIT;
--从还款计划表中取每笔账户最近一次小于等于数据日期还款日,作为上次还款日
INSERT INTO ETL.TMP_XD_LAST_PAYDATE
(OBJECTNO, LAST_PAYDATE)
SELECT OBJECTNO, LAST_PAYDATE
FROM (SELECT T.OBJECTNO,
MAX(TO_DATE(PAYDATE, 'YYYY-MM-DD')) LAST_PAYDATE
FROM NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE T
WHERE T.AS_OF_DATE = P_AS_OF_DATE
AND T.SEQID <> '999'
AND TO_DATE(T.PAYDATE, 'YYYY-MM-DD') < P_AS_OF_DATE
GROUP BY T.OBJECTNO);
--从还款计划表中取每笔账户最近一次大于数据日期还款日,作为下次还款日
INSERT INTO ETL.TMP_XD_NEXT_PAYDATE
(OBJECTNO, NEXT_PAYDATE)
SELECT OBJECTNO, NEXT_PAYDATE
FROM (SELECT T.OBJECTNO,
MIN(TO_DATE(PAYDATE, 'YYYY-MM-DD')) NEXT_PAYDATE
FROM NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE T
WHERE T.AS_OF_DATE = P_AS_OF_DATE
AND T.SEQID <> '999'
AND TO_DATE(T.PAYDATE, 'YYYY-MM-DD') >= P_AS_OF_DATE
GROUP BY T.OBJECTNO);
COMMIT;
MERGE INTO ETL.MA_F_LOAN A
USING (SELECT /*+PARALLEL(8)*/
T.ACCOUNT_NUMBER, T.GL_ACCOUNT_ID, T.INT_GL_ACCOUNT_ID
FROM ETL.MA_F_LOAN T
INNER JOIN ETL.MA_D_GL_SUBJECT T1
ON T.INT_GL_ACCOUNT_ID = T1.SUBJECT_NO3
AND T1.SUBJECT_NAME3 LIKE '%已减值%'
AND T1.AS_OF_DATE = P_AS_OF_DATE
WHERE T.AS_OF_DATE = P_AS_OF_DATE
AND T.ACCOUNT_NUMBER IN
(SELECT ACCOUNT_NUMBER
FROM (SELECT /*+PARALLEL(8)*/
T2.ACCOUNT_NUMBER, COUNT(1)
FROM ETL.MA_F_LOAN T2
WHERE T2.AS_OF_DATE = P_AS_OF_DATE
GROUP BY T2.ACCOUNT_NUMBER
HAVING COUNT(1) > 1))) B
ON (A.ACCOUNT_NUMBER = B.ACCOUNT_NUMBER AND A.AS_OF_DATE = P_AS_OF_DATE AND A.GL_ACCOUNT_ID = B.GL_ACCOUNT_ID AND A.INT_GL_ACCOUNT_ID = B.INT_GL_ACCOUNT_ID)
WHEN MATCHED THEN
UPDATE SET A.CUR_BOOK_BAL = 0, A.OVERDUE_BAL = 0;
--更新逾期xxxx上下次重订价日及重订价频率为起息日、到期日
UPDATE MA_F_LOAN A
SET LAST_REPRICE_DATE = A.ORIGINATION_DATE,
NEXT_REPRICE_DATE = A.MATURITY_DATE,
REPRICE_FREQ = A.ORG_TERM,
REPRICE_FREQ_MULT = A.ORG_TERM_MULT,
ADJUSTABLE_TYPE_CD = 0
WHERE A.MATURITY_DATE <= P_AS_OF_DATE
AND A.CUR_BOOK_BAL <> 0
;
END IF;
INSERT INTO M_RUNLOG VALUES (SYSDATE, V_PROC_NAME, 'it is 10');
COMMIT;
EXCEPTION
WHEN OTHERS THEN
--写入异常日志
call ETL.PROC_ETL_LOG(P_AS_OF_DATE,V_PROC_NAME,V_PROC_DESC,V_COUNT,-1,SQLCODE,SQLERRM);
RET_MSG := SQLCODE || ':' || SQLERRM;
END;
/# 执行标注命令: ZGLanguage -e SPLIT_ETL/MARK_SQLS.syn -t proc_test.prc -o mark_sql.zgl > log.log
得到标注结果 mark_sql.zgl 内容如下 :
__CREATE_PROCEDURE_HEAD__{:::}||PROC_F_CWWS_LOAN{:::}
CREATE OR REPLACE PROCEDURE PROC_F_CWWS_LOAN
(
P_AS_OF_DATE IN DATE default date '20200101' ,
RET_FLG OUT VARCHAR2 ,
RET_MSG OUT VARCHAR2
) IS
V_COUNT NUMBER := 0 ;
V_PROC_NAME VARCHAR2(200) := 'PROC_F_CWWS_LOAN' ;
V_PROC_DESC VARCHAR2(100) := 'xxxx业务数据ETL处理' ;
V_P_FREQ VARCHAR2(4) := '' ;
BEGIN
{;;;}
__EXECUTE__{:::}
EXECUTE IMMEDIATE ' ALTER SESSION SET NLS_DATE_FORMAT = ' 'YYYY-MM-DD' ''
{;;;}
__SELECT_INTO__{:::}V_P_FREQ{:::}
SELECT P_FREQ
INTO V_P_FREQ
FROM {###}<srctab>{###}ETL_PROC_STATUS_DEF{###}</srctab>{###}
WHERE PROC_NAME = V_PROC_NAME
{;;;}
__IF__{:::}
IF P_AS_OF_DATE = FUNC_GET_FREQ_DAYS(P_AS_OF_DATE, V_P_FREQ) THEN
{;;;}
__RUN_PROC_FUN__{:::}
ETL.ETL_ADD_PARTITION( 'MA_F_LOAN' , P_AS_OF_DATE , 'ETL' )
{;;;}
__DELETE_TABLE__{:::}||TMP_XD_LAST_PAYDATE{:::}
DELETE TMP_XD_LAST_PAYDATE
{;;;}
__DELETE_TABLE__{:::}||TMP_XD_NEXT_PAYDATE{:::}
DELETE TMP_XD_NEXT_PAYDATE
{;;;}
__COMMIT__{:::}commit{;;;}
__INSERT_TABLE_SELECT__{:::}ETL||TMP_XD_LAST_PAYDATE{:::}
INSERT INTO ETL.TMP_XD_LAST_PAYDATE
(OBJECTNO, LAST_PAYDATE)
SELECT OBJECTNO, LAST_PAYDATE
FROM {###}<subsel>{###} SELECT T.OBJECTNO,
MAX(TO_DATE(PAYDATE, 'YYYY-MM-DD')) LAST_PAYDATE
FROM {###}<srctab>{###}NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE{###}</srctab>{###} T
WHERE T.AS_OF_DATE = P_AS_OF_DATE
AND T.SEQID <> '999'
AND TO_DATE(T.PAYDATE, 'YYYY-MM-DD') < P_AS_OF_DATE
GROUP BY T.OBJECTNO {###}</subsel>{###}
{;;;}
__INSERT_TABLE_SELECT__{:::}ETL||TMP_XD_NEXT_PAYDATE{:::}
INSERT INTO ETL.TMP_XD_NEXT_PAYDATE
(OBJECTNO, NEXT_PAYDATE)
SELECT OBJECTNO, NEXT_PAYDATE
FROM {###}<subsel>{###} SELECT T.OBJECTNO,
MIN(TO_DATE(PAYDATE, 'YYYY-MM-DD')) NEXT_PAYDATE
FROM {###}<srctab>{###}NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE{###}</srctab>{###} T
WHERE T.AS_OF_DATE = P_AS_OF_DATE
AND T.SEQID <> '999'
AND TO_DATE(T.PAYDATE, 'YYYY-MM-DD') >= P_AS_OF_DATE
GROUP BY T.OBJECTNO {###}</subsel>{###}
{;;;}
__COMMIT__{:::}commit{;;;}
__MERGE_TABLE__{:::}ETL||MA_F_LOAN{:::}
MERGE INTO ETL.MA_F_LOAN A
USING {###}<subsel>{###} SELECT
T.ACCOUNT_NUMBER, T.GL_ACCOUNT_ID, T.INT_GL_ACCOUNT_ID
FROM {###}<srctab>{###}ETL.MA_F_LOAN{###}</srctab>{###} T
INNER JOIN {###}<srctab>{###}ETL.MA_D_GL_SUBJECT{###}</srctab>{###} T1
ON T.INT_GL_ACCOUNT_ID = T1.SUBJECT_NO3
AND T1.SUBJECT_NAME3 LIKE '%已减值%'
AND T1.AS_OF_DATE = P_AS_OF_DATE
WHERE T.AS_OF_DATE = P_AS_OF_DATE
AND T.ACCOUNT_NUMBER IN
(SELECT ACCOUNT_NUMBER
FROM {###}<subsel>{###} SELECT
T2.ACCOUNT_NUMBER, COUNT(1)
FROM {###}<srctab>{###}ETL.MA_F_LOAN{###}</srctab>{###} T2
WHERE T2.AS_OF_DATE = P_AS_OF_DATE
GROUP BY T2.ACCOUNT_NUMBER
HAVING COUNT(1) > 1 {###}</subsel>{###} ) {###}</subsel>{###} B
ON (A.ACCOUNT_NUMBER = B.ACCOUNT_NUMBER AND A.AS_OF_DATE = P_AS_OF_DATE AND A.GL_ACCOUNT_ID = B.GL_ACCOUNT_ID AND A.INT_GL_ACCOUNT_ID = B.INT_GL_ACCOUNT_ID )
WHEN MATCHED THEN
UPDATE SET A.CUR_BOOK_BAL = 0, A.OVERDUE_BAL = 0
{;;;}
__UPDATE_TABLE__{:::}||MA_F_LOAN{:::}
UPDATE MA_F_LOAN A
SET LAST_REPRICE_DATE = A.ORIGINATION_DATE ,
NEXT_REPRICE_DATE = A.MATURITY_DATE ,
REPRICE_FREQ = A.ORG_TERM ,
REPRICE_FREQ_MULT = A.ORG_TERM_MULT ,
ADJUSTABLE_TYPE_CD = 0
WHERE A.MATURITY_DATE <= P_AS_OF_DATE
AND A.CUR_BOOK_BAL <> 0
{;;;}
__END_IF__{:::}END IF{;;;}
__INSERT_INTO_VALUES__{:::}||M_RUNLOG{:::}
INSERT INTO M_RUNLOG VALUES (SYSDATE, V_PROC_NAME, 'it is 10')
{;;;}
__COMMIT__{:::}commit{;;;}
__EXCEPTION_WHEN__{:::}EXCEPTION
WHEN OTHERS THEN{;;;}
__RUN_PROC_FUN__{:::}call ETL.PROC_ETL_LOG(P_AS_OF_DATE ,V_PROC_NAME ,V_PROC_DESC ,V_COUNT ,-1 ,SQLCODE ,SQLERRM ) {;;;}
__DECLARE_VAR__{:::}RET_MSG{:::}RET_MSG := SQLCODE || ':' || SQLERRM
{;;;}
__END__{:::}end{;;;}# 使用 Python 对以上标注代码进行解析,获得Json结果如下 :
{'SQL_FILE_NAME': 'proc_test.prc',
'DEPEND_TABLES': [{'SCH': 'NYBDP', 'NAME': 'O_CWWS_ACCT_PAYMENT_SCHEDULE'},
{'SCH': 'ETL', 'NAME': 'MA_D_GL_SUBJECT'},
{'SCH': 'ETL', 'NAME': 'MA_F_LOAN'}],
'CREATE_TABLES': [],
'UPDATE_INSERT_TABLES': [{'SCH': 'ETL', 'NAME': 'TMP_XD_LAST_PAYDATE'},
{'SCH': 'ETL', 'NAME': 'TMP_XD_NEXT_PAYDATE'},
{'SCH': 'ETL', 'NAME': 'MA_F_LOAN'},
{'SCH': '', 'NAME': 'M_RUNLOG'}],
'SQL_SPLIT_INFO': [{'SQL_SEQ': 1, 'SQL_TYPE': '__CREATE_PROCEDURE_HEAD__'},
{'SQL_SEQ': 2, 'SQL_TYPE': '__EXECUTE__'},
{'SQL_SEQ': 3, 'SQL_TYPE': '__SELECT_INTO__'},
{'SQL_SEQ': 4, 'SQL_TYPE': '__IF__'},
{'SQL_SEQ': 5, 'SQL_TYPE': '__RUN_PROC_FUN__'},
{'SQL_SEQ': 6, 'SQL_TYPE': '__DELETE_TABLE__',
'SQL_CODE': 'DELETE TMP_XD_LAST_PAYDATE',
'LVL_STRUCT': {'TMP_XD_LAST_PAYDATE': []},
'LVL_CODE': {'TMP_XD_LAST_PAYDATE': 'DELETE TMP_XD_LAST_PAYDATE'},
'TAR_TAB': [{'SCH': '', 'NAME': 'TMP_XD_LAST_PAYDATE'}],
'SRC_TAB': []},
{'SQL_SEQ': 7, 'SQL_TYPE': '__DELETE_TABLE__',
'SQL_CODE': 'DELETE TMP_XD_NEXT_PAYDATE',
'LVL_STRUCT': {'TMP_XD_NEXT_PAYDATE': []},
'LVL_CODE': {'TMP_XD_NEXT_PAYDATE': 'DELETE TMP_XD_NEXT_PAYDATE'},
'TAR_TAB': [{'SCH': '', 'NAME': 'TMP_XD_NEXT_PAYDATE'}],
'SRC_TAB': []},
{'SQL_SEQ': 8, 'SQL_TYPE': '__COMMIT__'},
{'SQL_SEQ': 9, 'SQL_TYPE': '__INSERT_TABLE_SELECT__',
'SQL_CODE': "INSERT INTO ETL.TMP_XD_LAST_PAYDATE\n (OBJECTNO, LAST_PAYDATE) \n SELECT OBJECTNO, LAST_PAYDATE\n FROM ( SELECT T.OBJECTNO,\n MAX(TO_DATE(PAYDATE, 'YYYY-MM-DD')) LAST_PAYDATE\n FROM NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE T\n WHERE T.AS_OF_DATE = P_AS_OF_DATE\n AND T.SEQID <> '999'\n AND TO_DATE(T.PAYDATE, 'YYYY-MM-DD') < P_AS_OF_DATE\n GROUP BY T.OBJECTNO )",
'LVL_STRUCT': {'__SUB_SELECT_1__': ['NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE'], 'ETL.TMP_XD_LAST_PAYDATE': ['__SUB_SELECT_1__']},
'LVL_CODE': {'__SUB_SELECT_1__': " SELECT T.OBJECTNO,\n MAX(TO_DATE(PAYDATE, 'YYYY-MM-DD')) LAST_PAYDATE\n FROM NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE T\n WHERE T.AS_OF_DATE = P_AS_OF_DATE\n AND T.SEQID <> '999'\n AND TO_DATE(T.PAYDATE, 'YYYY-MM-DD') < P_AS_OF_DATE\n GROUP BY T.OBJECTNO ", 'ETL.TMP_XD_LAST_PAYDATE': 'INSERT INTO ETL.TMP_XD_LAST_PAYDATE\n (OBJECTNO, LAST_PAYDATE) \n SELECT OBJECTNO, LAST_PAYDATE\n FROM __SUB_SELECT_1__'},
'TAR_TAB': [{'SCH': 'ETL', 'NAME': 'TMP_XD_LAST_PAYDATE'}],
'SRC_TAB': [{'SCH': 'NYBDP', 'NAME': 'O_CWWS_ACCT_PAYMENT_SCHEDULE'}]},
{'SQL_SEQ': 10, 'SQL_TYPE': '__INSERT_TABLE_SELECT__',
'SQL_CODE': "INSERT INTO ETL.TMP_XD_NEXT_PAYDATE\n (OBJECTNO, NEXT_PAYDATE) \n SELECT OBJECTNO, NEXT_PAYDATE\n FROM ( SELECT T.OBJECTNO,\n MIN(TO_DATE(PAYDATE, 'YYYY-MM-DD')) NEXT_PAYDATE\n FROM NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE T\n WHERE T.AS_OF_DATE = P_AS_OF_DATE\n AND T.SEQID <> '999'\n AND TO_DATE(T.PAYDATE, 'YYYY-MM-DD') >= P_AS_OF_DATE\n GROUP BY T.OBJECTNO )",
'LVL_STRUCT': {'__SUB_SELECT_2__': ['NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE'], 'ETL.TMP_XD_NEXT_PAYDATE': ['__SUB_SELECT_2__']},
'LVL_CODE': {'__SUB_SELECT_2__': " SELECT T.OBJECTNO,\n MIN(TO_DATE(PAYDATE, 'YYYY-MM-DD')) NEXT_PAYDATE\n FROM NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE T\n WHERE T.AS_OF_DATE = P_AS_OF_DATE\n AND T.SEQID <> '999'\n AND TO_DATE(T.PAYDATE, 'YYYY-MM-DD') >= P_AS_OF_DATE\n GROUP BY T.OBJECTNO ", 'ETL.TMP_XD_NEXT_PAYDATE': 'INSERT INTO ETL.TMP_XD_NEXT_PAYDATE\n (OBJECTNO, NEXT_PAYDATE) \n SELECT OBJECTNO, NEXT_PAYDATE\n FROM __SUB_SELECT_2__'},
'TAR_TAB': [{'SCH': 'ETL', 'NAME': 'TMP_XD_NEXT_PAYDATE'}],
'SRC_TAB': [{'SCH': 'NYBDP', 'NAME': 'O_CWWS_ACCT_PAYMENT_SCHEDULE'}]},
{'SQL_SEQ': 11, 'SQL_TYPE': '__COMMIT__'},
{'SQL_SEQ': 12, 'SQL_TYPE': '__MERGE_TABLE__',
'SQL_CODE': "MERGE INTO ETL.MA_F_LOAN A\n USING ( SELECT\n T.ACCOUNT_NUMBER, T.GL_ACCOUNT_ID, T.INT_GL_ACCOUNT_ID\n FROM ETL.MA_F_LOAN T\n INNER JOIN ETL.MA_D_GL_SUBJECT T1\n ON T.INT_GL_ACCOUNT_ID = T1.SUBJECT_NO3\n AND T1.SUBJECT_NAME3 LIKE '%已减值%'\n AND T1.AS_OF_DATE = P_AS_OF_DATE\n WHERE T.AS_OF_DATE = P_AS_OF_DATE\n AND T.ACCOUNT_NUMBER IN\n (SELECT ACCOUNT_NUMBER\n FROM ( SELECT\n T2.ACCOUNT_NUMBER, COUNT(1)\n FROM ETL.MA_F_LOAN T2\n WHERE T2.AS_OF_DATE = P_AS_OF_DATE\n GROUP BY T2.ACCOUNT_NUMBER\n HAVING COUNT(1) > 1 ) ) ) B\n ON (A.ACCOUNT_NUMBER = B.ACCOUNT_NUMBER AND A.AS_OF_DATE = P_AS_OF_DATE AND A.GL_ACCOUNT_ID = B.GL_ACCOUNT_ID AND A.INT_GL_ACCOUNT_ID = B.INT_GL_ACCOUNT_ID )\n WHEN MATCHED THEN\n UPDATE SET A.CUR_BOOK_BAL = 0, A.OVERDUE_BAL = 0",
'LVL_STRUCT': {'__SUB_SELECT_3__': ['ETL.MA_F_LOAN'], '__SUB_SELECT_4__': ['ETL.MA_F_LOAN', 'ETL.MA_D_GL_SUBJECT', '__SUB_SELECT_3__'], 'ETL.MA_F_LOAN': ['__SUB_SELECT_4__']},
'LVL_CODE': {'__SUB_SELECT_3__': ' SELECT\n T2.ACCOUNT_NUMBER, COUNT(1)\n FROM ETL.MA_F_LOAN T2\n WHERE T2.AS_OF_DATE = P_AS_OF_DATE\n GROUP BY T2.ACCOUNT_NUMBER\n HAVING COUNT(1) > 1 ', '__SUB_SELECT_4__': " SELECT\n T.ACCOUNT_NUMBER, T.GL_ACCOUNT_ID, T.INT_GL_ACCOUNT_ID\n FROM ETL.MA_F_LOAN T\n INNER JOIN ETL.MA_D_GL_SUBJECT T1\n ON T.INT_GL_ACCOUNT_ID = T1.SUBJECT_NO3\n AND T1.SUBJECT_NAME3 LIKE '%已减值%'\n AND T1.AS_OF_DATE = P_AS_OF_DATE\n WHERE T.AS_OF_DATE = P_AS_OF_DATE\n AND T.ACCOUNT_NUMBER IN\n (SELECT ACCOUNT_NUMBER\n FROM __SUB_SELECT_3__ ) ", 'ETL.MA_F_LOAN': 'MERGE INTO ETL.MA_F_LOAN A\n USING __SUB_SELECT_4__ B\n ON (A.ACCOUNT_NUMBER = B.ACCOUNT_NUMBER AND A.AS_OF_DATE = P_AS_OF_DATE AND A.GL_ACCOUNT_ID = B.GL_ACCOUNT_ID AND A.INT_GL_ACCOUNT_ID = B.INT_GL_ACCOUNT_ID )\n WHEN MATCHED THEN\n UPDATE SET A.CUR_BOOK_BAL = 0, A.OVERDUE_BAL = 0'},
'TAR_TAB': [{'SCH': 'ETL', 'NAME': 'MA_F_LOAN'}],
'SRC_TAB': [{'SCH': 'ETL', 'NAME': 'MA_D_GL_SUBJECT'}, {'SCH': 'ETL', 'NAME': 'MA_F_LOAN'}]},
{'SQL_SEQ': 13, 'SQL_TYPE': '__UPDATE_TABLE__',
'SQL_CODE': 'UPDATE MA_F_LOAN A\n SET LAST_REPRICE_DATE = A.ORIGINATION_DATE ,\n NEXT_REPRICE_DATE = A.MATURITY_DATE ,\n REPRICE_FREQ = A.ORG_TERM ,\n REPRICE_FREQ_MULT = A.ORG_TERM_MULT ,\n ADJUSTABLE_TYPE_CD = 0\n WHERE A.MATURITY_DATE <= P_AS_OF_DATE\n AND A.CUR_BOOK_BAL <> 0',
'LVL_STRUCT': {'MA_F_LOAN': []},
'LVL_CODE': {'MA_F_LOAN': 'UPDATE MA_F_LOAN A\n SET LAST_REPRICE_DATE = A.ORIGINATION_DATE ,\n NEXT_REPRICE_DATE = A.MATURITY_DATE ,\n REPRICE_FREQ = A.ORG_TERM ,\n REPRICE_FREQ_MULT = A.ORG_TERM_MULT ,\n ADJUSTABLE_TYPE_CD = 0\n WHERE A.MATURITY_DATE <= P_AS_OF_DATE\n AND A.CUR_BOOK_BAL <> 0'},
'TAR_TAB': [{'SCH': '', 'NAME': 'MA_F_LOAN'}],
'SRC_TAB': []},
{'SQL_SEQ': 14, 'SQL_TYPE': '__END_IF__'},
{'SQL_SEQ': 15, 'SQL_TYPE': '__INSERT_INTO_VALUES__', 'SQL_CODE': "INSERT INTO M_RUNLOG VALUES (SYSDATE, V_PROC_NAME, 'it is 10')", 'LVL_STRUCT': {'M_RUNLOG': []}, 'LVL_CODE': {'M_RUNLOG': "INSERT INTO M_RUNLOG VALUES (SYSDATE, V_PROC_NAME, 'it is 10')"}, 'TAR_TAB': [{'SCH': '', 'NAME': 'M_RUNLOG'}], 'SRC_TAB': []}, {'SQL_SEQ': 16, 'SQL_TYPE': '__COMMIT__'}, {'SQL_SEQ': 17, 'SQL_TYPE': '__EXCEPTION_WHEN__'},
{'SQL_SEQ': 18, 'SQL_TYPE': '__RUN_PROC_FUN__'},
{'SQL_SEQ': 19, 'SQL_TYPE': '__DECLARE_VAR__'},
{'SQL_SEQ': 20, 'SQL_TYPE': '__END__'}],
'SQL_DEPEND_LVL': [{6: {}, 7: {}, 12: {}, 15: {}},
{9: {6}, 10: {7}, 13: {12}}
]
}# 将以上两个步骤(1、执行标注命令;2、解析标注结果)封装成一个Python代码文件(split_etl.py),可得:
# -*- coding: utf-8 -*-
import sys
import subprocess
import json
###############################################################
id_new_select = 0
def make_sql_struct(mark_sql, tartab) :
sql_info_list = mark_sql.split('{###}')
srctab_4_levels = []
onelvl_srctab = []
lvl_struct = {}
mid_sql_list = []
lvl_code = {}
all_srctab = set()
global id_new_select
while sql_info_list:
one_node = sql_info_list.pop(0)
#################################################
# with select
if one_node == '<withas>' :
srctab_4_levels.append(onelvl_srctab)
onelvl_srctab = []
mid_sql_list.append(one_node)
elif one_node == '</withas>':
sql_text = ''
tmp_pop_one = mid_sql_list.pop(-1)
while tmp_pop_one != '<as/>' : # '<withas>':
sql_text = tmp_pop_one + sql_text
tmp_pop_one = mid_sql_list.pop(-1)
withname = mid_sql_list.pop(-1)
mid_sql_list.pop(-1) # pass
lvl_struct[withname] = onelvl_srctab
onelvl_srctab = srctab_4_levels.pop(-1)
lvl_code[withname] = sql_text
#################################################
# sub select
elif one_node == '<subsel>' :
srctab_4_levels.append(onelvl_srctab)
onelvl_srctab = []
mid_sql_list.append(one_node)
elif one_node == '</subsel>' :
id_new_select += 1
new_sub_select_id = '__SUB_SELECT_%d__' % id_new_select
# 新增子查询ID添加到上层SQL的源表
srctab_4_levels[-1].append(new_sub_select_id)
sql_text = ''
tmp_pop_one = mid_sql_list.pop(-1)
while tmp_pop_one != '<subsel>':
sql_text = tmp_pop_one + sql_text
tmp_pop_one = mid_sql_list.pop(-1)
lvl_struct[new_sub_select_id] = onelvl_srctab
onelvl_srctab = srctab_4_levels.pop(-1)
lvl_code[new_sub_select_id] = sql_text
mid_sql_list[-1] += new_sub_select_id
#################################################
# union select
elif one_node == '<union>' :
srctab_4_levels.append(onelvl_srctab)
onelvl_srctab = []
mid_sql_list.append(one_node)
elif one_node == '</union>' :
id_new_select += 1
new_union_select_id = '__UNION_SELECT_%d__' % id_new_select
# 新增子查询ID添加到上层SQL的源表
srctab_4_levels[-1].append(new_union_select_id)
sql_text = ''
tmp_pop_one = mid_sql_list.pop(-1)
while tmp_pop_one != '<union>':
sql_text = tmp_pop_one + sql_text
tmp_pop_one = mid_sql_list.pop(-1)
lvl_struct[new_union_select_id] = onelvl_srctab
onelvl_srctab = srctab_4_levels.pop(-1)
lvl_code[new_union_select_id] = sql_text
mid_sql_list[-1] += '\nSELECT * FROM ' + new_union_select_id
#################################################
# sourc table
elif one_node == '<srctab>':
src_tab_name = sql_info_list.pop(0)
onelvl_srctab.append(src_tab_name)
sql_info_list.pop(0) # </srctab>
mid_sql_list[-1] += src_tab_name
all_srctab.add(src_tab_name.upper())
#################################################
# sourc function
elif one_node == '<srcfun>':
src_fun_name = sql_info_list.pop(0).split('(')[0]
onelvl_srctab.append(src_fun_name)
sql_info_list.pop(0) # </srcfun>
mid_sql_list[-1] += src_fun_name
all_srctab.add(src_fun_name.upper())
else:
mid_sql_list.append(one_node)
lvl_struct[tartab] = onelvl_srctab
lvl_code[tartab] = ''.join(mid_sql_list)
###############################################
tartablist = []
for one in tartab.split(',') :
schema_tartab = one.split('.')
# print('schema_tartab=', schema_tartab)
if len(schema_tartab) == 1 :
tartablist.append( {'SCH': '', 'NAME': schema_tartab[0]} )
else :
tartablist.append( {'SCH': schema_tartab[0], 'NAME': schema_tartab[-1]} )
srctablist = []
for one in all_srctab :
schema_tartab = one.split('.')
# print('schema_tartab=', schema_tartab)
if len(schema_tartab) == 1 :
srctablist.append( {'SCH': '', 'NAME': schema_tartab[0]} )
else :
srctablist.append( {'SCH': schema_tartab[0], 'NAME': schema_tartab[-1]} )
# print('lvl_struct = ', lvl_struct)
# print('lvl_code = ', lvl_code)
# print('tartablist = ', tartablist)
# print('srctablist = ', srctablist)
return {
'LVL_STRUCT' : lvl_struct
, 'LVL_CODE' : lvl_code
, 'TAR_TAB' : tartablist
, 'SRC_TAB' : srctablist
}
def split_etl(etl_file) :
####################################
### 对 SQL 文件进行标注转换
run_result = subprocess.run(['ZGLanguage', '-e', 'SPLIT_ETL/MARK_SQLS.syn'
, '-t', etl_file
, '-o', 'mark_sql.zgl']
, capture_output=True
# , stdout=log_file
, encoding='utf-8'
, text=True
)
with open("run_zgl.log", "w", encoding='utf-8') as log_file:
log_file.write(run_result.stdout)
last_log = run_result.stdout[0:500]
if ' ERROR !!!' in last_log or ' WARNING !!!' in last_log:
print(run_result.stdout[0:500])
sys.exit(-1)
#########################################
sql_file_info = {
'SQL_FILE_NAME': etl_file.replace('/', ' ').replace('\\', ' ').split(' ')[-1]
, 'SQL_SPLIT_INFO': []
, 'DEPEND_TABLES': []
, 'CREATE_TABLES': []
, 'UPDATE_INSERT_TABLES': []
, 'SQL_DEPEND_LVL': []
}
#########################################
### get SQL_SPLIT_INFO
sql_seq = 0
with (open('mark_sql.zgl', 'r', encoding='utf-8') as fs) :
sql_tran_list = fs.read().split('{;;;}')
for one_tran_sql in sql_tran_list :
# print('one_tran_sql=', one_tran_sql)
if not one_tran_sql :
continue
#############################
# 语句顺序ID
sql_seq += 1
one_split_info = {'SQL_SEQ' : sql_seq}
#############################
# 语句类型
tran_info = one_tran_sql.strip().split('{:::}')
one_split_info['SQL_TYPE'] = tran_info[0]
#############################
if tran_info[0] in ( '__CREATE_TABLE_STRUCT__', '__CREATE_TABLE_LIKE__', '__CREATE_TABLE_SELECT__', '__NAME_EQ_SELECT__'
, '__INSERT_TABLE_SELECT__', '__INSERT_INTO_VALUES__', '__UPDATE_TABLE__', '__MERGE_TABLE__'
, '__DELETE_TABLE__', '__TRUNCATE_TABLE__', '__DROP_TABLE__'
) :
schema_tartab = tran_info[1].split('||')
# print("schema_tartab = ", schema_tartab)
tartab = ''
if schema_tartab[0] :
tartab = (schema_tartab[0] +'.'+ schema_tartab[1]).upper()
else :
tartab = schema_tartab[1].upper()
# 还原语句代码
sql_code = tran_info[2].replace('{###}<union>{###}','').replace('{###}</union>{###}',''
).replace('{###}<intersect>{###}','').replace('{###}</intersect>{###}',''
).replace('{###}<minus>{###}','').replace('{###}</minus>{###}',''
).replace('{###}<except>{###}','').replace('{###}</except>{###}',''
).replace('{###}<subsel>{###}','(').replace('{###}</subsel>{###}',')'
).replace('{###}<withas>{###}','WHIT ').replace('{###}<as/>{###}',' AS (').replace('{###}</withas>{###}',')'
).replace('{###}<srctab>{###}','').replace('{###}</srctab>{###}',''
).replace('{###}<srcfun>{###}','').replace('{###}</srcfun>{###}','')
# print('SQL_CODE :\n', sql_code)
one_split_info['SQL_CODE'] = sql_code
# 拆解生成 SQL 代码块信息
one_split_info.update(make_sql_struct(tran_info[2], tartab))
elif tran_info[0] == '__JUST_SELECT__' :
tartab = '__JUST_SELECT_%d__' % id_new_select
# 还原语句代码
sql_code = tran_info[1].replace('{###}<union>{###}','').replace('{###}</union>{###}',''
).replace('{###}<intersect>{###}','').replace('{###}</intersect>{###}',''
).replace('{###}<minus>{###}','').replace('{###}</minus>{###}',''
).replace('{###}<except>{###}','').replace('{###}</except>{###}',''
).replace('{###}<subsel>{###}','(').replace('{###}</subsel>{###}',')'
).replace('{###}<withas>{###}','WHIT ').replace('{###}<as/>{###}',' AS (').replace('{###}</withas>{###}',')'
).replace('{###}<srctab>{###}','').replace('{###}</srctab>{###}',''
).replace('{###}<srcfun>{###}','').replace('{###}</srcfun>{###}','')
# print('SQL_CODE :\n', sql_code)
one_split_info['SQL_CODE'] = sql_code
# 拆解生成 SQL 代码块信息
one_split_info.update(make_sql_struct(tran_info[1], tartab))
elif tran_info[0] == '__FROM_INSERT__' :
tran_info.pop(0)
sql_code = tran_info.pop(0)
tartab_list = set()
while tran_info :
schema_tartab = tran_info.pop(0).split('||')
tartab = ''
if schema_tartab[0] :
tartab = schema_tartab[0] +'.'+ schema_tartab[1]
else :
tartab = schema_tartab[1]
tartab_list.add(tartab.upper())
##################################
sql_code += tran_info.pop(0)
# print('sql_code = ', sql_code)
one_split_info['SQL_CODE'] = sql_code.replace('{###}<union>{###}','').replace('{###}</union>{###}',''
).replace('{###}<intersect>{###}','').replace('{###}</intersect>{###}',''
).replace('{###}<minus>{###}','').replace('{###}</minus>{###}',''
).replace('{###}<except>{###}','').replace('{###}</except>{###}',''
).replace('{###}<subsel>{###}','(').replace('{###}</subsel>{###}',')'
).replace('{###}<withas>{###}','WHIT ').replace('{###}<as/>{###}',' AS (').replace('{###}</withas>{###}',')'
).replace('{###}<srctab>{###}','').replace('{###}</srctab>{###}',''
).replace('{###}<srcfun>{###}','').replace('{###}</srcfun>{###}','')
tartab = ','.join(tartab_list)
# print('tartab = ', tartab_list)
# 拆解生成 SQL 代码块信息
one_split_info.update(make_sql_struct(sql_code, tartab))
elif tran_info[0] in ('__DROP_PROCEDURE__', '__DROP_FUNCTION__', '__CREATE_PROCEDURE_HEAD__', '__CREATE_FUNCTION_HEAD__'):
pass
elif tran_info[0] in ( '__SELECT_INTO__'
, '__DECLARE_MYSQL_CURSOR__', '__DECLARE_ORACLE_CURSOR__', '__FETCH__'
, '__EXECUTE__', '__SET_CONF_VAR__', '__RESET_CONF__', '__DECLARE_VAR__'
, '__IF__', '__CONTROL_IF__', '__CONTROL_CASE_WHEN__', '__END_IF__'
, '__EXCEPTION_WHEN__'
, '__CONTROL_WHEN__', '__CONTROL_ELSE__'
, '__CONTROL_MYSQL_WHILE__', '__CONTROL_ORACLE_WHILE__'
, '__CONTROL_ORACLE_FOR__', '__CONTROL_REPEAT__'
, '__CONTROL_MYSQL_LOOP__', '__CONTROL_ORACLE_LOOP__'
, '__OPEN__', '__CLOSE__'
, '__RUN_PROC_FUN__', '__COMMIT__', '__END__'
, '__ALTER_TABLE__', '__GRANT__', '__ANALYZE__', '__COMMENT_ON__', '__VACUUM__'
):
pass
else :
print("SQL TYPE [%s] ????" % (tran_info[0]))
sql_file_info['SQL_SPLIT_INFO'].append(one_split_info)
#########################################
### get DEPEND_TABLES CREATE_TABLES UPDATE_INSERT_TABLES
all_tar_tab_name = set()
all_src_tab_name = set()
depend_tables = []
create_tables = []
update_insert_tables = []
for one_sql_info in sql_file_info['SQL_SPLIT_INFO'] :
# print('SQL_TYPE = ', one_sql_info['SQL_TYPE'])
if one_sql_info['SQL_TYPE'] not in ('__CREATE_TABLE_STRUCT__', '__CREATE_TABLE_LIKE__', '__CREATE_TABLE_SELECT__', '__NAME_EQ_SELECT__'
,'__INSERT_TABLE_SELECT__', '__INSERT_INTO_VALUES__', '__FROM_INSERT__'
,'__UPDATE_TABLE__', '__MERGE_TABLE__'
,'__JUST_SELECT__') :
continue
for one in one_sql_info['TAR_TAB'] :
# print('TAR_TAB = ', one['NAME'])
if one['NAME'] not in all_tar_tab_name :
all_tar_tab_name.add(one['NAME'])
if one_sql_info['SQL_TYPE'] in ('__CREATE_TABLE_STRUCT__','__CREATE_TABLE_LIKE__'
,'__CREATE_TABLE_SELECT__','__NAME_EQ_SELECT__') :
create_tables.append(one)
elif one_sql_info['SQL_TYPE'] in ('__INSERT_TABLE_SELECT__', '__INSERT_INTO_VALUES__', '__FROM_INSERT__'
,'__UPDATE_TABLE__', '__MERGE_TABLE__') :
update_insert_tables.append(one)
for one in one_sql_info['SRC_TAB'] :
# print('SRC_TAB = ', one['NAME'])
# 允许源表和目标表存在相同情况
# if one['NAME'] not in all_tar_tab_name and one['NAME'] not in all_src_tab_name :
if one['NAME'] not in all_src_tab_name :
depend_tables.append(one)
all_src_tab_name.add(one['NAME'])
# print('depend_tables = ', depend_tables)
# print('create_tables = ', create_tables)
# print('update_insert_tables = ', update_insert_tables)
sql_file_info['DEPEND_TABLES'] = depend_tables
sql_file_info['CREATE_TABLES'] = create_tables
sql_file_info['UPDATE_INSERT_TABLES'] = update_insert_tables
#########################################
### get SQL_DEPEND_LVL
sql_depend_lvl = []
# 选择要处理的 SQL 类型序号
sql_seq_ids = []
for one_sql_info in sql_file_info['SQL_SPLIT_INFO'] :
# print('SQL_SEQ = ', one_sql_info['SQL_SEQ'])
# print('SQL_TYPE = ', one_sql_info['SQL_TYPE'])
# 只处理部分主要类型的SQL,后续需要可以扩展
if one_sql_info['SQL_TYPE'] in ( '__CREATE_TABLE_STRUCT__', '__CREATE_TABLE_LIKE__', '__CREATE_TABLE_SELECT__', '__NAME_EQ_SELECT__'
, '__INSERT_TABLE_SELECT__', '__INSERT_INTO_VALUES__', '__FROM_INSERT__', '__JUST_SELECT__'
, '__UPDATE_TABLE__', '__MERGE_TABLE__'
, '__DELETE_TABLE__', '__TRUNCATE_TABLE__', '__DROP_TABLE__' ) :
sql_seq_ids.append(one_sql_info['SQL_SEQ'] - 1) # 减一当下标
# print('sql_seq_ids = ', sql_seq_ids)
###############################
tab2seq = { tab : 0 for tab in all_src_tab_name }
# print('tab2seq = ', tab2seq)
while sql_seq_ids :
curr_lvl_ids = {}
next_lvl_ids = []
tab2seq4add = {}
tab2seq4check = set()
for seq in sql_seq_ids :
is_curr_lvl = True
depd_seqs = set()
# 检查依赖表是否全满足
for one in sql_file_info['SQL_SPLIT_INFO'][seq]['SRC_TAB'] :
srctab = one['NAME']
if srctab not in tab2seq or srctab in tab2seq4add :
# print("srctab = ", srctab)
is_curr_lvl = False
break
if tab2seq[srctab] != 0 :
depd_seqs.add( tab2seq[srctab] )
if not is_curr_lvl :
next_lvl_ids.append(seq)
# 记录每个步骤的目标表
for one in sql_file_info['SQL_SPLIT_INFO'][seq]['TAR_TAB'] :
# print('TAR_TAB = ', one['NAME'])
tab2seq4check.add(one['NAME'])
continue
# 检查目标表是否冲突
for one in sql_file_info['SQL_SPLIT_INFO'][seq]['TAR_TAB'] :
# print('TAR_TAB = ', one['NAME'])
tartab = one['NAME']
if tartab not in tab2seq4check :
tab2seq4add[tartab] = seq + 1
tab2seq4check.add(tartab)
if not depd_seqs and tartab in tab2seq:
depd_seqs.add( tab2seq[tartab] )
else :
# print('冲突表:', tartab)
is_curr_lvl = False
break
if is_curr_lvl :
curr_lvl_ids[seq+1] = depd_seqs
else :
next_lvl_ids.append(seq)
# print('curr_lvl_ids = ', curr_lvl_ids)
# print('next_lvl_ids = ', next_lvl_ids)
sql_depend_lvl.append(curr_lvl_ids)
sql_seq_ids = next_lvl_ids
tab2seq.update(tab2seq4add)
sql_file_info['SQL_DEPEND_LVL'] = sql_depend_lvl
print('##################################################')
print(sql_file_info)
return
if __name__ == "__main__" :
if len(sys.argv) == 1 :
print('Miss sql file !')
sys.exit(-1)
split_etl(sys.argv[1])# 封装后 Python 代码执行命令:
python split_etl.py proc_test.prc > log.log
# 通用SQL语法解析配置文件 MARK_SQLS.syn 放在附件里。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。