Home » Server Options » Streams & AQ » Problems on Oracle Streams AQ Examples (Oracle 10.1.0.2 HP-UX)
Problems on Oracle Streams AQ Examples [message #333931] Mon, 14 July 2008 22:33
cowhand
Messages: 1
Registered: July 2008
Junior Member
I am a newbie on Oracle Streams Advanced Queues.
Now I am learning a sample in oracle docs,and here is the URL
http://download.oracle.com/docs/cd/B14117_01/server.101/b10727/capappde.htm#1006084
But at the last step,I only get a new row in emp_del,but there is no row in AQ$STREAMS_QUEUE_TABLE.And it returns "no more messages" after I exec the command "EXEC emp_dq('scott')".
Anyone can help me?
Here is my scripts and parameters.

show parameters 

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
O7_DICTIONARY_ACCESSIBILITY          boolean     FALSE
__shared_pool_size                   big integer 4912M
active_instance_count                integer
aq_tm_processes                      integer     2
archive_lag_target                   integer     0
asm_diskgroups                       string
asm_diskstring                       string
asm_power_limit                      integer     1
audit_file_dest                      string      /oracle/OraHome/rdbms/audit
audit_sys_operations                 boolean     FALSE
audit_trail                          string      NONE

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
background_core_dump                 string      partial
background_dump_dest                 string      /oracle/OraHome/admin/tes
                                                 t/bdump
backup_tape_io_slaves                boolean     FALSE
bitmap_merge_area_size               integer     1048576
blank_trimming                       boolean     FALSE
buffer_pool_keep                     string
buffer_pool_recycle                  string
circuits                             integer
cluster_database                     boolean     FALSE
cluster_database_instances           integer     1

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
cluster_interconnects                string
commit_point_strength                integer     1
compatible                           string      10.1.0.2.0
control_file_record_keep_time        integer     7
control_files                        string      /dev/vg_ora/rlv_control1, /dev
                                                 /vg_ora/rlv_control2, /dev/vg_
                                                 ora/rlv_control3
core_dump_dest                       string      /oracle/OraHome/admin/tes
                                                 t/cdump
cpu_count                            integer     8
create_bitmap_area_size              integer     8388608

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
create_stored_outlines               string
cursor_sharing                       string      EXACT
cursor_space_for_time                boolean     FALSE
db_16k_cache_size                    big integer 0
db_2k_cache_size                     big integer 0
db_32k_cache_size                    big integer 0
db_4k_cache_size                     big integer 0
db_8k_cache_size                     big integer 0
db_block_buffers                     integer     0
db_block_checking                    boolean     FALSE
db_block_checksum                    boolean     TRUE

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
db_block_size                        integer     8192
db_cache_advice                      string      ON
db_cache_size                        big integer 1008M
db_create_file_dest                  string
db_create_online_log_dest_1          string
db_create_online_log_dest_2          string
db_create_online_log_dest_3          string
db_create_online_log_dest_4          string
db_create_online_log_dest_5          string
db_domain                            string
db_file_multiblock_read_count        integer     16

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
db_file_name_convert                 string
db_files                             integer     200
db_flashback_retention_target        integer     1440
db_keep_cache_size                   big integer 32M
db_name                              string      test
db_recovery_file_dest                string
db_recovery_file_dest_size           big integer 0
db_recycle_cache_size                big integer 0
db_unique_name                       string      test
db_writer_processes                  integer     1
dbwr_io_slaves                       integer     0

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
ddl_wait_for_locks                   boolean     FALSE
dg_broker_config_file1               string      /oracle/OraHome/dbs/dr1te
                                                 st.dat
dg_broker_config_file2               string      /oracle/OraHome/dbs/dr2te
                                                 st.dat
dg_broker_start                      boolean     FALSE
disk_asynch_io                       boolean     TRUE
dispatchers                          string      (PROTOCOL=TCP) (SERVICE=tes
                                                 tXDB)
distributed_lock_timeout             integer     60
dml_locks                            integer     2440

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
drs_start                            boolean     FALSE
enqueue_resources                    integer     2660
event                                string
fal_client                           string
fal_server                           string
fast_start_io_target                 integer     0
fast_start_mttr_target               integer     0
fast_start_parallel_rollback         string      LOW
file_mapping                         boolean     FALSE
fileio_network_adapters              string
filesystemio_options                 string      asynch

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
fixed_date                           string
gc_files_to_locks                    string
gcs_server_processes                 integer     0
global_context_pool_size             string
global_names                         boolean     TRUE
hash_area_size                       integer     131072
hi_shared_memory_address             integer     0
hpux_sched_noage                     integer     0
hs_autoregister                      boolean     TRUE
ifile                                file
instance_groups                      string

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
instance_name                        string      test
instance_number                      integer     0
instance_type                        string      RDBMS
java_max_sessionspace_size           integer     0
java_pool_size                       big integer 256M
java_soft_sessionspace_limit         integer     0
job_queue_processes                  integer     10
large_pool_size                      big integer 256M
ldap_directory_access                string      NONE
license_max_sessions                 integer     0
license_max_users                    integer     0

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
license_sessions_warning             integer     0
local_listener                       string
lock_name_space                      string
lock_sga                             boolean     FALSE
log_archive_config                   string
log_archive_dest                     string
log_archive_dest_1                   string
log_archive_dest_10                  string
log_archive_dest_2                   string
log_archive_dest_3                   string
log_archive_dest_4                   string

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
log_archive_dest_5                   string
log_archive_dest_6                   string
log_archive_dest_7                   string
log_archive_dest_8                   string
log_archive_dest_9                   string
log_archive_dest_state_1             string      enable
log_archive_dest_state_10            string      enable
log_archive_dest_state_2             string      enable
log_archive_dest_state_3             string      enable
log_archive_dest_state_4             string      enable
log_archive_dest_state_5             string      enable

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
log_archive_dest_state_6             string      enable
log_archive_dest_state_7             string      enable
log_archive_dest_state_8             string      enable
log_archive_dest_state_9             string      enable
log_archive_duplex_dest              string
log_archive_format                   string      %t_%s_%r.dbf
log_archive_local_first              boolean     TRUE
log_archive_max_processes            integer     2
log_archive_min_succeed_dest         integer     1
log_archive_start                    boolean     FALSE
log_archive_trace                    integer     0

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
log_buffer                           integer     2097152
log_checkpoint_interval              integer     0
log_checkpoint_timeout               integer     1800
log_checkpoints_to_alert             boolean     FALSE
log_file_name_convert                string
logmnr_max_persistent_sessions       integer     1
max_commit_propagation_delay         integer     700
max_dispatchers                      integer
max_dump_file_size                   string      UNLIMITED
max_enabled_roles                    integer     150
max_shared_servers                   integer

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
nls_calendar                         string
nls_comp                             string
nls_currency                         string
nls_date_format                      string
nls_date_language                    string
nls_dual_currency                    string
nls_iso_currency                     string
nls_language                         string      SIMPLIFIED CHINESE
nls_length_semantics                 string      BYTE
nls_nchar_conv_excp                  string      FALSE
nls_numeric_characters               string

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
nls_sort                             string
nls_territory                        string      CHINA
nls_time_format                      string
nls_time_tz_format                   string
nls_timestamp_format                 string
nls_timestamp_tz_format              string
object_cache_max_size_percent        integer     10
object_cache_optimal_size            integer     102400
olap_page_pool_size                  big integer 0
open_cursors                         integer     300
open_links                           integer     4

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
open_links_per_instance              integer     4
optimizer_dynamic_sampling           integer     2
optimizer_features_enable            string      10.1.0
optimizer_index_caching              integer     0
optimizer_index_cost_adj             integer     100
optimizer_mode                       string      ALL_ROWS
os_authent_prefix                    string      ops$
os_roles                             boolean     FALSE
parallel_adaptive_multi_user         boolean     TRUE
parallel_automatic_tuning            boolean     FALSE
parallel_execution_message_size      integer     2152

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
parallel_instance_group              string
parallel_max_servers                 integer     160
parallel_min_percent                 integer     0
parallel_min_servers                 integer     0
parallel_server                      boolean     FALSE
parallel_server_instances            integer     1
parallel_threads_per_cpu             integer     2
pga_aggregate_target                 big integer 1000M
plsql_code_type                      string      INTERPRETED
plsql_compiler_flags                 string      INTERPRETED, NON_DEBUG
plsql_debug                          boolean     FALSE

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
plsql_native_library_dir             string
plsql_native_library_subdir_count    integer     0
plsql_optimize_level                 integer     2
plsql_v2_compatibility               boolean     FALSE
plsql_warnings                       string      DISABLE:ALL
pre_page_sga                         boolean     FALSE
processes                            integer     500
query_rewrite_enabled                string      TRUE
query_rewrite_integrity              string      enforced
rdbms_server_dn                      string
read_only_open_delayed               boolean     FALSE

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
recovery_parallelism                 integer     0
remote_archive_enable                string      true
remote_dependencies_mode             string      TIMESTAMP
remote_listener                      string
remote_login_passwordfile            string      EXCLUSIVE
remote_os_authent                    boolean     FALSE
remote_os_roles                      boolean     FALSE
replication_dependency_tracking      boolean     TRUE
resource_limit                       boolean     FALSE
resource_manager_plan                string
resumable_timeout                    integer     0

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
rollback_segments                    string
serial_reuse                         string      DISABLE
service_names                        string      test
session_cached_cursors               integer     0
session_max_open_files               integer     10
sessions                             integer     555
sga_max_size                         big integer 6512M
sga_target                           big integer 0
shadow_core_dump                     string      partial
shared_memory_address                integer     0
shared_pool_reserved_size            big integer 257530265

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
shared_pool_size                     big integer 4912M
shared_server_sessions               integer
shared_servers                       integer     1
skip_unusable_indexes                boolean     TRUE
smtp_out_server                      string
sort_area_retained_size              integer     0
sort_area_size                       integer     65536
sp_name                              string      test
spfile                               string      /dev/vg_ora/rlv_spfile
sql92_security                       boolean     FALSE
sql_trace                            boolean     FALSE

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
sql_version                          string      NATIVE
sqltune_category                     string      DEFAULT
standby_archive_dest                 string      ?/dbs/arch
standby_file_management              string      MANUAL
star_transformation_enabled          string      FALSE
statistics_level                     string      TYPICAL
streams_pool_size                    big integer 32M
tape_asynch_io                       boolean     TRUE
thread                               integer     0
timed_os_statistics                  integer     0
timed_statistics                     boolean     TRUE

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
trace_enabled                        boolean     TRUE
tracefile_identifier                 string
transactions                         integer     610
transactions_per_rollback_segment    integer     5
undo_management                      string      AUTO
undo_retention                       integer     900
undo_tablespace                      string      UNDOTBS1
use_indirect_data_buffers            boolean     FALSE
user_dump_dest                       string      /oracle/OraHome/admin/tes
                                                 t/udump
utl_file_dir                         string

NAME                                 TYPE        VALUE
------------------------------------ ----------- ------------------------------
workarea_size_policy                 string      AUTO




/************************* BEGINNING OF SCRIPT ******************************

Step 1 Show Output and Spool Results
Run SET ECHO ON and specify the spool file for the script. Check the spool file for errors after you run this script.

*/

SET ECHO ON
SPOOL streams_setup_catapp.out

/*

Step 2 Create the hr.emp_del Table
Connect to cpap.net as the hr user.

*/

CONNECT scott/tiger

/*

Create the hr.emp_del table. The shape of the emp_del table is the same as the employees table, except for one added timestamp column that will record the date when a row is inserted into the emp_del table.

*/
drop table employees;
CREATE TABLE employees( 
  employee_id    NUMBER(6), 
  first_name     VARCHAR2(20), 
  last_name      VARCHAR2(25), 
  email          VARCHAR2(25), 
  phone_number   VARCHAR2(20), 
  hire_date      DATE, 
  job_id         VARCHAR2(10), 
  salary         NUMBER(8,2), 
  commission_pct NUMBER(2,2), 
  manager_id     NUMBER(6), 
  department_id  NUMBER(4));

CREATE UNIQUE INDEX employees_id_pk ON employees (employee_id);

ALTER TABLE employees ADD (CONSTRAINT employees_id_pk PRIMARY KEY (employee_id));

drop table emp_del;
CREATE TABLE emp_del( 
  employee_id    NUMBER(6), 
  first_name     VARCHAR2(20), 
  last_name      VARCHAR2(25), 
  email          VARCHAR2(25), 
  phone_number   VARCHAR2(20), 
  hire_date      DATE, 
  job_id         VARCHAR2(10), 
  salary         NUMBER(8,2), 
  commission_pct NUMBER(2,2), 
  manager_id     NUMBER(6), 
  department_id  NUMBER(4),
  timestamp      DATE);

CREATE UNIQUE INDEX emp_del_id_pk ON emp_del (employee_id);

ALTER TABLE emp_del ADD (CONSTRAINT emp_del_id_pk PRIMARY KEY (employee_id));

/*

Step 3 Set Up Users at cpap.net
Connect to cpap.net as SYS user.

*/
 
CONNECT / AS SYSDBA

/*


Create the Streams administrator named strmadmin and grant this user the necessary privileges. These privileges enable the user to manage queues, execute subprograms in packages related to Streams, create rule sets, create rules, and monitor the Streams environment by querying data dictionary views and queue tables. You may choose a different name for this user.

In this example, the Streams administrator will be the apply user for the apply process and must be able to apply changes to the hr.emp_del table. Therefore, the Streams administrator is granted ALL privileges on this table.


--------------------------------------------------------------------------------
Note: 
To ensure security, use a password other than strmadminpw for the Streams administrator. 
The SELECT_CATALOG_ROLE is not required for the Streams administrator. It is granted in this example so that the Streams administrator can monitor the environment easily. 
If you plan to use the Streams tool in the Oracle Enterprise Manager Console, then grant the Streams administrator SELECT ANY DICTIONARY privilege, in addition to the privileges shown in this step. 
The ACCEPT command must appear on a single line in the script.

--------------------------------------------------------------------------------
 

See Also: 
"Configuring a Streams Administrator"
 

*/
drop user strmadmin cascade;

GRANT CONNECT, RESOURCE, DBA, SELECT_CATALOG_ROLE 
  TO strmadmin IDENTIFIED BY strmadminpw;

ALTER USER strmadmin DEFAULT TABLESPACE streams_tbs
                     QUOTA UNLIMITED ON streams_tbs;


/*

This example executes a subprogram in a Streams packages within a stored procedure. Specifically, the emp_dq procedure created in Step 9 runs the DEQUEUE procedure in the DBMS_STREAMS_MESSAGING package. Therefore, the Streams administrator must be granted EXECUTE privilege explicitly on the package. In this case, EXECUTE privilege cannot be granted through a role. The GRANT_ADMIN_PRIVILEGE procedure grants EXECUTE on all Streams packages, as well as other privileges relevant to Streams.

*/

BEGIN
  DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE(
    grantee          => 'strmadmin',    
    grant_privileges => true);
END;
/


/*

Grant the Streams administrator all privileges on the emp_del table because the Streams administrator will be the apply user and must be able to insert records into this table. Alternatively, you may alter the apply process to specify that hr is the apply user.

*/

GRANT ALL ON scott.emp_del TO STRMADMIN;

/*

Step 4 Create the SYS.AnyData Queue at cpap.net
Connect to cpap.net as the strmadmin user.

*/

CONNECT strmadmin/strmadminpw

/*


Run the SET_UP_QUEUE procedure to create a queue named streams_queue at cpap.net. This queue will function is a SYS.AnyData queue that will stage the captured changes that will be dequeued by an apply process and the user-enqueued changes that will be dequeued by a dequeue procedure.

Running the SET_UP_QUEUE procedure performs the following actions:

Creates a queue table named streams_queue_table. This queue table is owned by the Streams administrator (strmadmin) and uses the default storage of this user. 
Creates a queue named streams_queue owned by the Streams administrator (strmadmin). 
Starts the queue.
*/

BEGIN
  DBMS_STREAMS_ADM.SET_UP_QUEUE(
    queue_table  => 'strmadmin.streams_queue_table',
    queue_name   => 'strmadmin.streams_queue');
END;
/

/*

Step 5 Check the Spool Results
Check the streams_setup_catapp.out spool file to ensure that all actions finished successfully after this script is completed.

*/

SET ECHO OFF
SPOOL OFF

/*************************** END OF SCRIPT ******************************/





/************************* BEGINNING OF SCRIPT ******************************

Step 1 Show Output and Spool Results
Run SET ECHO ON and specify the spool file for the script. Check the spool file for errors after you run this script.

*/

SET ECHO ON
SPOOL streams_config_capapp.out

/*

Step 2 Specify Supplemental Logging at test
Connect to test as SYS user.

*/
 
CONNECT / AS SYSDBA

/*


Supplemental logging places additional information in the redo log for changes made to tables. The apply process needs this extra information to perform certain operations, such as unique row identification.

The following statement specifies an unconditional supplemental log group for the primary key column in the scott.employees table.

See Also: 
"Supplemental Logging in a Streams Environment" 
"Specifying Supplemental Logging at a Source Database"
 

*/

ALTER TABLE scott.employees ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;

/*

Step 3 Configure the Capture Process at test
Connect to test as the strmadmin user.

*/
 
CONNECT strmadmin/strmadminpw

/*


Configure the capture process to capture DML changes to the scott.employees table at test. This step creates the capture process and adds a rule to its positive rule set that instructs the capture process to capture DML changes to this table.

*/

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_RULES(
    table_name     => 'scott.employees',   
    streams_type   => 'capture',
    streams_name   => 'capture_emp',
    queue_name     => 'strmadmin.streams_queue',
    source_database=> 'test',
    include_dml    =>  true,
    include_ddl    =>  false,
    inclusion_rule =>  true);
END;
/

/*

Step 4 Set the Instantiation SCN for the scott.employees Table
Because this example captures and applies changes in a single database, no instantiation is necessary. However, the apply process at the test database still must be instructed to apply changes that were made to the scott.employees table after a certain system change number (SCN).

This example uses the GET_SYSTEM_CHANGE_NUMBER function in the DBMS_FLASHBACK package to obtain the current SCN for the database. This SCN is used to run the SET_TABLE_INSTANTIATION_SCN procedure in the DBMS_APPLY_ADM package.

The SET_TABLE_INSTANTIATION_SCN procedure controls which LCRs for a table are ignored by an apply process and which LCRs for a table are applied by an apply process. If the commit SCN of an LCR for a table from a source database is less than or equal to the instantiation SCN for that table at a destination database, then the apply process at the destination database discards the LCR. Otherwise, the apply process applies the LCR. In this example, the test database is both the source database and the destination database.

The apply process will apply transactions to the scott.employees table with SCNs that were committed after SCN obtained in this step.


--------------------------------------------------------------------------------
Note: 
The scott.employees table must also be prepared for instantiation. This preparation was done automatically when the the capture process was configured with a rule to capture DML changes to the scott.employees table in Step 3.

--------------------------------------------------------------------------------
 

*/

DECLARE
  iscn  NUMBER;         -- Variable to hold instantiation SCN value
BEGIN
  iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
  DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
    source_object_name    => 'scott.employees',
    source_database_name  => 'test',
    instantiation_scn     => iscn);
