Friday, 24 June 2016

Oracle Streams - Troubleshooting

Transactions, LCRs (logical change records) are core part of Oracle Streams replication, they are set-up in a queue, shipped to destination, dequeued and applied. Thus there could be several possibilities that a transaction may end up in a conflict (in bidirectional replication), not be applied due to ORA- or other errors. Thus I will today discuss a few things about working with errors and replication:

To find out errors raised by APPLY process below query can be used on database where apply process is running:

ALTER SESSION SET NLS_DATE_FORMAT='DD-MON-YY HH24:MI:SS';
SET PAGES 1000 LINES 150
COLUMN APPLY_NAME HEADING 'Apply|Process|Name' FORMAT A10
COLUMN SOURCE_DB HEADING 'Source|Database' FORMAT A10
COLUMN LOCAL_TRANSACTION_ID HEADING 'Local|Transaction|ID' FORMAT A11
COLUMN ERROR_NUMBER HEADING 'Error Number' FORMAT 99999999
COLUMN ERROR_MESSAGE HEADING 'Error Message' FORMAT A50
COLUMN MESSAGE_COUNT HEADING 'Messages in|Error|Transaction' FORMAT 99999999
SELECT APPLY_NAME,
       substr(SOURCE_DATABASE,1,4) SOURCE_DB,
       MESSAGE_NUMBER,
       LOCAL_TRANSACTION_ID,
       ERROR_NUMBER,
       ERROR_MESSAGE,
       MESSAGE_COUNT
  FROM DBA_APPLY_ERROR;


This would give an output like below:

Apply                                Local                                                                       Messages in
Process    Source                    Transaction                                                                       Error
Name       Database   MESSAGE_NUMBER ID          Error Number Error Message                                      Transaction
---------- ---------- -------------- ----------- ------------ -------------------------------------------------- -----------
APPLY_PRD  PROD                  2 10.12.18013          439 ORA-00439: feature not enabled: Basic Compression       352987


Using information from above query we can obtain LCR (logical change record) for transaction, however as a prerequisite we would need below two procedures created under Stream Administrator user:

$ sqlplus streamadm/password

PRINT_ANY procedure

SQL>CREATE OR REPLACE PROCEDURE print_any(data IN ANYDATA) IS
  tn  VARCHAR2(61);
  str VARCHAR2(4000);
  chr VARCHAR2(1000);
  num NUMBER;
  dat DATE;
  rw  RAW(4000);
  res NUMBER;
BEGIN
  IF data IS NULL THEN
    DBMS_OUTPUT.PUT_LINE('NULL value');
    RETURN;
  END IF;
  tn := data.GETTYPENAME();
  IF tn = 'SYS.VARCHAR2' THEN
    res := data.GETVARCHAR2(str);
    DBMS_OUTPUT.PUT_LINE(SUBSTR(str,0,253));
  ELSIF tn = 'SYS.CHAR' then
    res := data.GETCHAR(chr);
    DBMS_OUTPUT.PUT_LINE(SUBSTR(chr,0,253));
  ELSIF tn = 'SYS.VARCHAR' THEN
    res := data.GETVARCHAR(chr);
    DBMS_OUTPUT.PUT_LINE(chr);
  ELSIF tn = 'SYS.NUMBER' THEN
    res := data.GETNUMBER(num);
    DBMS_OUTPUT.PUT_LINE(num);
  ELSIF tn = 'SYS.DATE' THEN
    res := data.GETDATE(dat);
    DBMS_OUTPUT.PUT_LINE(dat);
  ELSIF tn = 'SYS.RAW' THEN
    -- res := data.GETRAW(rw);
    -- DBMS_OUTPUT.PUT_LINE(SUBSTR(DBMS_LOB.SUBSTR(rw),0,253));
    DBMS_OUTPUT.PUT_LINE('BLOB Value');
  ELSIF tn = 'SYS.BLOB' THEN
    DBMS_OUTPUT.PUT_LINE('BLOB Found');
  ELSE
    DBMS_OUTPUT.PUT_LINE('typename is ' || tn);
  END IF;
END print_any;
/



PRINT_LCR procedure

SQL> CREATE OR REPLACE PROCEDURE print_lcr(lcr IN ANYDATA) IS
  typenm    VARCHAR2(61);
  ddllcr    SYS.LCR$_DDL_RECORD;
  proclcr   SYS.LCR$_PROCEDURE_RECORD;
  rowlcr    SYS.LCR$_ROW_RECORD;
  res       NUMBER;
  newlist   SYS.LCR$_ROW_LIST;
  oldlist   SYS.LCR$_ROW_LIST;
  ddl_text  CLOB;
  ext_attr  ANYDATA;
