Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq;
   18   
   19   import java.util.HashMap;
   20   import java.util.Map;
   21   import java.util.concurrent.atomic.AtomicLong;
   22   
   23   import javax.jms.Destination;
   24   import javax.jms.IllegalStateException;
   25   import javax.jms.InvalidDestinationException;
   26   import javax.jms.JMSException;
   27   import javax.jms.Message;
   28   
   29   import org.apache.activemq.command.ActiveMQDestination;
   30   import org.apache.activemq.command.ProducerAck;
   31   import org.apache.activemq.command.ProducerId;
   32   import org.apache.activemq.command.ProducerInfo;
   33   import org.apache.activemq.management.JMSProducerStatsImpl;
   34   import org.apache.activemq.management.StatsCapable;
   35   import org.apache.activemq.management.StatsImpl;
   36   import org.apache.activemq.usage.MemoryUsage;
   37   import org.apache.activemq.util.IntrospectionSupport;
   38   
   39   /**
   40    * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
   41    * destination. A <CODE>MessageProducer</CODE> object is created by passing a
   42    * <CODE>Destination</CODE> object to a message-producer creation method
   43    * supplied by a session.
   44    * <P>
   45    * <CODE>MessageProducer</CODE> is the parent interface for all message
   46    * producers.
   47    * <P>
   48    * A client also has the option of creating a message producer without supplying
   49    * a destination. In this case, a destination must be provided with every send
   50    * operation. A typical use for this kind of message producer is to send replies
   51    * to requests using the request's <CODE>JMSReplyTo</CODE> destination.
   52    * <P>
   53    * A client can specify a default delivery mode, priority, and time to live for
   54    * messages sent by a message producer. It can also specify the delivery mode,
   55    * priority, and time to live for an individual message.
   56    * <P>
   57    * A client can specify a time-to-live value in milliseconds for each message it
   58    * sends. This value defines a message expiration time that is the sum of the
   59    * message's time-to-live and the GMT when it is sent (for transacted sends,
   60    * this is the time the client sends the message, not the time the transaction
   61    * is committed).
   62    * <P>
   63    * A JMS provider should do its best to expire messages accurately; however, the
   64    * JMS API does not define the accuracy provided.
   65    * 
   66    * @version $Revision: 1.14 $
   67    * @see javax.jms.TopicPublisher
   68    * @see javax.jms.QueueSender
   69    * @see javax.jms.Session#createProducer
   70    */
   71   public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport implements StatsCapable, Disposable {
   72   
   73       protected ProducerInfo info;
   74       protected boolean closed;
   75   
   76       private JMSProducerStatsImpl stats;
   77       private AtomicLong messageSequence;
   78       private long startTime;
   79       private MessageTransformer transformer;
   80       private MemoryUsage producerWindow;
   81   
   82       protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException {
   83           super(session);
   84           this.info = new ProducerInfo(producerId);
   85           this.info.setWindowSize(session.connection.getProducerWindowSize());
   86           if (destination != null && destination.getOptions() != null) {
   87               Map<String, String> options = new HashMap<String, String>(destination.getOptions());
   88               IntrospectionSupport.setProperties(this.info, options, "producer.");
   89           }
   90           this.info.setDestination(destination);
   91   
   92           // Enable producer window flow control if protocol > 3 and the window
   93           // size > 0
   94           if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) {
   95               producerWindow = new MemoryUsage("Producer Window: " + producerId);
   96               producerWindow.setLimit(this.info.getWindowSize());
   97               producerWindow.start();
   98           }
   99   
  100           this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
  101           this.defaultPriority = Message.DEFAULT_PRIORITY;
  102           this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
  103           this.startTime = System.currentTimeMillis();
  104           this.messageSequence = new AtomicLong(0);
  105           this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination);
  106           this.session.addProducer(this);
  107           this.session.asyncSendPacket(info);
  108           this.setSendTimeout(sendTimeout);
  109           setTransformer(session.getTransformer());
  110       }
  111   
  112       public StatsImpl getStats() {
  113           return stats;
  114       }
  115   
  116       public JMSProducerStatsImpl getProducerStats() {
  117           return stats;
  118       }
  119   
  120       /**
  121        * Gets the destination associated with this <CODE>MessageProducer</CODE>.
  122        * 
  123        * @return this producer's <CODE>Destination/ <CODE>
  124        * @throws JMSException if the JMS provider fails to close the producer due to
  125        *                      some internal error.
  126        * @since 1.1
  127        */
  128       public Destination getDestination() throws JMSException {
  129           checkClosed();
  130           return this.info.getDestination();
  131       }
  132   
  133       /**
  134        * Closes the message producer.
  135        * <P>
  136        * Since a provider may allocate some resources on behalf of a <CODE>
  137        * MessageProducer</CODE>
  138        * outside the Java virtual machine, clients should close them when they are
  139        * not needed. Relying on garbage collection to eventually reclaim these
  140        * resources may not be timely enough.
  141        * 
  142        * @throws JMSException if the JMS provider fails to close the producer due
  143        *                 to some internal error.
  144        */
  145       public void close() throws JMSException {
  146           if (!closed) {
  147               dispose();
  148               this.session.asyncSendPacket(info.createRemoveCommand());
  149           }
  150       }
  151   
  152       public void dispose() {
  153           if (!closed) {
  154               this.session.removeProducer(this);
  155               if (producerWindow != null) {
  156                   producerWindow.stop();
  157               }
  158               closed = true;
  159           }
  160       }
  161   
  162       /**
  163        * Check if the instance of this producer has been closed.
  164        * 
  165        * @throws IllegalStateException
  166        */
  167       protected void checkClosed() throws IllegalStateException {
  168           if (closed) {
  169               throw new IllegalStateException("The producer is closed");
  170           }
  171       }
  172   
  173       /**
  174        * Sends a message to a destination for an unidentified message producer,
  175        * specifying delivery mode, priority and time to live.
  176        * <P>
  177        * Typically, a message producer is assigned a destination at creation time;
  178        * however, the JMS API also supports unidentified message producers, which
  179        * require that the destination be supplied every time a message is sent.
  180        * 
  181        * @param destination the destination to send this message to
  182        * @param message the message to send
  183        * @param deliveryMode the delivery mode to use
  184        * @param priority the priority for this message
  185        * @param timeToLive the message's lifetime (in milliseconds)
  186        * @throws JMSException if the JMS provider fails to send the message due to
  187        *                 some internal error.
  188        * @throws UnsupportedOperationException if an invalid destination is
  189        *                 specified.
  190        * @throws InvalidDestinationException if a client uses this method with an
  191        *                 invalid destination.
  192        * @see javax.jms.Session#createProducer
  193        * @since 1.1
  194        */
  195       public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
  196           checkClosed();
  197           if (destination == null) {
  198               if (info.getDestination() == null) {
  199                   throw new UnsupportedOperationException("A destination must be specified.");
  200               }
  201               throw new InvalidDestinationException("Don't understand null destinations");
  202           }
  203   
  204           ActiveMQDestination dest;
  205           if (destination == info.getDestination()) {
  206               dest = (ActiveMQDestination)destination;
  207           } else if (info.getDestination() == null) {
  208               dest = ActiveMQDestination.transform(destination);
  209           } else {
  210               throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
  211           }
  212           if (dest == null) {
  213               throw new JMSException("No destination specified");
  214           }
  215   
  216           if (transformer != null) {
  217               Message transformedMessage = transformer.producerTransform(session, this, message);
  218               if (transformedMessage != null) {
  219                   message = transformedMessage;
  220               }
  221           }
  222   
  223           if (producerWindow != null) {
  224               try {
  225                   producerWindow.waitForSpace();
  226               } catch (InterruptedException e) {
  227                   throw new JMSException("Send aborted due to thread interrupt.");
  228               }
  229           }
  230   
  231           this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow,sendTimeout);
  232   
  233           stats.onMessage();
  234       }
  235   
  236       public MessageTransformer getTransformer() {
  237           return transformer;
  238       }
  239   
  240       /**
  241        * Sets the transformer used to transform messages before they are sent on
  242        * to the JMS bus
  243        */
  244       public void setTransformer(MessageTransformer transformer) {
  245           this.transformer = transformer;
  246       }
  247   
  248       /**
  249        * @return the time in milli second when this object was created.
  250        */
  251       protected long getStartTime() {
  252           return this.startTime;
  253       }
  254   
  255       /**
  256        * @return Returns the messageSequence.
  257        */
  258       protected long getMessageSequence() {
  259           return messageSequence.incrementAndGet();
  260       }
  261   
  262       /**
  263        * @param messageSequence The messageSequence to set.
  264        */
  265       protected void setMessageSequence(AtomicLong messageSequence) {
  266           this.messageSequence = messageSequence;
  267       }
  268   
  269       /**
  270        * @return Returns the info.
  271        */
  272       protected ProducerInfo getProducerInfo() {
  273           return this.info != null ? this.info : null;
  274       }
  275   
  276       /**
  277        * @param info The info to set
  278        */
  279       protected void setProducerInfo(ProducerInfo info) {
  280           this.info = info;
  281       }
  282   
  283       public String toString() {
  284           return "ActiveMQMessageProducer { value=" + info.getProducerId() + " }";
  285       }
  286   
  287       public void onProducerAck(ProducerAck pa) {
  288           if (this.producerWindow != null) {
  289               this.producerWindow.decreaseUsage(pa.getSize());
  290           }
  291       }
  292   
  293   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » [javadoc | source]