END;
/

/*

Step 5 Create the DML Handler Procedure
This step creates the emp_dml_handler procedure. This procedure will be the DML handler for DELETE changes to the scott.employees table. It converts any row LCR containing a DELETE command type into an INSERT row LCR and then inserts the converted row LCR into the scott.emp_del table by executing the row LCR.

*/

CREATE OR REPLACE PROCEDURE emp_dml_handler(in_any IN SYS.AnyData) IS
  lcr          SYS.LCR$_ROW_RECORD;
  rc           PLS_INTEGER;
  command      VARCHAR2(10);
  old_values   SYS.LCR$_ROW_LIST;
BEGIN    
  -- Access the LCR
  rc := in_any.GETOBJECT(lcr);
  -- Get the object command type
  command := lcr.GET_COMMAND_TYPE();
  -- Check for DELETE command on the scott.employees table
  IF command = 'DELETE' THEN
    -- Set the command_type in the row LCR to INSERT
    lcr.SET_COMMAND_TYPE('INSERT');
    -- Set the object_name in the row LCR to EMP_DEL
    lcr.SET_OBJECT_NAME('EMP_DEL');
    -- Get the old values in the row LCR
    old_values := lcr.GET_VALUES('old');
    -- Set the old values in the row LCR to the new values in the row LCR
    lcr.SET_VALUES('new', old_values);
    -- Set the old values in the row LCR to NULL
    lcr.SET_VALUES('old', NULL);
    -- Add a SYSDATE value for the timestamp column
    lcr.ADD_COLUMN('new', 'TIMESTAMP', SYS.AnyData.ConvertDate(SYSDATE));
    -- Apply the row LCR as an INSERT into the scott.emp_del table
    lcr.EXECUTE(true);
  END IF;
