聊聊artemis的FederatedQueue

228 阅读4分钟

本文主要研究一下artemis的FederatedQueue

FederatedQueue

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/queue/FederatedQueue.java

public class FederatedQueue extends FederatedAbstract implements ActiveMQServerConsumerPlugin, Serializable {

   private static final Logger logger = Logger.getLogger(FederatedQueue.class);

   private final Set<Matcher> includes;
   private final Set<Matcher> excludes;
   private final Filter metaDataFilter;
   private final int priorityAdjustment;

   private final FederationQueuePolicyConfiguration config;

   public FederatedQueue(Federation federation, FederationQueuePolicyConfiguration config, ActiveMQServer server, FederationUpstream federationUpstream) throws ActiveMQException {
      super(federation, server, federationUpstream);
      Objects.requireNonNull(config.getName());
      this.config = config;
      this.priorityAdjustment = federationUpstream.getPriorityAdjustment() + (config.getPriorityAdjustment() == null ? -1 : config.getPriorityAdjustment());
      String metaDataFilterString = config.isIncludeFederated() ? null : "hyphenated_props:" + FederatedQueueConsumer.FEDERATION_NAME +  " IS NOT NULL";
      metaDataFilter = FilterImpl.createFilter(metaDataFilterString);
      if (config.getIncludes().isEmpty()) {
         includes = Collections.emptySet();
      } else {
         includes = new HashSet<>(config.getIncludes().size());
         for (FederationQueuePolicyConfiguration.Matcher include : config.getIncludes()) {
            includes.add(new Matcher(include, wildcardConfiguration));
         }
      }

      if (config.getExcludes().isEmpty()) {
         excludes = Collections.emptySet();
      } else {
         excludes = new HashSet<>(config.getExcludes().size());
         for (FederationQueuePolicyConfiguration.Matcher exclude : config.getExcludes()) {
            excludes.add(new Matcher(exclude, wildcardConfiguration));
         }
      }
   }

   @Override
   public void start() {
      super.start();
      server.getPostOffice()
            .getAllBindings()
            .values()
            .stream()
            .filter(b -> b instanceof QueueBinding)
            .map(b -> (QueueBinding) b)
            .forEach(b -> conditionalCreateRemoteConsumer(b.getQueue()));
   }

   /**
    * After a consumer has been created
    *
    * @param consumer the created consumer
    */
   @Override
   public synchronized void afterCreateConsumer(ServerConsumer consumer) {
      conditionalCreateRemoteConsumer(consumer);
   }

   public FederationQueuePolicyConfiguration getConfig() {
      return config;
   }

   private void conditionalCreateRemoteConsumer(ServerConsumer  consumer) {
      if (server.hasBrokerFederationPlugins()) {
         final AtomicBoolean conditionalCreate = new AtomicBoolean(true);
         try {
            server.callBrokerFederationPlugins(plugin -> {
               conditionalCreate.set(conditionalCreate.get() && plugin.federatedQueueConditionalCreateConsumer(consumer));
            });
         } catch (ActiveMQException t) {
            ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "federatedQueueConditionalCreateConsumer");
            throw new IllegalStateException(t.getMessage(), t.getCause());
         }
         if (!conditionalCreate.get()) {
            return;
         }
      }
      createRemoteConsumer(consumer);
   }

   private void conditionalCreateRemoteConsumer(Queue queue) {
      queue.getConsumers()
            .stream()
            .filter(consumer -> consumer instanceof ServerConsumer)
            .map(c -> (ServerConsumer) c).forEach(this::conditionalCreateRemoteConsumer);
   }

   private void createRemoteConsumer(ServerConsumer consumer) {

      //We check the session meta data to see if its a federation session, if so by default we ignore these.
      //To not ignore these, set include-federated to true, which will mean no meta data filter.
      ServerSession serverSession = server.getSessionByID(consumer.getSessionID());
      if (metaDataFilter != null && serverSession != null && metaDataFilter.match(serverSession.getMetaData())) {
         return;
      }
      if (match(consumer)) {
         FederatedConsumerKey key = getKey(consumer);
         Transformer transformer = getTransformer(config.getTransformerRef());
         Transformer fqqnTransformer = message -> message == null ? null : message.setAddress(key.getFqqn());
         createRemoteConsumer(key, mergeTransformers(fqqnTransformer, transformer), null);
      }
   }

   //......
}
  • FederatedQueue继承了FederatedAbstract,其start方法遍历QueueBinding,然后挨个执行conditionalCreateRemoteConsumer;conditionalCreateRemoteConsumer方法通过父类的createRemoteConsumer来创建remoteQueueConsumer

