Vert.x的eventbus源码解读

1,925 阅读7分钟

1.概述

这篇文章主要就是对vertx通信的一个源码分析。首先分析本地的实现,因为集群通信大部分基于本地实现。后面会介绍vertx集群的实现。

2.单进程eventbus源码分析

实例的创建 在初始化vertx实例的时候会对其进行初始化。 如果是集群方式,通过ClusteredEventBus创建,否则通过new EventBusImpl对象创建。

if (options.getEventBusOptions().isClustered()) {
  this.clusterManager = getClusterManager(options);
  this.eventBus = new ClusteredEventBus(this, options, clusterManager);
} else {
  this.clusterManager = null;
  this.eventBus = new EventBusImpl(this);
}

创建成功之后调用start方法启动,并且通过volatile进行内存可见性,以及通过syn枷锁。保证只会被启动一次。这里只是设置了当前状态。

public synchronized void start(Handler<AsyncResult<Void>> completionHandler) {
  if (started) {
    throw new IllegalStateException("Already started");
  }
  started = true;
  completionHandler.handle(Future.succeededFuture());
}

consumer方法

一般情况下,我们绑定的时候会调用这个方法。对于一个addr,会找到对应的handler执行逻辑处理。

public <T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler) {
  Objects.requireNonNull(handler, "handler");
  MessageConsumer<T> consumer = consumer(address);
  consumer.handler(handler);
  return consumer;
}
public <T> MessageConsumer<T> consumer(String address) {
  checkStarted();
  Objects.requireNonNull(address, "address");
  return new HandlerRegistration<>(vertx, metrics, this, address, null, false, null, -1);
}

首先checkStarted会判断是否启动。然后创建HandlerRegistration对象。其实就是一个MessageConsumer,根据名字知道是用来消费消息的。

ok,我们来看HandlerRegistration创建的时候会做哪些事情。其实就是初始化对应的成员变量。然后判断是否有超时时间,如果有设置一个超时返回失败的调用。后面介绍这个。

public HandlerRegistration(Vertx vertx, EventBusMetrics metrics, EventBusImpl eventBus, String address,
                           String repliedAddress, boolean localOnly,
                           Handler<AsyncResult<Message<T>>> asyncResultHandler, long timeout) {
  this.vertx = vertx;
  this.metrics = metrics;
  this.eventBus = eventBus;
  this.address = address;
  this.repliedAddress = repliedAddress;
  this.localOnly = localOnly;
  this.asyncResultHandler = asyncResultHandler;
  if (timeout != -1) {
    timeoutID = vertx.setTimer(timeout, tid -> {
      if (metrics != null) {
        metrics.replyFailure(address, ReplyFailure.TIMEOUT);
      }
      sendAsyncResultFailure(new ReplyException(ReplyFailure.TIMEOUT, "Timed out after waiting " + timeout + "(ms) for a reply. address: " + address + ", repliedAddress: " + repliedAddress));
    });
  }
}

调用完这个方法之后,我们需要调用handler方法绑定对应的实现。我们看对应的handler方法。双重枷锁保证registered只会被创建一次。我们细节看一下addRegistration方法。

@Override
public synchronized MessageConsumer<T> handler(Handler<Message<T>> h) {
  if (h != null) {
    synchronized (this) {
      handler = h;
      if (registered == null) {
        registered = eventBus.addRegistration(address, this, repliedAddress != null, localOnly);
      }
    }
    return this;
  }
  this.unregister();
  return this;
}

addRegistration方法先调用addLocalRegistration,然后将结果设置到对应的result中。

  1. 这个方法先会获取一个context,其实在vertx中每个handler都会有一个context中。这个方法判断当前线程时候是一个vertxThread。如果是说明在verticle中,直接返回,如果不是说明不在verticle中,然后判断当前线程是否为netty线程,如果是直接返回这个线程的ctx。否则通过create选取一个eventloop创建一个ctx。
  2. 将创建的contex设置到MC(MessageComsumer)中。
  3. 创建一个HandlerHolder,其实碰到holder就能明白。持有者的意思。他会将我们的MC保存在这里,并且保存一些附加的信息。
  4. 完事之后将holder加入到ConcurrentCyclicSequence中。这是一个并发的循环序列。增加的时候通过copy的方式。每次都是在原来的基础上new的。
  5. 通过map的merge方法。其实就是如果addr存在。执行我们的逻辑操作,就是将对象add到原本的handlers。很实用的操作。
  6. 添加close的hook
private <T> LocalRegistrationResult<T> addLocalRegistration(String address, HandlerRegistration<T> registration,
                                                         boolean replyHandler, boolean localOnly) {
  Objects.requireNonNull(address, "address");
  Context context = vertx.getOrCreateContext();
  registration.setHandlerContext(context);
  HandlerHolder<T> holder = new HandlerHolder<>(registration, replyHandler, localOnly, context);

  ConcurrentCyclicSequence<HandlerHolder> handlers = new ConcurrentCyclicSequence<HandlerHolder>().add(holder);
  ConcurrentCyclicSequence<HandlerHolder> actualHandlers = handlerMap.merge(
    address,
    handlers,
    (old, prev) -> old.add(prev.first()));
  if (context.deploymentID() != null) {
    HandlerEntry entry = new HandlerEntry<>(address, registration);
    context.addCloseHook(entry);
  }

  boolean newAddress = handlers == actualHandlers;
  return new LocalRegistrationResult<>(holder, newAddress);
}