END;
/

/*

Step 6 Set the DML Handler for the scott.employees Table
Set the DML handler for the scott.employees table to the procedure created in Step 5. Notice that the DML handler must be set separately for each possible operation on the table: INSERT, UPDATE, and DELETE.


*/

BEGIN
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name         => 'scott.employees',
    object_type         => 'TABLE',
    operation_name      => 'INSERT',
    error_handler       => false,
    user_procedure      => 'strmadmin.emp_dml_handler',
    apply_database_link => NULL,
    apply_name          => NULL);
END;
/

BEGIN
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name         => 'scott.employees',
    object_type         => 'TABLE',
    operation_name      => 'UPDATE',
    error_handler       => false,
    user_procedure      => 'strmadmin.emp_dml_handler',
    apply_database_link => NULL,
    apply_name          => NULL);
END;
/

BEGIN
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name         => 'scott.employees',
    object_type         => 'TABLE',
    operation_name      => 'DELETE',
    error_handler       => false,
    user_procedure      => 'strmadmin.emp_dml_handler',
    apply_database_link => NULL,
    apply_name          => NULL);
END;
/

/*

Step 7 Create Messaging Client for the Queue
Create a messaging client that can be used by an application to dequeue the re-enqueued events. A messaging client must be specified before the events can be re-enqueued into the queue.

*/

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_RULES(
    table_name     => 'scott.employees',   
    streams_type   => 'dequeue',
    streams_name   => 'scott',
    queue_name     => 'strmadmin.streams_queue',
    include_dml    =>  true,
    include_ddl    =>  false,
    source_database => 'test',
    inclusion_rule =>  true);