FederatedAbstract

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedAbstract.java

public abstract class FederatedAbstract implements ActiveMQServerBasePlugin {

   //......

   public synchronized void createRemoteConsumer(FederatedConsumerKey key, Transformer transformer, ClientSessionCallback callback) {
      if (started) {
         FederatedQueueConsumer remoteQueueConsumer = remoteQueueConsumers.get(key);
         if (remoteQueueConsumer == null) {
            if (server.hasBrokerFederationPlugins()) {
               try {
                  server.callBrokerFederationPlugins(plugin -> plugin.beforeCreateFederatedQueueConsumer(key));
               } catch (ActiveMQException t) {
                  ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "beforeCreateFederatedQueueConsumer");
                  throw new IllegalStateException(t.getMessage(), t.getCause());
               }
            }
            remoteQueueConsumer = new FederatedQueueConsumerImpl(federation, server, transformer, key, upstream, callback);
            remoteQueueConsumer.start();
            remoteQueueConsumers.put(key, remoteQueueConsumer);

            if (server.hasBrokerFederationPlugins()) {
               try {
                  final FederatedQueueConsumer finalConsumer = remoteQueueConsumer;
                  server.callBrokerFederationPlugins(plugin -> plugin.afterCreateFederatedQueueConsumer(finalConsumer));
               } catch (ActiveMQException t) {
                  ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "afterCreateFederatedQueueConsumer");
                  throw new IllegalStateException(t.getMessage(), t.getCause());
               }
            }
         }
         remoteQueueConsumer.incrementCount();
      }
   }

   //......
}
  • FederatedAbstract的createRemoteConsumer创建FederatedQueueConsumerImpl并执行其start方法

FederatedQueueConsumerImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java

