Monday 27 June 2011

JMS- Message Broker Apache Active MQ


JMS (Java Messaging Service) API is a java Message Oriented Middleware (MOM) API for communicating between two or more clients. It allows Java components to create, send, receive and read Messages .It allows reliable, loosely coupled, asynchronous communication between various distributed components.

Apache ActiveMQ is an open source message broker which implements the JMS.

Testing ActiveMQ:
-------------------

1.Extract the ActiveMQ
2. Go to the Extracted Folder/bin
3.Run  cmd activemq and on success you should see some thing as in the pic below.



------------------------------------------------------------------------------------------------------------


C:\ActiveMQ5.5\bin>activemq
Java Runtime: Sun Microsystems Inc. 1.6.0_17 C:\Program Files\Java\jdk1.6.0_17\jre
  Heap sizes: current=8896k  free=5824k  max=466048k
    JVM args: -Dcom.sun.management.jmxremote -Xmx512M -Dorg.apache.activemq.UseDedicatedTaskRunner=true -Djava.util.logging.config.file=logging.properties
ACTIVEMQ_HOME: C:\ActiveMQ5.5\bin\..
ACTIVEMQ_BASE: C:\ActiveMQ5.5\bin\..
Loading message broker from: xbean:activemq.xml
 INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@50618d26: startup date [Mon Jun 27 12:32:23 GMT+05:30 2011]; root of context hierarchy
 WARN | destroyApplicationContextOnStop parameter is deprecated, please use shutdown hooks instead
 INFO | PListStore:C:\ActiveMQ5.5\bin\..\data\localhost\tmp_storage started
 INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\ActiveMQ5.5\bin\..\data\kahadb]
 INFO | KahaDB is version 3
 INFO | Recovering from the journal ...
 INFO | Recovery replayed 1 operations from the journal in 0.015 seconds.
 INFO | ActiveMQ 5.5.0 JMS Message Broker (localhost) is starting
 INFO | For help or more information please see: http://activemq.apache.org/
 INFO | Listening for connections at: tcp://D-MAA-00391437:61616
 INFO | Connector openwire Started
 INFO | ActiveMQ JMS Message Broker (localhost, ID:D-MAA-00391437-2490-1309158145193-0:1) started
 INFO | jetty-7.1.6.v20100715
 INFO | ActiveMQ WebConsole initialized.
 INFO | Initializing Spring FrameworkServlet 'dispatcher'
 INFO | ActiveMQ Console at http://0.0.0.0:8161/admin
 INFO | Initializing Spring root WebApplicationContext
 INFO | OSGi environment not detected.
 INFO | Apache Camel 2.7.0 (CamelContext: camel) is starting
 INFO | JMX enabled. Using ManagedManagementStrategy.
 INFO | Found 5 packages with 16 @Converter classes to load
 INFO | Loaded 152 type converters in 0.719 seconds
 INFO | Connector vm://localhost Started
 INFO | Route: route1 started and consuming from: Endpoint[activemq://example.A]
 INFO | Total 1 routes, of which 1 is started.
 INFO | Apache Camel 2.7.0 (CamelContext: camel) started in 1.562 seconds
 INFO | Camel Console at http://0.0.0.0:8161/camel
 INFO | ActiveMQ Web Demos at http://0.0.0.0:8161/demo
 INFO | RESTful file access application at http://0.0.0.0:8161/fileserver
 INFO | Started SelectChannelConnector@0.0.0.0:8161
----------------------------------------------------------------------------------------------------------- 






Monitoring  ActiveMQ:
----------------------- 
ActiveMQ can be monitored on the web console using http://localhost:8161/admin





Sample Producer and Consumer Code:

Note:Add  activemq-all-x.x.x.jar to the build path of the project and start the activemq before you run the sample ;-).

Producer:
package com.activemq.demo;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 *
 * @author DurgaPrasad.K
 *
 */
public class Producer {
    // URL of the JMS server. DEFAULT_BROKER_URL will just mean
    // that JMS server is on localhost
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    // Name of the queue we will be sending messages to
    private static String subject = "DEMOQUEUE";

    public static void main(String[] args) throws JMSException {
        // Getting JMS connection from the server and starting it
        ConnectionFactory connectionFactory =
            new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // JMS messages are sent and received using a Session. We will
        // create here a non-transactional session object. If you want
        // to use transactions you should set the first parameter to 'true'
        Session session = connection.createSession(false,
            Session.AUTO_ACKNOWLEDGE);

        // Destination represents here our queue 'DEMOQUEUE' on the
        // JMS server. You don't have to do anything special on the
        // server to create it, it will be created automatically.
        Destination destination = session.createQueue(subject);

        // MessageProducer is used for sending messages (as opposed
        // to MessageConsumer which is used for receiving them)
        MessageProducer producer = session.createProducer(destination);

        // We will send a small text message saying 'ActiveMQ Demo'
        TextMessage message = session.createTextMessage("ActiveMQ Demo");

        // Here we are sending the message!
        producer.send(message);
        System.out.println("Sent message '" + message.getText() + "'");

        connection.close();
    }
}


Consumer : 


package com.activemq.demo;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {
    // URL of the JMS server
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    // Name of the queue we will receive messages from
    private static String subject = "DEMOQUEUE";

    public static void main(String[] args) throws JMSException {
        // Getting JMS connection from the server
        ConnectionFactory connectionFactory
            = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // Creating session for seding messages
        Session session = connection.createSession(false,
            Session.AUTO_ACKNOWLEDGE);

        // Getting the queue 'TESTQUEUE'
        Destination destination = session.createQueue(subject);

        // MessageConsumer is used for receiving (consuming) messages
        MessageConsumer consumer = session.createConsumer(destination);

        // Here we receive the message.
        // By default this call is blocking, which means it will wait
        // for a message to arrive on the queue.
        Message message = consumer.receive();

        // There are many types of Message and TextMessage
        // is just one of them. Producer sent us a TextMessage
        // so we must cast to it to get access to its .getText()
        // method.
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("Received message '"
                + textMessage.getText() + "'");
        }
        connection.close();
    }
}
 












Cheers !! ;-) 


No comments:

Post a Comment