Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 4.0

Its quite tricky...

This will enable you to use JCA driven XA transactions for message delivery, outside a J2EE container. It will also allow outbound JMS messages to fall into those transactions.

This has only been tested with ActiveMQ using JOTM as the transaction manager.

Code Block
public class JcaJmsConnection {

    private Map<Integer, ActivationInfo> activationMap = new HashMap<Integer, ActivationInfo>();
    private AtomicInteger activationCounter = new AtomicInteger();

    private final String nameForDisplayPurposes;
    private final TransactionManager transactionManager;
    private ErrorHandler errorHandler;
    private String brokerUrl;
    private ConnectionFactory connectionFactory;

    public JcaJmsConnection(TransactionManager transactionManager, ErrorHandler errorHandler, String brokerUrl) throws Exception {
        this("Unknown", transactionManager, errorHandler, brokerUrl);
    }

    public JcaJmsConnection(String nameForDisplayPurposes, TransactionManager transactionManager, ErrorHandler errorHandler, String brokerUrl) throws Exception {
        this.nameForDisplayPurposes = nameForDisplayPurposes;
        this.transactionManager = transactionManager;
        this.errorHandler = errorHandler;
        this.brokerUrl = brokerUrl;
        this.connectionFactory = createManagedConnectionFactory();
    }

    public Connection getConnection() throws JMSException {
        return connectionFactory.createConnection();
    }

    private ConnectionFactory createManagedConnectionFactory() throws Exception {
        ActiveMQManagedConnectionFactory managedConnectionFactory = new ActiveMQManagedConnectionFactory();
        managedConnectionFactory.setResourceAdapter(new ActiveMQResourceAdapter() {
            {
                setServerUrl(brokerUrl);
            }
        });

        ConnectionManagerFactoryBean connectionManagerFactory = new ConnectionManagerFactoryBean();
        connectionManagerFactory.setTransactionManager(transactionManager);
        connectionManagerFactory.setPooling(true);
        connectionManagerFactory.setTransactionSupport(new XATransactions(false, true));

        connectionManagerFactory.afterPropertiesSet();
        ConnectionManager connectionManager = (ConnectionManager) connectionManagerFactory.getObject();

        return (ConnectionFactory) managedConnectionFactory.createConnectionFactory(connectionManager);
    }

    /**
     * Give this method the activation number you got from setartupMessageListener
     * @param activationNumber
     */
    public void shutdownMessageListener(int activationNumber) {
        ActivationInfo activationInfo = activationMap.get(activationNumber);
        if ( activationInfo == null ) {
            throw new Bug("Can't find an activation with id " + activationNumber);
        }
        activationInfo.resourceAdapter.endpointDeactivation(activationInfo.messageEndpointFactory, activationInfo.activationSpec);
    }

    /**
     * This only works with JOTM.
     */
    public int startUpMessageListener(String queueName, MessageListener messageListener, int workerThreadCount) throws Exception {
        return startUpMessageListener(((Current) transactionManager).getXATerminator(), queueName, messageListener, workerThreadCount) ;
    }

    /**
     * Start up multithreaded messagelistener, return activationnumber that you will need to shut it down again.
     */
    public int startUpMessageListener(XATerminator xaTerminator, String queueName, MessageListener messageListener, int workerThreadCount) throws Exception {
        ExecutorService threadPool = Executors.newFixedThreadPool(workerThreadCount);

        // The Geronimo code does not actually use the XAWork interface when used like this, so we don't need to create it.
        GeronimoWorkManager workManager = new GeronimoWorkManager(threadPool, threadPool, threadPool, null);
        BootstrapContext bootstrapContext = new GeronimoBootstrapContext(workManager, xaTerminator);

        ActiveMQResourceAdapter resourceAdapter = new ActiveMQResourceAdapter();
        resourceAdapter.setServerUrl(brokerUrl);

        resourceAdapter.start(bootstrapContext);

        ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
        activationSpec.setDestination(queueName);
        activationSpec.setDestinationType("javax.jms.Queue");
        activationSpec.setResourceAdapter(resourceAdapter);
        activationSpec.setInitialRedeliveryDelay(100);
        activationSpec.setMaximumRedeliveries(2);

        //TODO Jamesr Allow message selectors.
        // activationSpec.setMessageSelector(messsageSelector);

        MessageEndpointFactory messageEndpointFactory = new NamingEndpointFactory(transactionManager, messageListener);
        resourceAdapter.endpointActivation(messageEndpointFactory, activationSpec);

        ActivationInfo activationInfo = new ActivationInfo(resourceAdapter, activationSpec, messageEndpointFactory);

        int activationNumber = activationCounter.incrementAndGet();
        activationMap.put(activationNumber, activationInfo);

        return activationNumber;
    }

    public static class ActivationInfo {
        public MessageEndpointFactory messageEndpointFactory;
        public ActivationSpec activationSpec;
        public ResourceAdapter resourceAdapter;

        public ActivationInfo(ResourceAdapter resourceAdapter, ActivationSpec activationSpec, MessageEndpointFactory messageEndpointFactory) {
            this.messageEndpointFactory = messageEndpointFactory;
            this.activationSpec = activationSpec;
            this.resourceAdapter = resourceAdapter;
        }
    }

    public static class NamingEndpointFactory implements MessageEndpointFactory {

        private final TransactionManager transactionManager;
        private MessageListener messageListener;

        public NamingEndpointFactory(TransactionManager transactionManager, MessageListener messageListener) {
            this.transactionManager = transactionManager;
            this.messageListener = messageListener;
        }

        public MessageEndpoint createEndpoint(XAResource xaResource) throws UnavailableException {
            NamedXAResource r = new WrapperNamedXAResource(xaResource, "hello");
            return new XAEndpoint(messageListener, r, transactionManager);
        }

        public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
            return true;
        }
    }