Mega Code Archive

 
Categories / Java / J2EE
 

A message will not be acknowledged until processing of it is complete

/*  * @(#)AckEquivExample.java  1.3 02/05/02  *   * Copyright (c) 2000-2002 Sun Microsystems, Inc. All Rights Reserved.  *   * Sun grants you ("Licensee") a non-exclusive, royalty free, license to use,  * modify and redistribute this software in source and binary code form,  * provided that i) this copyright notice and license appear on all copies of  * the software; and ii) Licensee does not utilize the software in a manner  * which is disparaging to Sun.  *  * This software is provided "AS IS," without a warranty of any kind. ALL  * EXPRESS OR IMPLIED CONDITIONS, REPRESENTATIONS AND WARRANTIES, INCLUDING ANY  * IMPLIED WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR  * NON-INFRINGEMENT, ARE HEREBY EXCLUDED. SUN AND ITS LICENSORS SHALL NOT BE  * LIABLE FOR ANY DAMAGES SUFFERED BY LICENSEE AS A RESULT OF USING, MODIFYING  * OR DISTRIBUTING THE SOFTWARE OR ITS DERIVATIVES. IN NO EVENT WILL SUN OR ITS  * LICENSORS BE LIABLE FOR ANY LOST REVENUE, PROFIT OR DATA, OR FOR DIRECT,  * INDIRECT, SPECIAL, CONSEQUENTIAL, INCIDENTAL OR PUNITIVE DAMAGES, HOWEVER  * CAUSED AND REGARDLESS OF THE THEORY OF LIABILITY, ARISING OUT OF THE USE OF  * OR INABILITY TO USE SOFTWARE, EVEN IF SUN HAS BEEN ADVISED OF THE  * POSSIBILITY OF SUCH DAMAGES.  *  * This software is not designed or intended for use in on-line control of  * aircraft, air traffic, aircraft navigation or aircraft communications; or in  * the design, construction, operation or maintenance of any nuclear  * facility. Licensee represents and warrants that it will not use or  * redistribute the Software for such purposes.  */ import javax.jms.*; /**  * The AckEquivExample class shows how the following two scenarios both ensure  * that a message will not be acknowledged until processing of it is complete:  * <ul>  * <li> Using an asynchronous consumer (message listener) in an   * AUTO_ACKNOWLEDGE session  * <li> Using a synchronous consumer in a CLIENT_ACKNOWLEDGE session  * </ul>  * <p>  * With a message listener, the automatic acknowledgment happens when the  * onMessage method returns -- that is, after message processing has finished.  * <p>  * With a synchronous receive, the client acknowledges the message after  * processing is complete.  (If you use AUTO_ACKNOWLEDGE with a synchronous  * receive, the acknowledgement happens immediately after the receive call; if  * any subsequent processing steps fail, the message cannot be redelivered.)  * <p>  * The program contains a SynchProducer class, a SynchConsumer class, an   * AsynchSubscriber class with a TextListener class, a MultiplePublisher class,  * a main method, and a method that runs the other classes' threads.  * <p>  * Specify a queue name and a topic name on the command line when you run the   * program.  The program also uses a queue named "controlQueue", which should be  * created before you run the program.  *  * @author Kim Haase  * @version 1.6, 08/18/00  */ public class AckEquivExample {     final String  CONTROL_QUEUE = "controlQueue";     String        queueName = null;     String        topicName = null;     int           exitResult = 0;     /**      * The SynchProducer class creates a session in CLIENT_ACKNOWLEDGE mode and      * sends a message.      *      * @author Kim Haase      * @version 1.6, 08/18/00      */     public class SynchProducer extends Thread {                  /**          * Runs the thread.          */         public void run() {             ConnectionFactory    connectionFactory = null;             Connection           connection = null;             Session              session = null;             Queue                queue = null;             MessageProducer      msgProducer = null;             final String         MSG_TEXT =                                      new String("Here is a client-acknowledge message");             TextMessage          message = null;             try {                 connectionFactory =                      SampleUtilities.getConnectionFactory();                 connection =                      connectionFactory.createConnection();                 session = connection.createSession(false,                      Session.CLIENT_ACKNOWLEDGE);                 queue = SampleUtilities.getQueue(queueName, session);             } catch (Exception e) {                 System.out.println("Connection problem: " + e.toString());                 if (connection != null) {                     try {                         connection.close();                     } catch (JMSException ee) {}                 }                 System.exit(1);             }             /*              * Create client-acknowledge producer.              * Create and send message.              */             try {                 System.out.println("PRODUCER: Created client-acknowledge session");                 msgProducer = session.createProducer(queue);                 message = session.createTextMessage();                 message.setText(MSG_TEXT);                 System.out.println("PRODUCER: Sending message: "                      + message.getText());                 msgProducer.send(message);             } catch (JMSException e) {                 System.out.println("Exception occurred: " + e.toString());                 exitResult = 1;             } finally {                 if (connection != null) {                     try {                         connection.close();                     } catch (JMSException e) {                         exitResult = 1;                     }                 }             }         }     }          /**      * The SynchConsumer class creates a session in CLIENT_ACKNOWLEDGE mode and      * receives the message sent by the SynchProducer class.      *      * @author Kim Haase      * @version 1.6, 08/18/00      */     public class SynchConsumer extends Thread {         /**          * Runs the thread.          */       public void run() {             ConnectionFactory    connectionFactory = null;             Connection           connection = null;             Session              session = null;             Queue                queue = null;             MessageConsumer      msgConsumer = null;             TextMessage          message = null;             try {                 connectionFactory =                      SampleUtilities.getConnectionFactory();                 connection =                      connectionFactory.createConnection();                 session = connection.createSession(false,                      Session.CLIENT_ACKNOWLEDGE);                 queue = SampleUtilities.getQueue(queueName, session);             } catch (Exception e) {                 System.out.println("Connection problem: " + e.toString());                 if (connection != null) {                     try {                         connection.close();                     } catch (JMSException ee) {}                 }                 System.exit(1);             }                   /*              * Create client-acknowledge consumer.              * Receive message and process it.              * Acknowledge message.              */              try {                 System.out.println("CONSUMER: Created client-acknowledge session");                 msgConsumer = session.createConsumer(queue);                 connection.start();                 message = (TextMessage) msgConsumer.receive();                 System.out.println("CONSUMER: Processing message: "                      + message.getText());                 System.out.println("CONSUMER: Now I'll acknowledge the message");                 message.acknowledge();             } catch (JMSException e) {                 System.out.println("Exception occurred: " + e.toString());                 exitResult = 1;             } finally {                 if (connection != null) {                     try {                         connection.close();                     } catch (JMSException e) {                         exitResult = 1;                     }                 }             }         }             }          /**      * The AsynchSubscriber class creates a session in AUTO_ACKNOWLEDGE mode      * and fetches several messages from a topic asynchronously, using a       * message listener, TextListener.      * <p>      * Each message is acknowledged after the onMessage method completes.      *      * @author Kim Haase      * @version 1.6, 08/18/00      */     public class AsynchSubscriber extends Thread {         /**          * The TextListener class implements the MessageListener interface by           * defining an onMessage method for the AsynchSubscriber class.          *          * @author Kim Haase          * @version 1.6, 08/18/00          */         private class TextListener implements MessageListener {             final SampleUtilities.DoneLatch  monitor =                 new SampleUtilities.DoneLatch();             /**              * Casts the message to a TextMessage and displays its text.              * A non-text message is interpreted as the end of the message               * stream, and the message listener sets its monitor state to all               * done processing messages.              *              * @param message  the incoming message              */             public void onMessage(Message message) {                 if (message instanceof TextMessage) {                     TextMessage  msg = (TextMessage) message;                                          try {                         System.out.println("CONSUMER: Processing message: "                                             + msg.getText());                     } catch (JMSException e) {                         System.out.println("Exception in onMessage(): "                                             + e.toString());                     }                  } else {                     monitor.allDone();                 }             }         }         /**          * Runs the thread.          */         public void run() {             ConnectionFactory    connectionFactory = null;             Connection           connection = null;             Session              session = null;             Topic                topic = null;             TopicSubscriber      topicSubscriber = null;             TextListener         topicListener = null;             try {                 connectionFactory =                      SampleUtilities.getConnectionFactory();                 connection =                      connectionFactory.createConnection();                 connection.setClientID("AckEquivExample");                 session = connection.createSession(false,                      Session.AUTO_ACKNOWLEDGE);                 System.out.println("CONSUMER: Created auto-acknowledge session");                 topic = SampleUtilities.getTopic(topicName, session);             } catch (Exception e) {                 System.out.println("Connection problem: " + e.toString());                 if (connection != null) {                     try {                         connection.close();                     } catch (JMSException ee) {}                 }               System.exit(1);             }                          /*              * Create auto-acknowledge subscriber.              * Register message listener (TextListener).              * Start message delivery.              * Send synchronize message to publisher, then wait till all              *   messages have arrived.              * Listener displays the messages obtained.              */             try {                 topicSubscriber = session.createDurableSubscriber(topic,                      "AckEquivExampleSubscription");                 topicListener = new TextListener();                 topicSubscriber.setMessageListener(topicListener);                 connection.start();                 // Let publisher know that subscriber is ready.                 try {                     SampleUtilities.sendSynchronizeMessage("CONSUMER: ",                                                              CONTROL_QUEUE);                 } catch (Exception e) {                     System.out.println("Queue probably missing: " + e.toString());                     if (connection != null) {                         try {                             connection.close();                         } catch (JMSException ee) {}                     }                   System.exit(1);               }                 /*                  * Asynchronously process messages.                  * Block until publisher issues a control message indicating                  * end of publish stream.                  */                 topicListener.monitor.waitTillDone();                 topicSubscriber.close();                 session.unsubscribe("AckEquivExampleSubscription");             } catch (JMSException e) {                 System.out.println("Exception occurred: " + e.toString());                 exitResult = 1;             } finally {                 if (connection != null) {                     try {                         connection.close();                     } catch (JMSException e) {                         exitResult = 1;                     }                 }             }         }           }     /**      * The MultiplePublisher class creates a session in AUTO_ACKNOWLEDGE mode      * and publishes three messages to a topic.       *      * @author Kim Haase      * @version 1.6, 08/18/00      */     public class MultiplePublisher extends Thread {                  /**          * Runs the thread.          */         public void run() {             ConnectionFactory    connectionFactory = null;             Connection           connection = null;             Session              session = null;             Topic                topic = null;             MessageProducer      topicPublisher = null;             TextMessage          message = null;             final int            NUMMSGS = 3;             final String         MSG_TEXT =                                       new String("Here is an auto-acknowledge message");             try {                 connectionFactory =                      SampleUtilities.getConnectionFactory();                 connection =                      connectionFactory.createConnection();                 session = connection.createSession(false,                      Session.AUTO_ACKNOWLEDGE);                 System.out.println("PRODUCER: Created auto-acknowledge session");                 topic = SampleUtilities.getTopic(topicName, session);             } catch (Exception e) {                 System.out.println("Connection problem: " + e.toString());                 if (connection != null) {                     try {                         connection.close();                     } catch (JMSException ee) {}                 }               System.exit(1);             }             /*              * After synchronizing with subscriber, create publisher.              * Send 3 messages, varying text slightly.              * Send end-of-messages message.              */             try {                 /*                  * Synchronize with subscriber.  Wait for message indicating                   * that subscriber is ready to receive messages.                  */                 try {                     SampleUtilities.receiveSynchronizeMessages("PRODUCER: ",                                                                CONTROL_QUEUE, 1);                 } catch (Exception e) {                     System.out.println("Queue probably missing: " + e.toString());                     if (connection != null) {                         try {                             connection.close();                         } catch (JMSException ee) {}                     }                   System.exit(1);               }                 topicPublisher = session.createProducer(topic);                 message = session.createTextMessage();                 for (int i = 0; i < NUMMSGS; i++) {                     message.setText(MSG_TEXT + " " + (i + 1));                     System.out.println("PRODUCER: Publishing message: "                          + message.getText());                     topicPublisher.send(message);                 }                                  // Send a non-text control message indicating end of messages.                  topicPublisher.send(session.createMessage());             } catch (JMSException e) {                 System.out.println("Exception occurred: " + e.toString());                 exitResult = 1;             } finally {                 if (connection != null) {                     try {                         connection.close();                     } catch (JMSException e) {                         exitResult = 1;                     }                 }             }         }     }          /**      * Instantiates the producer, consumer, subscriber, and publisher classes and       * starts their threads.      * Calls the join method to wait for the threads to die.      */     public void run_threads() {         SynchProducer        synchProducer = new SynchProducer();         SynchConsumer        synchConsumer = new SynchConsumer();         AsynchSubscriber     asynchSubscriber = new AsynchSubscriber();         MultiplePublisher    multiplePublisher = new MultiplePublisher();                  synchProducer.start();         synchConsumer.start();         try {             synchProducer.join();             synchConsumer.join();         } catch (InterruptedException e) {}         asynchSubscriber.start();         multiplePublisher.start();         try {             asynchSubscriber.join();             multiplePublisher.join();         } catch (InterruptedException e) {}     }     /**      * Reads the queue and topic names from the command line, then calls the      * run_threads method to execute the program threads.      *      * @param args  the topic used by the example      */     public static void main(String[] args) {         AckEquivExample  aee = new AckEquivExample();                  if (args.length != 2) {           System.out.println("Usage: java AckEquivExample <queue_name> <topic_name>");           System.exit(1);       }         aee.queueName = new String(args[0]);         aee.topicName = new String(args[1]);         System.out.println("Queue name is " + aee.queueName);         System.out.println("Topic name is " + aee.topicName);       aee.run_threads();       SampleUtilities.exit(aee.exitResult);     } }          jms.zip( 65 k)