public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, SessionFailureListener {

   private static final Logger logger = Logger.getLogger(FederatedQueueConsumerImpl.class);
   private final ActiveMQServer server;
   private final Federation federation;
   private final FederatedConsumerKey key;
   private final Transformer transformer;
   private final FederationUpstream upstream;
   private final AtomicInteger count = new AtomicInteger();
   private final ScheduledExecutorService scheduledExecutorService;
   private final int intialConnectDelayMultiplier = 2;
   private final int intialConnectDelayMax = 30;
   private final ClientSessionCallback clientSessionCallback;

   private ClientSessionFactoryInternal clientSessionFactory;
   private ClientSession clientSession;
   private ClientConsumer clientConsumer;

   public FederatedQueueConsumerImpl(Federation federation, ActiveMQServer server, Transformer transformer, FederatedConsumerKey key, FederationUpstream upstream, ClientSessionCallback clientSessionCallback) {
      this.federation = federation;
      this.server = server;
      this.key = key;
      this.transformer = transformer;
      this.upstream = upstream;
      this.scheduledExecutorService = server.getScheduledPool();
      this.clientSessionCallback = clientSessionCallback;
   }

   @Override
   public FederationUpstream getFederationUpstream() {
      return upstream;
   }

   @Override
   public Federation getFederation() {
      return federation;
   }

   @Override
   public FederatedConsumerKey getKey() {
      return key;
   }

   @Override
   public ClientSession getClientSession() {
      return clientSession;
   }

   @Override
   public int incrementCount() {
      return count.incrementAndGet();
   }

   @Override
   public int decrementCount() {
      return count.decrementAndGet();
   }

   @Override
   public void start() {
      scheduleConnect(0);
   }

   private void scheduleConnect(int delay) {
      scheduledExecutorService.schedule(() -> {
         try {
            connect();
         } catch (Exception e) {
            scheduleConnect(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax));
         }
      }, delay, TimeUnit.SECONDS);
   }

   private void connect() throws Exception {
      try {
         if (clientConsumer == null) {
            synchronized (this) {
               this.clientSessionFactory = (ClientSessionFactoryInternal) upstream.getConnection().clientSessionFactory();
               this.clientSession = clientSessionFactory.createSession(upstream.getUser(), upstream.getPassword(), false, true, true, clientSessionFactory.getServerLocator().isPreAcknowledge(), clientSessionFactory.getServerLocator().getAckBatchSize());
               this.clientSession.addFailureListener(this);
               this.clientSession.addMetaData(FEDERATION_NAME, federation.getName().toString());
               this.clientSession.addMetaData(FEDERATION_UPSTREAM_NAME, upstream.getName().toString());
               this.clientSession.start();
               if (clientSessionCallback != null) {
                  clientSessionCallback.callback(clientSession);
               }
               if (clientSession.queueQuery(key.getQueueName()).isExists()) {
                  this.clientConsumer = clientSession.createConsumer(key.getQueueName(), key.getFilterString(), key.getPriority(), false);
                  this.clientConsumer.setMessageHandler(this);
               } else {
                  throw new ActiveMQNonExistentQueueException("Queue " + key.getQueueName() + " does not exist on remote");
               }
            }
         }
      } catch (Exception e) {
         try {
            if (clientSessionFactory != null) {
               clientSessionFactory.cleanup();
            }
            disconnect();
         } catch (ActiveMQException ignored) {
         }
         throw e;
      }
   }

   @Override
   public void close() {
      scheduleDisconnect(0);
   }

   private void scheduleDisconnect(int delay) {
      scheduledExecutorService.schedule(() -> {
         try {
            disconnect();
         } catch (Exception ignored) {
         }
      }, delay, TimeUnit.SECONDS);
   }

   private void disconnect() throws ActiveMQException {
      if (clientConsumer != null) {
         clientConsumer.close();
      }
      if (clientSession != null) {
         clientSession.close();
      }
      clientConsumer = null;
      clientSession = null;

      if (clientSessionFactory != null && (!upstream.getConnection().isSharedConnection() ||
          clientSessionFactory.numSessions() == 0)) {
         clientSessionFactory.close();
         clientSessionFactory = null;
      }
   }

   @Override
   public void onMessage(ClientMessage clientMessage) {
      try {
         if (server.hasBrokerFederationPlugins()) {
            try {
               server.callBrokerFederationPlugins(plugin -> plugin.beforeFederatedQueueConsumerMessageHandled(this, clientMessage));
            } catch (ActiveMQException t) {
               ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "beforeFederatedQueueConsumerMessageHandled");
               throw new IllegalStateException(t.getMessage(), t.getCause());
            }
         }

         Message message = transformer == null ? clientMessage : transformer.transform(clientMessage);
         if (message != null) {
            server.getPostOffice().route(message, true);
         }
         clientMessage.acknowledge();

         if (server.hasBrokerFederationPlugins()) {
            try {
               server.callBrokerFederationPlugins(plugin -> plugin.afterFederatedQueueConsumerMessageHandled(this, clientMessage));
            } catch (ActiveMQException t) {
               ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "afterFederatedQueueConsumerMessageHandled");
               throw new IllegalStateException(t.getMessage(), t.getCause());
            }
         }
      } catch (Exception e) {
         try {
            clientSession.rollback();
         } catch (ActiveMQException e1) {
         }
      }
   }

   @Override
   public void connectionFailed(ActiveMQException exception, boolean failedOver) {
      connectionFailed(exception, failedOver, null);
   }

   @Override
   public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
      try {
         clientSessionFactory.cleanup();
         clientSessionFactory.close();
         clientConsumer = null;
         clientSession = null;
         clientSessionFactory = null;
      } catch (Throwable dontCare) {
      }
      start();
   }

   @Override
   public void beforeReconnect(ActiveMQException exception) {
   }

   public interface ClientSessionCallback {
      void callback(ClientSession clientSession) throws ActiveMQException;
   }
}
  • FederatedQueueConsumerImpl的start方法执行scheduleConnect方法,delay参数为0;scheduleConnect方法会使用scheduledExecutorService调度执行connect方法,在捕获到异常时计算新的delay再次执行scheduleConnect;其close方法执行scheduleDisconnect方法,delay参数为0;scheduleDisconnect方法则调度执行disconnect方法;connect方法通过upstream的clientSessionFactory创建clientSession并执行其start方法,之后创建clientConsumer并设置其messageHandler(onMessage方法主要是执行server.getPostOffice().route(message, true)以及clientMessage.acknowledge());disconnect方法则执行clientConsumer及clientSession的close

ClientSessionImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java

public final class ClientSessionImpl implements ClientSessionInternal, FailureListener {

   //......

