Coherence - How to develop a custom push replication publisher

Posted by cosmin.tudor(at)oracle.com on Oracle Blogs See other posts from Oracle Blogs or by cosmin.tudor(at)oracle.com
Published on Thu, 13 Jan 2011 08:25:34 +0000 Indexed on 2011/01/13 10:56 UTC
Read the original article Hit count: 690

Filed under: Error when finding categories...

CoherencePushReplicationDB.zip

In the example bellow I'm describing a way of developing a custom push replication publisher that publishes data to a database via JDBC. This example can be easily changed to publish data to other receivers (JMS,...) by performing changes to step 2 and small changes to step 3, steps that are presented bellow. I've used Eclipse as the development tool.

To develop a custom push replication publishers we will need to go through 6 steps:

  • Step 1: Create a custom publisher scheme class

  • Step 2: Create a custom publisher class that should define what the publisher is doing.

  • Step 3: Create a class data is performing the actions (publish to JMS, DB, etc ) for the custom publisher.

  • Step 4: Register the new publisher against a ContentHandler.

  • Step 5: Add the new custom publisher in the cache configuration file.

  • Step 6: Add the custom publisher scheme class to the POF configuration file.


All these steps are detailed bellow.


The coherence project is attached and conclusions are presented at the end.


Step 1: In the Coherence Eclipse project create a class called CustomPublisherScheme that should implement com.oracle.coherence.patterns.pushreplication.publishers.AbstractPublisherScheme. In this class define the elements of the custom-publisher-scheme element.

For instance for a CustomPublisherScheme that looks like that:


<sync:publisher>

<sync:publisher-name>Active2-JDBC-Publisher</sync:publisher-name>

<sync:publisher-scheme>

<sync:custom-publisher-scheme>

<sync:jdbc-string>jdbc:oracle:thin:@machine-name:1521:XE</sync:jdbc-string>

<sync:username>hr</sync:username>

<sync:password>hr</sync:password>

</sync:custom-publisher-scheme>

</sync:publisher-scheme>

</sync:publisher>


the code is:


package com.oracle.coherence;


import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;


import com.oracle.coherence.patterns.pushreplication.Publisher;

import com.oracle.coherence.configuration.Configurable;

import com.oracle.coherence.configuration.Mandatory;

import com.oracle.coherence.configuration.Property;

import com.oracle.coherence.configuration.parameters.ParameterScope;

import com.oracle.coherence.environment.Environment;

import com.tangosol.io.pof.PofReader;

import com.tangosol.io.pof.PofWriter;

import com.tangosol.util.ExternalizableHelper;


@Configurable

public class CustomPublisherScheme

extends

com.oracle.coherence.patterns.pushreplication.publishers.AbstractPublisherScheme

{


/**

*

*/

private static final long serialVersionUID = 1L;

private String jdbcString;

private String username;

private String password;


public String getJdbcString()

{

return this.jdbcString;

}


@Property("jdbc-string")

@Mandatory

public void setJdbcString(String jdbcString)

{

this.jdbcString = jdbcString;

}


public String getUsername()

{

return username;

}


@Property("username")

@Mandatory

public void setUsername(String username)

{

this.username = username;

}


public String getPassword()

{

return password;

}


@Property("password")

@Mandatory

public void setPassword(String password)

{

this.password = password;

}


public Publisher realize(Environment environment, ClassLoader classLoader,

ParameterScope parameterScope)

{


return new CustomPublisher(getJdbcString(), getUsername(),

getPassword());

}


public void readExternal(DataInput in) throws IOException

{

super.readExternal(in);

this.jdbcString = ExternalizableHelper.readSafeUTF(in);

this.username = ExternalizableHelper.readSafeUTF(in);

this.password = ExternalizableHelper.readSafeUTF(in);

}


public void writeExternal(DataOutput out) throws IOException

{

super.writeExternal(out);

ExternalizableHelper.writeSafeUTF(out, this.jdbcString);

ExternalizableHelper.writeSafeUTF(out, this.username);

ExternalizableHelper.writeSafeUTF(out, this.password);

}


public void readExternal(PofReader reader) throws IOException

{

super.readExternal(reader);

this.jdbcString = reader.readString(100);

this.username = reader.readString(101);

this.password = reader.readString(102);

}


public void writeExternal(PofWriter writer) throws IOException

{

super.writeExternal(writer);

writer.writeString(100, this.jdbcString);

writer.writeString(101, this.username);

writer.writeString(102, this.password);

}


}