BEGIN
  typenm := lcr.GETTYPENAME();
  DBMS_OUTPUT.PUT_LINE('type name: ' || typenm);
  IF (typenm = 'SYS.LCR$_DDL_RECORD') THEN
    res := lcr.GETOBJECT(ddllcr);
    DBMS_OUTPUT.PUT_LINE('source database: ' ||
                         ddllcr.GET_SOURCE_DATABASE_NAME);
    DBMS_OUTPUT.PUT_LINE('owner: ' || ddllcr.GET_OBJECT_OWNER);
    DBMS_OUTPUT.PUT_LINE('object: ' || ddllcr.GET_OBJECT_NAME);
    DBMS_OUTPUT.PUT_LINE('is tag null: ' || ddllcr.IS_NULL_TAG);
    DBMS_LOB.CREATETEMPORARY(ddl_text, TRUE);
    ddllcr.GET_DDL_TEXT(ddl_text);
    DBMS_OUTPUT.PUT_LINE('ddl: ' || ddl_text);   
    -- Print extra attributes in DDL LCR
    ext_attr := ddllcr.GET_EXTRA_ATTRIBUTE('serial#');
      IF (ext_attr IS NOT NULL) THEN
        DBMS_OUTPUT.PUT_LINE('serial#: ' || ext_attr.ACCESSNUMBER());
      END IF;
    ext_attr := ddllcr.GET_EXTRA_ATTRIBUTE('session#');
      IF (ext_attr IS NOT NULL) THEN
        DBMS_OUTPUT.PUT_LINE('session#: ' || ext_attr.ACCESSNUMBER());
      END IF;
    ext_attr := ddllcr.GET_EXTRA_ATTRIBUTE('thread#');
      IF (ext_attr IS NOT NULL) THEN
        DBMS_OUTPUT.PUT_LINE('thread#: ' || ext_attr.ACCESSNUMBER());
      END IF;  
    ext_attr := ddllcr.GET_EXTRA_ATTRIBUTE('tx_name');
      IF (ext_attr IS NOT NULL) THEN
        DBMS_OUTPUT.PUT_LINE('transaction name: ' || ext_attr.ACCESSVARCHAR2());
      END IF;
    ext_attr := ddllcr.GET_EXTRA_ATTRIBUTE('username');
      IF (ext_attr IS NOT NULL) THEN
        DBMS_OUTPUT.PUT_LINE('username: ' || ext_attr.ACCESSVARCHAR2());
      END IF;     
    DBMS_LOB.FREETEMPORARY(ddl_text);
  ELSIF (typenm = 'SYS.LCR$_ROW_RECORD') THEN
    res := lcr.GETOBJECT(rowlcr);
    DBMS_OUTPUT.PUT_LINE('source database: ' ||
                         rowlcr.GET_SOURCE_DATABASE_NAME);
    DBMS_OUTPUT.PUT_LINE('owner: ' || rowlcr.GET_OBJECT_OWNER);
    DBMS_OUTPUT.PUT_LINE('object: ' || rowlcr.GET_OBJECT_NAME);
    DBMS_OUTPUT.PUT_LINE('is tag null: ' || rowlcr.IS_NULL_TAG);
    DBMS_OUTPUT.PUT_LINE('command_type: ' || rowlcr.GET_COMMAND_TYPE);
    oldlist := rowlcr.GET_VALUES('old');
    FOR i IN 1..oldlist.COUNT LOOP
      IF oldlist(i) IS NOT NULL THEN
        DBMS_OUTPUT.PUT_LINE('old(' || i || '): ' || oldlist(i).column_name);
        print_any(oldlist(i).data);
      END IF;
    END LOOP;
    newlist := rowlcr.GET_VALUES('new', 'n');
    FOR i in 1..newlist.count LOOP
      IF newlist(i) IS NOT NULL THEN
        DBMS_OUTPUT.PUT_LINE('new(' || i || '): ' || newlist(i).column_name);
        print_any(newlist(i).data);
      END IF;
    END LOOP;
    -- Print extra attributes in row LCR
    ext_attr := rowlcr.GET_EXTRA_ATTRIBUTE('row_id');
      IF (ext_attr IS NOT NULL) THEN
        DBMS_OUTPUT.PUT_LINE('row_id: ' || ext_attr.ACCESSUROWID());
      END IF;
    ext_attr := rowlcr.GET_EXTRA_ATTRIBUTE('serial#');
      IF (ext_attr IS NOT NULL) THEN
        DBMS_OUTPUT.PUT_LINE('serial#: ' || ext_attr.ACCESSNUMBER());
      END IF;
    ext_attr := rowlcr.GET_EXTRA_ATTRIBUTE('session#');
      IF (ext_attr IS NOT NULL) THEN
        DBMS_OUTPUT.PUT_LINE('session#: ' || ext_attr.ACCESSNUMBER());
      END IF;
    ext_attr := rowlcr.GET_EXTRA_ATTRIBUTE('thread#');
      IF (ext_attr IS NOT NULL) THEN
        DBMS_OUTPUT.PUT_LINE('thread#: ' || ext_attr.ACCESSNUMBER());
      END IF;  
    ext_attr := rowlcr.GET_EXTRA_ATTRIBUTE('tx_name');
      IF (ext_attr IS NOT NULL) THEN
        DBMS_OUTPUT.PUT_LINE('transaction name: ' || ext_attr.ACCESSVARCHAR2());
      END IF;
    ext_attr := rowlcr.GET_EXTRA_ATTRIBUTE('username');
      IF (ext_attr IS NOT NULL) THEN
        DBMS_OUTPUT.PUT_LINE('username: ' || ext_attr.ACCESSVARCHAR2());
      END IF;         
  ELSE
    DBMS_OUTPUT.PUT_LINE('Non-LCR Message with type ' || typenm);
  END IF;
