聊聊artemis的ClientProducerCreditManager

243 阅读2分钟

本文主要研究一下artemis的ClientProducerCreditManager

ClientProducerCreditManager

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManager.java

public interface ClientProducerCreditManager {

   ClientProducerCredits getCredits(SimpleString address, boolean anon, SessionContext context);

   void returnCredits(SimpleString address);

   void receiveCredits(SimpleString address, int credits);

   void receiveFailCredits(SimpleString address, int credits);

   void reset();

   void close();

   int creditsMapSize();

   int unReferencedCreditsSize();

   /** This will determine the flow control as asynchronous,
    *  no actual block should happen instead a callback will be sent whenever blockages change  */
   void setCallback(ClientProducerFlowCallback callback);
}
  • ClientProducerCreditManager接口定义了getCredits、returnCredits、receiveCredits、receiveFailCredits、reset、close、creditsMapSize、unReferencedCreditsSize、setCallback方法

ClientProducerCreditManagerImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java

public class ClientProducerCreditManagerImpl implements ClientProducerCreditManager {

   public static final int MAX_UNREFERENCED_CREDITS_CACHE_SIZE = 1000;

   private final Map<SimpleString, ClientProducerCredits> producerCredits = new LinkedHashMap<>();

   private final Map<SimpleString, ClientProducerCredits> unReferencedCredits = new LinkedHashMap<>();

   private final ClientSessionInternal session;

   private int windowSize;

   private ClientProducerFlowCallback callback;

   public ClientProducerCreditManagerImpl(final ClientSessionInternal session, final int windowSize) {
      this.session = session;

      this.windowSize = windowSize;
   }


   /** This will determine the flow control as asynchronous,
    *  no actual block should happen instead a callback will be sent whenever blockages change  */
   @Override
   public void setCallback(ClientProducerFlowCallback callback) {
      this.callback = callback;
   }

   @Override
   public synchronized ClientProducerCredits getCredits(final SimpleString address,
                                                        final boolean anon,
                                                        SessionContext context) {
      if (windowSize == -1) {
         return ClientProducerCreditsNoFlowControl.instance;
      } else {
         boolean needInit = false;
         ClientProducerCredits credits;

         synchronized (this) {
            credits = producerCredits.get(address);

            if (credits == null) {
               // Doesn't need to be fair since session is single threaded
               credits = build(address);
               needInit = true;

               producerCredits.put(address, credits);
            }

            if (!anon) {
               credits.incrementRefCount();

               // Remove from anon credits (if there)
               unReferencedCredits.remove(address);
            } else {
               addToUnReferencedCache(address, credits);
            }
         }

         // The init is done outside of the lock
         // otherwise packages may arrive with flow control
         // while this is still sending requests causing a dead lock
         if (needInit) {
            credits.init(context);
         }

         return credits;
      }
   }

   private ClientProducerCredits build(SimpleString address) {
      if (callback != null) {
         return new AsynchronousProducerCreditsImpl(session, address, windowSize, callback);
      } else {
         return new ClientProducerCreditsImpl(session, address, windowSize);
      }
   }

   @Override
   public synchronized void returnCredits(final SimpleString address) {
      ClientProducerCredits credits = producerCredits.get(address);

      if (credits != null && credits.decrementRefCount() == 0) {
         addToUnReferencedCache(address, credits);
      }
   }

   @Override
   public synchronized void receiveCredits(final SimpleString address, final int credits) {
      ClientProducerCredits cr = producerCredits.get(address);

      if (cr != null) {
         cr.receiveCredits(credits);
      }
   }

   @Override
   public synchronized void receiveFailCredits(final SimpleString address, int credits) {
      ClientProducerCredits cr = producerCredits.get(address);

      if (cr != null) {
         cr.receiveFailCredits(credits);
      }
   }

   @Override
   public synchronized void reset() {
      for (ClientProducerCredits credits : producerCredits.values()) {
         credits.reset();
      }
   }

   @Override
   public synchronized void close() {
      windowSize = -1;

      for (ClientProducerCredits credits : producerCredits.values()) {
         credits.close();
      }

      producerCredits.clear();

      unReferencedCredits.clear();
   }

   @Override
   public synchronized int creditsMapSize() {
      return producerCredits.size();
   }

   @Override
   public synchronized int unReferencedCreditsSize() {
      return unReferencedCredits.size();
   }

   private void addToUnReferencedCache(final SimpleString address, final ClientProducerCredits credits) {
      unReferencedCredits.put(address, credits);

      if (unReferencedCredits.size() > MAX_UNREFERENCED_CREDITS_CACHE_SIZE) {
         // Remove the oldest entry

         Iterator<Map.Entry<SimpleString, ClientProducerCredits>> iter = unReferencedCredits.entrySet().iterator();

         Map.Entry<SimpleString, ClientProducerCredits> oldest = iter.next();

         iter.remove();

         removeEntry(oldest.getKey(), oldest.getValue());
      }
   }

   private void removeEntry(final SimpleString address, final ClientProducerCredits credits) {
      producerCredits.remove(address);

      credits.releaseOutstanding();

      credits.close();
   }

   //......
}
  • ClientProducerCreditManagerImpl实现了ClientProducerCreditManager接口,它的构造器接收session及windowSize参数;其getCredits方法在windowSize为-1时返回ClientProducerCreditsNoFlowControl.instance,否则根据address从producerCredits获取或创建ClientProducerCredits,若是新建的则needInit为true会执行credits.init(context)
  • returnCredits方法从producerCredits获取ClientProducerCredits,然后递减refCount,若为0则执行addToUnReferencedCache;receiveCredits方法从producerCredits获取ClientProducerCredits,然后执行cr.receiveCredits(credits);receiveFailCredits方法从producerCredits获取ClientProducerCredits,然后执行cr.receiveFailCredits(credits)
  • reset方法遍历producerCredits.values()挨个执行ClientProducerCredits的reset方法;close方法遍历producerCredits.values()挨个执行ClientProducerCredits的close方法,然后清空producerCredits、unReferencedCredits

ClientProducerCreditsNoFlowControl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditManagerImpl.java

   static class ClientProducerCreditsNoFlowControl implements ClientProducerCredits {

      static ClientProducerCreditsNoFlowControl instance = new ClientProducerCreditsNoFlowControl();

      @Override
      public void acquireCredits(int credits) {
      }

      @Override
      public void receiveCredits(int credits) {
      }

      @Override
      public void receiveFailCredits(int credits) {
      }

      @Override
      public boolean isBlocked() {
         return false;
      }

      @Override
      public void init(SessionContext ctx) {
      }

      @Override
      public void reset() {
      }

      @Override
      public void close() {
      }

      @Override
      public void incrementRefCount() {
      }

      @Override
      public int decrementRefCount() {
         return 1;
      }

      @Override
      public void releaseOutstanding() {
      }

      @Override
      public SimpleString getAddress() {
         return SimpleString.toSimpleString("");
      }
   }
  • ClientProducerCreditsNoFlowControl实现了ClientProducerCredits接口,其isBlocked方法返回false,decrementRefCount方法返回1,getAddress方法返回SimpleString.toSimpleString(""),其余方法都是空操作

小结

ClientProducerCreditManager接口定义了getCredits、returnCredits、receiveCredits、receiveFailCredits、reset、close、creditsMapSize、unReferencedCreditsSize、setCallback方法

doc