首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >源代码:跨数据库通用SQL语法解析与标注拆解

源代码:跨数据库通用SQL语法解析与标注拆解

原创
作者头像
用户12032828
发布2026-05-29 18:53:46
发布2026-05-29 18:53:46
470
举报

### 背景:

1、 如今关系型数据库种类繁多,虽都使用SQL语言进行操作,但各数据库之间的SQL依然存在一些语法差异

2、 企业的ETL作业大量使用SQL脚本,脚本的运行往往存在依赖关系,而脚本间依赖关系往往需要人工识别

3、 管理或运维人员在分析SQL脚本时,只能用文本编辑器打开整个SQL脚本文件,在杂乱的屎山代码中分析问题

4、 一个SQL脚本文件是最小的运行单位,且一般只能按照从上倒下的顺序,逐个运行脚本中的代码段

### 问题:

1、 能否对不同关系型数据库的SQL语法进行兼容解析?

2、 能否通过对SQL脚本的解析,自动获得脚本间的依赖关系?

3、 分析SQL脚本时,能否先将脚本拆解成代码块,然后层级化、图形化展示,提升代码可读性?

4、 能否对某些SQL大脚本进行自动化步骤分析,拆解成若干个可“并行运行”的执行步骤,提升大脚本的执行效率?

5、 扩展思考:若SQL大脚本的上游依赖,只有个别变动,能否只选择执行大脚本中的某个“分支”步骤?

### 方案:

为了解决以上问题,可以使用ZGLanguage对各关系型数据库的SQL进行语法配置,配置文件为 MARK_SQLS.syn 。

目前已经完成对几款主流数据库(Hive、Greenplum、DWS、Oracle、Mysql、Hana)的主要SQL语法的配置,不仅可识别“增删改查”等基础语法,还可以识别“判断、循环、游标、赋值”等语法。

在支持跨数据库SQL语法解析的同时,还提供了SQL代码拆解标注功能,通过此功能,用户可以对任意SQL脚本进行拆解分析。

这里提供了一个SQL脚本拆解分析的案例,使用Python语言编写(split_etl.py),通过对ZGLanguage标注后的结果拆解分析,得到一个Json结果,说明如下:

代码语言:txt
复制
{
  SQL_FILE_NAME  : SQL脚本文件名
  
, DEPEND_TABLES        : SQL脚本依赖表清单
, CREATE_TABLES        : SQL脚本创建表清单
, UPDATE_INSERT_TABLES : SQL脚本更改表清单
  
, SQL_SPLIT_INFO : [ SQL脚本里每个SQL代码段拆解信息 :
                     { SQL_SEQ        : 序号   : 1,2,3 ...
                     , SQL_TYPE       : 类型   : __CREATE_TABLE_SELECT__, __INSERT_TABLE_SELECT__, __DELETE_TABLE__, __IF__ ....
                     , SQL_CODE       : 完整代码
                     , LVL_STRUCT     : 代码的层级结构
                     , LVL_CODE       : 层级结构的代码
                     , TAR_TAB        : 目标表 
                     , SRC_TAB        : 来源表
                     }
                     ,
                     {
                     .................
                     }
                   ]

, SQL_DEPEND_LVL  : SQL脚本代码段步骤(序号)依赖层级
}

参数使用建议:

1、 使用 DEPEND_TABLES、 CREATE_TABLES、 UPDATE_INSERT_TABLES,可以建立起各SQL脚本(包括存储过程)之间的依赖关系,辅助实现调度工具依赖自动化配置

2、 使用 SQL_SPLIT_INFO,可以对SQL脚本进行层级化、图形化展示,提升代码可读性

3、 使用 SQL_DEPEND_LVL,可以辅助实现大脚本中的: a、代码段并行; b、选择“分支”步骤运行功能

### 案例代码如下 :

# 假设存储过程代码(proc_test.prc):

代码语言:txt
复制
CREATE OR REPLACE PROCEDURE PROC_F_CWWS_LOAN
(
    P_AS_OF_DATE IN  DATE default date'20200101',
    RET_FLG      OUT VARCHAR2,
    RET_MSG      OUT VARCHAR2
) IS
/******************************************************************************
功能描述:xxxx业务数据ETL处理
源    表:
目 标 表:MA_F_LOAN
备    注:
******************************************************************************/

  -- 声明变量并初始化
  V_COUNT NUMBER := 0;
  V_PROC_NAME VARCHAR2(200) := 'PROC_F_CWWS_LOAN';
  V_PROC_DESC VARCHAR2(100) := 'xxxx业务数据ETL处理';
  V_P_FREQ VARCHAR2(4) := '';

