ETL调度系统

CREATE OR REPLACE PACKAGE PKG_ETL_CTL IS
  -- 调用主程序,调用存储过程
  PROCEDURE SP_EXEC_PROC(I_JOB_ID NUMBER);

  -- 创建新的时间周期,运行数据周期作业及作业流状态处理
  PROCEDURE SP_FLOW_RUN_DEAL;

  -- 创建新的时间周期
  PROCEDURE SP_FLOW_CREATE_NEW_PERIOD;

  -- 运行数据周期内的作业流及作业
  PROCEDURE SP_FLOW_RUN_NEW_PERIOD;

  -- 同步作业流运行状态
  PROCEDURE SP_FLOW_RUN_STATUS;

  -- 失败作业流及作业重置为未处理
  PROCEDURE SP_FLOW_RUN_ERROR_RESET;

  -- 作业状态更新
  PROCEDURE SP_JOB_RUN_STATUS
  (
    I_JOB_ID NUMBER
   ,I_ORG_ID VARCHAR2
   ,I_JOB_RUN_STATUS VARCHAR2
   ,I_JOB_RUN_INFO VARCHAR2
  );

  -- 作业流日志处理
  PROCEDURE SP_ETL_FLOW_LOG_INFO(I_FLOW_ID NUMBER);

  -- 作业日志处理
  PROCEDURE SP_ETL_JOB_LOG_INFO(I_JOB_ID NUMBER);

  PROCEDURE SP_INSERT_MONITOR_SMS
  (
    O_RESULT_FLAG OUT VARCHAR2 /*过程执行结果返回给调度 9 成功 2 失败*/
   ,O_RESULT_MSG OUT VARCHAR2 /*过程执行结果信息返回给调度*/
  );

  PROCEDURE SP_SEND_MONITOR_SMS;

  -- 获取作业流前置依赖
  FUNCTION FN_GET_FLOW_DEPEND(I_FLOW_ID NUMBER) RETURN VARCHAR2;

  -- 获取作业前置依赖
  FUNCTION FN_GET_JOB_DEPEND(I_JOB_ID NUMBER) RETURN VARCHAR2;

  -- 获取作业流下属作业运行状态
  FUNCTION FN_GET_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2;

  -- 获取下个数据日期
  FUNCTION FN_GET_NEXT_DATA_TIME(I_FLOW_ID NUMBER) RETURN DATE;

  -- 获取上级作业流运行状态
  FUNCTION FN_GET_SUPER_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2;

  -- 获取上级作业流数据开始时间
  FUNCTION FN_GET_SUPER_DATA_START_TIME(I_FLOW_ID NUMBER) RETURN DATE;

  -- 获取上级作业流数据结束时间
  FUNCTION FN_GET_SUPER_DATA_END_TIME(I_FLOW_ID NUMBER) RETURN DATE;

  -- 获取周期代码
  FUNCTION FN_GET_CYC_CODE(I_FLOW_ID VARCHAR2) RETURN VARCHAR2;