Step 2: Define what the CustomPublisher should basically do by creating a new java class called CustomPublisher that implements com.oracle.coherence.patterns.pushreplication.Publisher


package com.oracle.coherence;


import com.oracle.coherence.patterns.pushreplication.EntryOperation;

import com.oracle.coherence.patterns.pushreplication.Publisher;

import com.oracle.coherence.patterns.pushreplication.exceptions.PublisherNotReadyException;

import java.io.BufferedWriter;

import java.util.Iterator;


public class CustomPublisher implements Publisher

{

private String jdbcString;

private String username;

private String password;

private transient BufferedWriter bufferedWriter;


public CustomPublisher()

{

}


public CustomPublisher(String jdbcString, String username, String password)

{

this.jdbcString = jdbcString;

this.username = username;

this.password = password;

this.bufferedWriter = null;

}


public String getJdbcString()

{

return this.jdbcString;

}


public String getUsername()

{

return username;

}


public String getPassword()

{

return password;

}


public void publishBatch(String cacheName, String publisherName,

Iterator<EntryOperation> entryOperations)

{

DatabasePersistence databasePersistence = new DatabasePersistence(

jdbcString, username, password);

while (entryOperations.hasNext())

{

EntryOperation entryOperation = (EntryOperation) entryOperations

.next();

databasePersistence.databasePersist(entryOperation);

}

}


public void start(String cacheName, String publisherName)

throws PublisherNotReadyException

{

System.err

.printf("Started: Custom JDBC Publisher for Cache %s with Publisher %s\n",

new Object[] { cacheName, publisherName });


}


public void stop(String cacheName, String publisherName)

{

System.err

.printf("Stopped: Custom JDBC Publisher for Cache %s with Publisher %s\n",

new Object[] { cacheName, publisherName });

}

}


In the publishBatch method from above we inform the publisher that he is supposed to persist data to a database:


DatabasePersistence databasePersistence = new DatabasePersistence(

jdbcString, username, password);

while (entryOperations.hasNext())

{

EntryOperation entryOperation = (EntryOperation) entryOperations

.next();

databasePersistence.databasePersist(entryOperation);

}


Step 3: The class that deals with the persistence is a very basic one that uses JDBC to perform inserts/updates against a database.


package com.oracle.coherence;


import com.oracle.coherence.patterns.pushreplication.EntryOperation;

import java.sql.*;

import java.text.SimpleDateFormat;


import com.oracle.coherence.Order;


public class DatabasePersistence

