Skip navigation.

Pas Apicella

Syndicate content
Information on Pivotal -> Product Family - GemFire, SQLFire, tc Server, RabbitMQ, Pivotal HD, Greenplum DBPas Apicellahttp://www.blogger.com/profile/09389663166398991762noreply@blogger.comBlogger228125
Updated: 8 hours 29 min ago

Creating a multi threaded insert client for SQLFire 1.1

Wed, 2013-06-12 18:02
In this example below we show how to create a multi threaded insert client to insert 100,000 records into SQLFire table. In this example below the table is partitioned with synchronous persistence turned on.The distributed system includes one locator and 5 data members.

1. Create Table as shown below
  
drop diskstore store1;

CREATE DISKSTORE STORE1;

drop table person;

create table person 
(id int primary key,
 name varchar(40))
PARTITION BY COLUMN (id)
REDUNDANCY 1
PERSISTENT 'STORE1' SYNCHRONOUS;
2. Multi Threaded Insert client Code.
  
package pivotal.au.fe.sqlfire.insert;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class MultiThreadInsert 
{
 private String url = "jdbc:sqlfire://127.0.0.1:1527/";
 private final int RECORDS = 100000;
 private final int COMMIT_POINT = 10000;
 private static final int nThreads = 4;
 
 public MultiThreadInsert() 
 {
 }
 
 private Connection getConnection() throws SQLException
 {
  Connection conn = null;
  conn = DriverManager.getConnection(url);
  return conn; 
 }
 
 @SuppressWarnings("unchecked")
 public void start() throws InterruptedException, SQLException 
 {
  Connection conn = getConnection();
  
        final ExecutorService executorService = Executors.newFixedThreadPool(nThreads);

        ArrayList list = new ArrayList();
        for (int i = 0; i < nThreads; i++) {
            list.add(new RunData(conn, i+1));
        }
        long start = System.currentTimeMillis();
        
        List<Future<?>> tasks = executorService.invokeAll(list, 5, TimeUnit.MINUTES);
        
        for(Future<?> f : tasks){
         try {
    f.get();
   } catch (ExecutionException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
        }
        
     long end = System.currentTimeMillis() - start;
     
     float elapsedTimeSec = end/1000F;

        System.out.println(String.format("Elapsed time in seconds %f", elapsedTimeSec));
        
        conn.close();
     executorService.shutdown();
        System.exit(0);
 }
 
 private class RunData implements Callable 
 {
     int counter = 0;
        int increment;
        Connection conn;
        
        private RunData(Connection conn, int increment) 
        {
            this.increment = increment;
            this.conn = conn;
        }

        public void run() 
        {
      PreparedStatement stmt = null;
      String sql = "insert into person values (?, ?)";
      int counter = 0;

            int dataSize = RECORDS / nThreads;
            System.out.printf("Start: %d  End: %d \n",(dataSize * (increment - 1)), (dataSize * increment));
      try 
      {
       stmt = conn.prepareStatement(sql);
       
       for (int i = (dataSize * (increment - 1)); i < (dataSize * increment); i++)
       {
        counter = counter + 1;
        stmt.setInt(1, i);
        stmt.setString(2, "Person" + i);
        stmt.addBatch();
        
        if (counter % COMMIT_POINT == 0)
        {
         stmt.executeBatch();
         conn.commit();
        }
       }
       
       /* there might be more records so call stmt.executeBatch() prior to commit here */
       stmt.executeBatch();
       conn.commit();
       System.out.printf("Number of records submitted %d.\n", counter);
       
                 
      } 
   catch (SQLException e) 
   {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
   finally
   {
    if (stmt != null)
    {
     try 
     {
      stmt.close();
     } 
     catch (SQLException e) 
     {
      // TODO Auto-generated catch block
      e.printStackTrace();
     }
    }
   }      
        }
        
        public Object call() throws Exception 
        {
            run();
            return counter;
        }
  
 }

 /**
  * @param args
  * @throws InterruptedException 
  * @throws SQLException 
  */
 public static void main(String[] args) throws InterruptedException, SQLException 
 {
  // TODO Auto-generated method stub
  MultiThreadInsert test = new MultiThreadInsert();
  test.start();
 }
}
3. Output when run as follows.

Note: This was run on my MAC laptop which had 5 cache servers running on it. This would perform much better if I had 5 physical machines for each of the SQLFire cache server members.

Start: 0  End: 25000
Start: 75000  End: 100000
Start: 50000  End: 75000
Start: 25000  End: 50000
Number of records submitted 25000.
Number of records submitted 25000.
Number of records submitted 25000.
Number of records submitted 25000.
Elapsed time in seconds 4.409000


http://feeds.feedburner.com/TheBlasFromPas
Categories: Fusion Middleware

Naming Members in vFabric SQLFire

Tue, 2013-05-14 04:12
I find it useful to give a member a meaningful  name. In SQLFire you could simply give each member a name by adding a property "name" as follows to the sqlfire.properties file for the member.

sqlfire.properties

# sqlfire.properties for data store or accessor member
license-serial-number=XXXXXXXXXXX
name=server1

Note: The same can be done with GemFire as well.

Then when the system is up the ID for each system member includes the given name as shown below.

  
sqlf> select substr(id, 1 , 35) as "Member" from sys.members;
Member                             
-----------------------------------
172.16.62.1(server2:38971)<v2>:4265
172.16.62.1(server1:38970)<v1>:1660
127.0.0.1(38744):29535             

3 rows selected
http://feeds.feedburner.com/TheBlasFromPas
Categories: Fusion Middleware

JMX access to vFabric SQLFire

Wed, 2013-05-01 18:31
With the release of vFabric SQLFire 11 we can now start a JMX manager with the locator itself. To do that we add the following to the sqlfire.properties file of the locator itself.

jmx-manager=true
jmx-manager-start=true
jmx-manager-ssl=false
jmx-manager-http-port=8083

Then with the locator started we can verify we have it running on the default port of 1099 as shown below.

[Thu May 02 09:45:49 papicella@:~/sqlfire/vFabric_SQLFire_11_b40332/pasdemos/agent-test/locator ] $ netstat -an | grep 1099
tcp4       0      0  127.0.0.1.1099         127.0.0.1.64803        ESTABLISHED
tcp4       0      0  127.0.0.1.1099         127.0.0.1.64801        ESTABLISHED
tcp4       0      0  127.0.0.1.64801        127.0.0.1.1099         ESTABLISHED
tcp4       0      0  127.0.0.1.64803        127.0.0.1.1099         ESTABLISHED
tcp4       0      0  127.0.0.1.1099         127.0.0.1.64799        ESTABLISHED
tcp4       0      0  127.0.0.1.64799        127.0.0.1.1099         ESTABLISHED
tcp46      0      0  *.1099                 *.*                    LISTEN    

Finally start jconsole and connect using a service URL as follows

Format:

service:jmx:rmi://{hotname}/jndi/rmi://{hostname}:1099/jmxrmi

Example:

service:jmx:rmi://Pas-Apicellas-MacBook-Pro.local/jndi/rmi://Pas-Apicellas-MacBook-Pro.local:1099/jmxrmi

Once connected you can browse the MBean as shown in the image below.



More Information

http://pubs.vmware.com/vfabric53/index.jsp?topic=/com.vmware.vfabric.sqlfire.1.1/manage_guide/jmx/jmx_intro.htmlhttp://feeds.feedburner.com/TheBlasFromPas
Categories: Fusion Middleware

Explain Plan in vFabric SQLFire improved

Tue, 2013-04-30 19:42
With the recently released vFabric SQLFire 11 version the query execution plan is much easier to read then previously. An example below.

  
[Wed May 01 11:32:11 papicella@:~/sqlfire/vFabric_SQLFire_11_b40332/pasdemos/sqlfire ] $ sqlf
sqlf version 10.4
sqlf> connect peer 'bind-address=localhost;mcast-port=12333;host-data=false' as peerClient;
sqlf> explain select * from emp where deptno = 20;
MEMBER_PLAN                                                                                                                     
--------------------------------------------------------------------------------------------------------------------------------
ORIGINATOR 192.168.14.167(73118)<v6>:61492 BEGIN TIME 2013-05-01 11:32:39.735 END TIME 2013-05-01 11:32:39.777
DISTRIBUTION to &
Slowest Member Plan:
member 192.168.14.167(72048)<v1>:42223 begin_execution 2013-05-01 11:32:39.74 end_execution 2013-05-01 11:&
Fastest Member Plan:
member 192.168.14.167(72048)<v1>:42223 begin_execution 2013-05-01 11:32:39.74 end_execution 2013-05-01 11:&

3 rows selected
sqlf> select STMT_ID, STMT_TEXT from SYS.STATEMENTPLANS;
STMT_ID                             |STMT_TEXT                                                                                                                       
---------------------------------------------------------------------------------------------------------------------------------------------------------------------
00000001-ffff-ffff-ffff-000400000016| select * from emp where deptno = <?>                                                                                           

1 row selected
sqlf> explain '00000001-ffff-ffff-ffff-000400000016';
stmt_id 00000001-ffff-ffff-ffff-000400000016 SQL_stmt select * from emp where deptno = <?> begin_execution 2013-05-01 11:32:39.735 end_execution 2013-05-01 11:32:39.777
QUERY-SCATTER  execute_time 0.0 ms
  QUERY-SEND 
    RESULT-RECEIVE 
      SEQUENTIAL-ITERATION (0.38%) execute_time 0.136 ms returned_rows 5 no_opens 1
        RESULT-HOLDER  returned_rows 5 no_opens 1
          DISTRIBUTION-END (99.61%) execute_time 35.073 ms returned_rows 5
member 192.168.14.167(72048)<v1>:42223 begin_execution 2013-05-01 11:32:39.74 end_execution 2013-05-01 11:32:39.774
QUERY-RECEIVE 
  RESULT-SEND 
    RESULT-HOLDER  returned_rows 5 no_opens 1
      ROWIDSCAN (1.71%) execute_time 0.148 ms returned_rows 5 no_opens 1 node_details EMP : 
        CONSTRAINTSCAN (98.28%) execute_time 8.482 ms returned_rows 5 no_opens 1 scan_qualifiers None scanned_object APP.6__EMP__DEPTNO:base-table:APP.EMP scan_type  node_details WHERE : ((DEPTNO = CONSTANT:20) and true) 
http://feeds.feedburner.com/TheBlasFromPas
Categories: Fusion Middleware

Handling DML Events Synchronously with vFabric SQLFire

Sun, 2013-04-14 18:30
SQLFire provides synchronous cache plug-in mechanisms to handle cache events. This example is a synchronous listener. A listener enables you to receive after-event notifications of changes to a table (insert, update and delete). Any number of listeners can be defined for the same table. Listener callbacks are called synchronously, so they will cause the DML operation to block if the callback blocks.

CommandTableEventCallBackListenerImpl.java
  
package pivotal.au.demo.poc.listener;

import java.sql.ResultSet;
import java.sql.SQLException;
import pivotal.au.demo.poc.domain.Command;
import pivotal.au.demo.poc.executor.ExecutorCommand;
import pivotal.au.demo.poc.executor.ExecutorFactory;

import com.vmware.sqlfire.callbacks.Event;
import com.vmware.sqlfire.callbacks.Event.Type;
import com.vmware.sqlfire.callbacks.EventCallback;

public class CommandTableEventCallBackListenerImpl implements EventCallback
{ 
 public void close() throws SQLException 
 {
 }

 public void init(String configuration) throws SQLException 
 {  
  System.out.println("configuration = " + configuration);
 
  System.out.println("CommandTableEventCallBackListenerImpl.init");
  
 }

 public void onEvent(Event event) throws SQLException 
 {
  if (event.getType() == Type.AFTER_INSERT)
  {
   ResultSet rset = event.getNewRowsAsResultSet();
   Command cmd = 
     new Command(rset.getInt(1), 
        rset.getString(2),
        rset.getString(3),
        rset.getString(4),
        rset.getString(5));
   
   System.out.println("Table[" + event.getTableName() + "] Command = " + cmd.toString());
   handleEvent(cmd); 
  }
  else
  {
   System.out.println("Not processing event " + event.getType().toString());
  }
  
 }
 
 private void handleEvent (Command cmd)
 {
  System.out.println("Handling event for Command with id = " + cmd.getId());
  
  ExecutorCommand execCommand = null;
  
  if (cmd.getType().equalsIgnoreCase("OS"))
  {
   execCommand = ExecutorFactory.getOSExecutorImpl();
   execCommand.runCommand(cmd.getCommand(), null);
  }
  else
  {
   // expecting to execute SQL so check if firing on sqlfire or greenplum at this stage
   execCommand = ExecutorFactory.getSQLExecutorImpl();
   if (cmd.getExecuteOnGreenplum().equalsIgnoreCase("Y"))
   {
    execCommand.runCommand(cmd.getCommand(), "GP");
   }
   
   if (cmd.getExecuteOnSqlfire().equalsIgnoreCase("Y"))
   {
    execCommand.runCommand(cmd.getCommand(), "SQLFIRE");
   }
  }

 }

}

Attach Listener to a table.
  
CREATE TABLE command_table 
(ID INT generated always as identity NOT NULL, 
 EXECUTE_ON_SQLFIRE VARCHAR(1) default 'N',
 EXECUTE_ON_GREENPLUM VARCHAR(1) default 'Y',
 command_type varchar(10),
 COMMAND VARCHAR(200) not null
 )
SERVER GROUPS (MYGROUP);

call sys.ADD_LISTENER('CommandTableEventCallBackListenerImpl', 'apples', 'command_table', 'pivotal.au.demo.poc.listener.CommandTableEventCallBackListenerImpl', '', 'MYGROUP');


More Information

http://pubs.vmware.com/vfabricNoSuite/index.jsp?topic=/com.vmware.vfabric.sqlfire.1.1/caching_database/cache-plug-ins.htmlhttp://feeds.feedburner.com/TheBlasFromPas
Categories: Fusion Middleware

ADO .NET c# Connection Pooling with vFabric SQLFire

Thu, 2013-04-11 05:29
Whether it's accessing a database using JAVA or in this case c# I always want to use a connection pool and in this example I show and simple way to do this with a c# ADO .NET client accessing SQLFire.

1. Add a reference to your Visual Studio project in the VMware.Data.SQLFire.dll. This DLL is installed in the  

vFabric_SQLFire_11_bNNNNN\adonet\lib directory.


2. Reference the driver namespace in each source file where you want to use SQLFire components. For example, include this directive with all other references required in your application:

using VMware.Data.SQLFire;

3.  Create a c# console application as follows
  
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using VMware.Data.SQLFire;
using System.Data;

namespace SQLFireDemo
{
    class QueryDemo
    {
        private string sqlfHost = "192.168.1.4";
        private int sqlfPort = 1527;

        public QueryDemo()
        {
        }

        private string GetConnectionString()
        {
            return string.Format(@"server={0}:{1}", sqlfHost, sqlfPort);
        }

        public void run()
        {
            using (SQLFClientConnection conn = new SQLFClientConnection(GetConnectionString()))
            {
                conn.Open();
                SQLFCommand command = new SQLFCommand
                      (string.Format("SELECT * FROM dept"), conn);
                SQLFDataReader reader = command.ExecuteReader();

                try
                {
                    StringBuilder row = new StringBuilder();
                    while (reader.Read())
                    {
                        Console.WriteLine(string.Format("Dept[deptno={0}, dname={1}]",
                                          reader.GetString(0),
                                          reader.GetString(1)));
                    }
                }
                catch (Exception e)
                {
                    Console.WriteLine(e);
                }
                finally
                {
                    reader.Close();
                }
            }

        }

        static void Main(string[] args)
        {
            QueryDemo test = new QueryDemo();
            test.run();
        }
    }
}

Output as follows

Dept[deptno=10, dname=ACCOUNTING]
Dept[deptno=20, dname=RESEARCH]
Dept[deptno=30, dname=SALES]
Dept[deptno=40, dname=OPERATIONS]
Dept[deptno=50, dname=MARKETING]
Dept[deptno=60, dname=DEV]
Dept[deptno=70, dname=SUPPORT]
Press any key to continue . . .http://feeds.feedburner.com/TheBlasFromPas
Categories: Fusion Middleware

vFabric GemFire and the Native Client World using c#

Mon, 2013-04-08 05:17
Recently I had to step out of my comfort zone and learn how to create a c# client to access a GemFire 7 distributed system for a demo to a customer. OIf course there was more to it then just that but this outlines what you need to do to connect as a c# client to GemFire. I was using the following here.
  • Visual Studio 2012
  • GemFire 32 bit Naive Client
1. Install GemFire Native client 32 bit or 64 bit depending on your OS. It can be downloaded from the following location.

https://my.vmware.com/web/vmware/info/slug/application_platform/vmware_vfabric_gemfire/7_0

2. Once installed setup an ENV variable as shown below pointing to the location of the native client install.

C:\Windows\system32>echo %GFCPP%
C:\vFabric_NativeClient_32bit_7010

3. In your Visual Studio 2012 Project / Solution add a reference to GemFire DLL as shown below.




4. In Visual Studio 2012 create a cache.xml as shown below. This client cache is going to use a locator to connect to a cache server instance for the client itself.

xml/cache.xml
  
<?xml version="1.0"?>
<!DOCTYPE client-cache PUBLIC
    "-//GemStone Systems, Inc.//GemFire Declarative Caching 7.0//EN"
    "http://www.gemstone.com/dtd/cache7_0.dtd">

<client-cache>
  <pool name="client" subscription-enabled="true">
    <locator host="172.16.62.1" port="10334" />
  </pool>

  <region name="CommandRegion">
    <region-attributes refid="PROXY" pool-name="client">
    </region-attributes>
  </region>
  
  <region name="changeTrackingRegion">
    <region-attributes data-policy="normal" pool-name="client">
    </region-attributes>
  </region>
</client-cache>

5. Create 2 c# classes as shown below.

GemFireClient.cs
  
using GemStone.GemFire.Cache.Generic;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace pivotal.au.company.poc
{
    class GemFireClient
    {
        private static bool isStarted = false;
        private static GemFireClient instance = new GemFireClient();
        private string configFileLocation = "xml/cache.xml";
        private Properties<string, string> properties = Properties<string, string>.Create<string, string>();
        private CacheFactory cacheFactory;
        private Cache cache;
        IRegion<string, string> ctrRegion;

        private GemFireClient()
        {
            Console.WriteLine("Reading properties file xml/cache.xml...");
            string clientCacheXml = getCacheConfigLocation(configFileLocation);
            properties.Insert("cache-xml-file", clientCacheXml);
            Serializable.RegisterPdxSerializer(new ReflectionBasedAutoSerializer());

            cacheFactory = CacheFactory.CreateCacheFactory(properties);
            cache = cacheFactory.Create();
  
            ctrRegion = cache.GetRegion<string, string>("changeTrackingRegion");
            ctrRegion.GetSubscriptionService().RegisterRegex("."); 
            
            Console.WriteLine("ctrRegion size = " + ctrRegion.Count);
        }

        public static GemFireClient getInstance()
        {
            return instance;
        }

        public void closeClientCache()
        {
            cache.Close();
            Console.WriteLine("Client Cache closed...");
        }

        public Cache getCache()
        {
            return cache;
        }

        private static string getCacheConfigLocation(string cacheXml)
        {
            var directoryName = Path.GetDirectoryName(System.Reflection.Assembly.GetExecutingAssembly().Location);
            if (File.Exists(System.Environment.GetEnvironmentVariable("COMPANY_CONFIG") + "/" + cacheXml) == true)
            {
                return System.Environment.GetEnvironmentVariable("COMPANY_CONFIG") + "/" + cacheXml;
            }
            else if (File.Exists(Path.Combine(directoryName, cacheXml)) == true)
            {
                return Path.Combine(directoryName, cacheXml);
            }
            else
            {
                throw new SystemException("Unable to find /" + cacheXml);
            }
        }
    }
}

GemFireTest.cs
  
using pivotal.au.company.poc.domain;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using GemStone.GemFire.Cache.Generic;

namespace pivotal.au.company.poc
{
    class GemFireTest
    {
        GemFireClient gfClient;

        public void doInsert()
        {
            gfClient = GemFireClient.getInstance();

            // get command region
            IRegion<string, Command> commandRegion = gfClient.getCache().GetRegion<string, Command>("CommandRegion");

            // insert a Command Object into the region
            Command command = new Command();
            command.eventType = "INSERT";
            command.tableName = "Holiday";
            command.tableKey = "1";
            command.sequence = 31;
            command.payload = new Dictionary<object, object>()
                { 
                  {"Id", "1"},
               {"name", "apples"},
                  {"createdate", "10-10-2009"}
                };

            Console.WriteLine(command.ToString());

            commandRegion[command.tableKey] = command;

        }

        public void queryCommandRegion()
        {
            gfClient = GemFireClient.getInstance();

            Console.WriteLine("about to query commandRegion");

            QueryService<string, Command> queryService = gfClient.getCache().GetQueryService<string, Command>();
            Query<Command> qry = queryService.NewQuery("SELECT * FROM /CommandRegion");
            ISelectResults<Command> results = qry.Execute();
            SelectResultsIterator<Command> iter = results.GetIterator();
            while (iter.MoveNext())
            {
                Console.WriteLine(iter.Current.ToString());
            }

        }

        public void closeCache()
        {
            gfClient.closeClientCache();
        }

        public void run()
        {
            GemFireTest test = new GemFireTest();
            test.doInsert();
            test.queryCommandRegion();
            test.closeCache();
        }

    }
}

Output omitted but this should give you the general idea.

http://feeds.feedburner.com/TheBlasFromPas
Categories: Fusion Middleware

Implementing an AsyncEventListener for Write-Behind Cache Event Handling

Mon, 2013-03-04 03:45
As part of GemFire 70 release they have introduced as AsyncEventListener for write behind capability which is more or less very similar to the Gateway Listener in GemFire 6.x

An AsyncEventListener receives callbacks for events that change region data. You can use an AsyncEventListener implementation as a write-behind cache event handler to synchronize region updates with a database.

It documented as follows.

http://pubs.vmware.com/vfabricNoSuite/index.jsp?topic=/com.vmware.vfabric.gemfire.7.0/developing/events/implementing_write_behind_event_handler.html

So how would my cache.xml file for a member look like here.
  
<?xml version="1.0"?>
<!DOCTYPE cache PUBLIC
    "-//GemStone Systems, Inc.//GemFire Declarative Caching 7.0//EN"
    "http://www.gemstone.com/dtd/cache7_0.dtd">

<cache>
    <async-event-queue id="GreenplumQueue" parallel="true" batch-size="500">
       <async-event-listener>
             <class-name>vmware.pivotal.example.listener.GreenplumGatewayListener</class-name>
       </async-event-listener>      
    </async-event-queue>
    <cache-server port="40001" notify-by-subscription="true"/>
    <region name="greenplumRegion">
      <region-attributes refid="PARTITION_REDUNDANT" async-event-queue-ids="GreenplumQueue"/>
   </region>
</cache>

Finally the code to write an AsyncEventListener would be as follows.
  
package vmware.pivotal.example.listener;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import vmware.pivotal.example.dao.jdbcbatch.JdbcBatch;
import vmware.pivotal.example.dao.jdbcbatch.JdbcBatchDAO;

import com.gemstone.gemfire.cache.Declarable;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;

public class GreenplumGatewayListener implements AsyncEventListener, Declarable
{

 private Logger logger = Logger.getLogger(this.getClass().getSimpleName());
 private ApplicationContext context;
 private static final String BEAN_NAME = "jdbcBatchDAOImpl";
 private JdbcBatchDAO jdbcBatchDAO;
 
 public GreenplumGatewayListener()
 {
     context = new ClassPathXmlApplicationContext("application-context.xml");
     jdbcBatchDAO = (JdbcBatchDAO) context.getBean(BEAN_NAME);  
     logger.log (Level.INFO, "GreenplumGatewayListener started...");
 }
 
 @Override
 public boolean processEvents(@SuppressWarnings("rawtypes") List<AsyncEvent> list) 
 {
     logger.log (Level.INFO, String.format("Size of List<GatewayEvent> = %s", list.size()));
     List<JdbcBatch> newEntries = new ArrayList<JdbcBatch>();
     
     List<JdbcBatch> updatedEntries = new ArrayList<JdbcBatch>();
     List<String> destroyedEntries = new ArrayList<String>();
     int possibleDulicates = 0;
     
     for (@SuppressWarnings("rawtypes") AsyncEvent ge: list)
     {
       
       if (ge.getPossibleDuplicate())
        possibleDulicates++;
        
       if ( ge.getOperation().equals(Operation.UPDATE)) 
       {
      updatedEntries.add((JdbcBatch) ge.getDeserializedValue());
       }
       else if ( ge.getOperation().equals(Operation.CREATE))
       {
         newEntries.add((JdbcBatch) ge.getDeserializedValue());
       }
       else if ( ge.getOperation().equals(Operation.DESTROY))
       {
      destroyedEntries.add(ge.getKey().toString());
       }
      
     }
     
     if (newEntries.size() > 0)
     {
      jdbcBatchDAO.storeInsertBatch(newEntries); 
     }
     
     if (updatedEntries.size() > 0)
     {
      jdbcBatchDAO.storeUpdateBatch(updatedEntries);
     }
     
     if (destroyedEntries.size() > 0)
     {
      jdbcBatchDAO.storeDeleteBatch(destroyedEntries);
     }
     
     logger.log (Level.INFO, 
           String.format("New Entries = [%s], Updated Entries = [%s], Destroyed Entries = [%s], Possible Duplicates = [%s]", 
                   newEntries.size(), 
                   updatedEntries.size(), 
                   destroyedEntries.size(), 
                   possibleDulicates));
     
     return true;
 }


 public void init(Properties arg0) {
  // TODO Auto-generated method stub
  
 }

 public void close() {
  // TODO Auto-generated method stub
  
 }

}
http://feeds.feedburner.com/TheBlasFromPas
Categories: Fusion Middleware