Friday, July 25, 2014

What you know about ETL process is wrong

The ETL process 
The title you have just read, is deliberately provocative. Of course not everything is false. My intention is to try to see the things from another point of view.  Don't  take anything for granted, and try to read some axioms, typical of the world of the Data Warehouse, in a critical way.
I will try to provide a different view of reality, questioning the individual letters of the ETL paradigm. It is therefore necessary to investigate in more detail the meaning of the ETL process.
We can find many definitions of the ETL process. In general, it is an expression that refers to the process of Extraction, Transformation  and Load of input data into a Synthesis System (Data Warehouse, Data Mart ...) used by the end users.
This is a very general definition, which does not helps us to understand the work that we face. A simple design can help you to understand the process.



The data that are in the structures of the Operational Systems(OLTP), are extracted, transformed and loaded into the the Data Warehouse structures.
In recent years, it has also sets another definition of the loading process. Its difference is the inversion of the "L" with the "T", that is, the implementation of the transformation phase AFTER the extraction phase.
This trend is related to the need to charge increasingly large amounts of data, and to the ability to treat this data, using the ETL tools. Many data transformations, maybe performed "on the fly", ie in memory, or with the help of temporary tables, can be problematic.
You have less problems to load the data file as it is, in a staging table, and then apply on it, this transformations.

The ambiguity of the ETL and ELT processes
As part of my considerations on the Micro ETL Foundation, the ELT approach is more in line with his philosophy. There must be a close relationship between the input data file and the Staging table. This ratio must be 1:1 and the flow must be as complete as possible.
Despite the ELT approach is better, this does not mean that it is the correct one. Of course, on the Internet you can find various articles and comments relating to the pros and cons of the two approaches.
In my view, however, the reality is different. The problem is not to decide whether to make the changes before or after loading. The problem is that both processes need to be revised. This is because, if we look carefully :

  1. The extraction step doesn't exists.
  2. It lacks the configuration and acquisition step.
  3. It is not convenient the transformation phase
  4. It is not clear how to do the loading, and where to do it.

Thus, although we can continue to speak generically of ETL (or ELT) because it is basically an acronym universally known for years, we must be aware that the name is misleading in case you want to set a baseline with the three phases into a project  Gantt, with the estimates associated.
Let us then to justify the 4 previous points.

1 - The extraction step doesn't exists
Is there a very real extraction activity in charge of the development team of the DWH? I think not. In most cases, the feeding systems are external systems that reside on mainframe, perhaps with different operating systems, and different database programming languages.
The extraction phase of the data, the "E" of the Extract word, is always in charge to the feeding system, which knows how to produce the flow. The Data Warehouse team must instead deal with two activities.

  • The activity of Acquisition or Transfer, namely the placement and storage of the input data files into well-defined folders in the DWH server. All this, with a pre-established naming convention.
  • The analysis of the contents of the data file, that is what  the feeding system must produce. This, if we're lucky. Otherwise, because generate new data files costs money,  you will have to reuse or integrate already existing data files.

The relationship with the external systems, using the transfer of data files, is used by most of the Data Warehouse projects. The CDC (Change Data Capture) situations are not so frequent, however, and do not cover the whole loading phase.
There are also rare cases, in which the DWH team builds the extraction statements and runs them directly, using a database link.
This should not be done for safety reasons, for performance reasons (who knows the indexing structure into the external systems ?) and for reasons of liability (if the data are not loaded, where is the problem?).
And also for scalability reasons. In times of budget cuts, it is increasingly common for the "IT people" to change the transactional systems or part of the source systems.
Having a source configuration which remains stable to which the external systems must adapt it, is definitely a choice that maintains the stability.

2 - It lacks the configuration and the acquisition step
The first step to be taken into account (and it is not simple) is the definition phase of the data files and their configuration in the metadata tables. It will be the feeding system to provide us the definitions using word documents, excel, pdf or other.
We must also give a unique identification of the data file, not numeric, valid for all feeding systems. It 'important that the name will be unique.
If we have a data file of financial operations, let's call it,for example, TMOV. If we have multiple data files, such as daily, monthly, quarterly, etc, let's call them DTMOV, MTMOV, QTMOV. If we have two systems that provide the daily financial operations, let's call them XDTMOV, YDTMOV to distinguish them, but we must have always a unique name as a reference.  On it we will build a primary key.
In this phase, we will have to configure all of the characteristics of the data files, not only their columnar structure.

3 - It is not convenient the transformation phase
We now analyze the letter “T”, that is the "Transform" component of the process. My opinion is that we should not talk about transformation, but of enrichment of the data.
To transform the data, means make them different from the original one: this has, as a consequence, a difficulty in the control of the data.
We must always be able to demonstrate that the data that we have received in input is identical to what we have loaded into the Data Warehouse. Immediately after the deploy into production, certainly we will have to answer to several check requests.
If the original data has been transformed, we will have to spend much time to restore the original data files (maybe already stored on tapes) and redo the tests. If we preserve the original data and enrich them with the result of the transformation, we will be able to respond more efficiently and faster.  So my suggestion is:

Keep the original data into the Staging Area tables  (and, if possible, even after).
Do not make changes to the existing data, but add the columns that contain the transformation result.
Enrichment is the right word. I execute the enrichment  by transforming or aggregating different data as consequence of the requirements.
Implement the enrichment step not as a staging phase, but as a phase of post-staging, ie only at the end of the whole loading of the Staging Area. This is because, often, the enrichment involves the use of data from other staging tables. To avoid implementing any precedence rules or supervision of arrivals, it is certainly preferable to wait for the completion of the entire staging process.

4 – How and where to load
The phase of the loading is very generic, since it does not say where to load the data. We should decide where to load immediately, because this choice will determine which, of the two fundamental approaches in the field of Data Warehouse, will be adopted.
Many years have passed, but this choice will continue to divide the international community. Innmon approach or Kimball  approach?
We want to have a comprehensive architecture of ODS (Operational Data Store) that retains more detail data and a dimensional architecture for synthesis data, or we prefer to have a single dimensional structure for both? Everyone can decide according to his own experience, your own timing and your own badget.
However, regardless of the method used, surely the first structure to be loaded is the Staging Area, which at first, will welcome the input data files. The Staging Area is a very vast topic. Just some suggestion.
The loading of the staging tables should be as simple as possible. A single direct  insertion, possibly filtered by some logical structure, from the data file into the final table. Some small "syntactic" transformation can be done, but it must be of formatting, and not of semantic.
The loading must always be preceded by the cleaning of the staging table. Do not load into a staging table, multiple data files (more days, for example) of the same type, that, for some reason, have not been loaded and they have accumulated. If you can, always process them one at a time.
If it is necessary, you can aggregate them, by hand or with an automatic mechanism, into a single data file. Do not forget that we have to perform very accurate control of these flows.
So, even a trivial control on the congruence between the number of rows loaded and those present in the data file, it will be much more difficult if the staging table contains the rows of several input streams.

Conclusion
So in conclusion, keep in mind that, in practice, ETL hides a different acronym, which can be summarized with: CALEL

  • Configuration
  • Acquisition
  • Load (Staging Area)
  • Enrichment
  • Load (Data Warehouse)

But, as CALEL  is just horrible, we can continue to call it, ETL process. All this we can represent graphically in this way:



Tuesday, July 22, 2014

Techniques to control the processing units in the ETL process

