Re: dynamically created cursor doesn't work for parallel pipelined functions
Date: Sat, 4 Dec 2010 06:48:05 -0800 (PST)
Message-ID: <e99c92a2-ed2b-4e21-ac4f-174f65bb911b_at_j25g2000yqa.googlegroups.com>
a solution to start with - but i have to get around some dependencies:
drop table parallel_test;
drop type MyDoit;
drop type BaseDoit;
CREATE TABLE parallel_test (
id NUMBER(10),
description VARCHAR2(50)
);
BEGIN
FOR i IN 1 .. 50000 LOOP
INSERT INTO parallel_test (id, description)
VALUES (i, 'Description or ' || i);
END LOOP;
COMMIT;
END;
/
create or replace type BaseDoit as object (
id number, member procedure doit( p_sids in out nocopy ton, p_counts in out nocopy ton)
) not final;
/
create or replace type body BaseDoit as
member procedure doit(
p_sids in out nocopy ton, p_counts in out nocopy ton) is begin dbms_output.put_line('BaseDoit.doit() invoked'); end;
end;
/
- Define a strongly typed REF CURSOR type.
CREATE OR REPLACE PACKAGE parallel_ptf_api AS
TYPE t_parallel_test_row IS RECORD (
id1 NUMBER(10),
desc1 VARCHAR2(50),
id2 NUMBER(10),
desc2 VARCHAR2(50),
sid NUMBER
);
TYPE t_parallel_test_tab IS TABLE OF t_parallel_test_row;
TYPE t_parallel_test_ref_cursor IS REF CURSOR RETURN t_parallel_test_row;
FUNCTION test_ptf (p_cursor IN t_parallel_test_ref_cursor) RETURN t_parallel_test_tab PIPELINED PARALLEL_ENABLE(PARTITION p_cursor BY any);
END parallel_ptf_api;
/
SHOW ERRORS
CREATE OR REPLACE PACKAGE BODY parallel_ptf_api AS
FUNCTION test_ptf (p_cursor IN t_parallel_test_ref_cursor)
RETURN t_parallel_test_tab PIPELINED
PARALLEL_ENABLE(PARTITION p_cursor BY any)
IS
l_row t_parallel_test_row;
BEGIN
LOOP
FETCH p_cursor INTO l_row; EXIT WHEN p_cursor%NOTFOUND; select userenv('SID') into l_row.sid from dual; PIPE ROW (l_row);
END LOOP;
RETURN;
END test_ptf;
END parallel_ptf_api;
/
SHOW ERRORS
PROMPT
PROMPT Serial Execution
PROMPT ================
SELECT sid, count(*)
FROM TABLE(parallel_ptf_api.test_ptf(CURSOR(SELECT t1.id, t1.description, t2.id, t2.description, null
FROM parallel_test t1, parallel_test t2 where t1.id = t2.id ) ) ) t2
GROUP BY sid;
PROMPT
PROMPT Parallel Execution
PROMPT ==================
SELECT sid, count(*)
FROM TABLE(parallel_ptf_api.test_ptf(CURSOR(SELECT /*+ parallel(t1,5) */ t1.id, t1.description, t2.id, t2.description, null
FROM parallel_test t1, parallel_test t2 where t1.id = t2.id ) ) ) t2
GROUP BY sid;
PROMPT
PROMPT Parallel Execution 2
PROMPT ==================
set serveroutput on;
declare
v_sids ton := ton(); v_counts ton := ton(); -- v_cur parallel_ptf_api.t_parallel_test_ref_cursor; v_cur sys_refcursor; procedure OpenCursor(p_refCursor out sys_refcursor) is begin open p_refCursor for 'SELECT /*+ parallel(t1,5) */ t1.id, t1.description, t2.id, t2.description, null FROM parallel_test t1, parallel_test t2 where t1.id = t2.id'; end;
begin
OpenCursor(v_cur);
SELECT sid, count(*) bulk collect into v_sids, v_counts FROM TABLE(parallel_ptf_api.test_ptf(v_cur)) t2 GROUP BY sid; for i in v_sids.FIRST.. v_sids.LAST loop dbms_output.put_line (v_sids(i) || ', ' || v_counts(i)); end loop;
end;
/
PROMPT
PROMPT Parallel Execution 3
PROMPT ==================
set serveroutput on;
declare
procedure CreateMyDoit is cmd varchar2(4096 char); begin cmd := 'create or replace type MyDoit under BaseDoit ( ' || ' overriding member procedure doit( ' || ' p_sids in out nocopy ton, ' || ' p_counts in out nocopy ton) ' || ' )'; execute immediate cmd; cmd := 'create or replace type body MyDoit as ' || ' overriding member procedure doit( ' || ' p_sids in out nocopy ton, ' || ' p_counts in out nocopy ton) ' || ' is ' || ' begin ' || ' dbms_output.put_line(''MyDoit.doit() invoked''); ' || ' SELECT sid, count(*) bulk collect into p_sids, p_counts ' || ' FROM TABLE(parallel_ptf_api.test_ptf(CURSOR( ' || ' SELECT /*+ parallel(t1,5) */ t1.id, t1.description, t2.id, t2.description, null ' || ' FROM parallel_test t1, parallel_test t2 ' || ' where t1.id = t2.id ' || ' ))) ' || ' GROUP BY sid; ' || ' end; ' || ' end; '; execute immediate cmd; end;
begin
CreateMyDoit;
end;
/
declare
v_sids ton := ton(); v_counts ton := ton(); instance BaseDoit;
begin
instance := MyDoit(1);
instance.doit(v_sids, v_counts);
for i in v_sids.FIRST.. v_sids.LAST loop dbms_output.put_line (v_sids(i) || ', ' || v_counts(i)); end loop;
end;
/
The output is:
SQL> _at_test
Table dropped.
Type dropped.
Type dropped.
Table created.
PL/SQL procedure successfully completed.
Type created.
Type body created.
Package created.
No errors.
Package body created.
No errors.
Serial Execution
SID COUNT(*)
---------- ----------
649 50000
Parallel Execution
SID COUNT(*)
---------- ----------
457 10124 390 10012 555 9970 389 9924 603 9970
Parallel Execution 2
649, 50000
PL/SQL procedure successfully completed.
Parallel Execution 3
PL/SQL procedure successfully completed.
MyDoit.doit() invoked
468, 9970 483, 9924 389, 10012 368, 10124 341, 9970
PL/SQL procedure successfully completed.
SQL> Received on Sat Dec 04 2010 - 08:48:05 CST