END print_lcr;
/


These procedures are provided by Oracle Corporation. After they have been created run below procedure to obtain complete text of LCR:

SQL> SET SERVEROUTPUT ON;
DECLARE
   lcr SYS.AnyData;
BEGIN
    lcr := DBMS_APPLY_ADM.GET_ERROR_MESSAGE
                ('<message_number>', '<transaction_id>');
    print_lcr(lcr);
END;
/


source database: PROD.EXAMPLE.COM
owner: SAMPLE
object: CMP4$1820266
is tag null: Y
ddl: create table "SAMPLE".CMP4$1820266 organization heap  tablespace "SAMPLEDATA"  compress for all operations nologging as select /*+ DYNAMIC_SAMPLING(0)
*/ * from "SAMPLE".CMP3$1820266 mytab


The error in my case is because of destination database being Standard Edition which does not supports Basic Compression feature.

For the errors which can be resolved OR in case you would like to clear-out the error from database below procedures can be used:

By specifying transaction_id a particular error message can be deleted:
SQL> exec DBMS_APPLY_ADM.DELETE_ERROR ('<local_transaction_id>');

To delete all the errors:
SQL> exec DBMS_APPLY_ADM.DELETE_ALL_ERRORS ();

Will see more on troubleshooting in later posts! Thanks for reading.

Friday, 10 June 2016

Oracle Streams - Adding a New Schema to Streams Configuration

Today I will be discussing about adding a new schema to existing streams replication configuration. In my current environment I have:

1. A source database: PROD.EXAMPLE.COM
    a. Having CHANGE CAPTURE process: CAPTURE_PROD
    b. An ANYDATA queue: STREAMS_QUEUE
    c. A PROPAGATION process: PROD_TO_RPL

2. A destination database: REPL.EXAMPLE.COM
    a. Having an ANYDATA queue: STREAMS_QUEUE
    b. An APPLY process: APPLY_RPL

To add a new schema, we must first stop replication processes running on source and destination:

Stop Streams Processes

1. Stop CHANGE CAPTURE on source


