Re: dynamically created cursor doesn't work for parallel pipelined functions

From: Frank Bergemann <FBergemann_at_web.de>
Date: Sat, 4 Dec 2010 06:48:05 -0800 (PST)
Message-ID: <e99c92a2-ed2b-4e21-ac4f-174f65bb911b_at_j25g2000yqa.googlegroups.com>




a solution to start with - but i have to get around some dependencies:

drop table parallel_test;

drop type MyDoit;

drop type BaseDoit;

CREATE TABLE parallel_test (
  id NUMBER(10),
  description VARCHAR2(50)
);

BEGIN
  FOR i IN 1 .. 50000 LOOP
    INSERT INTO parallel_test (id, description)     VALUES (i, 'Description or ' || i);
  END LOOP;
  COMMIT;
END;
/

create or replace type BaseDoit as object (

	id number,
	member procedure doit(
		p_sids in out nocopy ton,
		p_counts in out nocopy ton)

) not final;
/

create or replace type body BaseDoit as

    member procedure doit(

		p_sids in out nocopy ton,
		p_counts in out nocopy ton)
	is
	begin
		dbms_output.put_line('BaseDoit.doit() invoked');
	end;

end;
/
  • Define a strongly typed REF CURSOR type.

CREATE OR REPLACE PACKAGE parallel_ptf_api AS

  TYPE t_parallel_test_row IS RECORD (
    id1 NUMBER(10),
    desc1 VARCHAR2(50),
    id2 NUMBER(10),
    desc2 VARCHAR2(50),
    sid NUMBER
  );

  TYPE t_parallel_test_tab IS TABLE OF t_parallel_test_row;

  TYPE t_parallel_test_ref_cursor IS REF CURSOR RETURN t_parallel_test_row;

  FUNCTION test_ptf (p_cursor IN t_parallel_test_ref_cursor)     RETURN t_parallel_test_tab PIPELINED     PARALLEL_ENABLE(PARTITION p_cursor BY any);

END parallel_ptf_api;
/

SHOW ERRORS CREATE OR REPLACE PACKAGE BODY parallel_ptf_api AS

  FUNCTION test_ptf (p_cursor IN t_parallel_test_ref_cursor)     RETURN t_parallel_test_tab PIPELINED     PARALLEL_ENABLE(PARTITION p_cursor BY any)   IS
    l_row t_parallel_test_row;
  BEGIN
    LOOP

      FETCH p_cursor
      INTO  l_row;
      EXIT WHEN p_cursor%NOTFOUND;

	  select userenv('SID') into l_row.sid from dual;

      PIPE ROW (l_row);

    END LOOP;
    RETURN;
  END test_ptf;

END parallel_ptf_api;
/

SHOW ERRORS PROMPT
PROMPT Serial Execution

PROMPT ================

SELECT sid, count(*)
FROM TABLE(parallel_ptf_api.test_ptf(CURSOR(SELECT t1.id, t1.description, t2.id, t2.description, null
                                              FROM   parallel_test t1,
parallel_test t2
												where t1.id = t2.id
                                             )
                                      )
            ) t2

GROUP BY sid;

PROMPT
PROMPT Parallel Execution

PROMPT ==================

SELECT sid, count(*)
FROM TABLE(parallel_ptf_api.test_ptf(CURSOR(SELECT /*+ parallel(t1,5) */ t1.id, t1.description, t2.id, t2.description, null
                                              FROM   parallel_test t1,
parallel_test t2
												where t1.id = t2.id
                                             )
                                       )
            ) t2

GROUP BY sid;

PROMPT
PROMPT Parallel Execution 2

PROMPT ==================

set serveroutput on;

declare

	v_sids ton := ton();
	v_counts ton := ton();
--	v_cur parallel_ptf_api.t_parallel_test_ref_cursor;
	v_cur sys_refcursor;

	procedure OpenCursor(p_refCursor out sys_refcursor)
	is
	begin
		open p_refCursor  for 'SELECT /*+ parallel(t1,5) */ t1.id,
t1.description, t2.id, t2.description, null
                                              FROM   parallel_test t1,
parallel_test t2
												where t1.id = t2.id';
	end;

begin

        OpenCursor(v_cur);

	SELECT sid, count(*) bulk collect into v_sids, v_counts
	FROM   TABLE(parallel_ptf_api.test_ptf(v_cur)) t2
	GROUP BY sid;

	for i in v_sids.FIRST.. v_sids.LAST loop
		dbms_output.put_line (v_sids(i) || ', ' || v_counts(i));
	end loop;

end;
/

PROMPT
PROMPT Parallel Execution 3

PROMPT ==================

set serveroutput on;

declare

	procedure CreateMyDoit
	is
		cmd varchar2(4096 char);
	begin
		cmd := 'create or replace type MyDoit under BaseDoit ( ' ||
					' overriding member procedure doit( ' ||
					'			p_sids in out nocopy ton,  ' ||
					'			p_counts in out nocopy ton) ' ||
					' )';
		execute immediate cmd;

		cmd := 'create or replace type body MyDoit as ' ||
					'	overriding member procedure doit( ' ||
					'		p_sids in out nocopy ton, ' ||
					'		p_counts in out nocopy ton) ' ||
					'	is ' ||
					'	begin ' ||
					'		dbms_output.put_line(''MyDoit.doit() invoked''); ' ||

					'		SELECT sid, count(*) bulk collect into p_sids, p_counts ' ||
					'		FROM   TABLE(parallel_ptf_api.test_ptf(CURSOR( ' ||
					'							SELECT /*+ parallel(t1,5) */ t1.id, t1.description,
t2.id, t2.description, null ' ||
					'							FROM   parallel_test t1, parallel_test t2 ' ||
					'							where t1.id = t2.id ' ||
                    '                      ))) ' ||
					'		GROUP BY sid; ' ||


					'	end; ' ||
					' end; ';
		execute immediate cmd;

	end;

begin

        CreateMyDoit;

end;
/

declare

	v_sids ton := ton();
	v_counts ton := ton();

	instance BaseDoit;

begin

        instance := MyDoit(1);

        instance.doit(v_sids, v_counts);

	for i in v_sids.FIRST.. v_sids.LAST loop
		dbms_output.put_line (v_sids(i) || ', ' || v_counts(i));
	end loop;

end;
/



The output is:

SQL> _at_test

Table dropped.

Type dropped.

Type dropped.

Table created.

PL/SQL procedure successfully completed.

Type created.

Type body created.

Package created.

No errors.

Package body created.

No errors.

Serial Execution


       SID COUNT(*)
---------- ----------

       649 50000

Parallel Execution


       SID COUNT(*)
---------- ----------

       457	10124
       390	10012
       555	 9970
       389	 9924
       603	 9970


Parallel Execution 2



649, 50000

PL/SQL procedure successfully completed.

Parallel Execution 3


PL/SQL procedure successfully completed.

MyDoit.doit() invoked

468, 9970
483, 9924
389, 10012
368, 10124
341, 9970

PL/SQL procedure successfully completed.

SQL> Received on Sat Dec 04 2010 - 08:48:05 CST

Original text of this message