聊聊artemis的confirmationWindowEnabled

341 阅读5分钟

本文主要研究一下artemis的confirmationWindowEnabled

confirmationWindowEnabled

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java

public class ClientProducerImpl implements ClientProducerInternal {

   //......

   public void send(SimpleString address1,
                    Message message,
                    SendAcknowledgementHandler handler) throws ActiveMQException {
      checkClosed();
      boolean confirmationWindowEnabled = session.isConfirmationWindowEnabled();
      if (confirmationWindowEnabled) {
         doSend(address1, message, handler);
      } else {
         doSend(address1, message, null);
         if (handler != null) {
            if (logger.isDebugEnabled()) {
               logger.debug("Handler was used on producing messages towards address " + (address1 == null ? null : address1.toString()) + " however there is no confirmationWindowEnabled");
            }

            if (!confirmationNotSetLogged) {
               // will log thisonly once
               ActiveMQClientLogger.LOGGER.confirmationNotSet();
            }

            // if there is no confirmation enabled, we will at least call the handler after the sent is done
            session.scheduleConfirmation(handler, message);
         }
      }
   }

   //......
}
  • ClientProducerImpl的send方法,在confirmationWindowEnabled为true时执行的是doSend(address1, message, handler);否则执行doSend(address1, message, null),并在handler不为null时执行session.scheduleConfirmation(handler, message)

doSend

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java

public class ClientProducerImpl implements ClientProducerInternal {

   //......

   private void doSend(SimpleString sendingAddress,
                       final Message msgToSend,
                       final SendAcknowledgementHandler handler) throws ActiveMQException {
      if (sendingAddress == null) {
         sendingAddress = this.address;
      }
      session.startCall();

      try {
         // In case we received message from another protocol, we first need to convert it to core as the ClientProducer only understands core
         ICoreMessage msg = msgToSend.toCore();

         ClientProducerCredits theCredits;

         boolean isLarge;
         // a note about the second check on the writerIndexSize,
         // If it's a server's message, it means this is being done through the bridge or some special consumer on the
         // server's on which case we can't' convert the message into large at the servers
         if (sessionContext.supportsLargeMessage() && (getBodyInputStream(msg) != null || msg.isLargeMessage() ||
            msg.getBodyBuffer().writerIndex() > minLargeMessageSize)) {
            isLarge = true;
         } else {
            isLarge = false;
         }

         if (!isLarge) {
            session.setAddress(msg, sendingAddress);
         } else {
            msg.setAddress(sendingAddress);
         }

         // Anonymous
         theCredits = session.getCredits(sendingAddress, true);

         if (rateLimiter != null) {
            // Rate flow control

            rateLimiter.limit();
         }

         if (groupID != null) {
            msg.putStringProperty(Message.HDR_GROUP_ID, groupID);
         }

         final boolean sendBlockingConfig = msg.isDurable() ? blockOnDurableSend : blockOnNonDurableSend;
         // if Handler != null, we will send non blocking
         final boolean sendBlocking = sendBlockingConfig && handler == null;

         session.workDone();

         if (isLarge) {
            largeMessageSend(sendBlocking, msg, theCredits, handler);
         } else {
            sendRegularMessage(sendingAddress, msg, sendBlocking, theCredits, handler);
         }
      } finally {
         session.endCall();
      }
   }

   private void sendRegularMessage(final SimpleString sendingAddress,
                                   final ICoreMessage msgI,
                                   final boolean sendBlocking,
                                   final ClientProducerCredits theCredits,
                                   final SendAcknowledgementHandler handler) throws ActiveMQException {
      // This will block if credits are not available

      // Note, that for a large message, the encode size only includes the properties + headers
      // Not the continuations, but this is ok since we are only interested in limiting the amount of
      // data in *memory* and continuations go straight to the disk

      logger.tracef("sendRegularMessage::%s, Blocking=%s", msgI, sendBlocking);

      int creditSize = sessionContext.getCreditsOnSendingFull(msgI);

      theCredits.acquireCredits(creditSize);

      sessionContext.sendFullMessage(msgI, sendBlocking, handler, address);
   }

   //......

}
  • doSend方法最后执行的是largeMessageSend或者sendRegularMessage方法,而sendRegularMessage方法最后执行的是sessionContext.sendFullMessage(msgI, sendBlocking, handler, address)