END;
/


/*

Step 8 Configure the Apply Process at test
Create an apply process to apply DML changes to the scott.employees table. Although the DML handler for the apply process causes deleted employees to be inserted into the emp_del table, this rule specifies the employees table, because the row LCRs in the queue contain changes to the employees table, not the emp_del table. When you run the ADD_TABLE_RULES procedure to create the apply process, the out parameter dml_rule_name contains the name of the DML rule created. This rule name is then passed to the SET_ENQUEUE_DESTINATION procedure.

The SET_ENQUEUE_DESTINATION procedure in the DBMS_APPLY_ADM package specifies that any apply process using the DML rule generated by ADD_TABLE_RULES will enqueue events that satisfy this rule into streams_queue. In this case, the DML rule is for row LCRs with DML changes to the scott.employees table. A local queue other than the apply process queue can be specified if appropriate.

*/

DECLARE
    emp_rule_name_dml  VARCHAR2(30);
    emp_rule_name_ddl  VARCHAR2(30);
BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_RULES(
    table_name      => 'scott.employees',
    streams_type    => 'apply', 
    streams_name    => 'apply_emp',
    queue_name      => 'strmadmin.streams_queue',
    include_dml     =>  true,
    include_ddl     =>  false,
    source_database => 'test',
    dml_rule_name   => emp_rule_name_dml,
    ddl_rule_name   => emp_rule_name_ddl);
  DBMS_APPLY_ADM.SET_ENQUEUE_DESTINATION(
    rule_name               =>  emp_rule_name_dml,
    destination_queue_name  =>  'strmadmin.streams_queue');
