A few years back, “we” had a batch job which produced a bunch of flat files.These files were then shipped to a customer. No problem so far. But as the database grew so did the output files, especially since this customer wanted everything in each table every time. Time for producing these flat files became a real issue…

One solution to the problem would be in this case to parallelize the file writing process. Where each process writes one part of the file, then in the end just merge them together, with a shell script.I think my original inspiration for solving this problem was when I  read this article http://www.oracle-developer.net/display.php?id=429.

Moving on to the solution…

If we want to parallelize the file writing, we must be able to have unique filenames for each slave process. Also, we might want to log the filenames and number of rows produced within each file.

First off let’s create a table with some dummy data


CREATE TABLE mytable AS
SELECT 'data1' || ROWNUM AS col1,
'data2' || ROWNUM AS col2,
'data3' || ROWNUM AS col3,
'data4' || ROWNUM AS col4
FROM dual
CONNECT BY ROWNUM <= 1000000;

Create a directory where you want your files, in my case

CREATE OR REPLACE DIRECTORY MYDIR AS 'tmp/test'

Then create a type that we can work within PL/SQL, and will be the container of the file attributes.

CREATE OR REPLACE TYPE file_ot AS OBJECT 
(filename VARCHAR2(128),
 number_of_rows   NUMBER,
 session_id       NUMBER);

CREATE OR REPLACE TYPE file_ott 
AS TABLE OF file_ot;

Next step would be to create a function that could take an SQL statement as an argument and produce a file in the desired location, which returns the file information.


FUNCTION write_file
        (p_refsql IN SYS_REFCURSOR,
         p_filename IN VARCHAR2,
         p_db_catalog IN VARCHAR2
        ) RETURN file_ot
     PIPELINED
     PARALLEL_ENABLE (PARTITION p_refsql BY ANY)

This is a parallel-enabled table function. PARALLEL_ENABLE (PARTITION p_refsql BY ANY) means that the input refcursor gets “parallelized”, and ANY means “By Oracle”, or that workload is randomly partitioned between the slave processes.

Moving on to the actual code of the function

CREATE OR REPLACE FUNCTION write_file 
		    (p_refsql    IN SYS_REFCURSOR,
                     p_filename  IN VARCHAR2,
                     p_db_catalog IN VARCHAR2
                    ) RETURN file_ott
                      PIPELINED
                      PARALLEL_ENABLE (PARTITION p_refsql BY ANY) IS
      
      TYPE row_ntt IS TABLE OF VARCHAR2(32767);
      lv_rows    row_ntt;
      lv_file    UTL_FILE.FILE_TYPE;
      lv_buffer  VARCHAR2(32767);
      lv_sid     NUMBER;
      lv_name    VARCHAR2(128);
      lv_lines   PLS_INTEGER := 0;
      
      c_eol     CONSTANT VARCHAR2(1) := chr(10); --depends..
      c_eollen  CONSTANT PLS_INTEGER := LENGTH(c_eol);
      c_maxline CONSTANT PLS_INTEGER := 30000;
      
   BEGIN
      SELECT sid INTO lv_sid FROM v$mystat WHERE ROWNUM = 1;
      lv_name := p_filename || '_' || TO_CHAR(lv_sid) || '.txt';
      lv_file := UTL_FILE.FOPEN(p_db_catalog, lv_name, 'w', lv_buffer);
 
      LOOP
        FETCH p_refsql BULK COLLECT INTO lv_rows LIMIT 1000;
 
         FOR i IN 1 .. lv_rows.COUNT LOOP
 
            IF LENGTH(lv_buffer) + c_eollen + LENGTH(lv_rows(i)) <= c_maxline 
            THEN
               lv_buffer := lv_buffer || c_eol || lv_rows(i);
            ELSE
               IF lv_buffer IS NOT NULL THEN
                  UTL_FILE.PUT_LINE(lv_file, lv_buffer);
              END IF;
               lv_buffer := lv_rows(i);
            END IF;
 
         END LOOP;
 
         lv_lines := lv_lines + lv_rows.COUNT;
 
         EXIT WHEN p_refsql%NOTFOUND;
      END LOOP;
      CLOSE p_refsql;
 
      UTL_FILE.PUT_LINE(lv_file, lv_buffer);
      UTL_FILE.FCLOSE(lv_file);
 
      PIPE ROW (file_ot(lv_name, lv_lines, lv_sid));
      RETURN;
  
   END write_file;

For each time the function is called we will loop through the refcursor. Unload that data to a file. Returning the filename, the number of rows unloaded and the session identifier to the caller.

Next step would be then to create a calling procedure. You can of course just run the SQL standalone.


CREATE OR REPLACE PROCEDURE MYPROC
                           (p_filename IN VARCHAR2,
                            p_db_catalog IN VARCHAR2,
                            p_degree IN number)
IS
 lt_file_info file_ott ;
 
 BEGIN

EXECUTE IMMEDIATE 'ALTER SESSION FORCE PARALLEL QUERY PARALLEL '||p_degree|| '';

  SELECT file_ot(p_filename,number_of_rows,session_id) 
                 BULK COLLECT INTO lt_file_info 
    FROM   TABLE(
              write_file(
                 CURSOR(
          SELECT  COL1||'|'||
                  COL2||'|'||
                  COL3||'|'||
                  COL4 as csv
          FROM MYTABLE),
                p_filename,p_db_catalog
                )) nt;                            
  END MYPROC;

The trick to produce and control the number of slave process is done by *ALTER SESSION…As in my case, I didn’t want to make everything in parallel. That’s why we have the in-parameter p_degree. The reason for this is that you have to look at each table and determine if you would benefit from doing it in parallel. By saying that I had one procedure for each table in a package, the file attributes are logged in lt_file_info and could be saved to a log table, for example.

*ALTER SESSION is one way, using a HINT second way, using the underlying object’s parallel degree a third

Time to call the procedure


BEGIN   
 MYPROC('testfile','MYDIR',2); 
END;

Looking at the result at the server, two files have been produced. Each filename has the session identifier.

/tmp/test
25193316 testfile_325.txt
22362268 testfile_222.txt

more testfile_222.txt 
data1238569|data2238569|data3238569|data4238569
data1238570|data2238570|data3238570|data4238570
data1238571|data2238571|data3238571|data4238571

So each slave produced a file with a unique name. Using this technique we did cut down the unloading time from approx 8 hours to 2 hours. In general, the parallelism was set on 4.

Over&Out