ActiveMQSessionContext

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java

public class ActiveMQSessionContext extends SessionContext {

   //......

   public void sendFullMessage(ICoreMessage msgI,
                               boolean sendBlocking,
                               SendAcknowledgementHandler handler,
                               SimpleString defaultAddress) throws ActiveMQException {
      final SessionSendMessage packet;
      if (sessionChannel.getConnection().isVersionBeforeAddressChange()) {
         packet = new SessionSendMessage_1X(msgI, sendBlocking, handler);
      } else if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
         packet = new SessionSendMessage(msgI, sendBlocking, handler);
      } else {
         boolean responseRequired = confirmationWindow != -1 || sendBlocking;
         packet = new SessionSendMessage_V2(msgI, responseRequired, handler);
      }
      if (sendBlocking) {
         sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE);
      } else {
         sessionChannel.sendBatched(packet);
      }
   }

   //......
}
  • sendFullMessage方法创建的是SessionSendMessage,其SendAcknowledgementHandler参数也通过SessionSendMessage的构造器传递给了SessionSendMessage

SessionSendMessage

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java

public class SessionSendMessage extends MessagePacket {

   protected boolean requiresResponse;

   private final transient SendAcknowledgementHandler handler;

   /** This will be using the CoreMessage because it is meant for the core-protocol */
   protected SessionSendMessage(final byte id,
                             final ICoreMessage message,
                             final boolean requiresResponse,
                             final SendAcknowledgementHandler handler) {
      super(id, message);
      this.handler = handler;
      this.requiresResponse = requiresResponse;
   }

   protected SessionSendMessage(final byte id,
                                final CoreMessage message) {
      super(id, message);
      this.handler = null;
   }

   /** This will be using the CoreMessage because it is meant for the core-protocol */
   public SessionSendMessage(final ICoreMessage message,
                             final boolean requiresResponse,
                             final SendAcknowledgementHandler handler) {
      super(SESS_SEND, message);
      this.handler = handler;
      this.requiresResponse = requiresResponse;
   }

   public SessionSendMessage(final CoreMessage message) {
      super(SESS_SEND, message);
      this.handler = null;
   }

   // Public --------------------------------------------------------

   @Override
   public boolean isRequiresResponse() {
      return requiresResponse;
   }

   public SendAcknowledgementHandler getHandler() {
      return handler;
   }

   //......
}
  • SessionSendMessage继承了MessagePacket;getHandler方法可以获取SendAcknowledgementHandler

scheduleConfirmation

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java

public final class ClientSessionImpl implements ClientSessionInternal, FailureListener {

   //......

   public void scheduleConfirmation(final SendAcknowledgementHandler handler, final Message message) {
      executor.execute(new Runnable() {
         @Override
         public void run() {
            handler.sendAcknowledged(message);
         }
      });
   }

   //......
}

  • scheduleConfirmation方法会往线程池提交一个Runnable,该run方法执行handler.sendAcknowledged(message)

SendAcknowledgementHandler

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/SendAcknowledgementHandler.java

public interface SendAcknowledgementHandler {

   /**
    * Notifies the client that a message sent asynchronously has been received by the server.
    *
    * @param message message sent asynchronously
    */
   void sendAcknowledged(Message message);

   default void sendFailed(Message message, Exception e) {
      /**
       * By default ignore failures to preserve compatibility with existing implementations.
       * If the message makes it to the broker and a failure occurs sendAcknowledge() will
       * still be invoked just like it always was.
       */
   }

}
  • SendAcknowledgementHandler接口定义了sendAcknowledged及sendFailed方法

ServerSessionPacketHandler

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java

public class ServerSessionPacketHandler implements ChannelHandler {

   //......