END;
/

/*

Step 9 Create a Procedure to Dequeue the Events
The emp_dq procedure created in this step can be used to dequeue the events that are re-enqueued by the apply process. In Step 8, the SET_ENQUEUE_DESTINATION procedure was used to instruct the apply process to enqueue row LCRs containing changes to the scott.employees table into streams_queue. When the emp_dq procedure is executed, it dequeues each row LCR in the queue and displays the type of command in the row LCR, either INSERT, UPDATE, or DELETE. Any information in the row LCRs can be accessed and displayed, not just the command type.

See Also: 
"Displaying Detailed Information About Apply Errors" for more information about displaying information in LCRs
 

*/

CREATE OR REPLACE PROCEDURE emp_dq (consumer IN VARCHAR2) AS
  msg            SYS.AnyData;
  row_lcr        SYS.LCR$_ROW_RECORD;
  num_var        pls_integer;
  more_messages  BOOLEAN := true;
  navigation     VARCHAR2(30);
BEGIN
  navigation := 'FIRST MESSAGE';
  WHILE (more_messages) LOOP
    BEGIN
      DBMS_STREAMS_MESSAGING.DEQUEUE(
        queue_name   => 'strmadmin.streams_queue',
        streams_name => consumer,
        payload      => msg,
        navigation   => navigation,
        wait         => DBMS_STREAMS_MESSAGING.NO_WAIT);
      IF msg.GETTYPENAME() = 'SYS.LCR$_ROW_RECORD' THEN
        num_var := msg.GetObject(row_lcr);   
        DBMS_OUTPUT.PUT_LINE(row_lcr.GET_COMMAND_TYPE || ' row LCR dequeued');
      END IF;
      navigation := 'NEXT MESSAGE';
    COMMIT;
    EXCEPTION WHEN SYS.DBMS_STREAMS_MESSAGING.ENDOFCURTRANS THEN
                navigation := 'NEXT TRANSACTION';
              WHEN DBMS_STREAMS_MESSAGING.NOMOREMSGS THEN
                more_messages := false;
                DBMS_OUTPUT.PUT_LINE('No more messages.');
              WHEN OTHERS THEN
                RAISE; 
    END;
  END LOOP;
END;
/

/*

Step 10 Start the Apply Process at test
Set the disable_on_error parameter to n so that the apply process will not be disabled if it encounters an error, and start the apply process at test.

*/

BEGIN
  DBMS_APPLY_ADM.SET_PARAMETER(
    apply_name  => 'apply_emp', 
    parameter   => 'disable_on_error', 
    value       => 'n');
END;
/
 
BEGIN
  DBMS_APPLY_ADM.START_APPLY(
    apply_name  => 'apply_emp');
END;
/

/*

Step 11 Start the Capture Process at test
Start the capture process at test.

*/

BEGIN
  DBMS_CAPTURE_ADM.START_CAPTURE(
    capture_name  => 'capture_emp');
END;
/

/*

Step 12 Check the Spool Results
Check the streams_config_capapp.out spool file to ensure that all actions finished successfully after this script is completed.

*/

SET ECHO OFF
SPOOL OFF

/*************************** END OF SCRIPT ******************************/

[Updated on: Mon, 14 July 2008 22:45]

Report message to a moderator

Previous Topic: Add New Destination
Next Topic: Function after dequeing
Goto Forum:
  


Current Time: Sat Jan 11 00:29:15 CST 2025