Mega Code Archive

 
Categories / Java / J2EE
 

The use of a message listener in the publishsubscribe model The producer publishes several messages, and the consumer reads them asyn

/*  * @(#)AsynchTopicExample.java  1.3 02/05/02  *  * Copyright (c) 2000-2002 Sun Microsystems, Inc. All Rights Reserved.  *   * Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,  * modify and redistribute this software in source and binary code form,  * provided that i) this copyright notice and license appear on all copies of  * the software; and ii) Licensee does not utilize the software in a manner  * which is disparaging to Sun.  *  * This software is provided "AS IS," without a warranty of any kind. ALL  * EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY  * IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR  * NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE  * LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING  * OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS  * LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,  * INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER  * CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF  * OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE  * POSSIBILITY OF SUCH DAMAGES.  *  * This software is not designed or intended for use in on-line control of  * aircraft, air traffic, aircraft navigation or aircraft communications; or in  * the design, construction, operation or maintenance of any nuclear  * facility. Licensee represents and warrants that it will not use or  * redistribute the Software for such purposes.  */ import javax.jms.*; /**  * The AsynchTopicExample class demonstrates the use of a message listener in   * the publish/subscribe model.  The producer publishes several messages, and  * the consumer reads them asynchronously.  * <p>  * The program contains a MultipleProducer class, an AsynchConsumer class  * with a listener class, a main method, and a method that runs the consumer  * and producer threads.  * <p>  * Specify a topic name on the command line when you run the program.  The   * program also uses a queue named "controlQueue", which should be created    * before you run the program.  *  * @author Kim Haase  * @version 1.6, 08/18/00  */ public class AsynchTopicExample {     final String  CONTROL_QUEUE = "controlQueue";     String        topicName = null;     int           exitResult = 0;     /**      * The AsynchConsumer class fetches several messages from a topic       * asynchronously, using a message listener, TextListener.      *      * @author Kim Haase      * @version 1.6, 08/18/00      */     public class AsynchConsumer extends Thread {         /**          * The TextListener class implements the MessageListener interface by           * defining an onMessage method for the AsynchConsumer class.          *          * @author Kim Haase          * @version 1.6, 08/18/00          */         private class TextListener implements MessageListener {             final SampleUtilities.DoneLatch  monitor =                 new SampleUtilities.DoneLatch();             /**              * Casts the message to a TextMessage and displays its text.              * A non-text message is interpreted as the end of the message               * stream, and the message listener sets its monitor state to all               * done processing messages.              *              * @param message  the incoming message              */             public void onMessage(Message message) {                 if (message instanceof TextMessage) {                     TextMessage  msg = (TextMessage) message;                     try {                         System.out.println("CONSUMER THREAD: Reading message: "                                             + msg.getText());                     } catch (JMSException e) {                         System.out.println("Exception in onMessage(): "                                             + e.toString());                     }                 } else {                     monitor.allDone();                 }             }         }         /**          * Runs the thread.          */         public void run() {             ConnectionFactory    connectionFactory = null;             Connection           connection = null;             Session              session = null;             Topic                topic = null;             MessageConsumer      msgConsumer = null;             TextListener         topicListener = null;             try {                 connectionFactory =                      SampleUtilities.getConnectionFactory();                 connection =                      connectionFactory.createConnection();                 session = connection.createSession(false,                      Session.AUTO_ACKNOWLEDGE);                 topic = SampleUtilities.getTopic(topicName, session);             } catch (Exception e) {                 System.out.println("Connection problem: " + e.toString());                 if (connection != null) {                     try {                         connection.close();                     } catch (JMSException ee) {}                 }               System.exit(1);             }              /*              * Create consumer.              * Register message listener (TextListener).              * Start message delivery.              * Send synchronize message to producer, then wait till all              * messages have arrived.              * Listener displays the messages obtained.              */             try {                 msgConsumer = session.createConsumer(topic);                 topicListener = new TextListener();                 msgConsumer.setMessageListener(topicListener);                 connection.start();                                  // Let producer know that consumer is ready.                 try {                     SampleUtilities.sendSynchronizeMessage("CONSUMER THREAD: ",                                                             CONTROL_QUEUE);                 } catch (Exception e) {                     System.out.println("Queue probably missing: " + e.toString());                     if (connection != null) {                         try {                             connection.close();                         } catch (JMSException ee) {}                     }                   System.exit(1);               }                 /*                  * Asynchronously process messages.                  * Block until producer issues a control message indicating                  * end of publish stream.                  */                 topicListener.monitor.waitTillDone();             } catch (JMSException e) {                 System.out.println("Exception occurred: " + e.toString());                 exitResult = 1;             } finally {                 if (connection != null) {                     try {                         connection.close();                     } catch (JMSException e) {                         exitResult = 1;                     }                 }             }         }           }     /**      * The MultipleProducer class publishes several message to a topic.       *      * @author Kim Haase      * @version 1.6, 08/18/00      */     public class MultipleProducer extends Thread {         /**          * Runs the thread.          */         public void run() {             ConnectionFactory    connectionFactory = null;             Connection           connection = null;             Session              session = null;             Topic                topic = null;             MessageProducer      msgProducer = null;             TextMessage          message = null;             final int            NUMMSGS = 20;             final String         MSG_TEXT = new String("Here is a message");             try {                 connectionFactory =                      SampleUtilities.getConnectionFactory();                 connection =                      connectionFactory.createConnection();                 session = connection.createSession(false,                      Session.AUTO_ACKNOWLEDGE);                 topic = SampleUtilities.getTopic(topicName, session);             } catch (Exception e) {                 System.out.println("Connection problem: " + e.toString());                 if (connection != null) {                     try {                         connection.close();                     } catch (JMSException ee) {}                 }               System.exit(1);             }              /*              * After synchronizing with consumer, create producer.              * Create text message.              * Send messages, varying text slightly.              * Send end-of-messages message.              * Finally, close connection.              */             try {                 /*                  * Synchronize with consumer.  Wait for message indicating                   * that consumer is ready to receive messages.                  */                 try {                     SampleUtilities.receiveSynchronizeMessages("PRODUCER THREAD: ",                                                               CONTROL_QUEUE, 1);                 } catch (Exception e) {                     System.out.println("Queue probably missing: " + e.toString());                     if (connection != null) {                         try {                             connection.close();                         } catch (JMSException ee) {}                     }                   System.exit(1);               }                                  msgProducer = session.createProducer(topic);                 message = session.createTextMessage();                 for (int i = 0; i < NUMMSGS; i++) {                     message.setText(MSG_TEXT + " " + (i + 1));                     System.out.println("PRODUCER THREAD: Publishing message: "                          + message.getText());                     msgProducer.send(message);                 }                 // Send a non-text control message indicating end of messages.                 msgProducer.send(session.createMessage());             } catch (JMSException e) {                 System.out.println("Exception occurred: " + e.toString());                 exitResult = 1;             } finally {                 if (connection != null) {                     try {                         connection.close();                     } catch (JMSException e) {                         exitResult = 1;                     }                 }             }         }     }          /**      * Instantiates the consumer and producer classes and starts their      * threads.      * Calls the join method to wait for the threads to die.      * <p>      * It is essential to start the consumer before starting the producer.      * In the publish/subscribe model, a consumer can ordinarily receive only       * messages published while it is active.      */     public void run_threads() {         AsynchConsumer   asynchConsumer = new AsynchConsumer();         MultipleProducer  multipleProducer = new MultipleProducer();         multipleProducer.start();         asynchConsumer.start();         try {             asynchConsumer.join();             multipleProducer.join();         } catch (InterruptedException e) {}     }     /**      * Reads the topic name from the command line, then calls the      * run_threads method to execute the program threads.      *      * @param args  the topic used by the example      */     public static void main(String[] args) {         AsynchTopicExample  ate = new AsynchTopicExample();                  if (args.length != 1) {           System.out.println("Usage: java AsynchTopicExample <topic_name>");           System.exit(1);       }         ate.topicName = new String(args[0]);         System.out.println("Topic name is " + ate.topicName);       ate.run_threads();       SampleUtilities.exit(ate.exitResult);     } }          jms.zip( 65 k)