基本逻辑很简单,最终就是放在一个chm中。

其实取消注册的方法也很简单,就是执行一些回调。并且将队列的消息全部执行。这里并不是在我们定义的handler执行,而是在discard执行。然后调用eventbus的remove方法移除。 其实就是调用chm移除,如果有hook一会将hook移除,这里注意,hook是放在一个set中,并且他重写了HandlerEntry的hashcode和equals方法,如果address和handler一样,就会移除。所以这里直接new一个就可以起到移除的作用。

if (holder.setRemoved() && holder.getContext().deploymentID() != null) {
  holder.getContext().removeCloseHook(new HandlerEntry<>(address, holder.getHandler()));
}

对于消息发送的逻辑就更简单了。获取地址然后发送即可。后面我们直接通过集群模式来看。

3.集群eventbus源码分析

对于集群下的eventbus,是需要一个集群管理器的。vertx实现了一个可插拔的集群模式。我们需要实现集群的话,引入支持的集群jar就可以。其默认使用 Hazelcast ,实现了一个管理器HazelcastClusterManager 。众所周知,Hazelcast 可以将多个服务组成一个分布式环境。并且是一个去中心化的方式。它具有master的选举,会选举集群的中一个节点作为master,管理集群节点。集群中的数据采用分布式存储。通过数据冗余保证数据不丢失。其实类似于一个redis分布式数据存储的概念。

我们看一下ClusteredEventBus启动的start方法。

这个方法看似复杂,其实逻辑很简单。

首先会创建HAManager。然后设置节点change的handler。(主要是保证vertical高可用,没什么卵用感觉)后面会调用集群管理器获取对应的map(分布式map),这个过程也是异步的。因此注册了回调。如果成功。将他的结果赋值给本地subs,然后启动tcpserver。监听端口成功执行回调。这个回调主要就是创建nodeinfo和serverId,并且加入到haManager中。所以这个tcp server就保证其他节点的eventbus可以和当前节点通信。也就是用来接收其他client的数据。

public void start(Handler<AsyncResult<Void>> resultHandler) {
  // Get the HA manager, it has been constructed but it's not yet initialized
  HAManager haManager = vertx.haManager();
  setClusterViewChangedHandler(haManager);
  clusterManager.<String, ClusterNodeInfo>getAsyncMultiMap(SUBS_MAP_NAME, ar1 -> {
    if (ar1.succeeded()) {
      subs = ar1.result();
      server = vertx.createNetServer(getServerOptions());
      server.connectHandler(getServerHandler());
      server.listen(...);
    } else {
      resultHandler.handle(Future.failedFuture(ar1.cause()));
    }
  });
}

看到这里其实应该觉得很简单了。就是通过集群管理器来维持集群,本地对通过对localeventbus的重构,来完成集群的一些必要操作。如果地址注册,更新subs。以及接收到其他节点数据,需要解析之后在本地执行等等。 集群启动之后本地回启动一个TCPserver(NetServier),用来监听其他节点的事件。如果有事件需要在别的节点执行,会创建一个TCPClient(NetClient)然后在subs获取节点信息,通过负载获取一个节点,建立TCP连接,传递数据。底层网络调用还是Netty。

Consumer方法

集群的eventbus继承自eventbusImpl。创建逻辑也是很简单 我们直接看他的Consumer方法,其实就是eventbusImpl里面的方法,大部分逻辑类似,在addRegistration,他在addLocalRegistration之后,会根据是否为新的newAddress调用addRegistration方法。

protected <T> HandlerHolder<T> addRegistration(String address, HandlerRegistration<T> registration,
                                   boolean replyHandler, boolean localOnly) {
  Objects.requireNonNull(registration.getHandler(), "handler");
  LocalRegistrationResult<T> result = addLocalRegistration(address, registration, replyHandler, localOnly);
  addRegistration(result.newAddress, address, replyHandler, localOnly, registration::setResult);
  return result.holder;
}

这个方法在子类ClusteredEventBus中实现了继承。 这个条件:如果是新注册,并且不是自动生成的reply consumer,并且允许支持集群,就会执行if代码。其实这里就是将address以及nodeInfo信息加入到subs中。这个subs其实是有集群管理器提供的,最后实现信息的共享(集群之间)。这里是异步的所以没有后续。

@Override
protected <T> void addRegistration(boolean newAddress, String address,
                                   boolean replyHandler, boolean localOnly,
                                   Handler<AsyncResult<Void>> completionHandler) {
  if (newAddress && subs != null && !replyHandler && !localOnly) {
    // Propagate the information
    subs.add(address, nodeInfo, completionHandler);
    ownSubs.add(address);
  } else {
    completionHandler.handle(Future.succeededFuture());
  }
}

