| Method from org.apache.activemq.broker.TransportConnection Detail: |
protected void dispatch(Command command) throws IOException {
try {
setMarkedCandidate(true);
transport.oneway(command);
} finally {
setMarkedCandidate(false);
}
}
|
public void dispatchAsync(Command message) {
if (!stopping.get()) {
// getStatistics().getEnqueues().increment();
if (taskRunner == null) {
dispatchSync(message);
} else {
synchronized (dispatchQueue) {
dispatchQueue.add(message);
}
try {
taskRunner.wakeup();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} else {
if (message.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch) message;
Runnable sub = md.getTransmitCallback();
broker.postProcessDispatch(md);
if (sub != null) {
sub.run();
}
}
}
}
|
public void dispatchSync(Command message) {
// getStatistics().getEnqueues().increment();
try {
processDispatch(message);
} catch (IOException e) {
serviceExceptionAsync(e);
}
}
|
public void doMark() {
if (timeStamp == 0) {
timeStamp = System.currentTimeMillis();
}
}
Mark the Connection, so we can deem if it's collectable on the next sweep |
protected void doStop() throws Exception, InterruptedException {
LOG.debug("Stopping connection: " + transport.getRemoteAddress());
connector.onStopped(this);
try {
synchronized (this) {
if (masterBroker != null) {
masterBroker.stop();
}
if (duplexBridge != null) {
duplexBridge.stop();
}
}
} catch (Exception ignore) {
LOG.trace("Exception caught stopping", ignore);
}
try {
transport.stop();
LOG.debug("Stopped transport: " + transport.getRemoteAddress());
} catch (Exception e) {
LOG.debug("Could not stop transport: " + e, e);
}
if (taskRunner != null) {
taskRunner.shutdown(1);
}
active = false;
// Run the MessageDispatch callbacks so that message references get
// cleaned up.
synchronized (dispatchQueue) {
for (Iterator< Command > iter = dispatchQueue.iterator(); iter.hasNext();) {
Command command = iter.next();
if (command.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch) command;
Runnable sub = md.getTransmitCallback();
broker.postProcessDispatch(md);
if (sub != null) {
sub.run();
}
}
}
dispatchQueue.clear();
}
//
// Remove all logical connection associated with this connection
// from the broker.
if (!broker.isStopped()) {
List< TransportConnectionState > connectionStates = listConnectionStates();
connectionStates = listConnectionStates();
for (TransportConnectionState cs : connectionStates) {
cs.getContext().getStopping().set(true);
try {
LOG.debug("Cleaning up connection resources: " + getRemoteAddress());
processRemoveConnection(cs.getInfo().getConnectionId(), 0l);
} catch (Throwable ignore) {
ignore.printStackTrace();
}
}
if (brokerInfo != null) {
broker.removeBroker(this, brokerInfo);
}
}
LOG.debug("Connection Stopped: " + getRemoteAddress());
}
|
public String getConnectionId() {
List< TransportConnectionState > connectionStates = listConnectionStates();
for (TransportConnectionState cs : connectionStates) {
if (cs.getInfo().getClientId() != null) {
return cs.getInfo().getClientId();
}
return cs.getInfo().getConnectionId().toString();
}
return null;
}
|
public Connector getConnector() {
return connector;
}
|
public int getDispatchQueueSize() {
synchronized (dispatchQueue) {
return dispatchQueue.size();
}
}
Returns the number of messages to be dispatched to this connection |
public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
return messageAuthorizationPolicy;
}
|
public int getProtocolVersion() {
return protocolVersion.get();
}
|
public String getRemoteAddress() {
return transport.getRemoteAddress();
}
|
public ConnectionStatistics getStatistics() {
return statistics;
}
Returns the statistics for this connection |
public boolean isActive() {
return active;
}
|
public boolean isBlocked() {
return blocked;
}
|
public boolean isBlockedCandidate() {
return blockedCandidate;
}
|
public boolean isConnected() {
return connected;
}
|
public boolean isManageable() {
return manageable;
}
|
public boolean isMarkedBlockedCandidate() {
return markedCandidate;
}
|
public boolean isMarkedCandidate() {
return markedCandidate;
}
|
public synchronized boolean isNetworkConnection() {
return networkConnection;
}
|
public synchronized boolean isPendingStop() {
return pendingStop;
}
|
public boolean isSlow() {
return slow;
}
|
public synchronized boolean isStarting() {
return starting;
}
|
public boolean iterate() {
try {
if (stopping.get()) {
if (dispatchStopped.compareAndSet(false, true)) {
if (transportException.get() == null) {
try {
dispatch(new ShutdownInfo());
} catch (Throwable ignore) {
}
}
dispatchStoppedLatch.countDown();
}
return false;
}
if (!dispatchStopped.get()) {
Command command = null;
synchronized (dispatchQueue) {
if (dispatchQueue.isEmpty()) {
return false;
}
command = dispatchQueue.remove(0);
}
processDispatch(command);
return true;
}
return false;
} catch (IOException e) {
if (dispatchStopped.compareAndSet(false, true)) {
dispatchStoppedLatch.countDown();
}
serviceExceptionAsync(e);
return false;
}
}
|
protected synchronized List<TransportConnectionState> listConnectionStates() {
return connectionStateRegister.listConnectionStates();
}
|
protected synchronized TransportConnectionState lookupConnectionState(String connectionId) {
return connectionStateRegister.lookupConnectionState(connectionId);
}
|
protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) {
return connectionStateRegister.lookupConnectionState(id);
}
|
protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) {
return connectionStateRegister.lookupConnectionState(id);
}
|
protected synchronized TransportConnectionState lookupConnectionState(SessionId id) {
return connectionStateRegister.lookupConnectionState(id);
}
|
protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
return connectionStateRegister.lookupConnectionState(connectionId);
}
|
public Response processAddConnection(ConnectionInfo info) throws Exception {
// if the broker service has slave attached, wait for the slave to be
// attached to allow client connection. slave connection is fine
if (!info.isBrokerMasterConnector() && connector.getBrokerService().isWaitForSlave()
&& connector.getBrokerService().getSlaveStartSignal().getCount() == 1) {
ServiceSupport.dispose(transport);
return new ExceptionResponse(new Exception("Master's slave not attached yet."));
}
// Older clients should have been defaulting this field to true.. but
// they were not.
if (wireFormatInfo != null && wireFormatInfo.getVersion() < = 2) {
info.setClientMaster(true);
}
TransportConnectionState state;
// Make sure 2 concurrent connections by the same ID only generate 1
// TransportConnectionState object.
synchronized (brokerConnectionStates) {
state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId());
if (state == null) {
state = new TransportConnectionState(info, this);
brokerConnectionStates.put(info.getConnectionId(), state);
}
state.incrementReference();
}
// If there are 2 concurrent connections for the same connection id,
// then last one in wins, we need to sync here
// to figure out the winner.
synchronized (state.getConnectionMutex()) {
if (state.getConnection() != this) {
LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress());
state.getConnection().stop();
LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection: "
+ state.getConnection().getRemoteAddress());
state.setConnection(this);
state.reset(info);
}
}
registerConnectionState(info.getConnectionId(), state);
LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress());
// Setup the context.
String clientId = info.getClientId();
context = new ConnectionContext();
context.setBroker(broker);
context.setClientId(clientId);
context.setClientMaster(info.isClientMaster());
context.setConnection(this);
context.setConnectionId(info.getConnectionId());
context.setConnector(connector);
context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
context.setNetworkConnection(networkConnection);
context.setFaultTolerant(faultTolerantConnection);
context.setTransactions(new ConcurrentHashMap< TransactionId, Transaction >());
context.setUserName(info.getUserName());
context.setWireFormatInfo(wireFormatInfo);
this.manageable = info.isManageable();
state.setContext(context);
state.setConnection(this);
try {
broker.addConnection(context, info);
} catch (Exception e) {
brokerConnectionStates.remove(info);
LOG.warn("Failed to add Connection", e);
throw e;
}
if (info.isManageable() && broker.isFaultTolerantConfiguration()) {
// send ConnectionCommand
ConnectionControl command = new ConnectionControl();
command.setFaultTolerant(broker.isFaultTolerantConfiguration());
dispatchAsync(command);
}
return null;
}
|
public Response processAddConsumer(ConsumerInfo info) throws Exception {
SessionId sessionId = info.getConsumerId().getParentId();
ConnectionId connectionId = sessionId.getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId);
if (ss == null) {
throw new IllegalStateException(broker.getBrokerName()
+ " Cannot add a consumer to a session that had not been registered: " + sessionId);
}
// Avoid replaying dup commands
if (!ss.getConsumerIds().contains(info.getConsumerId())) {
broker.addConsumer(cs.getContext(), info);
try {
ss.addConsumer(info);
} catch (IllegalStateException e) {
broker.removeConsumer(cs.getContext(), info);
}
}
return null;
}
|
public Response processAddDestination(DestinationInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
broker.addDestinationInfo(cs.getContext(), info);
if (info.getDestination().isTemporary()) {
cs.addTempDestination(info);
}
return null;
}
|
public Response processAddProducer(ProducerInfo info) throws Exception {
SessionId sessionId = info.getProducerId().getParentId();
ConnectionId connectionId = sessionId.getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId);
if (ss == null) {
throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
+ sessionId);
}
// Avoid replaying dup commands
if (!ss.getProducerIds().contains(info.getProducerId())) {
broker.addProducer(cs.getContext(), info);
try {
ss.addProducer(info);
} catch (IllegalStateException e) {
broker.removeProducer(cs.getContext(), info);
}
}
return null;
}
|
public Response processAddSession(SessionInfo info) throws Exception {
ConnectionId connectionId = info.getSessionId().getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
// Avoid replaying dup commands
if (!cs.getSessionIds().contains(info.getSessionId())) {
broker.addSession(cs.getContext(), info);
try {
cs.addSession(info);
} catch (IllegalStateException e) {
e.printStackTrace();
broker.removeSession(cs.getContext(), info);
}
}
return null;
}
|
public Response processBeginTransaction(TransactionInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
context = null;
if (cs != null) {
context = cs.getContext();
}
if (cs == null) {
throw new NullPointerException("Context is null");
}
// Avoid replaying dup commands
if (cs.getTransactionState(info.getTransactionId()) == null) {
cs.addTransactionState(info.getTransactionId());
broker.beginTransaction(context, info.getTransactionId());
}
return null;
}
|
public Response processBrokerInfo(BrokerInfo info) {
if (info.isSlaveBroker()) {
BrokerService bService = connector.getBrokerService();
// Do we only support passive slaves - or does the slave want to be
// passive ?
boolean passive = bService.isPassiveSlave() || info.isPassiveSlave();
if (passive == false) {
// stream messages from this broker (the master) to
// the slave
MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
masterBroker = new MasterBroker(parent, transport);
masterBroker.startProcessing();
}
LOG.info((passive?"Passive":"Active")+" Slave Broker " + info.getBrokerName() + " is attached");
bService.slaveConnectionEstablished();
} else if (info.isNetworkConnection() && info.isDuplexConnection()) {
// so this TransportConnection is the rear end of a network bridge
// We have been requested to create a two way pipe ...
try {
Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
Map< String, String > props = createMap(properties);
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
IntrospectionSupport.setProperties(config, props, "");
config.setBrokerName(broker.getBrokerName());
URI uri = broker.getVmConnectorURI();
HashMap< String, String > map = new HashMap< String, String >(URISupport.parseParamters(uri));
map.put("network", "true");
map.put("async", "false");
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
Transport localTransport = TransportFactory.connect(uri);
Transport remoteBridgeTransport = new ResponseCorrelator(transport);
duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport);
duplexBridge.setBrokerService(broker.getBrokerService());
// now turn duplex off this side
info.setDuplexConnection(false);
duplexBridge.setCreatedByDuplex(true);
duplexBridge.duplexStart(this, brokerInfo, info);
LOG.info("Created Duplex Bridge back to " + info.getBrokerName());
return null;
} catch (Exception e) {
LOG.error("Creating duplex network bridge", e);
}
}
// We only expect to get one broker info command per connection
if (this.brokerInfo != null) {
LOG.warn("Unexpected extra broker info command received: " + info);
}
this.brokerInfo = info;
broker.addBroker(this, info);
networkConnection = true;
List< TransportConnectionState > connectionStates = listConnectionStates();
for (TransportConnectionState cs : connectionStates) {
cs.getContext().setNetworkConnection(true);
}
return null;
}
|
public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
context = cs.getContext();
cs.removeTransactionState(info.getTransactionId());
broker.commitTransaction(context, info.getTransactionId(), true);
return null;
}
|
public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
context = cs.getContext();
cs.removeTransactionState(info.getTransactionId());
broker.commitTransaction(context, info.getTransactionId(), false);
return null;
}
|
public Response processConnectionControl(ConnectionControl control) throws Exception {
if (control != null) {
faultTolerantConnection = control.isFaultTolerant();
}
return null;
}
|
public Response processConnectionError(ConnectionError error) throws Exception {
return null;
}
|
public Response processConsumerControl(ConsumerControl control) throws Exception {
return null;
}
|
public Response processControlCommand(ControlCommand command) throws Exception {
String control = command.getCommand();
if (control != null && control.equals("shutdown")) {
System.exit(0);
}
return null;
}
|
protected void processDispatch(Command command) throws IOException {
final MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
try {
if (!stopping.get()) {
if (messageDispatch != null) {
broker.preProcessDispatch(messageDispatch);
}
dispatch(command);
}
} finally {
if (messageDispatch != null) {
Runnable sub = messageDispatch.getTransmitCallback();
broker.postProcessDispatch(messageDispatch);
if (sub != null) {
sub.run();
}
}
// getStatistics().getDequeues().increment();
}
}
|
public Response processEndTransaction(TransactionInfo info) throws Exception {
// No need to do anything. This packet is just sent by the client
// make sure he is synced with the server as commit command could
// come from a different connection.
return null;
}
|
public Response processFlush(FlushCommand command) throws Exception {
return null;
}
|
public Response processForgetTransaction(TransactionInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
context = cs.getContext();
broker.forgetTransaction(context, info.getTransactionId());
return null;
}
|
public Response processKeepAlive(KeepAliveInfo info) throws Exception {
return null;
}
|
public Response processMessage(Message messageSend) throws Exception {
ProducerId producerId = messageSend.getProducerId();
ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
broker.send(producerExchange, messageSend);
return null;
}
|
public Response processMessageAck(MessageAck ack) throws Exception {
ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
broker.acknowledge(consumerExchange, ack);
return null;
}
|
public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
return null;
}
|
public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
broker.processDispatchNotification(notification);
return null;
}
|
public Response processMessagePull(MessagePull pull) throws Exception {
return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
}
|
public Response processPrepareTransaction(TransactionInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
context = null;
if (cs != null) {
context = cs.getContext();
}
if (cs == null) {
throw new NullPointerException("Context is null");
}
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
if (transactionState == null) {
throw new IllegalStateException("Cannot prepare a transaction that had not been started: "
+ info.getTransactionId());
}
// Avoid dups.
if (!transactionState.isPrepared()) {
transactionState.setPrepared(true);
int result = broker.prepareTransaction(context, info.getTransactionId());
transactionState.setPreparedResult(result);
IntegerResponse response = new IntegerResponse(result);
return response;
} else {
IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
return response;
}
}
|
public Response processProducerAck(ProducerAck ack) throws Exception {
// A broker should not get ProducerAck messages.
return null;
}
|
public Response processRecoverTransactions(TransactionInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
context = cs.getContext();
TransactionId[] preparedTransactions = broker.getPreparedTransactions(context);
return new DataArrayResponse(preparedTransactions);
}
|
public synchronized Response processRemoveConnection(ConnectionId id,
long lastDeliveredSequenceId) throws InterruptedException {
LOG.debug("remove connection id: " + id);
TransportConnectionState cs = lookupConnectionState(id);
if (cs != null) {
// Don't allow things to be added to the connection state while we
// are
// shutting down.
cs.shutdown();
// Cascade the connection stop to the sessions.
for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {
SessionId sessionId = (SessionId) iter.next();
try {
processRemoveSession(sessionId, lastDeliveredSequenceId);
} catch (Throwable e) {
SERVICELOG.warn("Failed to remove session " + sessionId, e);
}
}
// Cascade the connection stop to temp destinations.
for (Iterator iter = cs.getTempDesinations().iterator(); iter.hasNext();) {
DestinationInfo di = (DestinationInfo) iter.next();
try {
broker.removeDestination(cs.getContext(), di.getDestination(), 0);
} catch (Throwable e) {
SERVICELOG.warn("Failed to remove tmp destination " + di.getDestination(), e);
}
iter.remove();
}
try {
broker.removeConnection(cs.getContext(), cs.getInfo(), null);
} catch (Throwable e) {
SERVICELOG.warn("Failed to remove connection " + cs.getInfo(), e);
}
TransportConnectionState state = unregisterConnectionState(id);
if (state != null) {
synchronized (brokerConnectionStates) {
// If we are the last reference, we should remove the state
// from the broker.
if (state.decrementReference() == 0) {
brokerConnectionStates.remove(id);
}
}
}
}
return null;
}
|
public Response processRemoveConsumer(ConsumerId id,
long lastDeliveredSequenceId) throws Exception {
SessionId sessionId = id.getParentId();
ConnectionId connectionId = sessionId.getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
if (cs == null) {
throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: "
+ connectionId);
}
SessionState ss = cs.getSessionState(sessionId);
if (ss == null) {
throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
+ sessionId);
}
ConsumerState consumerState = ss.removeConsumer(id);
if (consumerState == null) {
throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
}
ConsumerInfo info = consumerState.getInfo();
info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
broker.removeConsumer(cs.getContext(), consumerState.getInfo());
removeConsumerBrokerExchange(id);
return null;
}
|
public Response processRemoveDestination(DestinationInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
broker.removeDestinationInfo(cs.getContext(), info);
if (info.getDestination().isTemporary()) {
cs.removeTempDestination(info.getDestination());
}
return null;
}
|
public Response processRemoveProducer(ProducerId id) throws Exception {
SessionId sessionId = id.getParentId();
ConnectionId connectionId = sessionId.getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId);
if (ss == null) {
throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
+ sessionId);
}
ProducerState ps = ss.removeProducer(id);
if (ps == null) {
throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
}
removeProducerBrokerExchange(id);
broker.removeProducer(cs.getContext(), ps.getInfo());
return null;
}
|
public Response processRemoveSession(SessionId id,
long lastDeliveredSequenceId) throws Exception {
ConnectionId connectionId = id.getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
if (cs == null) {
throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId);
}
SessionState session = cs.getSessionState(id);
if (session == null) {
throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
}
// Don't let new consumers or producers get added while we are closing
// this down.
session.shutdown();
// Cascade the connection stop to the consumers and producers.
for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext();) {
ConsumerId consumerId = (ConsumerId) iter.next();
try {
processRemoveConsumer(consumerId, lastDeliveredSequenceId);
} catch (Throwable e) {
LOG.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e);
}
}
for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext();) {
ProducerId producerId = (ProducerId) iter.next();
try {
processRemoveProducer(producerId);
} catch (Throwable e) {
LOG.warn("Failed to remove producer: " + producerId + ". Reason: " + e, e);
}
}
cs.removeSession(id);
broker.removeSession(cs.getContext(), session.getInfo());
return null;
}
|
public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info);
return null;
}
|
public Response processRollbackTransaction(TransactionInfo info) throws Exception {
TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
context = cs.getContext();
cs.removeTransactionState(info.getTransactionId());
broker.rollbackTransaction(context, info.getTransactionId());
return null;
}
|
public Response processShutdown(ShutdownInfo info) throws Exception {
stopAsync();
return null;
}
|
public Response processWireFormat(WireFormatInfo info) throws Exception {
wireFormatInfo = info;
protocolVersion.set(info.getVersion());
return null;
}
|
protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
TransportConnectionState state) {
TransportConnectionState cs = null;
if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
// swap implementations
TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister();
newRegister.intialize(connectionStateRegister);
connectionStateRegister = newRegister;
}
cs = connectionStateRegister.registerConnectionState(connectionId, state);
return cs;
}
|
public Response service(Command command) {
Response response = null;
boolean responseRequired = command.isResponseRequired();
int commandId = command.getCommandId();
try {
response = command.visit(this);
} catch (Throwable e) {
if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
+ " command: " + command + ", exception: " + e, e);
}
if (responseRequired) {
response = new ExceptionResponse(e);
} else {
serviceException(e);
}
}
if (responseRequired) {
if (response == null) {
response = new Response();
}
response.setCorrelationId(commandId);
}
// The context may have been flagged so that the response is not
// sent.
if (context != null) {
if (context.isDontSendReponse()) {
context.setDontSendReponse(false);
response = null;
}
context = null;
}
return response;
}
|
public void serviceException(Throwable e) {
// are we a transport exception such as not being able to dispatch
// synchronously to a transport
if (e instanceof IOException) {
serviceTransportException((IOException) e);
} else if (e.getClass() == BrokerStoppedException.class) {
// Handle the case where the broker is stopped
// But the client is still connected.
if (!stopping.get()) {
if (SERVICELOG.isDebugEnabled()) {
SERVICELOG.debug("Broker has been stopped. Notifying client and closing his connection.");
}
ConnectionError ce = new ConnectionError();
ce.setException(e);
dispatchSync(ce);
// Wait a little bit to try to get the output buffer to flush
// the exption notification to the client.
try {
Thread.sleep(500);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
// Worst case is we just kill the connection before the
// notification gets to him.
stopAsync();
}
} else if (!stopping.get() && !inServiceException) {
inServiceException = true;
try {
SERVICELOG.warn("Async error occurred: " + e, e);
ConnectionError ce = new ConnectionError();
ce.setException(e);
dispatchAsync(ce);
} finally {
inServiceException = false;
}
}
}
Closes a clients connection due to a detected error. Errors are ignored
if: the client is closing or broker is closing. Otherwise, the connection
error transmitted to the client before stopping it's transport. |
public void serviceExceptionAsync(IOException e) {
if (asyncException.compareAndSet(false, true)) {
new Thread("Async Exception Handler") {
public void run() {
serviceException(e);
}
}.start();
}
}
Calls the serviceException method in an async thread. Since handling a
service exception closes a socket, we should not tie up broker threads
since client sockets may hang or cause deadlocks. |
public void serviceTransportException(IOException e) {
BrokerService bService = connector.getBrokerService();
if (bService.isShutdownOnSlaveFailure()) {
if (brokerInfo != null) {
if (brokerInfo.isSlaveBroker()) {
LOG.error("Slave has exception: " + e.getMessage() + " shutting down master now.", e);
try {
doStop();
bService.stop();
} catch (Exception ex) {
LOG.warn("Failed to stop the master", ex);
}
}
}
}
if (!stopping.get()) {
transportException.set(e);
if (TRANSPORTLOG.isDebugEnabled()) {
TRANSPORTLOG.debug("Transport failed: " + e, e);
}
stopAsync();
}
}
|
public void setActive(boolean active) {
this.active = active;
}
|
public void setBlocked(boolean blocked) {
this.blocked = blocked;
}
|
public void setBlockedCandidate(boolean blockedCandidate) {
this.blockedCandidate = blockedCandidate;
}
|
public void setConnected(boolean connected) {
this.connected = connected;
}
|
public void setMarkedCandidate(boolean markedCandidate) {
this.markedCandidate = markedCandidate;
if (!markedCandidate) {
timeStamp = 0;
blockedCandidate = false;
}
}
|
public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
this.messageAuthorizationPolicy = messageAuthorizationPolicy;
}
|
protected synchronized void setPendingStop(boolean pendingStop) {
this.pendingStop = pendingStop;
}
|
public void setSlow(boolean slow) {
this.slow = slow;
}
|
protected synchronized void setStarting(boolean starting) {
this.starting = starting;
}
|
public void start() throws Exception {
starting = true;
try {
synchronized (this) {
if (taskRunnerFactory != null) {
taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
+ getRemoteAddress());
} else {
taskRunner = null;
}
transport.start();
active = true;
dispatchAsync(connector.getBrokerInfo());
connector.onStarted(this);
}
} catch (Exception e) {
// Force clean up on an error starting up.
stop();
throw e;
} finally {
// stop() can be called from within the above block,
// but we want to be sure start() completes before
// stop() runs, so queue the stop until right now:
starting = false;
if (pendingStop) {
LOG.debug("Calling the delayed stop()");
stop();
}
}
}
|
public void stop() throws Exception {
synchronized (this) {
pendingStop = true;
if (starting) {
LOG.debug("stop() called in the middle of start(). Delaying...");
return;
}
}
stopAsync();
while (!stopped.await(5, TimeUnit.SECONDS)) {
LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown.");
}
}
|
public void stopAsync() {
// If we're in the middle of starting
// then go no further... for now.
if (stopping.compareAndSet(false, true)) {
// Let all the connection contexts know we are shutting down
// so that in progress operations can notice and unblock.
List< TransportConnectionState > connectionStates = listConnectionStates();
for (TransportConnectionState cs : connectionStates) {
cs.getContext().getStopping().set(true);
}
try {
getDefaultTaskRunnerFactory().execute(new Runnable(){
public void run() {
serviceLock.writeLock().lock();
try {
doStop();
} catch (Throwable e) {
LOG.debug("Error occured while shutting down a connection to '" + transport.getRemoteAddress()
+ "': ", e);
} finally {
stopped.countDown();
serviceLock.writeLock().unlock();
}
}
});
} catch (Throwable t) {
LOG.warn("cannot create async transport stopper thread.. not waiting for stop to complete, reason:", t);
stopped.countDown();
}
}
}
|
public String toString() {
return "Transport Connection to: " + transport.getRemoteAddress();
}
|
protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
return connectionStateRegister.unregisterConnectionState(connectionId);
}
|