END PKG_ETL_CTL;
/
CREATE OR REPLACE PACKAGE BODY PKG_ETL_CTL IS
  /*******************************************************************
    程序名   :SP_EXEC_PROC
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 调用主程序,调用存储过程
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  PROCEDURE SP_EXEC_PROC(I_JOB_ID NUMBER) IS
    VAR_PRO_NAME        VARCHAR2(100);
    VAR_DATA_START_TIME VARCHAR2(20);
    VAR_DATA_END_TIME   VARCHAR2(20);
    VAR_SQL             VARCHAR2(4000);
    VAR_PARAMS          VARCHAR2(1000);
    VAR_ORG_ID          VARCHAR2(10);
    VAR_JOB_RUN_DESC    VARCHAR2(100);
    VAR_JOB_ERR_DESC    VARCHAR2(100);
  
  BEGIN
    -- 获取作业正在运行描述
    SELECT T.ETL_PARA_VAL INTO VAR_JOB_RUN_DESC FROM ETL_CTL_PARA T WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_RUN_DESC';
  
    -- 获取作业运行失败描述
    SELECT T.ETL_PARA_VAL INTO VAR_JOB_ERR_DESC FROM ETL_CTL_PARA T WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_ERR_DESC';
  
    -- 获取作业所调用的存储过程,数据开始时间,数据结束时间
    SELECT T.ETL_JOB_PROC
          ,TO_CHAR(A.ETL_DATA_START_TIME, 'YYYYMMDDHH24MISS')
          ,TO_CHAR(A.ETL_DATA_END_TIME, 'YYYYMMDDHH24MISS')
      INTO VAR_PRO_NAME
          ,VAR_DATA_START_TIME
          ,VAR_DATA_END_TIME
      FROM ETL_CTL_JOB_INFO T
          ,ETL_JOB_RUN_STS A
     WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
       AND T.ETL_JOB_ID = I_JOB_ID;
  
    -- 获取作业全部参数拼接到一起
    FOR LOOP_PARAM IN (SELECT T.ETL_PARA_NAME
                             ,DECODE(T.ETL_PARA_TYPE
                                    ,2
                                    ,'TO_DATE(' || T.ETL_PARA_VAL || ',''YYYYMMDDHH24MISS'')' -- 日期类型参数转化成日期格式
                                    ,T.ETL_PARA_VAL) ETL_PARA_VAL
                             ,T.ETL_PARA_TYPE
                         FROM ETL_JOB_PARA T
                        WHERE T.ETL_JOB_ID = I_JOB_ID) LOOP
      -- 获取机构号
      IF LOOP_PARAM.ETL_PARA_NAME = 'I_ORG_ID'
      THEN
        VAR_ORG_ID := LOOP_PARAM.ETL_PARA_VAL;
      END IF;
      -- 参数拼接
      VAR_PARAMS := VAR_PARAMS || LOOP_PARAM.ETL_PARA_NAME || ' => ' || LOOP_PARAM.ETL_PARA_VAL || ',';
    END LOOP;
  
    -- 参数加上输出参数(存储过程运行结果和运行信息)
    VAR_PARAMS := UPPER(VAR_PARAMS) || 'O_RESULT_FLAG => LO_RESULT_FLAG,O_RESULT_MSG => LO_RESULT_MSG';
    -- 参数替换为变量
    VAR_PARAMS := REPLACE(VAR_PARAMS, '#$I_DATA_START_TIME#', VAR_DATA_START_TIME);
  
    VAR_PARAMS := REPLACE(VAR_PARAMS, '#$I_DATA_END_TIME#', VAR_DATA_END_TIME);
  
    -- 拼接存储过程进行调用
    VAR_SQL := 'DECLARE  LO_RESULT_FLAG VARCHAR2(10);LO_RESULT_MSG VARCHAR2(300);BEGIN ' || VAR_PRO_NAME || '(' ||
               VAR_PARAMS || ');PKG_ETL_CTL.SP_JOB_RUN_STATUS(' || I_JOB_ID || ',''' || VAR_ORG_ID ||
               ''', LO_RESULT_FLAG, LO_RESULT_MSG);END;';
  
    -- 修改作业状态为正在运行
    SP_JOB_RUN_STATUS(I_JOB_ID, VAR_ORG_ID, '1', VAR_JOB_RUN_DESC);
    -- 运行存储过程
    EXECUTE IMMEDIATE VAR_SQL;
  
  EXCEPTION
    WHEN OTHERS THEN
      -- 运行出错,返回错误信息
      SP_JOB_RUN_STATUS(I_JOB_ID, VAR_ORG_ID, '2', SQLERRM);
  END;

  /*******************************************************************
    程序名   :SP_FLOW_RUN_DEAL
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 创建新的时间周期,运行数据周期作业及作业流状态处理
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  PROCEDURE SP_FLOW_RUN_DEAL IS
  BEGIN
  
    -- 创建新的时间周期
    SP_FLOW_CREATE_NEW_PERIOD;
    -- 运行时间周期内的作业
    SP_FLOW_RUN_NEW_PERIOD;
    -- 更新作业流状态
    SP_FLOW_RUN_STATUS;
    -- 失败任务重置
    SP_FLOW_RUN_ERROR_RESET;
  END;

  /*******************************************************************
    程序名   :SP_FLOW_CREATE_NEW_PERIOD
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 生成新的数据周期运行新周期的数据
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  PROCEDURE SP_FLOW_CREATE_NEW_PERIOD IS
    DTE_DATA_NEXT_TIME  DATE;
    DTE_DATA_START_TIME DATE;
    DTE_DATA_END_TIME   DATE;
  BEGIN
  
    FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID
                            ,A.ETL_NEXT_EXPIRY_TIME
                        FROM ETL_CTL_JOB_FLOW T
                            ,ETL_FLOW_RUN_STS A
                       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
                         AND T.ETL_FLOW_LEVEL = 1
                         AND T.ETL_FLOW_STATUS = 1) LOOP
      DTE_DATA_NEXT_TIME := FN_GET_NEXT_DATA_TIME(LOOP_FLOW.ETL_FLOW_ID);
      IF DTE_DATA_NEXT_TIME <= LOOP_FLOW.ETL_NEXT_EXPIRY_TIME
      THEN
        UPDATE ETL_FLOW_RUN_STS T
           SET T.ETL_NEXT_DATA_TIME = DTE_DATA_NEXT_TIME
         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
        COMMIT;
      END IF;
    END LOOP;
  
    FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID
                            ,T.ETL_DATA_SUCC_TIME
                            ,T.ETL_DATA_START_TIME
                            ,T.ETL_DATA_END_TIME
                            ,T.ETL_NEXT_DATA_TIME
                            ,T.ETL_FLOW_RUN_STATUS
                            ,A.ETL_CYC_CODE
                            ,A.ETL_FLOW_LEVEL
                            ,A.ETL_FLOW_STATUS
                        FROM ETL_FLOW_RUN_STS T
                            ,ETL_CTL_JOB_FLOW A
                       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
                         AND A.ETL_FLOW_STATUS = 1
                       ORDER BY A.ETL_FLOW_LEVEL) LOOP
      IF LOOP_FLOW.ETL_FLOW_LEVEL = 1
         AND LOOP_FLOW.ETL_FLOW_STATUS = 1
         AND LOOP_FLOW.ETL_FLOW_RUN_STATUS = 9
         AND LOOP_FLOW.ETL_DATA_SUCC_TIME < LOOP_FLOW.ETL_NEXT_DATA_TIME
      THEN
        UPDATE ETL_FLOW_RUN_STS T
           SET T.ETL_DATA_START_TIME = CASE
                                         WHEN LOOP_FLOW.ETL_CYC_CODE = '01' THEN
                                          T.ETL_DATA_SUCC_TIME + 1
                                         WHEN LOOP_FLOW.ETL_CYC_CODE = '02' THEN
                                          T.ETL_DATA_SUCC_TIME + 1 / 24 / 60 / 60
                                         WHEN LOOP_FLOW.ETL_CYC_CODE = '03' THEN
                                          T.ETL_DATA_SUCC_TIME + 1
                                       END
              ,T.ETL_DATA_END_TIME = T.ETL_NEXT_DATA_TIME
              ,T.ETL_START_TIME = NULL
              ,T.ETL_END_TIME = NULL
              ,T.ETL_FLOW_RUN_STATUS = 0
              ,T.ETL_RESET_TIME = 0
         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
        COMMIT;
      ELSIF LOOP_FLOW.ETL_FLOW_LEVEL > 1
            AND LOOP_FLOW.ETL_FLOW_STATUS = 1
            AND FN_GET_SUPER_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 0
      THEN
        DTE_DATA_START_TIME := FN_GET_SUPER_DATA_START_TIME(LOOP_FLOW.ETL_FLOW_ID);
        DTE_DATA_END_TIME   := FN_GET_SUPER_DATA_END_TIME(LOOP_FLOW.ETL_FLOW_ID);
        UPDATE ETL_FLOW_RUN_STS T
           SET T.ETL_DATA_START_TIME = DTE_DATA_START_TIME
              ,T.ETL_DATA_END_TIME = DTE_DATA_END_TIME
              ,T.ETL_START_TIME = NULL
              ,T.ETL_END_TIME = NULL
              ,T.ETL_FLOW_RUN_STATUS = 0
         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
        COMMIT;
      END IF;
    END LOOP;
  
    FOR LOOP_JOB IN (SELECT A.ETL_JOB_ID
                           ,T.ETL_DATA_START_TIME FLOW_DATA_START_TIME
                           ,T.ETL_DATA_END_TIME FLOW_DATA_END_TIME
                           ,T.ETL_FLOW_RUN_STATUS
                           ,A.ETL_DATA_START_TIME JOB_DATA_START_TIME
                           ,A.ETL_DATA_END_TIME JOB_DATA_END_TIME
                           ,B.ETL_JOB_STATUS
                       FROM ETL_FLOW_RUN_STS T
                           ,ETL_JOB_RUN_STS A
                           ,ETL_CTL_JOB_INFO B
                      WHERE T.ETL_FLOW_ID = B.ETL_FLOW_ID
                        AND A.ETL_JOB_ID = B.ETL_JOB_ID
                        AND B.ETL_JOB_STATUS = 1) LOOP
      IF LOOP_JOB.ETL_FLOW_RUN_STATUS = 0
         AND LOOP_JOB.ETL_JOB_STATUS = 1
      THEN
        UPDATE ETL_JOB_RUN_STS T
           SET T.ETL_DATA_START_TIME = LOOP_JOB.FLOW_DATA_START_TIME
              ,T.ETL_DATA_END_TIME = LOOP_JOB.FLOW_DATA_END_TIME
              ,T.ETL_START_TIME = NULL
              ,T.ETL_END_TIME = NULL
              ,T.ETL_JOB_RUN_STATUS = 0
              ,T.ETL_SESSION_ID = NULL
              ,T.ETL_LOG_DESC = NULL
         WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID;
      END IF;
    END LOOP;
    COMMIT;
  END;

  /*******************************************************************
    程序名   :SP_FLOW_RUN_NEW_PERIOD
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 运行新周期的数据
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  PROCEDURE SP_FLOW_RUN_NEW_PERIOD IS
  
  BEGIN
    FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID
                            ,T.ETL_FLOW_RUN_STATUS
                            ,A.ETL_FLOW_LEVEL
                            ,A.ETL_FLOW_STATUS
                        FROM ETL_FLOW_RUN_STS T
                            ,ETL_CTL_JOB_FLOW A
                       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
                         AND A.ETL_FLOW_STATUS = 1
                       ORDER BY A.ETL_FLOW_LEVEL) LOOP
      IF LOOP_FLOW.ETL_FLOW_LEVEL = 1
         AND LOOP_FLOW.ETL_FLOW_RUN_STATUS IN (0, 3)
         AND FN_GET_FLOW_DEPEND(LOOP_FLOW.ETL_FLOW_ID) = 1
      THEN
        UPDATE ETL_FLOW_RUN_STS T
           SET T.ETL_START_TIME = SYSDATE
              ,T.ETL_END_TIME = NULL
              ,T.ETL_FLOW_RUN_STATUS = 1
         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
        COMMIT;
      ELSIF LOOP_FLOW.ETL_FLOW_LEVEL > 1
            AND FN_GET_SUPER_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 1
            AND LOOP_FLOW.ETL_FLOW_RUN_STATUS IN (0, 3)
            AND FN_GET_FLOW_DEPEND(LOOP_FLOW.ETL_FLOW_ID) = 1
      THEN
        UPDATE ETL_FLOW_RUN_STS T
           SET T.ETL_START_TIME = SYSDATE
              ,T.ETL_END_TIME = NULL
              ,T.ETL_FLOW_RUN_STATUS = 1
         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
        COMMIT;
      END IF;
    END LOOP;
  
  END;

  /*******************************************************************
    程序名   :SP_FLOW_RUN_STATUS
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 作业流状态处理
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  PROCEDURE SP_FLOW_RUN_STATUS IS
    -- VAR_JOB_NO_SESSION    VARCHAR2(100); -- 数据库进程不存在
    VAR_JOB_OVERTIME_DESC VARCHAR2(100); -- 作业运行超时描述
    INT_JOB_OVERTIME      NUMBER; -- 作业超时告警时间
    INT_LAST_RUNTIME      NUMBER; -- 作业上次运行时间
    -- INT_JOB_DEAD_TIME     NUMBER; -- 调度作业数据库进程不存在运行判定时间
  BEGIN
    -- 获取数据库进程不存在描述
    /*SELECT T.ETL_PARA_VAL
     INTO VAR_JOB_NO_SESSION
     FROM ETL_CTL_PARA T
    WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_NO_SESSION_DESC';*/
  
    -- 获取作业运行超时告警时间
    SELECT T.ETL_PARA_VAL INTO INT_JOB_OVERTIME FROM ETL_CTL_PARA T WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_OVERTIME';
  
    -- 调度作业数据库进程不存在运行判定时间
    /*SELECT T.ETL_PARA_VAL
     INTO INT_JOB_DEAD_TIME
     FROM ETL_CTL_PARA T
    WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_DEAD_TIME';*/
  
    -- 获取作业运行超时描述
    SELECT T.ETL_PARA_VAL
      INTO VAR_JOB_OVERTIME_DESC
      FROM ETL_CTL_PARA T
     WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_OVERTIME_DESC';
  
    FOR LOOP_JOB IN (SELECT T.ETL_JOB_ID
                           ,A.ETL_OVERTIME_REM_WAY
                            -- ,T.ETL_SESSION_ID
                           ,(SYSDATE - T.ETL_START_TIME) RUNTIME
                       FROM ETL_JOB_RUN_STS T
                           ,ETL_CTL_JOB_INFO A
                      WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
                        AND T.ETL_JOB_RUN_STATUS = 1) LOOP
      -- 作业为正在运行,但数据库进程已经不存在(10分钟)的作业置为运行失败
      /*IF FN_GET_SESSION_STATUS(LOOP_JOB.ETL_SESSION_ID) = 0
         AND LOOP_JOB.RUNTIME * 24 * 60 >= INT_JOB_DEAD_TIME
      THEN
        SP_JOB_RUN_STATUS(LOOP_JOB.ETL_JOB_ID, '', 2, VAR_JOB_NO_SESSION);
        -- 超时提醒方式为超过上次运行时间
      ELS*/
      IF LOOP_JOB.ETL_OVERTIME_REM_WAY = 1
         AND INT_JOB_OVERTIME > 0
      THEN
        BEGIN
          SELECT RUNTIME
            INTO INT_LAST_RUNTIME
            FROM (SELECT T.ETL_LOGID
                        ,T.ETL_JOB_ID
                        ,(T.ETL_END_TIME - T.ETL_START_TIME) * 24 * 60 RUNTIME
                        ,ROW_NUMBER() OVER(ORDER BY T.ETL_LOGID DESC) ROW_NUM
                    FROM ETL_JOB_RUN_LOG T
                   WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID
                     AND T.ETL_JOB_RUN_STATUS = 9)
           WHERE ROW_NUM = 1;
          IF LOOP_JOB.RUNTIME - INT_LAST_RUNTIME > INT_JOB_OVERTIME
          THEN
            -- 修改作业状态为运行超时
            SP_JOB_RUN_STATUS(LOOP_JOB.ETL_JOB_ID, '', 4, VAR_JOB_OVERTIME_DESC);
          END IF;
        EXCEPTION
          WHEN OTHERS THEN
            NULL;
        END;
        -- 超时提醒方式为超过前三次的平均值
      ELSIF LOOP_JOB.ETL_OVERTIME_REM_WAY = 2
            AND INT_JOB_OVERTIME > 0
      THEN
        BEGIN
          SELECT AVG(RUNTIME)
            INTO INT_LAST_RUNTIME
            FROM (SELECT T.ETL_LOGID
                        ,T.ETL_JOB_ID
                        ,(T.ETL_END_TIME - T.ETL_START_TIME) * 24 * 60 RUNTIME
                        ,ROW_NUMBER() OVER(ORDER BY T.ETL_LOGID DESC) ROW_NUM
                    FROM ETL_JOB_RUN_LOG T
                   WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID
                     AND T.ETL_JOB_RUN_STATUS = 9)
           WHERE ROW_NUM <= 3;
          IF LOOP_JOB.RUNTIME - INT_LAST_RUNTIME > INT_JOB_OVERTIME
          THEN
            -- 修改作业状态为运行超时
            SP_JOB_RUN_STATUS(LOOP_JOB.ETL_JOB_ID, '', 4, VAR_JOB_OVERTIME_DESC);
          END IF;
        EXCEPTION
          WHEN OTHERS THEN
            NULL;
        END;
      END IF;
    END LOOP;
  
    FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID
                            ,T.ETL_DATA_END_TIME
                        FROM ETL_FLOW_RUN_STS T
                            ,ETL_CTL_JOB_FLOW A
                       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
                         AND T.ETL_FLOW_RUN_STATUS = 1
                       ORDER BY A.ETL_FLOW_LEVEL DESC) LOOP
      -- 下属作业运行成功将作业流状态置为成功
      IF FN_GET_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 9
      THEN
        UPDATE ETL_FLOW_RUN_STS T
           SET T.ETL_DATA_SUCC_TIME = LOOP_FLOW.ETL_DATA_END_TIME
              ,T.ETL_END_TIME = SYSDATE
              ,T.ETL_FLOW_RUN_STATUS = 9
         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
        COMMIT;
        SP_ETL_FLOW_LOG_INFO(LOOP_FLOW.ETL_FLOW_ID);
        -- 下属作业运行失败将作业流置为失败
      ELSIF FN_GET_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 2
      THEN
        UPDATE ETL_FLOW_RUN_STS T
           SET T.ETL_END_TIME = SYSDATE
              ,T.ETL_FLOW_RUN_STATUS = 2
         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
        COMMIT;
        SP_ETL_FLOW_LOG_INFO(LOOP_FLOW.ETL_FLOW_ID);
      END IF;
    END LOOP;
  END;

  /*******************************************************************
    程序名   :SP_FLOW_RUN_ERROR_RESET
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 失败任务重置,等待重新运行
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  PROCEDURE SP_FLOW_RUN_ERROR_RESET IS
    VAR_MAX_RESET_TIME NUMBER;
  BEGIN
    -- 获取最大任务重置次数
    SELECT T.ETL_PARA_VAL
      INTO VAR_MAX_RESET_TIME
      FROM ETL_CTL_PARA T
     WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_MAX_RESET_TIME';
  
    FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID
                            ,T.ETL_FLOW_RUN_STATUS
                            ,A.ETL_FLOW_LEVEL
                            ,T.ETL_RESET_TIME
                        FROM ETL_FLOW_RUN_STS T
                            ,ETL_CTL_JOB_FLOW A
                       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
                         AND A.ETL_FLOW_STATUS = 1
                         AND T.ETL_FLOW_RUN_STATUS = 2
                       ORDER BY A.ETL_FLOW_LEVEL) LOOP
      IF LOOP_FLOW.ETL_FLOW_LEVEL = 1
         AND (VAR_MAX_RESET_TIME = 0 OR LOOP_FLOW.ETL_RESET_TIME < VAR_MAX_RESET_TIME)
      THEN
        UPDATE ETL_FLOW_RUN_STS T
           SET /*T.ETL_START_TIME = NULL
               ,*/ T.ETL_END_TIME = NULL
              ,T.ETL_FLOW_RUN_STATUS = 3
              ,T.ETL_RESET_TIME = T.ETL_RESET_TIME + 1
         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
        COMMIT;
      ELSIF LOOP_FLOW.ETL_FLOW_LEVEL > 1
            AND FN_GET_SUPER_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 3
      THEN
        UPDATE ETL_FLOW_RUN_STS T
           SET /*T.ETL_START_TIME = NULL
               ,*/ T.ETL_END_TIME = NULL
              ,T.ETL_FLOW_RUN_STATUS = 3
         WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
        COMMIT;
      END IF;
    END LOOP;
  
    FOR LOOP_JOB IN (SELECT A.ETL_JOB_ID
                           ,T.ETL_FLOW_RUN_STATUS
                           ,A.ETL_JOB_RUN_STATUS
                       FROM ETL_FLOW_RUN_STS T
                           ,ETL_JOB_RUN_STS A
                           ,ETL_CTL_JOB_INFO B
                      WHERE T.ETL_FLOW_ID = B.ETL_FLOW_ID
                        AND A.ETL_JOB_ID = B.ETL_JOB_ID
                        AND B.ETL_JOB_STATUS = 1) LOOP
      -- 将运行失败的作业置为重新运行,等待重新运行
      IF LOOP_JOB.ETL_FLOW_RUN_STATUS = 3
         AND LOOP_JOB.ETL_JOB_RUN_STATUS = 2
      THEN
        UPDATE ETL_JOB_RUN_STS T
           SET /*T.ETL_START_TIME = NULL
               ,*/ T.ETL_END_TIME = NULL
              ,T.ETL_JOB_RUN_STATUS = 3
              ,T.ETL_SESSION_ID = NULL
              ,T.ETL_LOG_DESC = NULL
         WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID;
      END IF;
    END LOOP;
    COMMIT;
  END;

  /*******************************************************************
    程序名   :SP_JOB_RUN_STATUS
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 作业运行状态处理
    修改人   : zhuyh
    修改时间 :2013/9/30
    修改原因 : 进程ID由记录数据库进程改为记录操作系统进程
  *******************************************************************/
  PROCEDURE SP_JOB_RUN_STATUS
  (
    I_JOB_ID NUMBER
   ,I_ORG_ID VARCHAR2
   ,I_JOB_RUN_STATUS VARCHAR2
   ,I_JOB_RUN_INFO VARCHAR2
  ) IS
    -- VAR_SESSION_ID    VARCHAR2(10);
    DTE_DATA_END_TIME DATE;
    VAR_JOB_SUCC_DESC VARCHAR2(100);
    VAR_JOB_ERR_DESC  VARCHAR2(100);
  
  BEGIN
    -- 获取作业运行成功描述
    SELECT T.ETL_PARA_VAL
      INTO VAR_JOB_SUCC_DESC
      FROM ETL_CTL_PARA T
     WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_SUCC_DESC';
  
    -- 获取作业运行失败描述
    SELECT T.ETL_PARA_VAL INTO VAR_JOB_ERR_DESC FROM ETL_CTL_PARA T WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_ERR_DESC';
  
    SELECT ETL_DATA_END_TIME INTO DTE_DATA_END_TIME FROM ETL_JOB_RUN_STS T WHERE T.ETL_JOB_ID = I_JOB_ID;
  
    -- 正在运行
    IF I_JOB_RUN_STATUS = 1
    THEN
      -- del by zhuyh 2013/9/30 进程ID由记录数据库进程改为记录操作系统进程
      -- 获取数据库进程
      -- SELECT SYS_CONTEXT('USERENV', 'SID') INTO VAR_SESSION_ID FROM DUAL;
      UPDATE ETL_JOB_RUN_STS T
         SET /*T.ETL_START_TIME = SYSDATE
             ,*/ T.ETL_DATA_ORG_ID = I_ORG_ID
            ,T.ETL_JOB_RUN_STATUS = I_JOB_RUN_STATUS
             -- del by zhuyh 2013/9/30 进程ID由记录数据库进程改为记录操作系统进程
             -- ,T.ETL_SESSION_ID = VAR_SESSION_ID
            ,T.ETL_LOG_DESC = I_JOB_RUN_INFO
       WHERE T.ETL_JOB_ID = I_JOB_ID;
      COMMIT;
      -- 运行失败
    ELSIF I_JOB_RUN_STATUS = 2
    THEN
      UPDATE ETL_JOB_RUN_STS T
         SET T.ETL_END_TIME = SYSDATE
            ,T.ETL_JOB_RUN_STATUS = I_JOB_RUN_STATUS
            ,T.ETL_LOG_DESC = VAR_JOB_ERR_DESC || I_JOB_RUN_INFO
       WHERE T.ETL_JOB_ID = I_JOB_ID;
      COMMIT;
      -- 写失败日志
      SP_ETL_JOB_LOG_INFO(I_JOB_ID);
      -- 运行成功
    ELSIF I_JOB_RUN_STATUS = 9
    THEN
      UPDATE ETL_JOB_RUN_STS T
         SET T.ETL_DATA_SUCC_TIME = DTE_DATA_END_TIME
            ,T.ETL_END_TIME = SYSDATE
            ,T.ETL_JOB_RUN_STATUS = I_JOB_RUN_STATUS
            ,T.ETL_LOG_DESC = VAR_JOB_SUCC_DESC || I_JOB_RUN_INFO
       WHERE T.ETL_JOB_ID = I_JOB_ID;
      COMMIT;
      -- 写失败日志
      SP_ETL_JOB_LOG_INFO(I_JOB_ID);
    END IF;
  END;

  /*******************************************************************
    程序名   :SP_ETL_FLOW_LOG_INFO
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 记录作业流运行日志
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  PROCEDURE SP_ETL_FLOW_LOG_INFO(I_FLOW_ID NUMBER) IS
    DTE_DATA_START_TIME DATE;
    DTE_DATA_END_TIME   DATE;
    INT_COUNT           NUMBER;
    VAR_ETL_LOGID       VARCHAR2(30); -- 日志序号在每个数据周期内排序
  BEGIN
    SELECT T.ETL_DATA_START_TIME
          ,T.ETL_DATA_END_TIME
      INTO DTE_DATA_START_TIME
          ,DTE_DATA_END_TIME
      FROM ETL_FLOW_RUN_STS T
     WHERE T.ETL_FLOW_ID = I_FLOW_ID;
  
    -- 检查该数据周期有没有运行过
    SELECT COUNT(*)
      INTO INT_COUNT
      FROM ETL_FLOW_RUN_LOG T
     WHERE T.ETL_FLOW_ID = I_FLOW_ID
       AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME
       AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME;
  
    -- 未运行过的使用数据开始时间从新编号
    IF INT_COUNT = 0
    THEN
      VAR_ETL_LOGID := TO_CHAR(DTE_DATA_START_TIME, 'YYYYMMDDHH24MISS') ||
                       TO_CHAR(DTE_DATA_END_TIME, 'YYYYMMDDHH24MISS') || '01';
    ELSE
      -- 运行过的用最大编号加1
      SELECT MAX(ETL_LOGID)
        INTO VAR_ETL_LOGID
        FROM ETL_FLOW_RUN_LOG T
       WHERE T.ETL_FLOW_ID = I_FLOW_ID
         AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME
         AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME;
    
      VAR_ETL_LOGID := VAR_ETL_LOGID + 1;
    END IF;
  
    -- 记日志表
    INSERT INTO ETL_FLOW_RUN_LOG
      (ETL_LOGID
      ,ETL_FLOW_ID
      ,ETL_DATA_START_TIME
      ,ETL_DATA_END_TIME
      ,ETL_START_TIME
      ,ETL_END_TIME
      ,ETL_FLOW_RUN_STATUS
      ,ETL_LOG_DESC)
      SELECT VAR_ETL_LOGID
            ,T.ETL_FLOW_ID
            ,T.ETL_DATA_START_TIME
            ,T.ETL_DATA_END_TIME
            ,T.ETL_START_TIME
            ,T.ETL_END_TIME
            ,T.ETL_FLOW_RUN_STATUS
            ,T.ETL_LOG_DESC
        FROM ETL_FLOW_RUN_STS T
       WHERE T.ETL_FLOW_ID = I_FLOW_ID;
    COMMIT;
  END;

  /*******************************************************************
    程序名   :SP_ETL_JOB_LOG_INFO
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 记录作业运行日志
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  PROCEDURE SP_ETL_JOB_LOG_INFO(I_JOB_ID NUMBER) IS
    DTE_DATA_START_TIME DATE;
    DTE_DATA_END_TIME   DATE;
    INT_COUNT           NUMBER;
    VAR_ETL_LOGID       VARCHAR2(30); -- 日志序号在每个数据周期内排序
  BEGIN
    SELECT T.ETL_DATA_START_TIME
          ,T.ETL_DATA_END_TIME
      INTO DTE_DATA_START_TIME
          ,DTE_DATA_END_TIME
      FROM ETL_JOB_RUN_STS T
     WHERE T.ETL_JOB_ID = I_JOB_ID;
  
    -- 检测该数据周期任务有没有运行过
    SELECT COUNT(*)
      INTO INT_COUNT
      FROM ETL_JOB_RUN_LOG T
     WHERE T.ETL_JOB_ID = I_JOB_ID
       AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME
       AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME;
  
    -- 未运行过的作业使用数据结束时间重新编号
    IF INT_COUNT = 0
    THEN
      VAR_ETL_LOGID := TO_CHAR(DTE_DATA_START_TIME, 'YYYYMMDDHH24MISS') ||
                       TO_CHAR(DTE_DATA_END_TIME, 'YYYYMMDDHH24MISS') || '01';
    ELSE
      -- 运行过的作业用最大编号加1
      SELECT MAX(ETL_LOGID)
        INTO VAR_ETL_LOGID
        FROM ETL_JOB_RUN_LOG T
       WHERE T.ETL_JOB_ID = I_JOB_ID
         AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME
         AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME;
    
      VAR_ETL_LOGID := VAR_ETL_LOGID + 1;
    END IF;
  
    -- 记日志表
    INSERT INTO ETL_JOB_RUN_LOG
      (ETL_LOGID
      ,ETL_JOB_ID
      ,ETL_DATA_START_TIME
      ,ETL_DATA_END_TIME
      ,ETL_DATA_ORG_ID
      ,ETL_START_TIME
      ,ETL_END_TIME
      ,ETL_JOB_RUN_STATUS
      ,ETL_LOG_DESC)
      SELECT VAR_ETL_LOGID
            ,T.ETL_JOB_ID
            ,T.ETL_DATA_START_TIME
            ,T.ETL_DATA_END_TIME
            ,T.ETL_DATA_ORG_ID
            ,T.ETL_START_TIME
            ,T.ETL_END_TIME
            ,T.ETL_JOB_RUN_STATUS
            ,T.ETL_LOG_DESC
        FROM ETL_JOB_RUN_STS T
       WHERE T.ETL_JOB_ID = I_JOB_ID;
    COMMIT;
  END;

  /*******************************************************************
    程序名   :SP_INSERT_JOB_FAIL_SMS
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 生成失败作业短信
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  PROCEDURE SP_INSERT_MONITOR_SMS
  (
    O_RESULT_FLAG OUT VARCHAR2 /*过程执行结果返回给调度 9 成功 2 失败*/
   ,O_RESULT_MSG OUT VARCHAR2 /*过程执行结果信息返回给调度*/
  ) IS
    V_DTE_RUN_BEGIN_DT DATE; /*程序每一步骤运行开始*/
    V_DTE_RUN_END_DT   DATE; /*程序每一步骤运行结束时间*/
    V_INT_STEP         NUMBER := 0; /*程序执行步骤*/
    /*步骤描述信息*/
    V_VAR_STEP_DESC VARCHAR2(1000);
    /*步骤所执行的DML类型*/
    V_VAR_STEP_DML_TYPE VARCHAR2(10);
    /*受影响行数*/
    V_INT_ROW_CNT INTEGER := 0;
    /*过程名称*/
    V_VAR_PROC_NAME VARCHAR2(70) := 'PKG_SEND_SMS.SP_INSERT_SMS';
  
  BEGIN
  
    V_INT_STEP          := V_INT_STEP + 1; /*第一步骤*/
    V_VAR_STEP_DESC     := V_INT_STEP || '.0:作业运行失败发送短信给运营人员 ';
    V_VAR_STEP_DML_TYPE := 'INSERT'; /*操作类型*/
  
    /*DML开始运行时间*/
    V_DTE_RUN_BEGIN_DT := SYSDATE;
  
    /*执行相应的SQL语句*/
    FOR LOOP_JOB IN (SELECT C.MOBLIE_PHONE
                           ,T.ETL_LOGID
                           ,A.ETL_JOB_ID
                           ,A.ETL_JOB_NAME
                           ,A.ETL_JOB_DESC
                           ,T.ETL_DATA_START_TIME
                           ,T.ETL_DATA_END_TIME
                           ,T.ETL_DATA_ORG_ID
                           ,T.ETL_LOG_DESC
                       FROM ETL_JOB_RUN_LOG T
                           ,ETL_CTL_JOB_INFO A
                           ,ETL_SEND_SMS_LIST C
                      WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
                        AND T.ETL_JOB_RUN_STATUS = 2
                        AND T.ETL_SEND_FLAG = 0
                        AND C.SEND_MONITOR_SMS = 1) LOOP
      INSERT INTO ETL_SEND_SMS_LOG
        (ETL_LOGID
        ,ETL_JOB_ID
        ,MOBLIE_PHONE
        ,SMS_CONTENT
        ,SMS_LEVEL
        ,SEND_TIME
        ,ETL_DATE)
      VALUES
        (LOOP_JOB.ETL_LOGID
        ,LOOP_JOB.ETL_JOB_ID
        ,LOOP_JOB.MOBLIE_PHONE
        ,'作业ID号[' || LOOP_JOB.ETL_JOB_ID || '] ,作业名称[' || LOOP_JOB.ETL_JOB_NAME || '],作业描述[' || LOOP_JOB.ETL_JOB_DESC ||
         '],数据周期[' || TO_CHAR(LOOP_JOB.ETL_DATA_START_TIME, 'YYYYMMDDHH24MISS') || '-' ||
         TO_CHAR(LOOP_JOB.ETL_DATA_END_TIME, 'YYYYMMDDHH24MISS') || '],机构号[' || LOOP_JOB.ETL_DATA_ORG_ID || '],运行描述[' ||
         LOOP_JOB.ETL_LOG_DESC || ']'
        ,1
        ,TRUNC(SYSDATE)
        ,SYSDATE);
      /*获取受影响行数*/
      V_INT_ROW_CNT := V_INT_ROW_CNT + 1;
      UPDATE ETL_JOB_RUN_LOG T
         SET T.ETL_SEND_FLAG = 1
       WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID
         AND T.ETL_LOGID = LOOP_JOB.ETL_LOGID;
      COMMIT; ---提交DML操作
    END LOOP;
    /*DML运行结束时间*/
    V_DTE_RUN_END_DT := SYSDATE;
  
    /*记录成功的日志信息*/
    IF V_INT_ROW_CNT > 0
    THEN
      PKG_PUBLIC.SP_ETL_LOAD_DML_LOG(V_INT_STEP
                                    ,SYSDATE /*数据开始日期*/
                                    ,SYSDATE /*数据结束日期*/
                                    ,1 /*机构代码*/
                                    ,V_VAR_PROC_NAME /*存储过程名称*/
                                    ,V_VAR_STEP_DESC /*操作步骤描述*/
                                    ,V_VAR_STEP_DML_TYPE /*操作类型*/
                                    ,V_INT_ROW_CNT /*受影响行数*/
                                    ,1 /*执行结果*/
                                    ,V_DTE_RUN_BEGIN_DT /*运行开始时间*/
                                    ,V_DTE_RUN_END_DT /*运行结束时间*/
                                    ,'' /*运行结果详细信息*/);
    END IF;
    /*整个过程执行成功*/
    O_RESULT_FLAG := 9;
    /*整个过程运行结果描述信息*/
    O_RESULT_MSG := '';
  
    /*异常处理部分*/
  EXCEPTION
    WHEN OTHERS THEN
      ---回滚DML操作
      ROLLBACK;
      O_RESULT_FLAG := 2; ----失败
      O_RESULT_MSG  := SQLERRM;
      ---记录异常日志信息
      PKG_PUBLIC.SP_ETL_LOAD_DML_LOG(V_INT_STEP
                                    ,SYSDATE /*数据开始日期*/
                                    ,SYSDATE /*数据结束日期*/
                                    ,1 /*机构代码*/
                                    ,V_VAR_PROC_NAME /*存储过程名称*/
                                    ,V_VAR_STEP_DESC /*操作步骤描述*/
                                    ,V_VAR_STEP_DML_TYPE /*操作步骤类型*/
                                    ,V_INT_ROW_CNT /*返回的受影响行数*/
                                    ,0 /*运行结果 0 失败; 1 成功*/
                                    ,V_DTE_RUN_BEGIN_DT /*运行开始时间*/
                                    ,V_DTE_RUN_END_DT /*运行结束时间*/
                                    ,SQLERRM /*运行结果详细信息*/);
  END;

  /*******************************************************************
    程序名   :SP_SEND_JOB_FAIL_SMS
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 发送失败作业短信
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  PROCEDURE SP_SEND_MONITOR_SMS IS
    V_VAR_RESULT_FLAG   CHAR(1);
    V_VAR_RESULT_MSG    CHAR(300);
    V_VAR_RETURN_STATUS INT;
  
    CURSOR C_DATA IS
      SELECT T.ETL_LOGID
            ,T.ETL_JOB_ID
            ,T.MOBLIE_PHONE
            ,T.SMS_CONTENT
            ,T.SMS_LEVEL
            ,T.SEND_TIME
        FROM ETL_SEND_SMS_LOG T
       WHERE T.SEND_STATUS = 0;
    V_USER_CODE VARCHAR2(100) DEFAULT 'MIS';
    V_PASSWORD  VARCHAR2(100) DEFAULT 'MIS#2013';
  BEGIN
  
    SP_INSERT_MONITOR_SMS(V_VAR_RESULT_FLAG, V_VAR_RESULT_MSG);
  
    IF V_VAR_RESULT_FLAG = 9
    THEN
    
      FOR CC_DATA IN C_DATA LOOP
      
        PKG_SMS_INTERFACE.SEND_SMS(V_USER_CODE
                                  ,V_PASSWORD
                                  ,CC_DATA.MOBLIE_PHONE
                                  ,CC_DATA.SMS_CONTENT
                                  ,CC_DATA.SMS_LEVEL
                                  ,CC_DATA.SEND_TIME
                                  ,V_VAR_RETURN_STATUS);
        --发送成功,更新标志
        IF V_VAR_RETURN_STATUS > 0
        THEN
          UPDATE ETL_SEND_SMS_LOG T
             SET T.SEND_STATUS = '1'
                ,T.RECEIVE_STATUS = V_VAR_RETURN_STATUS
           WHERE T.ETL_LOGID = CC_DATA.ETL_LOGID
             AND T.ETL_JOB_ID = CC_DATA.ETL_JOB_ID
             AND T.MOBLIE_PHONE = CC_DATA.MOBLIE_PHONE;
        END IF;
      END LOOP;
    END IF;
    COMMIT;
  END;

  /*******************************************************************
    程序名   :FN_GET_FLOW_DEPEND
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 获取作业流前置依赖
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  FUNCTION FN_GET_FLOW_DEPEND(I_FLOW_ID NUMBER) RETURN VARCHAR2 IS
    VAR_DEP_STS VARCHAR2(100) := 1;
  
  BEGIN
    FOR LOOP_DEP IN (SELECT A.ETL_DATA_END_TIME
                           ,NVL(B.ETL_DATA_SUCC_TIME, DATE '1900-1-1') ETL_DEP_SUCC_TIME
                           ,PKG_ETL_CTL.FN_GET_CYC_CODE(T.ETL_FLOW_ID) ETL_CYC_CODE
                           ,PKG_ETL_CTL.FN_GET_CYC_CODE(T.ETL_DEPD_FLOW_ID) DEP_CYC_CODE
                       FROM ETL_CTL_FLOW_DEPD T
                           ,ETL_FLOW_RUN_STS A
                           ,ETL_FLOW_RUN_STS B
                      WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
                        AND T.ETL_DEPD_FLOW_ID = B.ETL_FLOW_ID
                        AND T.ETL_FLOW_ID = I_FLOW_ID) LOOP
      -- 数据结束时间大于所依赖的数据成功时间
      IF (LOOP_DEP.ETL_DATA_END_TIME > LOOP_DEP.ETL_DEP_SUCC_TIME AND LOOP_DEP.ETL_CYC_CODE = LOOP_DEP.DEP_CYC_CODE)
         OR (LOOP_DEP.ETL_DATA_END_TIME >= TRUNC(LOOP_DEP.ETL_DEP_SUCC_TIME) AND LOOP_DEP.ETL_CYC_CODE <> '02' AND
         LOOP_DEP.DEP_CYC_CODE = '02')
         OR (LOOP_DEP.ETL_DATA_END_TIME > LOOP_DEP.ETL_DEP_SUCC_TIME AND LOOP_DEP.DEP_CYC_CODE <> '02')
      THEN
        VAR_DEP_STS := 0;
        RETURN VAR_DEP_STS;
      END IF;
    END LOOP;
    RETURN VAR_DEP_STS;
  EXCEPTION
    WHEN OTHERS THEN
      RETURN NULL;
  END;

  /*******************************************************************
    程序名   :FN_GET_JOB_DEPEND
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 获取作业前置依赖
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  FUNCTION FN_GET_JOB_DEPEND(I_JOB_ID NUMBER) RETURN VARCHAR2 IS
    VAR_DEP_STS VARCHAR2(100) := 1;
  
  BEGIN
    FOR LOOP_DEP IN (SELECT A.ETL_DATA_END_TIME
                           ,NVL(B.ETL_DATA_SUCC_TIME, DATE '1900-1-1') ETL_DEP_SUCC_TIME
                       FROM ETL_CTL_JOB_DEPD T
                           ,ETL_JOB_RUN_STS A
                           ,ETL_JOB_RUN_STS B
                      WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
                        AND T.ETL_DEPD_JOB_ID = B.ETL_JOB_ID
                        AND T.ETL_JOB_ID = I_JOB_ID) LOOP
      -- 数据结束时间大于所依赖的数据成功时间
      IF LOOP_DEP.ETL_DATA_END_TIME > LOOP_DEP.ETL_DEP_SUCC_TIME
      THEN
        VAR_DEP_STS := 0;
        RETURN VAR_DEP_STS;
      END IF;
    END LOOP;
    RETURN VAR_DEP_STS;
  END;

  /*******************************************************************
    程序名   :FN_GET_FLOW_RUN_STATUS
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 获取作业流运行状态
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  FUNCTION FN_GET_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2 IS
    VAR_FLOW_RUN_STATUS VARCHAR2(100) := 1;
    VAR_CHILD_FLAG      CHAR(1);
    INT_STS_NOT_9       NUMBER;
    INT_STS_0_3         NUMBER;
    INT_STS_1_4         NUMBER;
    INT_STS_2           NUMBER;
  BEGIN
    SELECT T.ETL_CHILD_FLAG INTO VAR_CHILD_FLAG FROM ETL_CTL_JOB_FLOW T WHERE T.ETL_FLOW_ID = I_FLOW_ID;
  
    -- 不成功的数量
    IF VAR_CHILD_FLAG = 0
    THEN
      SELECT COUNT(*)
        INTO INT_STS_NOT_9
        FROM ETL_FLOW_RUN_STS T
            ,ETL_CTL_JOB_FLOW A
       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
         AND A.ETL_FLOW_STATUS = 1
         AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID
         AND ETL_FLOW_RUN_STATUS <> 9;
    
      -- 正在运行的数量
      SELECT COUNT(*)
        INTO INT_STS_1_4
        FROM ETL_FLOW_RUN_STS T
            ,ETL_CTL_JOB_FLOW A
       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
         AND A.ETL_FLOW_STATUS = 1
         AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID
         AND ETL_FLOW_RUN_STATUS IN (1, 4);
    
      -- 满足条件但未运行的数量
      SELECT COUNT(*)
        INTO INT_STS_0_3
        FROM ETL_FLOW_RUN_STS T
            ,ETL_CTL_JOB_FLOW A
       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
         AND A.ETL_FLOW_STATUS = 1
         AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID
         AND ETL_FLOW_RUN_STATUS IN (0, 3)
         AND FN_GET_FLOW_DEPEND(T.ETL_FLOW_ID) = 1;
    
      -- 运行失败的数量
      SELECT COUNT(*)
        INTO INT_STS_2
        FROM ETL_FLOW_RUN_STS T
            ,ETL_CTL_JOB_FLOW A
       WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
         AND A.ETL_FLOW_STATUS = 1
         AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID
         AND ETL_FLOW_RUN_STATUS = 2;
    ELSIF VAR_CHILD_FLAG = 1
    THEN
      SELECT COUNT(*)
        INTO INT_STS_NOT_9
        FROM ETL_JOB_RUN_STS T
            ,ETL_CTL_JOB_INFO A
       WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
         AND A.ETL_JOB_STATUS = 1
         AND A.ETL_FLOW_ID = I_FLOW_ID
         AND ETL_JOB_RUN_STATUS <> 9;
    
      -- 正在运行的数量
      SELECT COUNT(*)
        INTO INT_STS_1_4
        FROM ETL_JOB_RUN_STS T
            ,ETL_CTL_JOB_INFO A
       WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
         AND A.ETL_JOB_STATUS = 1
         AND A.ETL_FLOW_ID = I_FLOW_ID
         AND ETL_JOB_RUN_STATUS IN (1, 4);
    
      -- 满足条件但未运行的数量
      SELECT COUNT(*)
        INTO INT_STS_0_3
        FROM ETL_JOB_RUN_STS T
            ,ETL_CTL_JOB_INFO A
       WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
         AND A.ETL_JOB_STATUS = 1
         AND A.ETL_FLOW_ID = I_FLOW_ID
         AND ETL_JOB_RUN_STATUS IN (0, 3)
         AND FN_GET_FLOW_DEPEND(T.ETL_JOB_ID) = 1;
    
      -- 运行失败的数量
      SELECT COUNT(*)
        INTO INT_STS_2
        FROM ETL_JOB_RUN_STS T
            ,ETL_CTL_JOB_INFO A
       WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
         AND A.ETL_JOB_STATUS = 1
         AND A.ETL_FLOW_ID = I_FLOW_ID
         AND ETL_JOB_RUN_STATUS = 2;
    END IF;
  
    -- 不成功的数量为0,则全部成功
    IF INT_STS_NOT_9 = 0
    THEN
      VAR_FLOW_RUN_STATUS := 9;
      -- 不成功的不为0,正在运行的为0,运行失败的数量大于0
    ELSIF INT_STS_1_4 = 0
          AND INT_STS_0_3 = 0
          AND INT_STS_2 > 0
    THEN
      VAR_FLOW_RUN_STATUS := 2;
    END IF;
  
    RETURN VAR_FLOW_RUN_STATUS;
  END;

  /*******************************************************************
    程序名   :FN_GET_NEXT_DATA_TIME
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 获取下个数据时间
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  FUNCTION FN_GET_NEXT_DATA_TIME(I_FLOW_ID NUMBER) RETURN DATE IS
    VAR_CTL_CYC_CODE   VARCHAR2(2);
    VAR_SQL            VARCHAR2(1000);
    VAR_FREQ_TIME      VARCHAR2(100);
    VAR_SRC_DB         VARCHAR2(100);
    DTE_SRC_SYS_TIME   DATE;
    DTE_NEXT_DATA_TIME DATE;
    DTE_DATA_SUCC_TIME DATE;
  
  BEGIN
    -- 获取运行周期、源数据库,下次运行时间、数据成功时间
    SELECT T.ETL_CYC_CODE
          ,T.ETL_SRC_DB
          ,NVL(A.ETL_NEXT_DATA_TIME, A.ETL_DATA_END_TIME)
          ,NVL(A.ETL_DATA_SUCC_TIME, A.ETL_DATA_END_TIME)
      INTO VAR_CTL_CYC_CODE
          ,VAR_SRC_DB
          ,DTE_NEXT_DATA_TIME
          ,DTE_DATA_SUCC_TIME
      FROM ETL_CTL_JOB_FLOW T
          ,ETL_FLOW_RUN_STS A
     WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
       AND T.ETL_FLOW_ID = I_FLOW_ID;
  
    -- 非本数据库
    VAR_SRC_DB := VAR_SRC_DB || 'dual';
  
    -- 每天运行的作业流
    IF VAR_CTL_CYC_CODE = '01'
    THEN
      VAR_FREQ_TIME := 1;
      VAR_SQL       := 'SELECT TRUNC(SYSDATE) FROM ' || VAR_SRC_DB;
      EXECUTE IMMEDIATE VAR_SQL
        INTO DTE_SRC_SYS_TIME;
      DTE_NEXT_DATA_TIME := DTE_DATA_SUCC_TIME + VAR_FREQ_TIME;
      IF DTE_NEXT_DATA_TIME < DTE_SRC_SYS_TIME
      THEN
        RETURN DTE_NEXT_DATA_TIME;
      ELSE
        RETURN DTE_DATA_SUCC_TIME;
      END IF;
      -- 每十分钟运行的作业流
    ELSIF VAR_CTL_CYC_CODE = '02'
    THEN
      VAR_FREQ_TIME := 1 / 24 / 6;
      VAR_SQL       := 'SELECT SYSDATE - 1 / 24 / 6 FROM ' || VAR_SRC_DB;
      EXECUTE IMMEDIATE VAR_SQL
        INTO DTE_SRC_SYS_TIME;
    
      WHILE DTE_NEXT_DATA_TIME < DTE_SRC_SYS_TIME LOOP
        DTE_NEXT_DATA_TIME := DTE_NEXT_DATA_TIME + VAR_FREQ_TIME;
      END LOOP;
      RETURN DTE_NEXT_DATA_TIME;
    
    ELSIF VAR_CTL_CYC_CODE = '03'
    THEN
      VAR_FREQ_TIME := 1;
      VAR_SQL       := 'SELECT TRUNC(SYSDATE) FROM ' || VAR_SRC_DB;
      EXECUTE IMMEDIATE VAR_SQL
        INTO DTE_SRC_SYS_TIME;
      DTE_NEXT_DATA_TIME := ADD_MONTHS(DTE_DATA_SUCC_TIME, VAR_FREQ_TIME);
      IF DTE_NEXT_DATA_TIME < DTE_SRC_SYS_TIME
      THEN
        RETURN DTE_NEXT_DATA_TIME;
      ELSE
        RETURN DTE_DATA_SUCC_TIME;
      END IF;
    END IF;
  END;

  /*******************************************************************
    程序名   :FN_GET_SUPER_FLOW_RUN_STATUS
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 获取上级作业流运行状态
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  FUNCTION FN_GET_SUPER_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2 IS
    VAR_SUPER_FLOW_RUN_STATUS VARCHAR2(10);
  BEGIN
    SELECT ETL_FLOW_RUN_STATUS
      INTO VAR_SUPER_FLOW_RUN_STATUS
      FROM ETL_CTL_JOB_FLOW T
          ,ETL_FLOW_RUN_STS A
     WHERE T.ETL_SUPER_FLOW_ID = A.ETL_FLOW_ID
       AND T.ETL_FLOW_ID = I_FLOW_ID;
    RETURN VAR_SUPER_FLOW_RUN_STATUS;
  END;

  /*******************************************************************
    程序名   :FN_GET_SUPER_DATA_START_TIME
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 获取上级作业流数据开始时间
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  FUNCTION FN_GET_SUPER_DATA_START_TIME(I_FLOW_ID NUMBER) RETURN DATE IS
    VAR_SUPER_FLOW_DATA_START_TIME DATE;
  BEGIN
    SELECT A.ETL_DATA_START_TIME
      INTO VAR_SUPER_FLOW_DATA_START_TIME
      FROM ETL_CTL_JOB_FLOW T
          ,ETL_FLOW_RUN_STS A
     WHERE T.ETL_SUPER_FLOW_ID = A.ETL_FLOW_ID
       AND T.ETL_FLOW_ID = I_FLOW_ID;
    RETURN VAR_SUPER_FLOW_DATA_START_TIME;
  END;

  /*******************************************************************
    程序名   :FN_GET_SUPER_DATA_END_TIME
    创建人   : zhuyh
    创建时间 : 2013/8/20
    功能描述 : 获取上级作业流数据结束时间
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  FUNCTION FN_GET_SUPER_DATA_END_TIME(I_FLOW_ID NUMBER) RETURN DATE IS
    VAR_SUPER_FLOW_DATA_END_TIME DATE;
  BEGIN
    SELECT A.ETL_DATA_END_TIME
      INTO VAR_SUPER_FLOW_DATA_END_TIME
      FROM ETL_CTL_JOB_FLOW T
          ,ETL_FLOW_RUN_STS A
     WHERE T.ETL_SUPER_FLOW_ID = A.ETL_FLOW_ID
       AND T.ETL_FLOW_ID = I_FLOW_ID;
    RETURN VAR_SUPER_FLOW_DATA_END_TIME;
  END;

  /*******************************************************************
    程序名   :FN_GET_CYC_CODE
    创建人   : zhuyh
    创建时间 : 2013/8/26
    功能描述 : 获取周期代码
    修改人   :
    修改时间 :
    修改原因 :
  *******************************************************************/
  FUNCTION FN_GET_CYC_CODE(I_FLOW_ID VARCHAR2) RETURN VARCHAR2 IS
    VAR_CYC_CODE VARCHAR2(100);
  BEGIN
    SELECT T.ETL_CYC_CODE INTO VAR_CYC_CODE FROM ETL_CTL_JOB_FLOW T WHERE T.ETL_FLOW_ID = I_FLOW_ID;
    RETURN VAR_CYC_CODE;
  END;

END PKG_ETL_CTL;
/

编程技巧