{

public static String INSERT_OPERATION = "INSERT";

public static String UPDATE_OPERATION = "UPDATE";

public Connection dbConnection;


public DatabasePersistence(String jdbcString, String username,

String password)

{

this.dbConnection = createConnection(jdbcString, username, password);

}


public Connection createConnection(String jdbcString, String username,

String password)

{

Connection connection = null;

System.err.println("Connecting to: " + jdbcString + " Username: "

+ username + " Password: " + password);

try

{

// Load the JDBC driver

String driverName = "oracle.jdbc.driver.OracleDriver";

Class.forName(driverName);

// Create a connection to the database

connection = DriverManager.getConnection(jdbcString, username,

password);

System.err.println("Connected to:" + jdbcString + " Username: "

+ username + " Password: " + password);

} catch (ClassNotFoundException e)

{

e.printStackTrace();

}

// driver

catch (SQLException e)

{

e.printStackTrace();

}


return connection;


}


public void databasePersist(EntryOperation entryOperation)

{

if (entryOperation.getOperation().toString()

.equalsIgnoreCase(INSERT_OPERATION))

{

insert(((Order) entryOperation.getPublishableEntry().getValue()));

} else if (entryOperation.getOperation().toString()

.equalsIgnoreCase(UPDATE_OPERATION))

{

update(((Order) entryOperation.getPublishableEntry().getValue()));

}

}


public void update(Order order)

{

String update = "UPDATE Orders set QUANTITY= '"

+ order.getQuantity()

+ "', AMOUNT='"

+ order.getAmount()

+ "', ORD_DATE= '"

+ (new SimpleDateFormat("dd-MMM-yyyy")).format(order

.getOrdDate()) + "' WHERE SYMBOL='" + order.getSymbol()

+ "'";

System.err.println("UPDATE = " + update);

try

{

Statement stmt = getDbConnection().createStatement();

stmt.execute(update);

stmt.close();

} catch (SQLException ex)

{

System.err.println("SQLException: " + ex.getMessage());

}

}


public void insert(Order order)

{


String insert = "insert into Orders values('"

+ order.getSymbol()

+ "',"

+ order.getQuantity()

+ ","

+ order.getAmount()

+ ",'"

+ (new SimpleDateFormat("dd-MMM-yyyy")).format(order

.getOrdDate()) + "')";

System.err.println("INSERT = " + insert);

try

{

Statement stmt = getDbConnection().createStatement();

stmt.execute(insert);

stmt.close();

} catch (SQLException ex)

{

System.err.println("SQLException: " + ex.getMessage());

}


}


public Connection getDbConnection()

{

return dbConnection;

}


public void setDbConnection(Connection dbConnection)

{

this.dbConnection = dbConnection;

}


}


Step 4: Now we need to register our publisher against a ContentHandler. In order to achieve that we need to create in our eclipse project a new class called CustomPushReplicationNamespaceContentHandler that should extend the com.oracle.coherence.patterns.pushreplication.configuration.PushReplicationNamespaceContentHandler. In the constructor of the new class we define a new handler for our custom publisher.


package com.oracle.coherence;


import com.oracle.coherence.configuration.Configurator;

import com.oracle.coherence.environment.extensible.ConfigurationContext;

import com.oracle.coherence.environment.extensible.ConfigurationException;

import com.oracle.coherence.environment.extensible.ElementContentHandler;

import com.oracle.coherence.patterns.pushreplication.PublisherScheme;

import com.oracle.coherence.environment.extensible.QualifiedName;

import com.oracle.coherence.patterns.pushreplication.configuration.PushReplicationNamespaceContentHandler;

import com.tangosol.run.xml.XmlElement;



public class CustomPushReplicationNamespaceContentHandler extends PushReplicationNamespaceContentHandler

{


public CustomPushReplicationNamespaceContentHandler()

{

super();

registerContentHandler("custom-publisher-scheme", new ElementContentHandler()

{

public Object onElement(ConfigurationContext context, QualifiedName qualifiedName, XmlElement xmlElement)

throws ConfigurationException

{

PublisherScheme publisherScheme = new CustomPublisherScheme();


Configurator.configure(publisherScheme, context, qualifiedName, xmlElement);


return publisherScheme;

}

});

}


}


Step 5: Now we should define our CustomPublisher in the cache configuration file according to the following documentation.


<cache-config

xmlns:sync="class:com.oracle.coherence.CustomPushReplicationNamespaceContentHandler"

xmlns:cr="class:com.oracle.coherence.environment.extensible.namespaces.InstanceNamespaceContentHandler">


<caching-schemes>

<sync:provider pof-enabled="false">

<sync:coherence-provider />

</sync:provider>


<caching-scheme-mapping>

<cache-mapping>

<cache-name>publishing-cache</cache-name>

<scheme-name>distributed-scheme-with-publishing-cachestore</scheme-name>