   private void sendResponse(final Packet confirmPacket,
                             final Packet response,
                             final boolean flush,
                             final boolean closeChannel) {
      if (logger.isTraceEnabled()) {
         logger.trace("ServerSessionPacketHandler::scheduling response::" + response);
      }

      storageManager.afterCompleteOperations(new IOCallback() {
         @Override
         public void onError(final int errorCode, final String errorMessage) {
            ActiveMQServerLogger.LOGGER.errorProcessingIOCallback(errorCode, errorMessage);

            Packet exceptionPacket = convertToExceptionPacket(confirmPacket, ActiveMQExceptionType.createException(errorCode, errorMessage));
            doConfirmAndResponse(confirmPacket, exceptionPacket, flush, closeChannel);

            if (logger.isTraceEnabled()) {
               logger.trace("ServerSessionPacketHandler::exception response sent::" + exceptionPacket);
            }

         }

         @Override
         public void done() {
            if (logger.isTraceEnabled()) {
               logger.trace("ServerSessionPacketHandler::regular response sent::" + response);
            }

            doConfirmAndResponse(confirmPacket, response, flush, closeChannel);
         }
      });
   }

   private void doConfirmAndResponse(final Packet confirmPacket,
                                     final Packet response,
                                     final boolean flush,
                                     final boolean closeChannel) {
      // don't confirm if the response is an exception
      if (confirmPacket != null && (response == null || (response != null && response.getType() != PacketImpl.EXCEPTION))) {
         channel.confirm(confirmPacket);

         if (flush) {
            channel.flushConfirmations();
         }
      }

      if (response != null) {
         channel.send(response);
      }

      if (closeChannel) {
         channel.close();
      }
   }

   //......
}   
  • ServerSessionPacketHandler的sendResponse方法通过storageManager.afterCompleteOperations注册了IOCallback,在onError以及done方法里头都执行了doConfirmAndResponse方法;doConfirmAndResponse方法在response不是exception的时候会执行channel.confirm(confirmPacket)

confirm

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java

public final class ChannelImpl implements Channel {

   //......

   public void confirm(final Packet packet) {
      if (resendCache != null && packet.isRequiresConfirmations()) {
         lastConfirmedCommandID.incrementAndGet();

         if (logger.isTraceEnabled()) {
            logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " ChannelImpl::confirming packet " + packet + " last commandID=" + lastConfirmedCommandID);
         }

         receivedBytes += packet.getPacketSize();

         if (receivedBytes >= confWindowSize) {
            receivedBytes = 0;

            final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedCommandID.get());

            confirmed.setChannelID(id);

            doWrite(confirmed);
         }
      }
   }

   //......
}
  • ChannelImpl的confirm方法写入的是PacketsConfirmedMessage

PacketsConfirmedMessage

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/PacketsConfirmedMessage.java

public class PacketsConfirmedMessage extends PacketImpl {

   private int commandID;

   public PacketsConfirmedMessage(final int commandID) {
      super(PACKETS_CONFIRMED);

      this.commandID = commandID;
   }

   public PacketsConfirmedMessage() {
      super(PACKETS_CONFIRMED);
   }

   //......
}
  • PacketsConfirmedMessage继承了PacketImpl,其type为PACKETS_CONFIRMED

handlePacket

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java

public final class ChannelImpl implements Channel {

   //......

   public void handlePacket(final Packet packet) {
      if (packet.getType() == PacketImpl.PACKETS_CONFIRMED) {
         if (resendCache != null) {
            final PacketsConfirmedMessage msg = (PacketsConfirmedMessage) packet;

            clearUpTo(msg.getCommandID());
         }

         if (!connection.isClient() && handler != null) {
            handler.handlePacket(packet);
         }

         return;
      } else {
         if (packet.isResponse()) {
            confirm(packet);

            handleAsyncResponse(packet);
            lock.lock();

            try {
               response = packet;
               sendCondition.signal();
            } finally {
               lock.unlock();
            }
         } else if (handler != null) {
            handler.handlePacket(packet);
         }
      }
   }

   private void clearUpTo(final int lastReceivedCommandID) {
      final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;

      if (logger.isTraceEnabled()) {
         logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " ChannelImpl::clearUpTo lastReceived commandID=" + lastReceivedCommandID + " first commandID=" + firstStoredCommandID + " number to clear " + numberToClear);
      }

      for (int i = 0; i < numberToClear; i++) {
         final Packet packet = resendCache.poll();

         if (packet == null) {
            ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID, firstStoredCommandID);
            firstStoredCommandID = lastReceivedCommandID + 1;
            return;
         }

