1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.activemq;
18
19 import java.util.HashMap;
20 import java.util.Map;
21 import java.util.concurrent.atomic.AtomicLong;
22
23 import javax.jms.Destination;
24 import javax.jms.IllegalStateException;
25 import javax.jms.InvalidDestinationException;
26 import javax.jms.JMSException;
27 import javax.jms.Message;
28
29 import org.apache.activemq.command.ActiveMQDestination;
30 import org.apache.activemq.command.ProducerAck;
31 import org.apache.activemq.command.ProducerId;
32 import org.apache.activemq.command.ProducerInfo;
33 import org.apache.activemq.management.JMSProducerStatsImpl;
34 import org.apache.activemq.management.StatsCapable;
35 import org.apache.activemq.management.StatsImpl;
36 import org.apache.activemq.usage.MemoryUsage;
37 import org.apache.activemq.util.IntrospectionSupport;
38
39 /**
40 * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
41 * destination. A <CODE>MessageProducer</CODE> object is created by passing a
42 * <CODE>Destination</CODE> object to a message-producer creation method
43 * supplied by a session.
44 * <P>
45 * <CODE>MessageProducer</CODE> is the parent interface for all message
46 * producers.
47 * <P>
48 * A client also has the option of creating a message producer without supplying
49 * a destination. In this case, a destination must be provided with every send
50 * operation. A typical use for this kind of message producer is to send replies
51 * to requests using the request's <CODE>JMSReplyTo</CODE> destination.
52 * <P>
53 * A client can specify a default delivery mode, priority, and time to live for
54 * messages sent by a message producer. It can also specify the delivery mode,
55 * priority, and time to live for an individual message.
56 * <P>
57 * A client can specify a time-to-live value in milliseconds for each message it
58 * sends. This value defines a message expiration time that is the sum of the
59 * message's time-to-live and the GMT when it is sent (for transacted sends,
60 * this is the time the client sends the message, not the time the transaction
61 * is committed).
62 * <P>
63 * A JMS provider should do its best to expire messages accurately; however, the
64 * JMS API does not define the accuracy provided.
65 *
66 * @version $Revision: 1.14 $
67 * @see javax.jms.TopicPublisher
68 * @see javax.jms.QueueSender
69 * @see javax.jms.Session#createProducer
70 */
71 public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport implements StatsCapable, Disposable {
72
73 protected ProducerInfo info;
74 protected boolean closed;
75
76 private JMSProducerStatsImpl stats;
77 private AtomicLong messageSequence;
78 private long startTime;
79 private MessageTransformer transformer;
80 private MemoryUsage producerWindow;
81
82 protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException {
83 super(session);
84 this.info = new ProducerInfo(producerId);
85 this.info.setWindowSize(session.connection.getProducerWindowSize());
86 if (destination != null && destination.getOptions() != null) {
87 Map<String, String> options = new HashMap<String, String>(destination.getOptions());
88 IntrospectionSupport.setProperties(this.info, options, "producer.");
89 }
90 this.info.setDestination(destination);
91
92 // Enable producer window flow control if protocol > 3 and the window
93 // size > 0
94 if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) {
95 producerWindow = new MemoryUsage("Producer Window: " + producerId);
96 producerWindow.setLimit(this.info.getWindowSize());
97 producerWindow.start();
98 }
99
100 this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
101 this.defaultPriority = Message.DEFAULT_PRIORITY;
102 this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
103 this.startTime = System.currentTimeMillis();
104 this.messageSequence = new AtomicLong(0);
105 this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination);
106 this.session.addProducer(this);
107 this.session.asyncSendPacket(info);
108 this.setSendTimeout(sendTimeout);
109 setTransformer(session.getTransformer());
110 }
111
112 public StatsImpl getStats() {
113 return stats;
114 }
115
116 public JMSProducerStatsImpl getProducerStats() {
117 return stats;
118 }
119
120 /**
121 * Gets the destination associated with this <CODE>MessageProducer</CODE>.
122 *
123 * @return this producer's <CODE>Destination/ <CODE>
124 * @throws JMSException if the JMS provider fails to close the producer due to
125 * some internal error.
126 * @since 1.1
127 */
128 public Destination getDestination() throws JMSException {
129 checkClosed();
130 return this.info.getDestination();
131 }
132
133 /**
134 * Closes the message producer.
135 * <P>
136 * Since a provider may allocate some resources on behalf of a <CODE>
137 * MessageProducer</CODE>
138 * outside the Java virtual machine, clients should close them when they are
139 * not needed. Relying on garbage collection to eventually reclaim these
140 * resources may not be timely enough.
141 *
142 * @throws JMSException if the JMS provider fails to close the producer due
143 * to some internal error.
144 */
145 public void close() throws JMSException {
146 if (!closed) {
147 dispose();
148 this.session.asyncSendPacket(info.createRemoveCommand());
149 }
150 }
151
152 public void dispose() {
153 if (!closed) {
154 this.session.removeProducer(this);
155 if (producerWindow != null) {
156 producerWindow.stop();
157 }
158 closed = true;
159 }
160 }
161
162 /**
163 * Check if the instance of this producer has been closed.
164 *
165 * @throws IllegalStateException
166 */
167 protected void checkClosed() throws IllegalStateException {
168 if (closed) {
169 throw new IllegalStateException("The producer is closed");
170 }
171 }
172
173 /**
174 * Sends a message to a destination for an unidentified message producer,
175 * specifying delivery mode, priority and time to live.
176 * <P>
177 * Typically, a message producer is assigned a destination at creation time;
178 * however, the JMS API also supports unidentified message producers, which
179 * require that the destination be supplied every time a message is sent.
180 *
181 * @param destination the destination to send this message to
182 * @param message the message to send
183 * @param deliveryMode the delivery mode to use
184 * @param priority the priority for this message
185 * @param timeToLive the message's lifetime (in milliseconds)
186 * @throws JMSException if the JMS provider fails to send the message due to
187 * some internal error.
188 * @throws UnsupportedOperationException if an invalid destination is
189 * specified.
190 * @throws InvalidDestinationException if a client uses this method with an
191 * invalid destination.
192 * @see javax.jms.Session#createProducer
193 * @since 1.1
194 */
195 public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
196 checkClosed();
197 if (destination == null) {
198 if (info.getDestination() == null) {
199 throw new UnsupportedOperationException("A destination must be specified.");
200 }
201 throw new InvalidDestinationException("Don't understand null destinations");
202 }
203
204 ActiveMQDestination dest;
205 if (destination == info.getDestination()) {
206 dest = (ActiveMQDestination)destination;
207 } else if (info.getDestination() == null) {
208 dest = ActiveMQDestination.transform(destination);
209 } else {
210 throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
211 }
212 if (dest == null) {
213 throw new JMSException("No destination specified");
214 }
215
216 if (transformer != null) {
217 Message transformedMessage = transformer.producerTransform(session, this, message);
218 if (transformedMessage != null) {
219 message = transformedMessage;
220 }
221 }
222
223 if (producerWindow != null) {
224 try {
225 producerWindow.waitForSpace();
226 } catch (InterruptedException e) {
227 throw new JMSException("Send aborted due to thread interrupt.");
228 }
229 }
230
231 this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow,sendTimeout);
232
233 stats.onMessage();
234 }
235
236 public MessageTransformer getTransformer() {
237 return transformer;
238 }
239
240 /**
241 * Sets the transformer used to transform messages before they are sent on
242 * to the JMS bus
243 */
244 public void setTransformer(MessageTransformer transformer) {
245 this.transformer = transformer;
246 }
247
248 /**
249 * @return the time in milli second when this object was created.
250 */
251 protected long getStartTime() {
252 return this.startTime;
253 }
254
255 /**
256 * @return Returns the messageSequence.
257 */
258 protected long getMessageSequence() {
259 return messageSequence.incrementAndGet();
260 }
261
262 /**
263 * @param messageSequence The messageSequence to set.
264 */
265 protected void setMessageSequence(AtomicLong messageSequence) {
266 this.messageSequence = messageSequence;
267 }
268
269 /**
270 * @return Returns the info.
271 */
272 protected ProducerInfo getProducerInfo() {
273 return this.info != null ? this.info : null;
274 }
275
276 /**
277 * @param info The info to set
278 */
279 protected void setProducerInfo(ProducerInfo info) {
280 this.info = info;
281 }
282
283 public String toString() {
284 return "ActiveMQMessageProducer { value=" + info.getProducerId() + " }";
285 }
286
287 public void onProducerAck(ProducerAck pa) {
288 if (this.producerWindow != null) {
289 this.producerWindow.decreaseUsage(pa.getSize());
290 }
291 }
292
293 }