   public ClientSessionImpl start() throws ActiveMQException {
      checkClosed();

      if (!started) {
         for (ClientConsumerInternal clientConsumerInternal : cloneConsumers()) {
            clientConsumerInternal.start();
         }

         sessionContext.sessionStart();

         started = true;
      }

      return this;
   }

   //......
}
  • ClientSessionImpl的start方法主要是执行clientConsumerInternal.start()及sessionContext.sessionStart()

ClientConsumerImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

public final class ClientConsumerImpl implements ClientConsumerInternal {

   //......

   public synchronized void start() {
      stopped = false;

      requeueExecutors();
   }

   private void requeueExecutors() {
      for (int i = 0; i < buffer.size(); i++) {
         queueExecutor();
      }
   }

   private void queueExecutor() {
      if (logger.isTraceEnabled()) {
         logger.trace(this + "::Adding Runner on Executor for delivery");
      }

      sessionExecutor.execute(runner);
   }

   private class Runner implements Runnable {

      @Override
      public void run() {
         try {
            callOnMessage();
         } catch (Exception e) {
            ActiveMQClientLogger.LOGGER.onMessageError(e);

            lastException = e;
         }
      }
   }

   private void callOnMessage() throws Exception {
      if (closing || stopped) {
         return;
      }

      session.workDone();

      // We pull the message from the buffer from inside the Runnable so we can ensure priority
      // ordering. If we just added a Runnable with the message to the executor immediately as we get it
      // we could not do that

      ClientMessageInternal message;

      // Must store handler in local variable since might get set to null
      // otherwise while this is executing and give NPE when calling onMessage
      MessageHandler theHandler = handler;

      if (theHandler != null) {
         if (rateLimiter != null) {
            rateLimiter.limit();
         }

         failedOver = false;

         synchronized (this) {
            message = buffer.poll();
         }

         if (message != null) {
            if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
               //Ignore, this could be a relic from a previous receiveImmediate();
               return;
            }

            boolean expired = message.isExpired();

            flowControlBeforeConsumption(message);

            if (!expired) {
               if (logger.isTraceEnabled()) {
                  logger.trace(this + "::Calling handler.onMessage");
               }
               final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
                  @Override
                  public ClassLoader run() {
                     ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();

                     Thread.currentThread().setContextClassLoader(contextClassLoader);

                     return originalLoader;
                  }
               });

               onMessageThread = Thread.currentThread();
               try {
                  theHandler.onMessage(message);
               } finally {
                  try {
                     AccessController.doPrivileged(new PrivilegedAction<Object>() {
                        @Override
                        public Object run() {
                           Thread.currentThread().setContextClassLoader(originalLoader);
                           return null;
                        }
                     });
                  } catch (Exception e) {
                     ActiveMQClientLogger.LOGGER.failedPerformPostActionsOnMessage(e);
                  }

                  onMessageThread = null;
               }

               if (logger.isTraceEnabled()) {
                  logger.trace(this + "::Handler.onMessage done");
               }

               if (message.isLargeMessage()) {
                  message.discardBody();
               }
            } else {
               session.expire(this, message);
            }

            // If slow consumer, we need to send 1 credit to make sure we get another message
            if (clientWindowSize == 0) {
               startSlowConsumer();
            }
         }
      }
   }

   //......
}   
  • ClientConsumerImpl的start方法会调度执行Runner,其run方法则是执行callOnMessage方法,该方法会通过buffer.poll()拉取信息,然后执行theHandler.onMessage(message)回调

ActiveMQSessionContext

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java

public class ActiveMQSessionContext extends SessionContext {

   //......

   public void sessionStart() throws ActiveMQException {
      sessionChannel.send(new PacketImpl(PacketImpl.SESS_START));
   }

   //......
}
  • ActiveMQSessionContext的sessionStart方法通过sessionChannel发送PacketImpl.SESS_START消息

小结

FederatedQueueConsumerImpl的start方法执行scheduleConnect方法,delay参数为0;scheduleConnect方法会使用scheduledExecutorService调度执行connect方法,在捕获到异常时计算新的delay再次执行scheduleConnect;其close方法执行scheduleDisconnect方法,delay参数为0;scheduleDisconnect方法则调度执行disconnect方法;connect方法通过upstream的clientSessionFactory创建clientSession并执行其start方法,之后创建clientConsumer并设置其messageHandler;disconnect方法则执行clientConsumer及clientSession的close

doc