BEGIN

  --设置会话日期格式
  EXECUTE IMMEDIATE ' ALTER SESSION SET NLS_DATE_FORMAT = ''YYYY-MM-DD''';
  
  --查询参数表中,该程序对应的频率值
  SELECT P_FREQ
    INTO V_P_FREQ
    FROM ETL_PROC_STATUS_DEF
   WHERE PROC_NAME = V_PROC_NAME;
   
  --判断是调度频率
  IF P_AS_OF_DATE = FUNC_GET_FREQ_DAYS(P_AS_OF_DATE, V_P_FREQ) THEN
    --调用分区维护程序

    ETL.ETL_ADD_PARTITION('MA_F_LOAN', P_AS_OF_DATE, 'ETL');

    --删除取上下次支付日临时表
    DELETE TMP_XD_LAST_PAYDATE;
    DELETE TMP_XD_NEXT_PAYDATE;
    COMMIT;

    --从还款计划表中取每笔账户最近一次小于等于数据日期还款日,作为上次还款日
    INSERT INTO ETL.TMP_XD_LAST_PAYDATE
      (OBJECTNO, LAST_PAYDATE)
      SELECT OBJECTNO, LAST_PAYDATE
        FROM (SELECT T.OBJECTNO,
                     MAX(TO_DATE(PAYDATE, 'YYYY-MM-DD')) LAST_PAYDATE
                FROM NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE T
               WHERE T.AS_OF_DATE = P_AS_OF_DATE
                 AND T.SEQID <> '999'
                 AND TO_DATE(T.PAYDATE, 'YYYY-MM-DD') < P_AS_OF_DATE
               GROUP BY T.OBJECTNO);


    --从还款计划表中取每笔账户最近一次大于数据日期还款日,作为下次还款日
    INSERT INTO ETL.TMP_XD_NEXT_PAYDATE
      (OBJECTNO, NEXT_PAYDATE)
      SELECT OBJECTNO, NEXT_PAYDATE
        FROM (SELECT T.OBJECTNO,
                     MIN(TO_DATE(PAYDATE, 'YYYY-MM-DD')) NEXT_PAYDATE
                FROM NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE T
               WHERE T.AS_OF_DATE = P_AS_OF_DATE
                 AND T.SEQID <> '999'
                 AND TO_DATE(T.PAYDATE, 'YYYY-MM-DD') >= P_AS_OF_DATE
               GROUP BY T.OBJECTNO);
               
    COMMIT;

    MERGE INTO ETL.MA_F_LOAN A
    USING (SELECT /*+PARALLEL(8)*/
            T.ACCOUNT_NUMBER, T.GL_ACCOUNT_ID, T.INT_GL_ACCOUNT_ID
             FROM ETL.MA_F_LOAN T
            INNER JOIN ETL.MA_D_GL_SUBJECT T1
               ON T.INT_GL_ACCOUNT_ID = T1.SUBJECT_NO3
              AND T1.SUBJECT_NAME3 LIKE '%已减值%'
              AND T1.AS_OF_DATE = P_AS_OF_DATE
            WHERE T.AS_OF_DATE = P_AS_OF_DATE
              AND T.ACCOUNT_NUMBER IN
                  (SELECT ACCOUNT_NUMBER
                     FROM (SELECT /*+PARALLEL(8)*/
                            T2.ACCOUNT_NUMBER, COUNT(1)
                             FROM ETL.MA_F_LOAN T2
                            WHERE T2.AS_OF_DATE = P_AS_OF_DATE
                            GROUP BY T2.ACCOUNT_NUMBER
                           HAVING COUNT(1) > 1))) B
    ON (A.ACCOUNT_NUMBER = B.ACCOUNT_NUMBER AND A.AS_OF_DATE = P_AS_OF_DATE AND A.GL_ACCOUNT_ID = B.GL_ACCOUNT_ID AND A.INT_GL_ACCOUNT_ID = B.INT_GL_ACCOUNT_ID)
    WHEN MATCHED THEN
      UPDATE SET A.CUR_BOOK_BAL = 0, A.OVERDUE_BAL = 0;

    --更新逾期xxxx上下次重订价日及重订价频率为起息日、到期日
    UPDATE MA_F_LOAN A
       SET LAST_REPRICE_DATE  = A.ORIGINATION_DATE,
           NEXT_REPRICE_DATE  = A.MATURITY_DATE,
           REPRICE_FREQ       = A.ORG_TERM,
           REPRICE_FREQ_MULT  = A.ORG_TERM_MULT,
           ADJUSTABLE_TYPE_CD = 0
     WHERE A.MATURITY_DATE <= P_AS_OF_DATE
       AND A.CUR_BOOK_BAL <> 0
         ;


  END IF;
  
  INSERT INTO M_RUNLOG VALUES (SYSDATE, V_PROC_NAME, 'it is 10');
  COMMIT;
  
EXCEPTION

  WHEN OTHERS THEN
    --写入异常日志
    call ETL.PROC_ETL_LOG(P_AS_OF_DATE,V_PROC_NAME,V_PROC_DESC,V_COUNT,-1,SQLCODE,SQLERRM);
    RET_MSG := SQLCODE || ':' || SQLERRM;

END;
/

# 执行标注命令: ZGLanguage -e SPLIT_ETL/MARK_SQLS.syn -t proc_test.prc -o mark_sql.zgl > log.log

得到标注结果 mark_sql.zgl 内容如下 :

代码语言:txt
复制
__CREATE_PROCEDURE_HEAD__{:::}||PROC_F_CWWS_LOAN{:::}
    CREATE OR REPLACE PROCEDURE PROC_F_CWWS_LOAN
    ( 
        P_AS_OF_DATE IN DATE default date '20200101' , 
        RET_FLG OUT VARCHAR2 , 
        RET_MSG OUT VARCHAR2 
    ) IS
      V_COUNT NUMBER := 0 ;
      V_PROC_NAME VARCHAR2(200) := 'PROC_F_CWWS_LOAN' ;
      V_PROC_DESC VARCHAR2(100) := 'xxxx业务数据ETL处理' ;
      V_P_FREQ VARCHAR2(4) := '' ;
    BEGIN 
{;;;}

__EXECUTE__{:::}
    EXECUTE IMMEDIATE ' ALTER SESSION SET NLS_DATE_FORMAT = ' 'YYYY-MM-DD' ''
{;;;}