         if (logger.isTraceEnabled()) {
            logger.trace("RemotingConnectionID=" + connection.getID() + " ChannelImpl::clearUpTo confirming " + packet + " towards " + commandConfirmationHandler);
         }
         if (commandConfirmationHandler != null) {
            commandConfirmationHandler.commandConfirmed(packet);
         }
         if (responseAsyncCache != null) {
            responseAsyncCache.handleResponse(packet);
         }
      }

      firstStoredCommandID += numberToClear;
   }

   //......
}   
  • ChannelImpl的handlePacket方法在packet的type为PacketImpl.PACKETS_CONFIRMED,且resendCache不为null时会执行clearUpTo方法;clearUpTo方法在commandConfirmationHandler不为null时会执行commandConfirmationHandler.commandConfirmed(packet),在responseAsyncCache不为null时会执行responseAsyncCache.handleResponse(packet),二者最后都会执行responseHandler.handleResponse(packet, response)方法

ActiveMQSessionContext

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java

public class ActiveMQSessionContext extends SessionContext {

   //......

   private final CommandConfirmationHandler commandConfirmationHandler = new CommandConfirmationHandler() {
      @Override
      public void commandConfirmed(Packet packet) {
         responseHandler.handleResponse(packet, null);
      }
   };

   private final ResponseHandler responseHandler = new ResponseHandler() {
      @Override
      public void handleResponse(Packet packet, Packet response) {
         final ActiveMQException activeMQException;
         if (response != null && response.getType() == PacketImpl.EXCEPTION) {
            ActiveMQExceptionMessage exceptionResponseMessage = (ActiveMQExceptionMessage) response;
            activeMQException = exceptionResponseMessage.getException();
         } else {
            activeMQException = null;
         }

         if (packet.getType() == PacketImpl.SESS_SEND) {
            SessionSendMessage ssm = (SessionSendMessage) packet;
            callSendAck(ssm.getHandler(), ssm.getMessage(), activeMQException);
         } else if (packet.getType() == PacketImpl.SESS_SEND_CONTINUATION) {
            SessionSendContinuationMessage scm = (SessionSendContinuationMessage) packet;
            if (!scm.isContinues()) {
               callSendAck(scm.getHandler(), scm.getMessage(), activeMQException);
            }
         }
      }

      private void callSendAck(SendAcknowledgementHandler handler, final Message message, final Exception exception) {
         if (handler != null) {
            if (exception == null) {
               handler.sendAcknowledged(message);
            } else {
               handler.sendFailed(message, exception);
            }
         } else if (sendAckHandler != null) {
            if (exception == null) {
               sendAckHandler.sendAcknowledged(message);
            } else {
               sendAckHandler.sendFailed(message, exception);
            }
         }
      }
   };

   //......
}
  • CommandConfirmationHandler的commandConfirmed会执行ResponseHandler的handleResponse方法;handleResponse方法会执行callSendAck,而callSendAck执行的是SendAcknowledgementHandler(ssm.getHandler())的sendAcknowledged或者sendFailed方法

小结

  • ClientProducerImpl的send方法,在confirmationWindowEnabled为true时执行的是doSend(address1, message, handler);否则执行doSend(address1, message, null),并在handler不为null时执行session.scheduleConfirmation(handler, message)
  • doSend方法最后执行的是largeMessageSend或者sendRegularMessage方法,而sendRegularMessage方法最后执行的是sessionContext.sendFullMessage(msgI, sendBlocking, handler, address);sendFullMessage方法创建的是SessionSendMessage,其SendAcknowledgementHandler参数也通过SessionSendMessage的构造器传递给了SessionSendMessage
  • ServerSessionPacketHandler的sendResponse方法通过storageManager.afterCompleteOperations注册了IOCallback,在onError以及done方法里头都执行了doConfirmAndResponse方法;doConfirmAndResponse方法在response不是exception的时候会执行channel.confirm(confirmPacket);ChannelImpl的confirm方法写入的是PacketsConfirmedMessage;PacketsConfirmedMessage继承了PacketImpl,其type为PACKETS_CONFIRMED

ChannelImpl的handlePacket方法在packet的type为PacketImpl.PACKETS_CONFIRMED,且resendCache不为null时会执行clearUpTo方法;最后都会执行responseHandler.handleResponse(packet, response)方法,handleResponse方法会执行callSendAck,而callSendAck执行的是SendAcknowledgementHandler(ssm.getHandler())的sendAcknowledged或者sendFailed方法

doc