A few years back, “we” had a batch job which produced a bunch of flat files.These files were then shipped to a customer. No problem so far. But as the database grew so did the output files, especially since this customer wanted everything in each table every time. Time for producing these flat files became a real issue…
One solution to the problem would be in this case to parallelize the file writing process. Where each process writes one part of the file, then in the end just merge them together, with a shell script.I think my original inspiration for solving this problem was when I read this article http://www.oracle-developer.net/display.php?id=429.
Moving on to the solution…
If we want to parallelize the file writing, we must be able to have unique filenames for each slave process. Also, we might want to log the filenames and number of rows produced within each file.
First off let’s create a table with some dummy data
CREATE TABLE mytable AS SELECT 'data1' || ROWNUM AS col1, 'data2' || ROWNUM AS col2, 'data3' || ROWNUM AS col3, 'data4' || ROWNUM AS col4 FROM dual CONNECT BY ROWNUM <= 1000000;
Create a directory where you want your files, in my case
CREATE OR REPLACE DIRECTORY MYDIR AS 'tmp/test'
Then create a type that we can work within PL/SQL, and will be the container of the file attributes.
CREATE OR REPLACE TYPE file_ot AS OBJECT (filename VARCHAR2(128), number_of_rows NUMBER, session_id NUMBER); CREATE OR REPLACE TYPE file_ott AS TABLE OF file_ot;
Next step would be to create a function that could take an SQL statement as an argument and produce a file in the desired location, which returns the file information.
FUNCTION write_file (p_refsql IN SYS_REFCURSOR, p_filename IN VARCHAR2, p_db_catalog IN VARCHAR2 ) RETURN file_ot PIPELINED PARALLEL_ENABLE (PARTITION p_refsql BY ANY)
This is a parallel-enabled table function. PARALLEL_ENABLE (PARTITION p_refsql BY ANY) means that the input refcursor gets “parallelized”, and ANY means “By Oracle”, or that workload is randomly partitioned between the slave processes.
Moving on to the actual code of the function
CREATE OR REPLACE FUNCTION write_file (p_refsql IN SYS_REFCURSOR, p_filename IN VARCHAR2, p_db_catalog IN VARCHAR2 ) RETURN file_ott PIPELINED PARALLEL_ENABLE (PARTITION p_refsql BY ANY) IS TYPE row_ntt IS TABLE OF VARCHAR2(32767); lv_rows row_ntt; lv_file UTL_FILE.FILE_TYPE; lv_buffer VARCHAR2(32767); lv_sid NUMBER; lv_name VARCHAR2(128); lv_lines PLS_INTEGER := 0; c_eol CONSTANT VARCHAR2(1) := chr(10); --depends.. c_eollen CONSTANT PLS_INTEGER := LENGTH(c_eol); c_maxline CONSTANT PLS_INTEGER := 30000; BEGIN SELECT sid INTO lv_sid FROM v$mystat WHERE ROWNUM = 1; lv_name := p_filename || '_' || TO_CHAR(lv_sid) || '.txt'; lv_file := UTL_FILE.FOPEN(p_db_catalog, lv_name, 'w', lv_buffer); LOOP FETCH p_refsql BULK COLLECT INTO lv_rows LIMIT 1000; FOR i IN 1 .. lv_rows.COUNT LOOP IF LENGTH(lv_buffer) + c_eollen + LENGTH(lv_rows(i)) <= c_maxline THEN lv_buffer := lv_buffer || c_eol || lv_rows(i); ELSE IF lv_buffer IS NOT NULL THEN UTL_FILE.PUT_LINE(lv_file, lv_buffer); END IF; lv_buffer := lv_rows(i); END IF; END LOOP; lv_lines := lv_lines + lv_rows.COUNT; EXIT WHEN p_refsql%NOTFOUND; END LOOP; CLOSE p_refsql; UTL_FILE.PUT_LINE(lv_file, lv_buffer); UTL_FILE.FCLOSE(lv_file); PIPE ROW (file_ot(lv_name, lv_lines, lv_sid)); RETURN; END write_file;
For each time the function is called we will loop through the refcursor. Unload that data to a file. Returning the filename, the number of rows unloaded and the session identifier to the caller.
Next step would be then to create a calling procedure. You can of course just run the SQL standalone.
CREATE OR REPLACE PROCEDURE MYPROC (p_filename IN VARCHAR2, p_db_catalog IN VARCHAR2, p_degree IN number) IS lt_file_info file_ott ; BEGIN EXECUTE IMMEDIATE 'ALTER SESSION FORCE PARALLEL QUERY PARALLEL '||p_degree|| ''; SELECT file_ot(p_filename,number_of_rows,session_id) BULK COLLECT INTO lt_file_info FROM TABLE( write_file( CURSOR( SELECT COL1||'|'|| COL2||'|'|| COL3||'|'|| COL4 as csv FROM MYTABLE), p_filename,p_db_catalog )) nt; END MYPROC;
The trick to produce and control the number of slave process is done by *ALTER SESSION…As in my case, I didn’t want to make everything in parallel. That’s why we have the in-parameter p_degree. The reason for this is that you have to look at each table and determine if you would benefit from doing it in parallel. By saying that I had one procedure for each table in a package, the file attributes are logged in lt_file_info and could be saved to a log table, for example.
*ALTER SESSION is one way, using a HINT second way, using the underlying object’s parallel degree a third
Time to call the procedure
BEGIN MYPROC('testfile','MYDIR',2); END;
Looking at the result at the server, two files have been produced. Each filename has the session identifier.
/tmp/test 25193316 testfile_325.txt 22362268 testfile_222.txt more testfile_222.txt data1238569|data2238569|data3238569|data4238569 data1238570|data2238570|data3238570|data4238570 data1238571|data2238571|data3238571|data4238571
So each slave produced a file with a unique name. Using this technique we did cut down the unloading time from approx 8 hours to 2 hours. In general, the parallelism was set on 4.