__SELECT_INTO__{:::}V_P_FREQ{:::}
    SELECT P_FREQ 
      INTO V_P_FREQ
      FROM {###}<srctab>{###}ETL_PROC_STATUS_DEF{###}</srctab>{###}
     WHERE PROC_NAME = V_PROC_NAME 
{;;;}

__IF__{:::}
    IF P_AS_OF_DATE = FUNC_GET_FREQ_DAYS(P_AS_OF_DATE, V_P_FREQ) THEN
{;;;}

__RUN_PROC_FUN__{:::}
    ETL.ETL_ADD_PARTITION( 'MA_F_LOAN' , P_AS_OF_DATE , 'ETL' ) 
{;;;}

__DELETE_TABLE__{:::}||TMP_XD_LAST_PAYDATE{:::}
    DELETE TMP_XD_LAST_PAYDATE 
{;;;}

__DELETE_TABLE__{:::}||TMP_XD_NEXT_PAYDATE{:::}
    DELETE TMP_XD_NEXT_PAYDATE 
{;;;}

__COMMIT__{:::}commit{;;;}

__INSERT_TABLE_SELECT__{:::}ETL||TMP_XD_LAST_PAYDATE{:::}
    INSERT INTO ETL.TMP_XD_LAST_PAYDATE
      (OBJECTNO, LAST_PAYDATE) 
      SELECT OBJECTNO, LAST_PAYDATE
        FROM {###}<subsel>{###} SELECT T.OBJECTNO,
                     MAX(TO_DATE(PAYDATE, 'YYYY-MM-DD')) LAST_PAYDATE
                FROM {###}<srctab>{###}NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE{###}</srctab>{###} T
               WHERE T.AS_OF_DATE = P_AS_OF_DATE
                 AND T.SEQID <> '999'
                 AND TO_DATE(T.PAYDATE, 'YYYY-MM-DD') < P_AS_OF_DATE
               GROUP BY T.OBJECTNO {###}</subsel>{###} 
{;;;}

__INSERT_TABLE_SELECT__{:::}ETL||TMP_XD_NEXT_PAYDATE{:::}
    INSERT INTO ETL.TMP_XD_NEXT_PAYDATE
      (OBJECTNO, NEXT_PAYDATE) 
      SELECT OBJECTNO, NEXT_PAYDATE
        FROM {###}<subsel>{###} SELECT T.OBJECTNO,
                     MIN(TO_DATE(PAYDATE, 'YYYY-MM-DD')) NEXT_PAYDATE
                FROM {###}<srctab>{###}NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE{###}</srctab>{###} T
               WHERE T.AS_OF_DATE = P_AS_OF_DATE
                 AND T.SEQID <> '999'
                 AND TO_DATE(T.PAYDATE, 'YYYY-MM-DD') >= P_AS_OF_DATE
               GROUP BY T.OBJECTNO {###}</subsel>{###}
{;;;}
               
__COMMIT__{:::}commit{;;;}
    
__MERGE_TABLE__{:::}ETL||MA_F_LOAN{:::}
    MERGE INTO ETL.MA_F_LOAN A
    USING {###}<subsel>{###} SELECT
            T.ACCOUNT_NUMBER, T.GL_ACCOUNT_ID, T.INT_GL_ACCOUNT_ID
             FROM {###}<srctab>{###}ETL.MA_F_LOAN{###}</srctab>{###} T
            INNER JOIN {###}<srctab>{###}ETL.MA_D_GL_SUBJECT{###}</srctab>{###} T1
               ON T.INT_GL_ACCOUNT_ID = T1.SUBJECT_NO3
              AND T1.SUBJECT_NAME3 LIKE '%已减值%'
              AND T1.AS_OF_DATE = P_AS_OF_DATE
            WHERE T.AS_OF_DATE = P_AS_OF_DATE
              AND T.ACCOUNT_NUMBER IN
                  (SELECT ACCOUNT_NUMBER
                     FROM {###}<subsel>{###} SELECT
                            T2.ACCOUNT_NUMBER, COUNT(1)
                             FROM {###}<srctab>{###}ETL.MA_F_LOAN{###}</srctab>{###} T2
                            WHERE T2.AS_OF_DATE = P_AS_OF_DATE
                            GROUP BY T2.ACCOUNT_NUMBER
                           HAVING COUNT(1) > 1 {###}</subsel>{###} ) {###}</subsel>{###} B
    ON (A.ACCOUNT_NUMBER = B.ACCOUNT_NUMBER AND A.AS_OF_DATE = P_AS_OF_DATE AND A.GL_ACCOUNT_ID = B.GL_ACCOUNT_ID AND A.INT_GL_ACCOUNT_ID = B.INT_GL_ACCOUNT_ID )
    WHEN MATCHED THEN
      UPDATE SET A.CUR_BOOK_BAL = 0, A.OVERDUE_BAL = 0 
{;;;}
      
__UPDATE_TABLE__{:::}||MA_F_LOAN{:::}
    UPDATE MA_F_LOAN A
       SET LAST_REPRICE_DATE = A.ORIGINATION_DATE ,
           NEXT_REPRICE_DATE = A.MATURITY_DATE ,
           REPRICE_FREQ = A.ORG_TERM ,
           REPRICE_FREQ_MULT = A.ORG_TERM_MULT ,
           ADJUSTABLE_TYPE_CD = 0
     WHERE A.MATURITY_DATE <= P_AS_OF_DATE
       AND A.CUR_BOOK_BAL <> 0
{;;;}

__END_IF__{:::}END IF{;;;}

__INSERT_INTO_VALUES__{:::}||M_RUNLOG{:::}
    INSERT INTO M_RUNLOG VALUES (SYSDATE, V_PROC_NAME, 'it is 10') 
{;;;}

__COMMIT__{:::}commit{;;;}

__EXCEPTION_WHEN__{:::}EXCEPTION
    WHEN OTHERS THEN{;;;}
      __RUN_PROC_FUN__{:::}call ETL.PROC_ETL_LOG(P_AS_OF_DATE ,V_PROC_NAME ,V_PROC_DESC ,V_COUNT ,-1 ,SQLCODE ,SQLERRM ) {;;;}
      __DECLARE_VAR__{:::}RET_MSG{:::}RET_MSG := SQLCODE || ':' || SQLERRM 
{;;;}

    
__END__{:::}end{;;;}

# 使用 Python 对以上标注代码进行解析,获得Json结果如下 :

代码语言:txt
复制
{'SQL_FILE_NAME': 'proc_test.prc', 

 'DEPEND_TABLES': [{'SCH': 'NYBDP', 'NAME': 'O_CWWS_ACCT_PAYMENT_SCHEDULE'}, 
                   {'SCH': 'ETL', 'NAME': 'MA_D_GL_SUBJECT'}, 
                   {'SCH': 'ETL', 'NAME': 'MA_F_LOAN'}], 
 
 'CREATE_TABLES': [], 
 
 'UPDATE_INSERT_TABLES': [{'SCH': 'ETL', 'NAME': 'TMP_XD_LAST_PAYDATE'}, 
                          {'SCH': 'ETL', 'NAME': 'TMP_XD_NEXT_PAYDATE'}, 
                          {'SCH': 'ETL', 'NAME': 'MA_F_LOAN'}, 
                          {'SCH': '', 'NAME': 'M_RUNLOG'}], 
                          
 'SQL_SPLIT_INFO': [{'SQL_SEQ': 1, 'SQL_TYPE': '__CREATE_PROCEDURE_HEAD__'}, 
                    {'SQL_SEQ': 2, 'SQL_TYPE': '__EXECUTE__'}, 
                    {'SQL_SEQ': 3, 'SQL_TYPE': '__SELECT_INTO__'}, 
                    {'SQL_SEQ': 4, 'SQL_TYPE': '__IF__'}, 
                    {'SQL_SEQ': 5, 'SQL_TYPE': '__RUN_PROC_FUN__'}, 
                    
                    {'SQL_SEQ': 6, 'SQL_TYPE': '__DELETE_TABLE__', 
                     'SQL_CODE': 'DELETE TMP_XD_LAST_PAYDATE', 
                     'LVL_STRUCT': {'TMP_XD_LAST_PAYDATE': []}, 
                     'LVL_CODE': {'TMP_XD_LAST_PAYDATE': 'DELETE TMP_XD_LAST_PAYDATE'}, 
                     'TAR_TAB': [{'SCH': '', 'NAME': 'TMP_XD_LAST_PAYDATE'}], 
                     'SRC_TAB': []}, 
                     
                    {'SQL_SEQ': 7, 'SQL_TYPE': '__DELETE_TABLE__', 
                     'SQL_CODE': 'DELETE TMP_XD_NEXT_PAYDATE', 
                     'LVL_STRUCT': {'TMP_XD_NEXT_PAYDATE': []}, 
                     'LVL_CODE': {'TMP_XD_NEXT_PAYDATE': 'DELETE TMP_XD_NEXT_PAYDATE'}, 
                     'TAR_TAB': [{'SCH': '', 'NAME': 'TMP_XD_NEXT_PAYDATE'}], 
                     'SRC_TAB': []}, 
                     
                    {'SQL_SEQ': 8, 'SQL_TYPE': '__COMMIT__'}, 
                    {'SQL_SEQ': 9, 'SQL_TYPE': '__INSERT_TABLE_SELECT__', 
                     'SQL_CODE': "INSERT INTO ETL.TMP_XD_LAST_PAYDATE\n      (OBJECTNO, LAST_PAYDATE) \n      SELECT OBJECTNO, LAST_PAYDATE\n        FROM ( SELECT T.OBJECTNO,\n                     MAX(TO_DATE(PAYDATE, 'YYYY-MM-DD')) LAST_PAYDATE\n                FROM NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE T\n               WHERE T.AS_OF_DATE = P_AS_OF_DATE\n                 AND T.SEQID <> '999'\n                 AND TO_DATE(T.PAYDATE, 'YYYY-MM-DD') < P_AS_OF_DATE\n               GROUP BY T.OBJECTNO )", 
                     'LVL_STRUCT': {'__SUB_SELECT_1__': ['NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE'], 'ETL.TMP_XD_LAST_PAYDATE': ['__SUB_SELECT_1__']}, 
                     'LVL_CODE': {'__SUB_SELECT_1__': " SELECT T.OBJECTNO,\n                     MAX(TO_DATE(PAYDATE, 'YYYY-MM-DD')) LAST_PAYDATE\n                FROM NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE T\n               WHERE T.AS_OF_DATE = P_AS_OF_DATE\n                 AND T.SEQID <> '999'\n                 AND TO_DATE(T.PAYDATE, 'YYYY-MM-DD') < P_AS_OF_DATE\n               GROUP BY T.OBJECTNO ", 'ETL.TMP_XD_LAST_PAYDATE': 'INSERT INTO ETL.TMP_XD_LAST_PAYDATE\n      (OBJECTNO, LAST_PAYDATE) \n      SELECT OBJECTNO, LAST_PAYDATE\n        FROM __SUB_SELECT_1__'}, 
                     'TAR_TAB': [{'SCH': 'ETL', 'NAME': 'TMP_XD_LAST_PAYDATE'}], 
                     'SRC_TAB': [{'SCH': 'NYBDP', 'NAME': 'O_CWWS_ACCT_PAYMENT_SCHEDULE'}]}, 
                    
                    {'SQL_SEQ': 10, 'SQL_TYPE': '__INSERT_TABLE_SELECT__', 
                     'SQL_CODE': "INSERT INTO ETL.TMP_XD_NEXT_PAYDATE\n      (OBJECTNO, NEXT_PAYDATE) \n      SELECT OBJECTNO, NEXT_PAYDATE\n        FROM ( SELECT T.OBJECTNO,\n                     MIN(TO_DATE(PAYDATE, 'YYYY-MM-DD')) NEXT_PAYDATE\n                FROM NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE T\n               WHERE T.AS_OF_DATE = P_AS_OF_DATE\n                 AND T.SEQID <> '999'\n                 AND TO_DATE(T.PAYDATE, 'YYYY-MM-DD') >= P_AS_OF_DATE\n               GROUP BY T.OBJECTNO )", 
                     'LVL_STRUCT': {'__SUB_SELECT_2__': ['NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE'], 'ETL.TMP_XD_NEXT_PAYDATE': ['__SUB_SELECT_2__']}, 
                     'LVL_CODE': {'__SUB_SELECT_2__': " SELECT T.OBJECTNO,\n                     MIN(TO_DATE(PAYDATE, 'YYYY-MM-DD')) NEXT_PAYDATE\n                FROM NYBDP.O_CWWS_ACCT_PAYMENT_SCHEDULE T\n               WHERE T.AS_OF_DATE = P_AS_OF_DATE\n                 AND T.SEQID <> '999'\n                 AND TO_DATE(T.PAYDATE, 'YYYY-MM-DD') >= P_AS_OF_DATE\n               GROUP BY T.OBJECTNO ", 'ETL.TMP_XD_NEXT_PAYDATE': 'INSERT INTO ETL.TMP_XD_NEXT_PAYDATE\n      (OBJECTNO, NEXT_PAYDATE) \n      SELECT OBJECTNO, NEXT_PAYDATE\n        FROM __SUB_SELECT_2__'}, 
                     'TAR_TAB': [{'SCH': 'ETL', 'NAME': 'TMP_XD_NEXT_PAYDATE'}], 
                     'SRC_TAB': [{'SCH': 'NYBDP', 'NAME': 'O_CWWS_ACCT_PAYMENT_SCHEDULE'}]}, 
                    
                    {'SQL_SEQ': 11, 'SQL_TYPE': '__COMMIT__'}, 
                    {'SQL_SEQ': 12, 'SQL_TYPE': '__MERGE_TABLE__', 
                     'SQL_CODE': "MERGE INTO ETL.MA_F_LOAN A\n    USING ( SELECT\n            T.ACCOUNT_NUMBER, T.GL_ACCOUNT_ID, T.INT_GL_ACCOUNT_ID\n             FROM ETL.MA_F_LOAN T\n            INNER JOIN ETL.MA_D_GL_SUBJECT T1\n               ON T.INT_GL_ACCOUNT_ID = T1.SUBJECT_NO3\n              AND T1.SUBJECT_NAME3 LIKE '%已减值%'\n              AND T1.AS_OF_DATE = P_AS_OF_DATE\n            WHERE T.AS_OF_DATE = P_AS_OF_DATE\n              AND T.ACCOUNT_NUMBER IN\n                  (SELECT ACCOUNT_NUMBER\n                     FROM ( SELECT\n                            T2.ACCOUNT_NUMBER, COUNT(1)\n                             FROM ETL.MA_F_LOAN T2\n                            WHERE T2.AS_OF_DATE = P_AS_OF_DATE\n                            GROUP BY T2.ACCOUNT_NUMBER\n                           HAVING COUNT(1) > 1 ) ) ) B\n    ON (A.ACCOUNT_NUMBER = B.ACCOUNT_NUMBER AND A.AS_OF_DATE = P_AS_OF_DATE AND A.GL_ACCOUNT_ID = B.GL_ACCOUNT_ID AND A.INT_GL_ACCOUNT_ID = B.INT_GL_ACCOUNT_ID )\n    WHEN MATCHED THEN\n      UPDATE SET A.CUR_BOOK_BAL = 0, A.OVERDUE_BAL = 0", 
                     'LVL_STRUCT': {'__SUB_SELECT_3__': ['ETL.MA_F_LOAN'], '__SUB_SELECT_4__': ['ETL.MA_F_LOAN', 'ETL.MA_D_GL_SUBJECT', '__SUB_SELECT_3__'], 'ETL.MA_F_LOAN': ['__SUB_SELECT_4__']}, 
                     'LVL_CODE': {'__SUB_SELECT_3__': ' SELECT\n                            T2.ACCOUNT_NUMBER, COUNT(1)\n                             FROM ETL.MA_F_LOAN T2\n                            WHERE T2.AS_OF_DATE = P_AS_OF_DATE\n                            GROUP BY T2.ACCOUNT_NUMBER\n                           HAVING COUNT(1) > 1 ', '__SUB_SELECT_4__': " SELECT\n            T.ACCOUNT_NUMBER, T.GL_ACCOUNT_ID, T.INT_GL_ACCOUNT_ID\n             FROM ETL.MA_F_LOAN T\n            INNER JOIN ETL.MA_D_GL_SUBJECT T1\n               ON T.INT_GL_ACCOUNT_ID = T1.SUBJECT_NO3\n              AND T1.SUBJECT_NAME3 LIKE '%已减值%'\n              AND T1.AS_OF_DATE = P_AS_OF_DATE\n            WHERE T.AS_OF_DATE = P_AS_OF_DATE\n              AND T.ACCOUNT_NUMBER IN\n                  (SELECT ACCOUNT_NUMBER\n                     FROM __SUB_SELECT_3__ ) ", 'ETL.MA_F_LOAN': 'MERGE INTO ETL.MA_F_LOAN A\n    USING __SUB_SELECT_4__ B\n    ON (A.ACCOUNT_NUMBER = B.ACCOUNT_NUMBER AND A.AS_OF_DATE = P_AS_OF_DATE AND A.GL_ACCOUNT_ID = B.GL_ACCOUNT_ID AND A.INT_GL_ACCOUNT_ID = B.INT_GL_ACCOUNT_ID )\n    WHEN MATCHED THEN\n      UPDATE SET A.CUR_BOOK_BAL = 0, A.OVERDUE_BAL = 0'}, 
                     'TAR_TAB': [{'SCH': 'ETL', 'NAME': 'MA_F_LOAN'}], 
                     'SRC_TAB': [{'SCH': 'ETL', 'NAME': 'MA_D_GL_SUBJECT'}, {'SCH': 'ETL', 'NAME': 'MA_F_LOAN'}]}, 
                    
                    {'SQL_SEQ': 13, 'SQL_TYPE': '__UPDATE_TABLE__', 
                     'SQL_CODE': 'UPDATE MA_F_LOAN A\n       SET LAST_REPRICE_DATE = A.ORIGINATION_DATE ,\n           NEXT_REPRICE_DATE = A.MATURITY_DATE ,\n           REPRICE_FREQ = A.ORG_TERM ,\n           REPRICE_FREQ_MULT = A.ORG_TERM_MULT ,\n           ADJUSTABLE_TYPE_CD = 0\n     WHERE A.MATURITY_DATE <= P_AS_OF_DATE\n       AND A.CUR_BOOK_BAL <> 0', 
                     'LVL_STRUCT': {'MA_F_LOAN': []}, 
                     'LVL_CODE': {'MA_F_LOAN': 'UPDATE MA_F_LOAN A\n       SET LAST_REPRICE_DATE = A.ORIGINATION_DATE ,\n           NEXT_REPRICE_DATE = A.MATURITY_DATE ,\n           REPRICE_FREQ = A.ORG_TERM ,\n           REPRICE_FREQ_MULT = A.ORG_TERM_MULT ,\n           ADJUSTABLE_TYPE_CD = 0\n     WHERE A.MATURITY_DATE <= P_AS_OF_DATE\n       AND A.CUR_BOOK_BAL <> 0'}, 
                     'TAR_TAB': [{'SCH': '', 'NAME': 'MA_F_LOAN'}], 
                     'SRC_TAB': []}, 
                     
                    {'SQL_SEQ': 14, 'SQL_TYPE': '__END_IF__'}, 
                    {'SQL_SEQ': 15, 'SQL_TYPE': '__INSERT_INTO_VALUES__', 'SQL_CODE': "INSERT INTO M_RUNLOG VALUES (SYSDATE, V_PROC_NAME, 'it is 10')", 'LVL_STRUCT': {'M_RUNLOG': []}, 'LVL_CODE': {'M_RUNLOG': "INSERT INTO M_RUNLOG VALUES (SYSDATE, V_PROC_NAME, 'it is 10')"}, 'TAR_TAB': [{'SCH': '', 'NAME': 'M_RUNLOG'}], 'SRC_TAB': []}, {'SQL_SEQ': 16, 'SQL_TYPE': '__COMMIT__'}, {'SQL_SEQ': 17, 'SQL_TYPE': '__EXCEPTION_WHEN__'}, 
                    
                    {'SQL_SEQ': 18, 'SQL_TYPE': '__RUN_PROC_FUN__'}, 
                    {'SQL_SEQ': 19, 'SQL_TYPE': '__DECLARE_VAR__'}, 
                    {'SQL_SEQ': 20, 'SQL_TYPE': '__END__'}],
                    
 'SQL_DEPEND_LVL': [{6: {}, 7: {}, 12: {}, 15: {}}, 
                    {9: {6}, 10: {7}, 13: {12}}
                   ]
}

# 将以上两个步骤(1、执行标注命令;2、解析标注结果)封装成一个Python代码文件(split_etl.py),可得:

代码语言:txt
复制
# -*- coding: utf-8 -*-

import sys
import subprocess
import json


###############################################################

id_new_select = 0 

def make_sql_struct(mark_sql, tartab) :
    
    sql_info_list = mark_sql.split('{###}')
    
    srctab_4_levels = []
    onelvl_srctab = []
    lvl_struct = {}

    mid_sql_list = []
    lvl_code = {}
    
    all_srctab = set()
    
    global id_new_select
    
    while sql_info_list:
        one_node = sql_info_list.pop(0)

        #################################################
        # with select 
        if one_node == '<withas>' :
            srctab_4_levels.append(onelvl_srctab)
            onelvl_srctab = []
            mid_sql_list.append(one_node)
        elif one_node == '</withas>':
            sql_text = ''
            tmp_pop_one = mid_sql_list.pop(-1)
            while tmp_pop_one != '<as/>' : # '<withas>':
                sql_text = tmp_pop_one + sql_text
                tmp_pop_one = mid_sql_list.pop(-1)
                
            withname = mid_sql_list.pop(-1)
            mid_sql_list.pop(-1)  # pass 
            
            lvl_struct[withname] = onelvl_srctab
            onelvl_srctab = srctab_4_levels.pop(-1)
            
            lvl_code[withname] = sql_text

        #################################################
        # sub select
        elif one_node == '<subsel>' :
            srctab_4_levels.append(onelvl_srctab)
            onelvl_srctab = []
            mid_sql_list.append(one_node)
        elif one_node == '</subsel>' :
            id_new_select += 1
            new_sub_select_id = '__SUB_SELECT_%d__' % id_new_select
            
            # 新增子查询ID添加到上层SQL的源表
            srctab_4_levels[-1].append(new_sub_select_id)

            sql_text = ''
            tmp_pop_one = mid_sql_list.pop(-1)
            while tmp_pop_one != '<subsel>':
                sql_text = tmp_pop_one + sql_text
                tmp_pop_one = mid_sql_list.pop(-1)
                
            lvl_struct[new_sub_select_id] = onelvl_srctab
            onelvl_srctab = srctab_4_levels.pop(-1)

            lvl_code[new_sub_select_id] = sql_text
            mid_sql_list[-1] += new_sub_select_id

        #################################################
        # union select
        elif one_node == '<union>' :
            srctab_4_levels.append(onelvl_srctab)
            onelvl_srctab = []
            mid_sql_list.append(one_node)
        elif one_node == '</union>' :
            id_new_select += 1
            new_union_select_id = '__UNION_SELECT_%d__' % id_new_select
            
            # 新增子查询ID添加到上层SQL的源表
            srctab_4_levels[-1].append(new_union_select_id)

            sql_text = ''
            tmp_pop_one = mid_sql_list.pop(-1)
            while tmp_pop_one != '<union>':
                sql_text = tmp_pop_one + sql_text
                tmp_pop_one = mid_sql_list.pop(-1)
                
            lvl_struct[new_union_select_id] = onelvl_srctab
            onelvl_srctab = srctab_4_levels.pop(-1)

            lvl_code[new_union_select_id] = sql_text
            mid_sql_list[-1] += '\nSELECT * FROM ' + new_union_select_id
            
        #################################################
        # sourc table 
        elif one_node == '<srctab>':
            src_tab_name = sql_info_list.pop(0)
            onelvl_srctab.append(src_tab_name)
            sql_info_list.pop(0) # </srctab>
            mid_sql_list[-1] += src_tab_name
            
            all_srctab.add(src_tab_name.upper())
            
        #################################################
        # sourc function 
        elif one_node == '<srcfun>':
            src_fun_name = sql_info_list.pop(0).split('(')[0]
            onelvl_srctab.append(src_fun_name)
            sql_info_list.pop(0) # </srcfun>
            mid_sql_list[-1] += src_fun_name
            
            all_srctab.add(src_fun_name.upper())

        else:
            mid_sql_list.append(one_node)

    lvl_struct[tartab] = onelvl_srctab
    lvl_code[tartab] = ''.join(mid_sql_list)
    
    ###############################################
    
    tartablist = []
    for one in tartab.split(',') :
        schema_tartab = one.split('.')
        # print('schema_tartab=', schema_tartab)
        if len(schema_tartab) == 1 :
            tartablist.append( {'SCH': '', 'NAME': schema_tartab[0]} )
        else : 
            tartablist.append( {'SCH': schema_tartab[0], 'NAME': schema_tartab[-1]} )
        
    srctablist = []
    for one in all_srctab :
        schema_tartab = one.split('.')
        # print('schema_tartab=', schema_tartab)
        if len(schema_tartab) == 1 :
            srctablist.append( {'SCH': '', 'NAME': schema_tartab[0]} )
        else : 
            srctablist.append( {'SCH': schema_tartab[0], 'NAME': schema_tartab[-1]} )
    
    # print('lvl_struct = ', lvl_struct)
    # print('lvl_code = ', lvl_code)
    # print('tartablist = ', tartablist)
    # print('srctablist = ', srctablist)
    
    return {
      'LVL_STRUCT' : lvl_struct
    , 'LVL_CODE' : lvl_code
    , 'TAR_TAB' : tartablist
    , 'SRC_TAB' : srctablist
    }


def split_etl(etl_file) :
    ####################################
    ### 对 SQL 文件进行标注转换
    run_result = subprocess.run(['ZGLanguage', '-e', 'SPLIT_ETL/MARK_SQLS.syn'
                                    , '-t', etl_file
                                    , '-o', 'mark_sql.zgl']
                                , capture_output=True
                                # , stdout=log_file
                                , encoding='utf-8'
                                , text=True
                                )

    with open("run_zgl.log", "w", encoding='utf-8') as log_file:
        log_file.write(run_result.stdout)
        last_log = run_result.stdout[0:500]
        if ' ERROR !!!' in last_log or ' WARNING !!!' in last_log:
            print(run_result.stdout[0:500])
            sys.exit(-1)

    #########################################

    sql_file_info = {
          'SQL_FILE_NAME': etl_file.replace('/', ' ').replace('\\', ' ').split(' ')[-1]
        , 'SQL_SPLIT_INFO': []
        , 'DEPEND_TABLES': []
        , 'CREATE_TABLES': []
        , 'UPDATE_INSERT_TABLES': []
        , 'SQL_DEPEND_LVL': []
    }

    #########################################
    ### get SQL_SPLIT_INFO
    
    sql_seq = 0
    with (open('mark_sql.zgl', 'r', encoding='utf-8') as fs) :
        sql_tran_list = fs.read().split('{;;;}')
        for one_tran_sql in sql_tran_list :
            # print('one_tran_sql=', one_tran_sql)
            if not one_tran_sql :
                continue

            #############################
            # 语句顺序ID
            sql_seq += 1
            one_split_info = {'SQL_SEQ' : sql_seq}

            #############################
            # 语句类型
            tran_info = one_tran_sql.strip().split('{:::}')
            one_split_info['SQL_TYPE'] = tran_info[0]

            #############################

            if tran_info[0] in ( '__CREATE_TABLE_STRUCT__', '__CREATE_TABLE_LIKE__', '__CREATE_TABLE_SELECT__', '__NAME_EQ_SELECT__'
                               , '__INSERT_TABLE_SELECT__', '__INSERT_INTO_VALUES__', '__UPDATE_TABLE__', '__MERGE_TABLE__'
                               , '__DELETE_TABLE__', '__TRUNCATE_TABLE__', '__DROP_TABLE__' 
                               ) :

                schema_tartab = tran_info[1].split('||')
                # print("schema_tartab = ", schema_tartab)
                tartab = ''
                if schema_tartab[0] : 
                    tartab = (schema_tartab[0] +'.'+ schema_tartab[1]).upper()
                else :
                    tartab = schema_tartab[1].upper()

                # 还原语句代码
                sql_code = tran_info[2].replace('{###}<union>{###}','').replace('{###}</union>{###}',''
                                     ).replace('{###}<intersect>{###}','').replace('{###}</intersect>{###}',''
                                     ).replace('{###}<minus>{###}','').replace('{###}</minus>{###}',''
                                     ).replace('{###}<except>{###}','').replace('{###}</except>{###}',''
                                     ).replace('{###}<subsel>{###}','(').replace('{###}</subsel>{###}',')'
                                     ).replace('{###}<withas>{###}','WHIT ').replace('{###}<as/>{###}',' AS (').replace('{###}</withas>{###}',')'
                                     ).replace('{###}<srctab>{###}','').replace('{###}</srctab>{###}',''
                                     ).replace('{###}<srcfun>{###}','').replace('{###}</srcfun>{###}','')

                # print('SQL_CODE :\n', sql_code)
                one_split_info['SQL_CODE'] = sql_code

                # 拆解生成 SQL 代码块信息
                one_split_info.update(make_sql_struct(tran_info[2], tartab))

            elif tran_info[0] == '__JUST_SELECT__' :
                tartab = '__JUST_SELECT_%d__' % id_new_select

                # 还原语句代码
                sql_code = tran_info[1].replace('{###}<union>{###}','').replace('{###}</union>{###}',''
                                     ).replace('{###}<intersect>{###}','').replace('{###}</intersect>{###}',''
                                     ).replace('{###}<minus>{###}','').replace('{###}</minus>{###}',''
                                     ).replace('{###}<except>{###}','').replace('{###}</except>{###}',''
                                     ).replace('{###}<subsel>{###}','(').replace('{###}</subsel>{###}',')'
                                     ).replace('{###}<withas>{###}','WHIT ').replace('{###}<as/>{###}',' AS (').replace('{###}</withas>{###}',')'
                                     ).replace('{###}<srctab>{###}','').replace('{###}</srctab>{###}',''
                                     ).replace('{###}<srcfun>{###}','').replace('{###}</srcfun>{###}','')

                # print('SQL_CODE :\n', sql_code)
                one_split_info['SQL_CODE'] = sql_code

                # 拆解生成 SQL 代码块信息
                one_split_info.update(make_sql_struct(tran_info[1], tartab))

            elif tran_info[0] == '__FROM_INSERT__' : 
                tran_info.pop(0)
                sql_code = tran_info.pop(0)
                
                tartab_list = set()
                while tran_info :
                    schema_tartab = tran_info.pop(0).split('||')
                    
                    tartab = ''
                    if schema_tartab[0] : 
                        tartab = schema_tartab[0] +'.'+ schema_tartab[1]
                    else :
                        tartab = schema_tartab[1]
                    
                    tartab_list.add(tartab.upper())
                    
                    ##################################
                    
                    sql_code += tran_info.pop(0)
                
                # print('sql_code = ', sql_code)
                one_split_info['SQL_CODE'] = sql_code.replace('{###}<union>{###}','').replace('{###}</union>{###}',''
                                     ).replace('{###}<intersect>{###}','').replace('{###}</intersect>{###}',''
                                     ).replace('{###}<minus>{###}','').replace('{###}</minus>{###}',''
                                     ).replace('{###}<except>{###}','').replace('{###}</except>{###}',''
                                     ).replace('{###}<subsel>{###}','(').replace('{###}</subsel>{###}',')'
                                     ).replace('{###}<withas>{###}','WHIT ').replace('{###}<as/>{###}',' AS (').replace('{###}</withas>{###}',')'
                                     ).replace('{###}<srctab>{###}','').replace('{###}</srctab>{###}',''
                                     ).replace('{###}<srcfun>{###}','').replace('{###}</srcfun>{###}','')
                
                tartab = ','.join(tartab_list)
                # print('tartab = ', tartab_list)
                
                # 拆解生成 SQL 代码块信息
                one_split_info.update(make_sql_struct(sql_code, tartab))

            elif tran_info[0] in ('__DROP_PROCEDURE__', '__DROP_FUNCTION__', '__CREATE_PROCEDURE_HEAD__', '__CREATE_FUNCTION_HEAD__'):
                pass

            elif tran_info[0] in ( '__SELECT_INTO__'
                                 , '__DECLARE_MYSQL_CURSOR__', '__DECLARE_ORACLE_CURSOR__', '__FETCH__'
                                 , '__EXECUTE__', '__SET_CONF_VAR__', '__RESET_CONF__', '__DECLARE_VAR__'
                                 , '__IF__', '__CONTROL_IF__', '__CONTROL_CASE_WHEN__', '__END_IF__'
                                 , '__EXCEPTION_WHEN__'
                                 , '__CONTROL_WHEN__', '__CONTROL_ELSE__'
                                 , '__CONTROL_MYSQL_WHILE__', '__CONTROL_ORACLE_WHILE__'
                                 , '__CONTROL_ORACLE_FOR__', '__CONTROL_REPEAT__'
                                 , '__CONTROL_MYSQL_LOOP__', '__CONTROL_ORACLE_LOOP__'
                                 , '__OPEN__', '__CLOSE__'
                                 , '__RUN_PROC_FUN__', '__COMMIT__', '__END__'
                                 , '__ALTER_TABLE__', '__GRANT__', '__ANALYZE__', '__COMMENT_ON__', '__VACUUM__'
                                 ):
                pass

            else :
                print("SQL TYPE [%s] ????" % (tran_info[0]))

            sql_file_info['SQL_SPLIT_INFO'].append(one_split_info)


    #########################################
    ### get DEPEND_TABLES  CREATE_TABLES  UPDATE_INSERT_TABLES 
    
    all_tar_tab_name = set()
    all_src_tab_name = set()

    depend_tables = []
    create_tables = []
    update_insert_tables = []

    for one_sql_info in sql_file_info['SQL_SPLIT_INFO'] :
        # print('SQL_TYPE = ', one_sql_info['SQL_TYPE'])
        if one_sql_info['SQL_TYPE'] not in ('__CREATE_TABLE_STRUCT__', '__CREATE_TABLE_LIKE__', '__CREATE_TABLE_SELECT__', '__NAME_EQ_SELECT__'
                                           ,'__INSERT_TABLE_SELECT__', '__INSERT_INTO_VALUES__', '__FROM_INSERT__'
                                           ,'__UPDATE_TABLE__', '__MERGE_TABLE__'
                                           ,'__JUST_SELECT__') : 
            continue 
        
        for one in one_sql_info['TAR_TAB'] :
            # print('TAR_TAB = ', one['NAME'])
            if one['NAME'] not in all_tar_tab_name :
                all_tar_tab_name.add(one['NAME'])
                
                if one_sql_info['SQL_TYPE'] in ('__CREATE_TABLE_STRUCT__','__CREATE_TABLE_LIKE__'
                                               ,'__CREATE_TABLE_SELECT__','__NAME_EQ_SELECT__') :
                    create_tables.append(one)
                elif one_sql_info['SQL_TYPE'] in ('__INSERT_TABLE_SELECT__', '__INSERT_INTO_VALUES__', '__FROM_INSERT__'
                                                 ,'__UPDATE_TABLE__', '__MERGE_TABLE__') :
                    update_insert_tables.append(one)

        for one in one_sql_info['SRC_TAB'] :
            # print('SRC_TAB = ', one['NAME'])
            # 允许源表和目标表存在相同情况
            # if one['NAME'] not in all_tar_tab_name and one['NAME'] not in all_src_tab_name :  
            if one['NAME'] not in all_src_tab_name : 
                depend_tables.append(one)
                all_src_tab_name.add(one['NAME'])


    # print('depend_tables = ', depend_tables)
    # print('create_tables = ', create_tables)
    # print('update_insert_tables = ', update_insert_tables)
    
    sql_file_info['DEPEND_TABLES'] = depend_tables
    sql_file_info['CREATE_TABLES'] = create_tables
    sql_file_info['UPDATE_INSERT_TABLES'] = update_insert_tables
    
    
    #########################################
    ### get SQL_DEPEND_LVL 
    
    sql_depend_lvl = []
    
    
    # 选择要处理的 SQL 类型序号
    sql_seq_ids = []
    for one_sql_info in sql_file_info['SQL_SPLIT_INFO'] :
        # print('SQL_SEQ = ', one_sql_info['SQL_SEQ'])
        # print('SQL_TYPE = ', one_sql_info['SQL_TYPE'])
        
        # 只处理部分主要类型的SQL,后续需要可以扩展
        if one_sql_info['SQL_TYPE'] in ( '__CREATE_TABLE_STRUCT__', '__CREATE_TABLE_LIKE__', '__CREATE_TABLE_SELECT__', '__NAME_EQ_SELECT__'
                                       , '__INSERT_TABLE_SELECT__', '__INSERT_INTO_VALUES__', '__FROM_INSERT__', '__JUST_SELECT__'
                                       , '__UPDATE_TABLE__', '__MERGE_TABLE__'
                                       , '__DELETE_TABLE__', '__TRUNCATE_TABLE__', '__DROP_TABLE__' ) : 
            sql_seq_ids.append(one_sql_info['SQL_SEQ'] - 1)  # 减一当下标

    # print('sql_seq_ids = ', sql_seq_ids)
    
    ###############################
    
    tab2seq = { tab : 0 for tab in all_src_tab_name }
    # print('tab2seq = ', tab2seq)

    while sql_seq_ids :
        curr_lvl_ids = {}
        next_lvl_ids = []
        
        tab2seq4add = {}
        tab2seq4check = set()

        for seq in sql_seq_ids :
            is_curr_lvl = True 
            depd_seqs = set()
            
            # 检查依赖表是否全满足
            for one in sql_file_info['SQL_SPLIT_INFO'][seq]['SRC_TAB'] :
                srctab = one['NAME']
                
                if srctab not in tab2seq or srctab in tab2seq4add : 
                    # print("srctab = ", srctab)
                    is_curr_lvl = False
                    break
                
                if tab2seq[srctab] != 0 :
                    depd_seqs.add( tab2seq[srctab] )

            if not is_curr_lvl :
                next_lvl_ids.append(seq)
                # 记录每个步骤的目标表
                for one in sql_file_info['SQL_SPLIT_INFO'][seq]['TAR_TAB'] :
                    # print('TAR_TAB = ', one['NAME'])
                    tab2seq4check.add(one['NAME'])

                continue

            # 检查目标表是否冲突
            for one in sql_file_info['SQL_SPLIT_INFO'][seq]['TAR_TAB'] :
                # print('TAR_TAB = ', one['NAME'])
                tartab = one['NAME']
                if tartab not in tab2seq4check :
                    tab2seq4add[tartab] = seq + 1
                    tab2seq4check.add(tartab)
                    
                    if not depd_seqs and tartab in tab2seq:
                        depd_seqs.add( tab2seq[tartab] )
                else :
                    # print('冲突表:', tartab)
                    is_curr_lvl = False
                    break

            if is_curr_lvl :
                curr_lvl_ids[seq+1] = depd_seqs
            else :
                next_lvl_ids.append(seq)

        # print('curr_lvl_ids = ', curr_lvl_ids)
        # print('next_lvl_ids = ', next_lvl_ids)
        
        sql_depend_lvl.append(curr_lvl_ids)
        sql_seq_ids = next_lvl_ids
        tab2seq.update(tab2seq4add)
 
 
    sql_file_info['SQL_DEPEND_LVL'] = sql_depend_lvl

    print('##################################################')
    print(sql_file_info)

    return
    

if __name__ == "__main__" :

    if len(sys.argv) == 1 :
        print('Miss sql file !')
        sys.exit(-1)

    split_etl(sys.argv[1])

# 封装后 Python 代码执行命令:

python split_etl.py proc_test.prc > log.log

# 通用SQL语法解析配置文件 MARK_SQLS.syn 放在附件里。

zglanguage_20260529.zip

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档