看一下send的方法

数据会被封装为ClusteredMessage.,其实继承自本地的MessageImpl,然后回调用sendcontext的next方法。这个方法是在发送之前执行一些拦截操作。

@Override
public MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName, Handler<AsyncResult<Void>> writeHandler) {
  Objects.requireNonNull(address, "no null address accepted");
  MessageCodec codec = codecManager.lookupCodec(body, codecName);
  @SuppressWarnings("unchecked")
  ClusteredMessage msg = new ClusteredMessage(serverID, address, null, headers, body, codec, send, this, writeHandler);
  return msg;
}

之后会调用 sendOrPub方法,这个方法在本地和集群的eventbus都有实现。但是集群的肯定和本地逻辑不一样。下面我们贴出来集群实现的代码。如果是onlylocal,那么执行本地调用其实就是根据地址选取本地的handler执行。否则通过subs.get方法。这个方法其实首先根据地址获取对应的serverids,这个操作是异步的,然后注册了一个onSubsReceived会调用,最后根据异步结果的执行回调。其实结果就是serverids。

@Override
protected <T> void sendOrPub(OutboundDeliveryContext<T> sendContext) {
  if (sendContext.options.isLocalOnly()) {
    if (metrics != null) {
      metrics.messageSent(sendContext.message.address(), !sendContext.message.isSend(), true, false);
    }
    deliverMessageLocally(sendContext);
  } else if (Vertx.currentContext() == null) {
    // Guarantees the order when there is no current context
    sendNoContext.runOnContext(v -> {
      subs.get(sendContext.message.address(), ar -> onSubsReceived(ar, sendContext));
    });
  } else {
    subs.get(sendContext.message.address(), ar -> onSubsReceived(ar, sendContext));
  }
}

继续回调onSubsReceived,获取serverIDs,调用sendToSubs

private <T> void onSubsReceived(AsyncResult<ChoosableIterable<ClusterNodeInfo>> asyncResult, OutboundDeliveryContext<T> sendContext) {
  if (asyncResult.succeeded()) {
    ChoosableIterable<ClusterNodeInfo> serverIDs = asyncResult.result();
    if (serverIDs != null && !serverIDs.isEmpty()) {
      sendToSubs(serverIDs, sendContext);
    } else {
         ...
    }
  } else {
        ...
    }
  }
}

sendToSubs方法主要就是先判断是send和事pub,然后选出一个serverid,注意不会发送到本节点。最后调用sendRemote发送。

private <T> void sendToSubs(ChoosableIterable<ClusterNodeInfo> subs, OutboundDeliveryContext<T> sendContext) {
  String address = sendContext.message.address();
  if (sendContext.message.isSend()) {
    // Choose one
    ClusterNodeInfo ci = subs.choose();
    ServerID sid = ci == null ? null : ci.serverID;
    if (sid != null && !sid.equals(serverID)) {  //We don't send to this node
      if (metrics != null) {
        metrics.messageSent(address, false, false, true);
      }
      sendRemote(sid, sendContext.message);
    } else {
      if (metrics != null) {
        metrics.messageSent(address, false, true, false);
      }
      deliverMessageLocally(sendContext);
    }
  } else {
    // Publish
  }
}

发送的代码就不贴了。其实就是从连接池获取对应连接,如果没有创建TCP连接。然后发送消息,发送的时候会对其编码,毕竟协议要对等。最后调用socket的write方法。

消息的接收

在集群启动的时候会创建server,并且会注册事件回调函数getServerHandler。这个函数会对buffer反序列化,然后通过deliverMessageLocally执行对应的方法。实现消息的执行。其实就是本地事件处理的方法。

private Handler<NetSocket> getServerHandler() {
  return socket -> {
    RecordParser parser = RecordParser.newFixed(4);
    Handler<Buffer> handler = new Handler<Buffer>() {
      int size = -1;
      public void handle(Buffer buff) {
        if (size == -1) {
          size = buff.getInt(0);
          parser.fixedSizeMode(size);
        } else {
          ClusteredMessage received = new ClusteredMessage();
          received.readFromWire(buff, codecManager);
          if (metrics != null) {
            metrics.messageRead(received.address(), buff.length());
          }
          parser.fixedSizeMode(4);
          size = -1;
          if (received.codec() == CodecManager.PING_MESSAGE_CODEC) {
            // Just send back pong directly on connection
            socket.write(PONG);
          } else {
            deliverMessageLocally(received);
          }
        }
      }
    };
    parser.setOutput(handler);
    socket.handler(parser);
  };
}

对于vert.x的群集,数据为一些地址和节点信息。虽然更新频率很高,但是感觉对数据广播也没有多大的压力。默认Hazelcast,有必要去中心化分布式存储吗?本地缓存所有数据副本也行吧。zk不是挺中用呢么!行为不规范。希望大家可以评论指出问题。