<autostart>true</autostart>

<sync:publisher>

<sync:publisher-name>Active2 Publisher</sync:publisher-name>

<sync:publisher-scheme>

<sync:remote-cluster-publisher-scheme>

<sync:remote-invocation-service-name>remote-site1</sync:remote-invocation-service-name>

<sync:remote-publisher-scheme>

<sync:local-cache-publisher-scheme>

<sync:target-cache-name>publishing-cache</sync:target-cache-name>

</sync:local-cache-publisher-scheme>

</sync:remote-publisher-scheme>

<sync:autostart>true</sync:autostart>

</sync:remote-cluster-publisher-scheme>

</sync:publisher-scheme>

</sync:publisher>

<sync:publisher>

<sync:publisher-name>Active2-Output-Publisher</sync:publisher-name>

<sync:publisher-scheme>

<sync:stderr-publisher-scheme>

<sync:autostart>true</sync:autostart>

<sync:publish-original-value>true</sync:publish-original-value>

</sync:stderr-publisher-scheme>

</sync:publisher-scheme>

</sync:publisher>

<sync:publisher>

<sync:publisher-name>Active2-JDBC-Publisher</sync:publisher-name>

<sync:publisher-scheme>

<sync:custom-publisher-scheme>

<sync:jdbc-string>jdbc:oracle:thin:@machine_name:1521:XE</sync:jdbc-string>

<sync:username>hr</sync:username>

<sync:password>hr</sync:password>

</sync:custom-publisher-scheme>

</sync:publisher-scheme>

</sync:publisher>

</cache-mapping>

</caching-scheme-mapping>

<!-- The following scheme is required for each remote-site when using a

RemoteInvocationPublisher -->

<remote-invocation-scheme>

<service-name>remote-site1</service-name>

<initiator-config>

<tcp-initiator>

<remote-addresses>

<socket-address>

<address>localhost</address>

<port>20001</port>

</socket-address>

</remote-addresses>

<connect-timeout>2s</connect-timeout>

</tcp-initiator>

<outgoing-message-handler>

<request-timeout>5s</request-timeout>

</outgoing-message-handler>

</initiator-config>

</remote-invocation-scheme>

<!-- END: com.oracle.coherence.patterns.pushreplication -->


<proxy-scheme>

<service-name>ExtendTcpProxyService</service-name>

<acceptor-config>

<tcp-acceptor>

<local-address>

<address>localhost</address>

<port>20002</port>

</local-address>

</tcp-acceptor>

</acceptor-config>

<autostart>true</autostart>

</proxy-scheme>

</caching-schemes>

</cache-config>


As you can see in the red-marked text from above I've:
       - set new Namespace Content Handler
       - define the new custom publisher that should work together with other publishers like: stderr and remote publishers in our case.


Step 6: Add the com.oracle.coherence.CustomPublisherScheme to your custom-pof-config file:


<pof-config>

<user-type-list>

<!-- Built in types -->

<include>coherence-pof-config.xml</include>

<include>coherence-common-pof-config.xml</include>

<include>coherence-messagingpattern-pof-config.xml</include>

<include>coherence-pushreplicationpattern-pof-config.xml</include>

<!-- Application types -->

<user-type>

<type-id>1901</type-id>

<class-name>com.oracle.coherence.Order</class-name>

<serializer>

<class-name>com.oracle.coherence.OrderSerializer</class-name>

</serializer>

</user-type>

<user-type>

<type-id>1902</type-id>

<class-name>com.oracle.coherence.CustomPublisherScheme</class-name>

</user-type>

</user-type-list>

</pof-config>


CONCLUSIONS

This approach allows for publishers to publish data to almost any other receiver (database, JMS, MQ, ...). The only thing that needs to be changed is the DatabasePersistence.java class that should be adapted to the chosen receiver. Only minor changes are needed for the rest of the code (to publishBatch method from CustomPublisher class).


© Oracle Blogs or respective owner