$ sqlplus streamadm/*****@PROD.EXAMPLE.COM
SQL> exec DBMS_CAPTURE_ADM.STOP_CAPTURE('CAPTURE_PROD');

2. Stop PROPAGATION on source


$ sqlplus streamadm/*****@PROD.EXAMPLE.COM
SQL> exec DBMS_PROPAGATION_ADM.STOP_PROPAGATION('PROD_TO_RPL');

3. Stop APPLY on destination


$ sqlplus streamadm/*****@REPL.EXAMPLE.COM
SQL> exec DBMS_APPLY_ADM.STOP_APPLY('APPLY_RPL');

Add a Schema and Setup Replication at Source and Destination Database


1. Create User and Assign Required Privileges/Roles etc


$  sqlplus streamadm/*****@PROD.EXAMPLE.COM
SQL> CREATE USER SCOTTY IDENTIFIED BY "welcome" DEFAULT TABLESPACE USERS; SQL> GRANT CONNECT, RESOURCE TO SCOTTY;

$ sqlplus streamadm/*****@REPL.EXAMPLE.COM
SQL> CREATE USER SCOTTY IDENTIFIED BY "welcome" DEFAULT TABLESPACE USERS; SQL> GRANT CONNECT, RESOURCE TO SCOTTY;

2. Prepare Schema for Instantiation at Source Database


$  sqlplus streamadm/*****@PROD.EXAMPLE.COM
SQL> BEGIN 
         DBMS_CAPTURE_ADM.PREPARE_SCHEMA_INSTANTIATION(
         schema_name          => 'SCOTTY',
         supplemental_logging => 'all');
     END;
     /


3. Create PROPAGATION Rule for new schema


$  sqlplus streamadm/*****@PROD.EXAMPLE.COM
SQL> 
BEGIN
DBMS_STREAMS_ADM.ADD_SCHEMA_PROPAGATION_RULES(
   schema_name              => 'SCOTTY',
   streams_name             => 'DBA_TO_RPL',
   source_queue_name        => 'streamadm.streams_queue',
   destination_queue_name   => 'streamadm.streams_queue@REPL.
EXAMPLE.COM',
   include_dml              => TRUE,
   include_ddl              => TRUE,
   include_tagged_lcr       => FALSE,
   source_database          => 'PROD.
EXAMPLE.COM',
   inclusion_rule           => TRUE,
   and_condition            => NULL,
   queue_to_queue           => TRUE);
END;
/


4. Create CHANGE CAPTURE Rule for new schema


$  sqlplus streamadm/*****@PROD.EXAMPLE.COM
SQL>BEGIN
DBMS_STREAMS_ADM.ADD_SCHEMA_RULES(
   schema_name          => 'SCOTTY',
   streams_type         => 'capture',
   streams_name         => 'CAPTURE_DBA',
   queue_name           => 'streamadm.streams_queue',
   include_dml          => TRUE,
   include_ddl          => TRUE,
   include_tagged_lcr   => FALSE,
   source_database      => 'PROD.EXAMPLE.COM',
   inclusion_rule       => TRUE,
   and_condition        => NULL);
END;
/


5. Instantiate Schema at Destination Database


$  sqlplus streamadm/*****@REPL.EXAMPLE.COM
SQL>DECLARE
  iscn  NUMBER;         -- Variable to hold instantiation SCN value
BEGIN
  iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
  DBMS_APPLY_ADM.SET_SCHEMA_INSTANTIATION_SCN@REPL.EXAMPLE.COM(
    source_schema_name    => 'SCOTTY',
    source_database_name  => 'PROD.EXAMPLE.COM',
    instantiation_scn     => iscn,
    recursive             => TRUE);
END;
/


6. Create APPLY Rules for new schema at Destination Database



$  sqlplus streamadm/*****@REPL.EXAMPLE.COM
SQL>BEGIN
DBMS_STREAMS_ADM.ADD_SCHEMA_RULES(
   schema_name          => 'SCOTTY',
   streams_type         => 'apply',
   streams_name         => 'APPLY_RPL',
   queue_name           => 'streamadm.streams_queue',
   include_dml          => TRUE,
   include_ddl          => TRUE,
   include_tagged_lcr   => FALSE,
   source_database      => 'PROD.EXAMPLE.COM',
   inclusion_rule       => TRUE,
   and_condition        => NULL);
END;
/


Start Streams Processes

1. Start CHANGE CAPTURE on source


$ sqlplus streamadm/*****@PROD.EXAMPLE.COM
SQL> exec DBMS_CAPTURE_ADM.START_CAPTURE('CAPTURE_PROD');

2. Start PROPAGATION on source


$ sqlplus streamadm/*****@PROD.EXAMPLE.COM
SQL> exec DBMS_PROPAGATION_ADM.START_PROPAGATION('PROD_TO_RPL');

3. Start APPLY on destination


$ sqlplus streamadm/*****@REPL.EXAMPLE.COM
SQL> exec DBMS_APPLY_ADM.STOP_APPLY('APPLY_RPL');


All Done! Now Test Replication Again..

Friday, 3 June 2016

Configuring Synchronous Capture - Oracle Streams

Synchronous capture (also implicit capture) is an optional Oracle Streams client that captures data manipulation language (DML) changes made to tables. Synchronous capture uses an internal mechanism to capture DML changes to specified tables.

When a DML change it made to a table, it can result in changes to one or more rows in the table. Synchronous capture captures each row change and converts it into a specific message format called a row logical change record (row LCR). After capturing a row LCR, synchronous capture enqueues a message containing the row LCR into a queue. Row LCRs created by synchronous capture always contain values for all the columns in a row, even if some of the columns where not modified by the change.

The need of using Synchronous capture is when:

a) you have fewer number of tables to replicate
b) replicating DML changes is a concern
c) you are using Oracle RDBMS Standard Edition

I will be using Oracle RDBMS 11.2.0.4 for this setup. We currently have:

1. Source database (11.2.0.4 Standard Edition) with schema ORDERS having only one table ORDATA - PROD.EXMPLE.COM
2. ORDERS schema has default tablespace USERS.
3. A newly created destination database on another server, a USERS tablespace has been added - REPL.EXAMPLE.COM
4. Schema Export of ORDERS is imported into REPL database.

Aim: Replicate DML changes in ORDERS.ORDATA table from PROD to REPL.

In my previous posts, I have already discussed about setting up environment and pre-requisite tasks. Please follow steps in Setup Oracle Streams - Datapump Instantiation to setup Stream Administrator user and other prerequisites.

Setup Synchronous Capture Replication


1. Create ANYDATA queue on source and destination databases

BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
   queue_table        =>'prod_to_repl_queue_table',
   queue_name          =>'prod_to_repl');
END;
/

ANYDATA queue stores LCRs (logical change records) captured by synchronous capture process at source database and ships them to destination database's ANYDATA queue.

2. Create APPLY process at destination database

BEGIN
  DBMS_APPLY_ADM.CREATE_APPLY(
    queue_name     => 'streamadm.prod_to_repl',
    apply_name     => 'APPLY_REPL',
    apply_captured => FALSE);
END;
/


As we are using change capture by a synchronous capture the apply_captured parameter is set to FALSE because the apply process applies changes in the persistent queue. The apply_captured parameter should be set to TRUE only when the apply process applies changes captured by a capture process.

APPLY process will be started later, thus do not start it at this point.

3. Add APPLY rules at destination database

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_RULES(
    table_name      => 'ORDERS.ORDATA',
    streams_type    => 'apply',
    streams_name    => 'APPLY_REPL',
    queue_name      => 'streamadm.prod_to_repl',
    source_database => 'PROD.EXAMPLE.COM');
END;
/


Apply rules indicates apply process to apply all DML changes that appear in apply queue to specified table name. Repeat this step for as many tables as replication is desired for.

4. Create PROPAGATION on source

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
    table_name             => 'ORDERS.ORDATA',
    streams_name           => 'CAPTURE_PROD',
    source_queue_name      => 'streamadm.prod_to_repl',
    destination_queue_name => 'streamadm.prod_to_repl@REPL.EXAMPLE.COM',
    source_database        => 'PROD.EXAMPLE.COM',
    queue_to_queue          => TRUE);
END;
/


The ADD_TABLE_PROPAGATION_RULES procedure creates the propagation and its positive rule set. This procedure also adds a rule to the propagation rule set that instructs it to send DML changes from ORDERS.ORDATA to apply_queue. Repeat this step for as many tables as replication is desired for.

5. Create SYNCHRONOUS CAPTURE process at source database

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_RULES(
    table_name      => 'ORDERS.ORDATA',
    streams_type    => 'sync_capture',
    streams_name    => 'CAPTURE_PROD',
    queue_name      => 'streamadm.prod_to_repl');
END; 

/

Executing this procedure:
  • Creates a synchronous capture named sync_capture at the current database. A synchronous capture with the same name must not exist.
  • Enables the synchronous capture. A synchronous capture cannot be disabled.
  • Associates the synchronous capture with an existing queue named prod_to_repl owned by streamadm.
  • Creates a positive rule set for synchronous capture sync_capture. The rule set has a system-generated name.
  • Creates a rule that captures DML changes to the ORDERS.ORDATA table and adds the rule to the positive rule set for the synchronous capture. The rule has a system-generated name.
  • Prepares the ORDERS.ORDATA table for instantiation by running the DBMS_CAPTURE_ADM.PREPARE_SYNC_INSTANTIATION function for the table automatically.
Repeat this step for as many tables as replication is desired for.

6. INSTANTIATE objects at destination

DECLARE
  iscn  NUMBER;

BEGIN
  iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
  DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@REPL.EXAMPLE.COM(
    source_object_name      => 'ORDERS.ORDATA',
    source_database_name    => 'PROD.EXAMPLE.COM',
    instantiation_scn       => iscn);
END;
/


An instantiation SCN is the lowest SCN for which an apply process can apply changes to a table. Before the apply process can apply changes to the tables at the destination database, an instantiation SCN must be set for each table.

7. Start APPLY process
Run below procedure to start APPLY process at destination database.

BEGIN
  DBMS_APPLY_ADM.START_APPLY(
    apply_name => 'APPLY_REPL');
END;
/


Test out the replication!

References:
Oracle's Data Replication and Integration Guide [https://docs.oracle.com/cd/E11882_01/server.112/e17516/toc.htm]