1.概述
这篇文章主要就是对vertx通信的一个源码分析。首先分析本地的实现,因为集群通信大部分基于本地实现。后面会介绍vertx集群的实现。
2.单进程eventbus源码分析
实例的创建 在初始化vertx实例的时候会对其进行初始化。 如果是集群方式,通过ClusteredEventBus创建,否则通过new EventBusImpl对象创建。
if (options.getEventBusOptions().isClustered()) {
this.clusterManager = getClusterManager(options);
this.eventBus = new ClusteredEventBus(this, options, clusterManager);
} else {
this.clusterManager = null;
this.eventBus = new EventBusImpl(this);
}
创建成功之后调用start方法启动,并且通过volatile进行内存可见性,以及通过syn枷锁。保证只会被启动一次。这里只是设置了当前状态。
public synchronized void start(Handler<AsyncResult<Void>> completionHandler) {
if (started) {
throw new IllegalStateException("Already started");
}
started = true;
completionHandler.handle(Future.succeededFuture());
}
consumer方法
一般情况下,我们绑定的时候会调用这个方法。对于一个addr,会找到对应的handler执行逻辑处理。
public <T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler) {
Objects.requireNonNull(handler, "handler");
MessageConsumer<T> consumer = consumer(address);
consumer.handler(handler);
return consumer;
}
public <T> MessageConsumer<T> consumer(String address) {
checkStarted();
Objects.requireNonNull(address, "address");
return new HandlerRegistration<>(vertx, metrics, this, address, null, false, null, -1);
}
首先checkStarted会判断是否启动。然后创建HandlerRegistration对象。其实就是一个MessageConsumer,根据名字知道是用来消费消息的。
ok,我们来看HandlerRegistration创建的时候会做哪些事情。其实就是初始化对应的成员变量。然后判断是否有超时时间,如果有设置一个超时返回失败的调用。后面介绍这个。
public HandlerRegistration(Vertx vertx, EventBusMetrics metrics, EventBusImpl eventBus, String address,
String repliedAddress, boolean localOnly,
Handler<AsyncResult<Message<T>>> asyncResultHandler, long timeout) {
this.vertx = vertx;
this.metrics = metrics;
this.eventBus = eventBus;
this.address = address;
this.repliedAddress = repliedAddress;
this.localOnly = localOnly;
this.asyncResultHandler = asyncResultHandler;
if (timeout != -1) {
timeoutID = vertx.setTimer(timeout, tid -> {
if (metrics != null) {
metrics.replyFailure(address, ReplyFailure.TIMEOUT);
}
sendAsyncResultFailure(new ReplyException(ReplyFailure.TIMEOUT, "Timed out after waiting " + timeout + "(ms) for a reply. address: " + address + ", repliedAddress: " + repliedAddress));
});
}
}
调用完这个方法之后,我们需要调用handler方法绑定对应的实现。我们看对应的handler方法。双重枷锁保证registered只会被创建一次。我们细节看一下addRegistration方法。
@Override
public synchronized MessageConsumer<T> handler(Handler<Message<T>> h) {
if (h != null) {
synchronized (this) {
handler = h;
if (registered == null) {
registered = eventBus.addRegistration(address, this, repliedAddress != null, localOnly);
}
}
return this;
}
this.unregister();
return this;
}
addRegistration方法先调用addLocalRegistration,然后将结果设置到对应的result中。
- 这个方法先会获取一个context,其实在vertx中每个handler都会有一个context中。这个方法判断当前线程时候是一个vertxThread。如果是说明在verticle中,直接返回,如果不是说明不在verticle中,然后判断当前线程是否为netty线程,如果是直接返回这个线程的ctx。否则通过create选取一个eventloop创建一个ctx。
- 将创建的contex设置到MC(MessageComsumer)中。
- 创建一个HandlerHolder,其实碰到holder就能明白。持有者的意思。他会将我们的MC保存在这里,并且保存一些附加的信息。
- 完事之后将holder加入到ConcurrentCyclicSequence中。这是一个并发的循环序列。增加的时候通过copy的方式。每次都是在原来的基础上new的。
- 通过map的merge方法。其实就是如果addr存在。执行我们的逻辑操作,就是将对象add到原本的handlers。很实用的操作。
- 添加close的hook
private <T> LocalRegistrationResult<T> addLocalRegistration(String address, HandlerRegistration<T> registration,
boolean replyHandler, boolean localOnly) {
Objects.requireNonNull(address, "address");
Context context = vertx.getOrCreateContext();
registration.setHandlerContext(context);
HandlerHolder<T> holder = new HandlerHolder<>(registration, replyHandler, localOnly, context);
ConcurrentCyclicSequence<HandlerHolder> handlers = new ConcurrentCyclicSequence<HandlerHolder>().add(holder);
ConcurrentCyclicSequence<HandlerHolder> actualHandlers = handlerMap.merge(
address,
handlers,
(old, prev) -> old.add(prev.first()));
if (context.deploymentID() != null) {
HandlerEntry entry = new HandlerEntry<>(address, registration);
context.addCloseHook(entry);
}
boolean newAddress = handlers == actualHandlers;
return new LocalRegistrationResult<>(holder, newAddress);
}
基本逻辑很简单,最终就是放在一个chm中。
其实取消注册的方法也很简单,就是执行一些回调。并且将队列的消息全部执行。这里并不是在我们定义的handler执行,而是在discard执行。然后调用eventbus的remove方法移除。 其实就是调用chm移除,如果有hook一会将hook移除,这里注意,hook是放在一个set中,并且他重写了HandlerEntry的hashcode和equals方法,如果address和handler一样,就会移除。所以这里直接new一个就可以起到移除的作用。
if (holder.setRemoved() && holder.getContext().deploymentID() != null) {
holder.getContext().removeCloseHook(new HandlerEntry<>(address, holder.getHandler()));
}
对于消息发送的逻辑就更简单了。获取地址然后发送即可。后面我们直接通过集群模式来看。
3.集群eventbus源码分析
对于集群下的eventbus,是需要一个集群管理器的。vertx实现了一个可插拔的集群模式。我们需要实现集群的话,引入支持的集群jar就可以。其默认使用 Hazelcast ,实现了一个管理器HazelcastClusterManager 。众所周知,Hazelcast 可以将多个服务组成一个分布式环境。并且是一个去中心化的方式。它具有master的选举,会选举集群的中一个节点作为master,管理集群节点。集群中的数据采用分布式存储。通过数据冗余保证数据不丢失。其实类似于一个redis分布式数据存储的概念。
我们看一下ClusteredEventBus启动的start方法。
这个方法看似复杂,其实逻辑很简单。
首先会创建HAManager。然后设置节点change的handler。(主要是保证vertical高可用,没什么卵用感觉)后面会调用集群管理器获取对应的map(分布式map),这个过程也是异步的。因此注册了回调。如果成功。将他的结果赋值给本地subs,然后启动tcpserver。监听端口成功执行回调。这个回调主要就是创建nodeinfo和serverId,并且加入到haManager中。所以这个tcp server就保证其他节点的eventbus可以和当前节点通信。也就是用来接收其他client的数据。
public void start(Handler<AsyncResult<Void>> resultHandler) {
// Get the HA manager, it has been constructed but it's not yet initialized
HAManager haManager = vertx.haManager();
setClusterViewChangedHandler(haManager);
clusterManager.<String, ClusterNodeInfo>getAsyncMultiMap(SUBS_MAP_NAME, ar1 -> {
if (ar1.succeeded()) {
subs = ar1.result();
server = vertx.createNetServer(getServerOptions());
server.connectHandler(getServerHandler());
server.listen(...);
} else {
resultHandler.handle(Future.failedFuture(ar1.cause()));
}
});
}
看到这里其实应该觉得很简单了。就是通过集群管理器来维持集群,本地对通过对localeventbus的重构,来完成集群的一些必要操作。如果地址注册,更新subs。以及接收到其他节点数据,需要解析之后在本地执行等等。 集群启动之后本地回启动一个TCPserver(NetServier),用来监听其他节点的事件。如果有事件需要在别的节点执行,会创建一个TCPClient(NetClient)然后在subs获取节点信息,通过负载获取一个节点,建立TCP连接,传递数据。底层网络调用还是Netty。
Consumer方法
集群的eventbus继承自eventbusImpl。创建逻辑也是很简单 我们直接看他的Consumer方法,其实就是eventbusImpl里面的方法,大部分逻辑类似,在addRegistration,他在addLocalRegistration之后,会根据是否为新的newAddress调用addRegistration方法。
protected <T> HandlerHolder<T> addRegistration(String address, HandlerRegistration<T> registration,
boolean replyHandler, boolean localOnly) {
Objects.requireNonNull(registration.getHandler(), "handler");
LocalRegistrationResult<T> result = addLocalRegistration(address, registration, replyHandler, localOnly);
addRegistration(result.newAddress, address, replyHandler, localOnly, registration::setResult);
return result.holder;
}
这个方法在子类ClusteredEventBus中实现了继承。 这个条件:如果是新注册,并且不是自动生成的reply consumer,并且允许支持集群,就会执行if代码。其实这里就是将address以及nodeInfo信息加入到subs中。这个subs其实是有集群管理器提供的,最后实现信息的共享(集群之间)。这里是异步的所以没有后续。
@Override
protected <T> void addRegistration(boolean newAddress, String address,
boolean replyHandler, boolean localOnly,
Handler<AsyncResult<Void>> completionHandler) {
if (newAddress && subs != null && !replyHandler && !localOnly) {
// Propagate the information
subs.add(address, nodeInfo, completionHandler);
ownSubs.add(address);
} else {
completionHandler.handle(Future.succeededFuture());
}
}
看一下send的方法
数据会被封装为ClusteredMessage.,其实继承自本地的MessageImpl,然后回调用sendcontext的next方法。这个方法是在发送之前执行一些拦截操作。
@Override
public MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName, Handler<AsyncResult<Void>> writeHandler) {
Objects.requireNonNull(address, "no null address accepted");
MessageCodec codec = codecManager.lookupCodec(body, codecName);
@SuppressWarnings("unchecked")
ClusteredMessage msg = new ClusteredMessage(serverID, address, null, headers, body, codec, send, this, writeHandler);
return msg;
}
之后会调用 sendOrPub方法,这个方法在本地和集群的eventbus都有实现。但是集群的肯定和本地逻辑不一样。下面我们贴出来集群实现的代码。如果是onlylocal,那么执行本地调用其实就是根据地址选取本地的handler执行。否则通过subs.get方法。这个方法其实首先根据地址获取对应的serverids,这个操作是异步的,然后注册了一个onSubsReceived会调用,最后根据异步结果的执行回调。其实结果就是serverids。
@Override
protected <T> void sendOrPub(OutboundDeliveryContext<T> sendContext) {
if (sendContext.options.isLocalOnly()) {
if (metrics != null) {
metrics.messageSent(sendContext.message.address(), !sendContext.message.isSend(), true, false);
}
deliverMessageLocally(sendContext);
} else if (Vertx.currentContext() == null) {
// Guarantees the order when there is no current context
sendNoContext.runOnContext(v -> {
subs.get(sendContext.message.address(), ar -> onSubsReceived(ar, sendContext));
});
} else {
subs.get(sendContext.message.address(), ar -> onSubsReceived(ar, sendContext));
}
}
继续回调onSubsReceived,获取serverIDs,调用sendToSubs
private <T> void onSubsReceived(AsyncResult<ChoosableIterable<ClusterNodeInfo>> asyncResult, OutboundDeliveryContext<T> sendContext) {
if (asyncResult.succeeded()) {
ChoosableIterable<ClusterNodeInfo> serverIDs = asyncResult.result();
if (serverIDs != null && !serverIDs.isEmpty()) {
sendToSubs(serverIDs, sendContext);
} else {
...
}
} else {
...
}
}
}
sendToSubs方法主要就是先判断是send和事pub,然后选出一个serverid,注意不会发送到本节点。最后调用sendRemote发送。
private <T> void sendToSubs(ChoosableIterable<ClusterNodeInfo> subs, OutboundDeliveryContext<T> sendContext) {
String address = sendContext.message.address();
if (sendContext.message.isSend()) {
// Choose one
ClusterNodeInfo ci = subs.choose();
ServerID sid = ci == null ? null : ci.serverID;
if (sid != null && !sid.equals(serverID)) { //We don't send to this node
if (metrics != null) {
metrics.messageSent(address, false, false, true);
}
sendRemote(sid, sendContext.message);
} else {
if (metrics != null) {
metrics.messageSent(address, false, true, false);
}
deliverMessageLocally(sendContext);
}
} else {
// Publish
}
}
发送的代码就不贴了。其实就是从连接池获取对应连接,如果没有创建TCP连接。然后发送消息,发送的时候会对其编码,毕竟协议要对等。最后调用socket的write方法。
消息的接收
在集群启动的时候会创建server,并且会注册事件回调函数getServerHandler。这个函数会对buffer反序列化,然后通过deliverMessageLocally执行对应的方法。实现消息的执行。其实就是本地事件处理的方法。
private Handler<NetSocket> getServerHandler() {
return socket -> {
RecordParser parser = RecordParser.newFixed(4);
Handler<Buffer> handler = new Handler<Buffer>() {
int size = -1;
public void handle(Buffer buff) {
if (size == -1) {
size = buff.getInt(0);
parser.fixedSizeMode(size);
} else {
ClusteredMessage received = new ClusteredMessage();
received.readFromWire(buff, codecManager);
if (metrics != null) {
metrics.messageRead(received.address(), buff.length());
}
parser.fixedSizeMode(4);
size = -1;
if (received.codec() == CodecManager.PING_MESSAGE_CODEC) {
// Just send back pong directly on connection
socket.write(PONG);
} else {
deliverMessageLocally(received);
}
}
}
};
parser.setOutput(handler);
socket.handler(parser);
};
}
对于vert.x的群集,数据为一些地址和节点信息。虽然更新频率很高,但是感觉对数据广播也没有多大的压力。默认Hazelcast,有必要去中心化分布式存储吗?本地缓存所有数据副本也行吧。zk不是挺中用呢么!行为不规范。希望大家可以评论指出问题。