This will enable you to use JCA driven XA transactions for message delivery, outside a J2EE container. It will also allow outbound JMS messages to fall into those transactions.
This has only been tested with ActiveMQ using JOTM as the transaction manager.
public class JcaJmsConnection {
private Map<Integer, ActivationInfo> activationMap = new HashMap<Integer, ActivationInfo>();
private AtomicInteger activationCounter = new AtomicInteger();
private final String nameForDisplayPurposes;
private final TransactionManager transactionManager;
private ErrorHandler errorHandler;
private String brokerUrl;
private ConnectionFactory connectionFactory;
public JcaJmsConnection(TransactionManager transactionManager, ErrorHandler errorHandler, String brokerUrl) throws Exception {
this("Unknown", transactionManager, errorHandler, brokerUrl);
}
public JcaJmsConnection(String nameForDisplayPurposes, TransactionManager transactionManager, ErrorHandler errorHandler, String brokerUrl) throws Exception {
this.nameForDisplayPurposes = nameForDisplayPurposes;
this.transactionManager = transactionManager;
this.errorHandler = errorHandler;
this.brokerUrl = brokerUrl;
this.connectionFactory = createManagedConnectionFactory();
}
public Connection getConnection() throws JMSException {
return connectionFactory.createConnection();
}
private ConnectionFactory createManagedConnectionFactory() throws Exception {
ActiveMQManagedConnectionFactory managedConnectionFactory = new ActiveMQManagedConnectionFactory();
managedConnectionFactory.setResourceAdapter(new ActiveMQResourceAdapter() {
{
setServerUrl(brokerUrl);
}
});
ConnectionManagerFactoryBean connectionManagerFactory = new ConnectionManagerFactoryBean();
connectionManagerFactory.setTransactionManager(transactionManager);
connectionManagerFactory.setPooling(true);
connectionManagerFactory.setTransactionSupport(new XATransactions(false, true));
connectionManagerFactory.afterPropertiesSet();
ConnectionManager connectionManager = (ConnectionManager) connectionManagerFactory.getObject();
return (ConnectionFactory) managedConnectionFactory.createConnectionFactory(connectionManager);
}
/**
* Give this method the activation number you got from setartupMessageListener
* @param activationNumber
*/
public void shutdownMessageListener(int activationNumber) {
ActivationInfo activationInfo = activationMap.get(activationNumber);
if ( activationInfo == null ) {
throw new Bug("Can't find an activation with id " + activationNumber);
}
activationInfo.resourceAdapter.endpointDeactivation(activationInfo.messageEndpointFactory, activationInfo.activationSpec);
}
/**
* This only works with JOTM.
*/
public int startUpMessageListener(String queueName, MessageListener messageListener, int workerThreadCount) throws Exception {
return startUpMessageListener(((Current) transactionManager).getXATerminator(), queueName, messageListener, workerThreadCount) ;
}
/**
* Start up multithreaded messagelistener, return activationnumber that you will need to shut it down again.
*/
public int startUpMessageListener(XATerminator xaTerminator, String queueName, MessageListener messageListener, int workerThreadCount) throws Exception {
ExecutorService threadPool = Executors.newFixedThreadPool(workerThreadCount);
// The Geronimo code does not actually use the XAWork interface when used like this, so we don't need to create it.
GeronimoWorkManager workManager = new GeronimoWorkManager(threadPool, threadPool, threadPool, null);
BootstrapContext bootstrapContext = new GeronimoBootstrapContext(workManager, xaTerminator);
ActiveMQResourceAdapter resourceAdapter = new ActiveMQResourceAdapter();
resourceAdapter.setServerUrl(brokerUrl);
resourceAdapter.start(bootstrapContext);
ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
activationSpec.setDestination(queueName);
activationSpec.setDestinationType("javax.jms.Queue");
activationSpec.setResourceAdapter(resourceAdapter);
activationSpec.setInitialRedeliveryDelay(100);
activationSpec.setMaximumRedeliveries(2);
//TODO Jamesr Allow message selectors.
// activationSpec.setMessageSelector(messsageSelector);
MessageEndpointFactory messageEndpointFactory = new NamingEndpointFactory(transactionManager, messageListener);
resourceAdapter.endpointActivation(messageEndpointFactory, activationSpec);
ActivationInfo activationInfo = new ActivationInfo(resourceAdapter, activationSpec, messageEndpointFactory);
int activationNumber = activationCounter.incrementAndGet();
activationMap.put(activationNumber, activationInfo);
return activationNumber;
}
public static class ActivationInfo {
public MessageEndpointFactory messageEndpointFactory;
public ActivationSpec activationSpec;
public ResourceAdapter resourceAdapter;
public ActivationInfo(ResourceAdapter resourceAdapter, ActivationSpec activationSpec, MessageEndpointFactory messageEndpointFactory) {
this.messageEndpointFactory = messageEndpointFactory;
this.activationSpec = activationSpec;
this.resourceAdapter = resourceAdapter;
}
}
public static class NamingEndpointFactory implements MessageEndpointFactory {
private final TransactionManager transactionManager;
private MessageListener messageListener;
public NamingEndpointFactory(TransactionManager transactionManager, MessageListener messageListener) {
this.transactionManager = transactionManager;
this.messageListener = messageListener;
}
public MessageEndpoint createEndpoint(XAResource xaResource) throws UnavailableException {
NamedXAResource r = new WrapperNamedXAResource(xaResource, "hello");
return new XAEndpoint(messageListener, r, transactionManager);
}
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
return true;
}
}
|