Introduction
In the loading of a Data Warehouse is important to have full control of the processing units that compose it. Each processing unit must be carefully monitored both in the detection of errors that may occur, both in the analysis of the execution times.
In this article, which uses the messaging techniques described in the slideshare [http://www.slideshare.net/jackbim/recipe-7-of-data-warehouse-a-messaging-system-for-oracle-dwh-1], I enrich the Micro ETL Foundation, by building a control system of the PL/SQL elaboration units.
By using the MEF messaging system, we have seen how to activate messages that are then stored in a log table. In the demonstration tests, it is seen as a suitable sequence of messages and a correct settings of the package variables , has provided an idea of the processing flow and the delay between a message and the other.
It could call The setting of those tests as an "excess of zeal". In fact the purpose of the messaging system was much less ambitious. It wanted provide only generic information at any moment of the ETL process, simply calling a procedure.
It is now time to take the next step, to implement an agile control system for the processing units. The goal is always the same: it must be simple and non-invasive. This means that it can plug into existing DWH systems (and not only) gradually, without changing the elaborative flow. There is no doubt, however, that if you have already set up your ETL process in a modular way, the application of the techniques that will be described, it will be simpler and more natural.
I will use concepts and definitions already given in the messaging system. Now let us concentrate on the concept of modularity, which is fundamental to the control system.

Modularity and sequencing
The concept of modularity is the basis of the techniques that will be exhibited.As we know, a complex system, and the ETL process of a Data Warehouse can be defined without doubt very complex, it can be managed and understood only if we can break its overall complexity, in less complex components. In other words: you can not have one big main program that contains thousands code lines. This is the first point.
The second point is the sequentiality. We must try to think, and you can almost always do, that each component of the process is connected to the next, and that their sequential execution leads to the final loading of the Data Warehouse. Please note, I am not saying it is not possible the parallelism, but to identify which components are completely independent of each other (so they can run in parallel), it is not an easy task; without forgetting all the problems of their synchronization.
Moreover, the parallelism also requires a specific hardware structure and specific Oracle settings. And the performance improvement,I speak from my experience, it is not so sure. I suggest, therefore, to try to apply parallelism on the objects rather than on the processes. Usually, the dimension tables may be loaded in parallel (if there are not logical connections between them), but why complicate our lives if we can reason in a simple sequential  way ? 
Recall that simplicity is a pillar of the Micro ETL Foundation. So the advice is: modularity and sequencing. Do not forget that the ETL process, is physiologically sequential in its basic pillars. You can not load a Data Mart of level 2 before loading  the Data Mart of level 1. And the Data Mart of level 1 cannot be loaded until you first load the dimensions, which in turn, cannot be loaded if you have not first  loaded the Staging Area tables, and so on.
The figure below shows the concepts  of modularity and sequencing applied to a hypothetical schedule S1. On the left we have the "logical" components of the ETL process, ie not code, but names configured in a table.On the right we have the "physical" components, ie the real programming code.



Requirements
The main requirement, as already mentioned, is to have the control of each processing unit (unit) which
constitutes the ETL process . Having control means that, for each unit, at any time, I need to know:

• when it started and when it ended.
• how long was his execution.
• if it is successful or has had some problems (exception).
• if you received an exception, what is the error that occurred
• What are the consequences in case of error.

In order to meet those requirements, it is not necessary to use very complex structures. In accordance with the MEF philosophy, I will use just a configuration table (MEF_UNIT_CFT), that allows me to enter the main characteristics of the units that make up the loading job, and a logging table (MEF_UNIT_LOT) that allows me to see the log of the executions.
The most important information present in the configuration table, along with some context information, is the continuity flag, which allows me to decide whether, in the event of an error, the error is critical (ie it must abort the job that has called the unit) or not critical (ie it must allows the running of the next unit ).
To be able logging the execution, you will have to "surround" the unit call by a procedure call that logs the beginning, and from a procedure call that logs the end.
In addition, the error situations should be treated in a uniform manner by a unique procedure, which logs the error and implements the consequences using the continuity flag.
To control the behavior of a unit means, first of all, to understand its life cycle in the global context of the job to which it belongs. To that end, we will use the the theory of the  finite-state machine, or, if we want to be a bit more modern, we will use a simplified version of the state-diagrams of the Unified Modelling Language.

The state diagram
As stated previously, the MEF control system, in order to perform its function, it must surround the execution of every elaborative unit, with a procedure call (p_start) that registers its beginning, and with a procedure call (p_end) that registers the end. In practice, the program code (we will see him clearly in the test case) must have call like:

p_start
<unit call x>
p_end
p_start
<unit call y>
p_end
...

This seems so simple, but to complicate the situation, there may be exceptions in the execution. These exceptions should be handled by a procedure call (p_exc) always present inside the unit. A state diagram is the most useful tool to understand the unit life cycle.


The Figure shows the various states and the possible state changes of the units inside a block of executions. Everything shown graphically it was  translated into PL/SQL language.
Because each unit must be preceded by a procedure call that logs the beginning of the execution, the procedure p_start places the unit in the "running" state , and set the return code to a obviously unknown ("?") value. It is important to note that after the p_start must be present only the call of the unit and no other procedure.
The unit can conclude its run in two different ways. It finishes without any problems, ie no Oracle error,or it fails. In the first case the unit will call the p_end procedure, which leads us into a "Done" terminal state with  return code = "OK". In the second case, depending on the setting of the continuity flag in the configuration table, it can behave in two different ways.
The unit ends in a definitive way and thus prevents the execution of any other unit, it switches in the "Aborted" state  and sets the return code = "NOT OK".
The unit ends with a warning, switches in the "Done" state but with  "OK (Warning)" as return code.This doesn't  prevent the execution of the next unit; In fact, in this state is again possiblecall a p_start procedure that will switch the next unit in the "Running" state.
The p_end procedure however, will do nothing, leaving the unit in the final state.Each state change that is not present in the diagram, will always give an error message.This ensures that, for distraction, you have not followed the correct sequence of calls(eg.  you have forgotten the p_end procedure or it is called more than one time, or other).

The exception management
Since the more complex change of state is related to the error situations, we explore briefly the Exception management, (of which, however, we have already seen some examples inside the messaging techniques).
In the PL/SQL language, an error situation that occurs while the program is running, it is called "exception". The error condition can be generated by the program itself (eg. Division by zero), or forced by the logic of the program (eg,. An amount that does not exceed a certain threshold).
In the latter case, the exception is explicitly reached using the RAISE_APPLICATION_ERROR statement. Regardless of the cause of the error, officially identified as internal exception or user-defined exception, the Oracle PL/SQL engine transfers the control of the program toward the exception handler of the running module (or PL/SQL block) In practice the code after the EXCEPTION keyword .
Obviously, if the EXCEPTION keyword is not present, the program will endimmediately because did not find any indication for error handling.
Let us now analyze the error propagation. If there are several nested procedures between them, an unhandled error by the most internal procedure , propagates into the caller procedure, and so forth up to the main program. If the error is handled, the  procedure will continue its regular work. (unless inside the exception handler there is the RAISE keyword or there is a software error into the exception handler).
To clarify the positioning of the exception call, we try to anticipate what will be the structure of the program code, using the next figure.


Design
The design of the unit control system, is composed of two tables, and a sequence.The MEF_CFT table is the configuration table of the processing units.The MEF_UNIT_LOT table  is the one that keeps the log of the executions of the units. The MEF_UNIT_LOT_SEQ sequence  serves to give a sequence number to each log line. 

drop table mef_unit_cft cascade constraints purge;
create table mef_unit_cft
(
  sched_cod        varchar2(30)                 not null,
  job_cod          varchar2(60)                 not null,
  unit_cod         varchar2(61)                 not null,
  sort_cnt         number                       ,
  continue_flg     number                       ,
  unit_active_flg  number                       ,
  job_active_flg   number                      
)
;

drop table mef_unit_lot cascade constraints purge;
create table mef_unit_lot
(
  seq_num          number          not null,
  day_cod          number          not null,
  sched_cod        varchar2(30)    not null,
  job_cod          varchar2(60)    not null,
  unit_cod         varchar2(61)    not null,
  exec_cnt         number          not null,
  status_cod       varchar2(60)    not null,
  return_cod       varchar2(30)    not null,
  ss_num           number          not null,
  mi_num           number          not null,
  hh_num           number          not null,
  elapsed_txt      varchar2(4000)  not null,
  errmsg_txt       varchar2(4000),
  stamp_dts        date            not null
)
;

alter table mef_unit_cft add (
  constraint mef_unit_cft_pk1
 primary key
 (job_cod, unit_cod));   

                                   
drop sequence mef_unit_lot_seq;

create sequence mef_unit_lot_seq nocache;  

Let us now see in detail these objects.

The DDW_COM_ETL_UNIT_LOT_SEQ sequence
It is the most functional of the time stamp to sort the table. Because sometimes the units begin and end in fractions of a second of each other,the time stamp might not be sufficiently discriminating.

The DDW_COM_ETL_UNIT_CFT table
This table contains the configuration information of the processing units In it you configure schedules, jobs and units.For the purpose of the control system of the elaboration units, jobs and schedules will be used in a static way, as parameters to the procedure calls.
Their management will be explained, in the future, in the control system of the jobs. As well the SORT_CNT, UNIT_ACTIVE_FLG and JOB_ACTIVE_FLG fields not havean immediate use, and we not have to set them.

  • SCHED_COD: Identifies the schedule to which the job belongs. It is alogical entity, in the sense that we have to think of it as the identifier of a job list.
  • JOB_COD: identifier of the job. It is a logical entity, in the sense that we have to think of it as the identifier of a list of elaboration units.
  • UNIT_COD: Identifier of the processing unit within the job. We can think of it as a Oracle packaged procedure.
  • SORT_CNT: Counter of the unit inside the job.
  • CONTINUE_FLG: Continuity flag. If set = 1 means that in the event of an error, the next unit can continue, if set = 0 means that the job should have an abort because the error is blocker.
  • UNIT_ACTIVE_FLG: A flag that indicates whether the unit is active.
  • JOB_ACTIVE_FLG: Flag indicating whether the job is active
The DDW_COM_ETL_UNIT_LOT table
This table stores all information relating to the executions of all processing units. Its structure is very similar to that of messaging system as regards the timing information, but in addition also retains the unit status and the final outcome of its execution.

  • SEQ_NUM: Sequential number of the line obtained from the Oracle sequence .
  • DAY_COD: day of the execution in the  YYYYMMDD (year, month, day) format
  • SCHED_COD: identifier of the schedule to which the job belongs.
  • JOB_COD: identifier of the job.
  • UNIT_COD: Identifier of the processing unit within the job.
  • EXEC_CNT: Identifier of the job execution. Every job execution should be tagged by a number, in turn extracted from a Oracle sequence.
  • STATUS_COD: State of the unit.
  • RETURN_COD: return code of the execution of the unit.
  • SS_NUM: Number of seconds consumed by the processing unit.This information, together with the two following, is a summable statistical number.
  • MI_NUM: Number of minutes consumed by the processing unit
  • HH_NUM: Number of hours consumed by the processing unit
  • ELAPSED_TXT: execution time in the HH24MISS format
  • ERRMSG_TXT: Error Message.
  • STAMP_DTS: Time stamp.

The MEF_UNIT package
This package is the core of the control system. 

 /*******************************************************************************
*   Package Specification
*******************************************************************************/
CREATE OR REPLACE package mef_unit is
type pt_work_rec is record (
   status_num number,
   lot_row mef_unit_lot%rowtype,
   cft_row mef_unit_cft%rowtype,
   fail_list_txt varchar2(4000),
   fail_unit_cnt number,
   errmsg_txt varchar2(4000),
   continue_flg number
);
pv_work_rec pt_work_rec;
function f_concat(p_curr varchar2,p_add varchar2, p_size number,p_sep varchar2)
   return varchar2;
procedure p_start(
   p_sched_cod varchar2
   ,p_job_cod varchar2
   ,p_unit_cod varchar2);
procedure p_end;
procedure p_exc(p_unit_cod varchar2,p_errmsg_txt varchar2);
procedure p_ins_unit_lot(p_row in out mef_unit_lot%rowtype);
procedure p_upd_unit_lot(p_row in out mef_unit_lot%rowtype);
procedure p_get_cft(p_sched_cod varchar2,p_job_cod varchar2
   ,p_unit_cod varchar2,p_cft_row in out mef_unit_cft%rowtype);
end;
/
sho errors

/*******************************************************************************
*   Package Body
*******************************************************************************/
CREATE OR REPLACE package body mef_unit as
pv_pkg varchar2(30) := 'mef_unit.';
pv_error EXCEPTION; 
pragma exception_init (pv_error, -20058);

function f_concat(p_curr varchar2,p_add varchar2, p_size number,p_sep varchar2)
   return varchar2 as
   v_module_cod varchar2(61) := pv_pkg||'f_concat';
v_out varchar2(4000);
begin
   if (p_curr is null) then
      v_out := substr(p_add,1,p_size);
   else
      if (length(p_curr)+length(p_add)+1 > p_size) then
         v_out := substr(p_curr||'...',1,p_size);
      else
         v_out := p_curr||p_sep||p_add;
      end if;
   end if;
   return v_out;
exception
   when pv_error then raise;
   when others then mef.p_rae(sqlerrm,v_module_cod);   
end;

procedure p_ins_unit_lot(p_row in out mef_unit_lot%rowtype) is
   pragma autonomous_transaction;
   v_module_cod varchar2(61) := pv_pkg||'p_ins_unit_lot';
begin
   insert into mef_unit_lot values p_row;
   commit;
exception
   when pv_error then raise;
   when others then mef.p_rae(sqlerrm,v_module_cod);
end;

procedure p_upd_unit_lot(p_row in out mef_unit_lot%rowtype) is
   pragma autonomous_transaction;
   v_module_cod varchar2(61) := pv_pkg||'p_upd_unit_lot';
begin
   mef.delta_time(p_row.stamp_dts,sysdate,p_row.ss_num
   ,p_row.mi_num,p_row.hh_num,p_row.elapsed_txt);
   update mef_unit_lot set
       exec_cnt      = p_row.exec_cnt,
       status_cod    = p_row.status_cod,
       return_cod    = p_row.return_cod,
       ss_num        = p_row.ss_num,
       mi_num        = p_row.mi_num,
       hh_num        = p_row.hh_num,
       elapsed_txt   = p_row.elapsed_txt,
       errmsg_txt    = p_row.errmsg_txt,
       stamp_dts     = p_row.stamp_dts
      where seq_num = p_row.seq_num;
      commit;
   exception
      when pv_error then raise;
      when others then mef.p_rae(sqlerrm,v_module_cod);
end;

procedure p_get_cft(p_sched_cod varchar2,p_job_cod varchar2
   ,p_unit_cod varchar2,p_cft_row in out mef_unit_cft%rowtype) is
   v_module_cod varchar2(61) := pv_pkg||'p_get_cft';
begin  
     select * into p_cft_row
     from mef_unit_cft
     where job_cod=p_job_cod
     and unit_cod=p_unit_cod
     --and sched_cod=p_sched_cod
     ;
exception
   when pv_error then raise;
   when others then mef.p_rae(sqlerrm,v_module_cod,mef.f_str(
   'Sched %1 job %2 unit %3 not found in mef_unit_cft (check case sensitive)',
   p_sched_cod,p_job_cod,p_unit_cod));
end; 
procedure p_init_unit (
   p_sched_cod varchar2
   ,p_job_cod varchar2
   ,p_unit_cod varchar2) is
   v_module_cod varchar2(61) := pv_pkg||'p_init_unit';
begin
   mef.pv_job_cod := p_job_cod;
   mef.pv_sched_cod := p_sched_cod;
   mef.pv_unit_cod := p_unit_cod;
  p_get_cft(
      p_sched_cod,p_job_cod,p_unit_cod,pv_work_rec.cft_row);     
      pv_work_rec.lot_row.seq_num := mef.f_get_seq_val(
      'mef_unit_lot_seq','nextval');
   pv_work_rec.lot_row.job_cod := p_job_cod;
   pv_work_rec.lot_row.sched_cod := p_sched_cod;
   pv_work_rec.lot_row.unit_cod := p_unit_cod;
   pv_work_rec.lot_row.stamp_dts := sysdate;
   pv_work_rec.lot_row.day_cod := to_char(sysdate,'yyyymmdd'); 
   pv_work_rec.lot_row.errmsg_txt := null;
   pv_work_rec.lot_row.ss_num := 0;
   pv_work_rec.lot_row.mi_num := 0;
   pv_work_rec.lot_row.hh_num := 0;
   pv_work_rec.lot_row.elapsed_txt := '?';
   pv_work_rec.lot_row.exec_cnt := nvl(mef.pv_exec_cnt,0); 
   pv_work_rec.continue_flg := 0;
exception
      when pv_error then raise;
      when others then mef.p_rae(sqlerrm,v_module_cod); 
end;

procedure p_exc_continue (p_unit_cod varchar2,p_errmsg_txt varchar2) is
   v_module_cod  varchar2(61) := pv_pkg||'p_exc_continue';
begin
   pv_work_rec.lot_row.status_cod := 'Done';
   pv_work_rec.lot_row.return_cod := 'OK (Warning)';
   pv_work_rec.lot_row.errmsg_txt := p_errmsg_txt;
   pv_work_rec.errmsg_txt :=
   f_concat(pv_work_rec.errmsg_txt,p_errmsg_txt,4000,mef.cr); 
   pv_work_rec.status_num := 3;
  p_upd_unit_lot(pv_work_rec.lot_row);  
exception
   when pv_error then raise;
   when others then mef.p_rae(sqlerrm,v_module_cod);
end;

procedure p_exc_abort (p_unit_cod varchar2,p_errmsg_txt varchar2) is
   v_module_cod  varchar2(61) := pv_pkg||'p_exc_abort';
begin
   pv_work_rec.lot_row.status_cod := 'Aborted';
   pv_work_rec.lot_row.return_cod := 'NOT OK';
   pv_work_rec.lot_row.errmsg_txt := p_errmsg_txt;
   pv_work_rec.errmsg_txt := p_errmsg_txt; 
  p_upd_unit_lot(pv_work_rec.lot_row);
   pv_work_rec.status_num := 4;
   raise_application_error (-20058, 'Module '||p_unit_cod||' aborted !');             
exception
   when pv_error then raise;
   when others then mef.p_rae(sqlerrm,v_module_cod);
end;

/*******************************************************************************
*   Entry points
*******************************************************************************/
procedure p_start (
   p_sched_cod varchar2
   ,p_job_cod varchar2
   ,p_unit_cod varchar2) is
   v_module_cod varchar2(61) := pv_pkg||'p_start';
   v_str   varchar2 (4000);
begin
   if (nvl(pv_work_rec.status_num,0) in (0,2,3)) then
      p_init_unit(p_sched_cod,p_job_cod,p_unit_cod);
      pv_work_rec.lot_row.status_cod := 'Running';
      pv_work_rec.lot_row.return_cod := '?';    
      p_ins_unit_lot(pv_work_rec.lot_row);
      pv_work_rec.status_num := 1;       
   else
      raise_application_error (-20058, 'Not allowed');
   end if;
exception
      when pv_error then raise;
      when others then mef.p_rae(sqlerrm,v_module_cod);
end;

procedure p_end is
   v_module_cod  varchar2(61) := pv_pkg||'p_end';
begin
   if (pv_work_rec.status_num = 1) then
      pv_work_rec.lot_row.status_cod := 'Done';
      pv_work_rec.lot_row.return_cod := 'OK';
     p_upd_unit_lot(pv_work_rec.lot_row);
      pv_work_rec.status_num := 2;       
   elsif (pv_work_rec.status_num = 3) then
      null;    
   else  
      raise_application_error (-20058, 'Not allowed');
   end if;
exception
      when pv_error then raise;
      when others then mef.p_rae(sqlerrm,v_module_cod);
end;

procedure p_exc(p_unit_cod varchar2,p_errmsg_txt varchar2) is
   v_module_cod  varchar2(61) := pv_pkg||'p_exc';
begin
   mef.p_send(p_unit_cod,'%1 (continue = %2)'
   ,p_errmsg_txt,pv_work_rec.cft_row.continue_flg);      
   if (nvl(pv_work_rec.status_num,0) <> 1) then
      mef.p_send(p_unit_cod,'Not allowed'); 
      raise_application_error(-20058,p_errmsg_txt);
   else
      pv_work_rec.fail_unit_cnt := nvl(pv_work_rec.fail_unit_cnt,0) + 1;
      pv_work_rec.fail_list_txt :=
      f_concat(
      pv_work_rec.fail_list_txt,p_unit_cod,2000,mef.cr);
      if (pv_work_rec.cft_row.continue_flg = 1) then
         p_exc_continue(pv_work_rec.lot_row.unit_cod,p_errmsg_txt);
      else
         p_exc_abort(pv_work_rec.lot_row.unit_cod,p_errmsg_txt);
      end if;
   end if; 
end;
end;
/
sho errors
I will give a brief description of the main procedures.

p_start
The task of the p_start procedure, is to record the start of the processing unit. In practice, the code implements the logic present in the diagram of the status changes.The initial test is related to the recognition of the current state to see if the p_start procedure is permitted at this time. If we are not in a state 0, ie, the first elaboration unit of the job, in a 2/3 state, (that is, after the correct end or the end with warning of the previous unit), it generates an abort that prevents to the process to continue.
If we are in the correct state, is called the initialization procedure of the unit and there is the setting of the status variable and of the return code variable. So all this informations are stored in the DDW_COM_ETL_UNIT_LOT table. Finally, there is the change of the unit current status.

p_exc
The procedure of the exceptions management begins immediately with the recording of this anomalous situation in the MEF_MSG_LOT table. At this point there is the test on the state, which, obviously, can only be that of unit in running (ie the state 1).
Are then updated two variables, the use of which we will see in the future, which preserve the history of the units in error.
The fail_unit_cnt is simply a counter of the units that have had problems,
The fail_list_txt variable, links using a carriage return, the name of these units.
The next test, based on the continuity flag, invokes the corresponding management procedures. Let's see.

p_exc_continue
This procedure has the task of recording the error situation, but should not block the process of working through. It then sets the state and the return code of the unit as specified by the state diagram.It will pass in the final state 3, ie ends with warning, and updates the MEF_UNIT_LOT table.

p_exc_abort
This procedure has the task to terminate the processing flow. Like the previous procedure, it will sets the ending state, it will sets the return code and updates the MEF_UNIT_LOT table, but ends, with the RAISE_APPLICATION_ERROR, in a definitive way, the running job.

p_init_unit
This procedure has the only task to initialize all global variables, extracting the information from the MEF_UNIT_CFT table basing on parameters received in input.

The UNIT_TEST package
We are now able to start our tests. To this end, we will build the UNIT_TEST package which will contain a small number of processing units. Its code is only for demonstration. Note, that each processing unit has the exception according to the standard described above, ie using the SQL statement:

when others then mef_unit.p_exc (v_module_cod, sqlerrm);

create or replace package unit_test is
procedure module1;
procedure module2;
procedure module3;
procedure module_a;
procedure module_w;
procedure module_w2;
end;
/
sho errors

create or replace package body unit_test is
pv_pkg varchar2(30) := 'unit_test.';

procedure module1 as
   v_module_cod varchar2(60) := pv_pkg||'module1';
   v_num number;
begin
   select count(*) into v_num from all_objects;
exception
   when others then mef_unit.p_exc(v_module_cod,sqlerrm);   
end;

procedure module2 as
   v_module_cod varchar2(60) := pv_pkg||'module2';
   v_num number;
begin
   select count(*) into v_num from all_objects;
exception
   when others then mef_unit.p_exc(v_module_cod,sqlerrm);   
end;
procedure module3 as
   v_module_cod varchar2(60) := pv_pkg||'module3';
   v_num number;
begin
   select count(*) into v_num from all_objects;
exception
   when others then mef_unit.p_exc(v_module_cod,sqlerrm);   
end;

procedure module_a as
   v_module_cod varchar2(60) := pv_pkg||'module_a';
   --v_gg date;
begin
    execute immediate 'delete pippo';
    --v_gg := to_date('20100140','yyyymmdd');
exception
   when others then mef_unit.p_exc(v_module_cod,sqlerrm);   
end;

procedure module_w as
   v_module_cod varchar2(60) := pv_pkg||'module_w';
   v_num number(1);
begin
    select count(*) into v_num from all_objects;
exception
   when others then mef_unit.p_exc(v_module_cod,sqlerrm);   
end;

procedure module_w2 as
   v_module_cod varchar2(60) := pv_pkg||'module_w2';
   v_gg date;
begin
    v_gg := to_date('20100140','yyyymmdd');
exception
   when others then mef_unit.p_exc(v_module_cod,sqlerrm);   
end;
end;
/
sho errors

We see the functionality of these modules:
  • MODULE1, MODULE2, MODULE3: They are procedures that finish without problems. They set into a local variable, the number of rows of an Oracle system table.This select was chosen to occupy a little time, and allows us to check the correctness of the information thunderstorms in the MEF_UNIT_LOT table .
  • MODULE_W, MODULE_W2: These procedures have some instructions that force an error, that, using the flag of continuity, is non-blocking.
  • MODULE_A: This procedure will end in failure, as the number of rows in the table is surely greater than 1 digit ,which is the constraint associated with the v_num local variable. As the flag of continuity says, the error is blocking. Remember, in your own tests, to exit and enter from SQL, between a test and the other, to reset the package variables.In addition, you must place a lot of attention to the names of units: If you execute the p_start of the X unit and you start the Y unit, the elaboration log will not be reliable.

Installation
I remember that all the code of the control system of units, can be downloaded from MEF_01:
https://drive.google.com/folderview?id=0B2dQ0EtjqAOTN3I1MU9JQmpOUEE&usp=sharing
Before you use it, you must install the base of the Micro ETL Foundation, that is the messaging system. This is the link to MEF_00:
https://drive.google.com/folderview?id=0B2dQ0EtjqAOTaU5WNmc5MkVnVFE&usp=sharing
Its installation is explained on slideshare:
http://www.slideshare.net/jackbim/recipe-7-of-data-warehouse-a-messaging-system-for-oracle-dwh-2

spool mef_unit_install

@mef_unit_ddl.sql
@mef_unit_pkg.sql
@unit_test_pkg.sql

spool off

Regarding the control system of the units, do this. You must go in SQL*Plus with the user you created/ configured in the messaging system. Then run:

SQL> @ mef_unit_install.sql

You do not need to do anything else. We're ready for the test phase.

Test1 (it is all right)
We enter into SQL*Plus, and we launch the unit_test_run1.sql SQL script. The script is very simple.

delete mef_unit_cft where job_cod='job1';

insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job1', 'unit_test.module1', 0);
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job1', 'unit_test.module2', 0);
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job1', 'unit_test.module3', 0);
commit;

begin
   mef.pv_exec_cnt := 1;
   mef_unit.p_start('s1','job1','unit_test.module1');
   unit_test.module1;
   mef_unit.p_end;
 
   mef_unit.p_start('s1','job1','unit_test.module2');
   unit_test.module2;
   mef_unit.p_end;
 
   mef_unit.p_start('s1','job1','unit_test.module3');
   unit_test.module3;
   mef_unit.p_end;
end;
/
sho errors
exit;

First of all, it configures the units of the testing job, by inserting a row for units, into the MEF_UNIT_CFT table. So then runs the three units that surely will end with a positive outcome (it only takes about 30 seconds). As you can see, each unit is limited by the start/end pair.
Now we can verify the result, by seeing the contents of the MEF_UNIT_LOT table. You can enter into SQL*Plus and run the select of the table, but for ease of viewing, I will show the result (reduced) in graphical format.



Test2 (not-blocking errors)
In this second test, we show the behavior of the control system, in case of non-blocking errors. We go into SQL*Plus and launch the unit_test_run2.sql script.

delete mef_unit_cft where job_cod='job2';

insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job2', 'unit_test.module1', 0);
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job2', 'unit_test.module_w', 1);
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
 values('s1', 'job2', 'unit_test.module_w2', 1);
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job2', 'unit_test.module3', 0);
commit;

begin
   mef.pv_exec_cnt := 2;  
   mef_unit.p_start('s1','job2','unit_test.module1');
   unit_test.module1;
   mef_unit.p_end;
  
   mef_unit.p_start('s1','job2','unit_test.module_w');
   unit_test.module_w;
   mef_unit.p_end;
  
   mef_unit.p_start('s1','job2','unit_test.module_w2');
   unit_test.module_w2;
   mef_unit.p_end;  
  
   mef_unit.p_start('s1','job2','unit_test.module3');
   unit_test.module3;
   mef_unit.p_end;
end;
/
exit;

The script is similar to the previous one, changing only the calls and the setting of the continuity flag of the unit. The final result obtained, clearly shows the errors encountered at run-time from the job2 and the fact that, not being blockers, module_w2 and module3 led to term, with successful, their executions.



Test3 (fatal error)
In this third test, we show the behavior of the control system in case of fatal error. We go into SQL*Plus and we launch the unit_test_run3.sql script.

delete MEF_UNIT_CFT where JOB_COD='job3';

insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job3', 'unit_test.module1', 0);       
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job3', 'unit_test.module_a', 0);
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job3', 'unit_test.module2', 0);
insert into mef_unit_cft(sched_cod, job_cod, unit_cod, continue_flg)
values('s1', 'job3', 'unit_test.module3', 0);  
commit;

begin
   mef.pv_exec_cnt := 3;  
   mef_unit.p_start('s1','job3','unit_test.module1');
   unit_test.module1;
   mef_unit.p_end;
  
   mef_unit.p_start('s1','job3','unit_test.module_a');
   unit_test.module_a;
   mef_unit.p_end;
  
   mef_unit.p_start('s1','job3','unit_test.module2');
   unit_test.module2;
   mef_unit.p_end;  
  
   mef_unit.p_start('s1','job3','unit_test.module3');
   unit_test.module3;
   mef_unit.p_end;
end;
/
exit;

The final result obtained, clearly shows how the module_a of the job3, having configured the continue_flg = 0, prevents the execution of the subsequent module2 and module3 units. Obviously such exception  situations were inserted automatically also in the log messages table.



Conclusions
As the messaging system, also the control system of the processing units is simple and very useful, to answer all those questions that, inevitably, we are asked in the case of problems with the loading process of the Data Warehouse. Its simplicity is based on the fact that only three steps are sufficient in an already existing code:

1. Configure the unit
2. Insert the p_start and the p_end procedure call.
3. Replace the exception-handler with the p_exc call.

Soon, we will also see some scheduling techniques.That is, based on the configuration table here described, we will launch a loading job without care to insert the p_start and p_end calls.
It will be all automatical and dynamic. We will not have a separate  main program for each job. And we will get what I theorized in the past in an my article that appeared, years ago, for Data Mart Review.
[The Infrastructural Data Warehouse]
(unfortunately the site no longer exists and has been incorporated into the Information Management site).
We will have implemented a method that, in my opinion, it is essential for an ETL process: get the clear separationbetween infrastructure code and business code of a Data Warehouse.