阅读 118

sofa-registry源码分析(Publisher注册与发布)

1.概述

最近在学习jvm层面的东西,但是为了不让博客断更,给大家业余时间分析一下sofa-registry的源码。价值就是学习一下蚂蚁同学的代码,并且熟悉一下现有微服务注册中心实现机制。我们的关注点可能侧重于他的ha以及海量存储方面 。有一说一:看源码归看源码,但不建议大家自己去撸中间件。还是多了解业务。技术方面游刃有余就可。

2.介绍

众所周知,看源码之前我们首先需要对框架有初步的认识。比如实现架构,功能,使用方式等。

架构

图片摘自https://www.sofastack.tech/blog/sofa-registry-introduction/

这幅图是整个框架的几个模块。

  • Client其实就是框架的API,业务服务需要依赖Client的jar使用服务。这其实就是微服务的痛点,对业务有侵入。未来service mesh可能会取代这些东西,但目前不太成熟。
  • SessionServer(session)其实就是接受客户端的服务发布和订阅请求。然后将数据交给DataServer,在数据变化的时候通知消费方。
  • DataServer,提供存储数据和通知数据变化的能力,通过多副本保证高可用,通过一致性哈希保证数据分布均匀。因此DataServer具有很强的伸缩性。
  • MetaServer 维护了DataServer和SessionServer的列表。在节点变化时通知其他节点。MetaServer集群通过raft进行主从选举和数据复制。

为什么要有session?

其实如果客户端数量不大,根本没有必要一层session代理。但是如果客户端数量过大, 并且没有这层session,就会导致客户端与dataserver建立大量的长链接。意味着每台 DataServer 承载的连接数会随 Client 数量的增长而增长。因为一个Client订阅的数据可能分布在不同的dataserver上。所以对dataserver扩容也不能避免这个问题。其实在阿里分布式缓存Tair中就有这个痛点。

3.源码分析

这里我看一张类图,可以看到继承结构很清晰,都依赖Register是因为都需要进行注册。

我们先来看一下服务发布的demo,这也是我们源码分析的入口。

@Component
@Slf4j
public class RegisterService {
   @PostConstruct
   public void init() {
      RegistryClientConfig config = DefaultRegistryClientConfigBuilder
            .start()
            .setRegistryEndpoint("localhost")
            .setRegistryEndpointPort(port).build();
      DefaultRegistryClient registryClient = new DefaultRegistryClient(config);
      registryClient.init();
      // 构造发布者注册表
      String dataId = "com.alipay.test.demo.service:1.0@DEFAULT";
      PublisherRegistration registration = new PublisherRegistration(dataId);
      registration.setAppName("test_appName");
      registration.setGroup("group");
      // 将注册表注册进客户端并发布数据
      registryClient.register(registration, "ip:port?name=lzy");
   }
}
复制代码

1.通过Builder模式初始化一份配置

2.创建注册客户端,并初始化。

3.创建发布的元数据。这个根据业务而定。业务的唯一标识。

4.调用registryClient.register方法注册发布。

DefaultRegistryClient#init

这个方法主要是一系列初始化操作。比如事件处理器。rpc通信以及协议处理器的初始化,这里先不多说。后面遇到会详细。rpc主要是用的是sofa的bolt。这里我们不做关注。

同步session节点信息

从demo中可以看到,上面配置了RegistryEndpoint。其实就是用来同步session节点信息的。

在服务初始化的过程中会启动SyncServerListThread同步节点信息。执行下面run方法。默认10s执行一次。为什么先sleep执行,是因为启动的时候会主动调用一次,避免冷启动。

public void run() {
   int retryInterval;
   while (true) {
      try {
         retryInterval = Math.max(MIN_RETRY_INTERVAL,
               config.getSyncConfigRetryInterval());
         Thread.sleep(retryInterval);


         syncServerList();
      } catch (Throwable e) {
         LOGGER.error("[serverManager] sync server list task error", e);
      }
   }
}
复制代码

主要执行逻辑的就是syncServerList方法。其实方法很简单,就是通过http方法。从session服务获取节点信息。我比较好奇的时候,如果RegistryEndpoint不可用的时候,这个方法该怎么执行。

Publisher的register方法

Publisher publisher = registrationPublisherMap.get(registration);
if (null != publisher) {
   throwDuplicateException(registration, publisher);
}

publisher = new DefaultPublisher(registration, workerThread, registryClientConfig);
((DefaultPublisher) publisher).setAuthManager(authManager);

Publisher oldPublisher = registrationPublisherMap.putIfAbsent(registration, publisher);
if (null != oldPublisher) {
   throwDuplicateException(registration, oldPublisher);
}
registerCache.addRegister(publisher);
publisher.republish(data);
复制代码

这个方法主要初始化一个DefaultPublisher,registrationPublisherMap保证了只会被初始化一次。

调用registerCache.addRegister加入缓存。通过uuid唯一。这里PublisherRegistration 并没有实现equals和hashcode方法。所以相同的dataId,是可以获取到不同的Publisher(dataInfo和Publisher是1->N)。我们重点关注一下publisher.publisher.republish方法。

DefaultPublisher#republish

这个方法逻辑很简单,其实就是更新发布版本和时间。

然后启动worker发布任务。具体执行下面这个逻辑。这里的this就是这个Publisher,每个Publisher都有一个dataList,一个公用的worker。

this.worker.schedule(new TaskEvent(this));

DefaultPublisher保存的元数据:
private final String REGIST_ID;
private PublisherRegistration registration;
private Worker worker;
private Collection<String> dataList;
private RegistryClientConfig config;
复制代码

Woker其实就是一个线程。schedule方法会将任务丢进队列。在线程没有启动的时候启动线程。至于signal其实就是唤醒当前线程去执行任务,其实就是使用object的wait以及notify。

public void schedule(TaskEvent event) {
   if (inited.compareAndSet(false, true)) {
      this.start();
   }
   requestQueue.put(event);
   signal();
}
复制代码

我们在再来看一下该线程的run方法

Run方法其实就调用了handle。handle通过while(true)进行任务处理;整个模型其实就是一个线程池的概念。这个过队列使用的是TaskQueue。内部存储为chm。一方面是因为对于一个publish或者subscriber,他们同时之会发布一个dataList。所以他们的数据通过RegistId映射起来。并且可以遍历移除处理完的任务。因为任务执行的过程中可能会失败。失败情况下我们是需要重新执行的。

public void handle() {
   //noinspection InfiniteLoopStatement
   while (true) {
      try {
         // check connection status, try to reconnect to the server when connection lose
         client.ensureConnected();
         if (requestQueue.isEmpty()) {
            await(config.getRecheckInterval());
            continue;
         }
         Iterator<TaskEvent> lt = requestQueue.iterator();
         while (lt.hasNext()) {
            client.ensureConnected();
            TaskEvent ev = lt.next();
            lt.remove();
            int sendCount = ev.incSendCount();


            // Resent needs delay when task event is not the first time to send.
            if (sendCount != 0 && ev.delayTime() > 0) {
               continue;
            }
            handleTask(ev);
         }
         // Cleaning completed task, it will take more time when the registration number is large.
         requestQueue.cleanCompletedTasks();
      } catch (Throwable e) {
         LOGGER.error("[send] handle data error!", e);
      }
   }
}
复制代码

1.确保连接

2.dump一份任务数据

3.遍历处理任务。每个任务有对应的sendcount和triggerTime。这里我们可以学习一下,其实就是如果这个任务不是第一次发送的时候,设置一个延迟时间。超时之后再重试。避免服务不可用的频发重试。在这个框架中,超时时间随着sendcount正比例叠加。

处理任务的方法为handleTask

这个方法其实很简单。首先依赖TaskEvent去构建一个SyncTask。SyncTask如下。

private String requestId;
private Object request;
private boolean done;
复制代码

这里我们需要清楚一个概念,上面所说的Publisher都是发布者的一些元数据信息。具体发布数据的时候我们需要从中提取一些信息。比如判断是否需要发布。以及发布的数据。订阅者是如此。

这个方法就是用来构造TaskEvent。这个方法是在DefaultPublisher的父类实现,assembly是一个模版方法,这是我们可以学习的,因为订阅者发布者有相同的点。所以可以通过模版方法实现。

public SyncTask assemblySyncTask() {
   readLock.lock();
   try {
      SyncTask syncTask = new SyncTask();
      syncTask.setRequestId(requestId);
      syncTask.setRequest(assembly());
      syncTask.setDone(isDone());
      return syncTask;
   } finally {
      readLock.unlock();
   }
}
复制代码

模版方法如下图

最后通过bolt发送到session服务。客户端的jar就做了这些。我们接下来看session如何处理。

session架构细节

上面已经介绍过,session是为了减少dataServer的连接压力而设计的。并且他还会对数据缓存。提高访问效率。session是可以动态扩容的。 session的实现其实很简单,一方面是和dataServer进行通信,存储和拉取数据。一方面给客户端提供服务。 所以它主要包括remoting(进行通信、协议接收处理),strategy(对应时间处理策略),task-listener模式处理任务。cache(data缓存)等。

我们直接顺着上面的逻辑看session如何处理客户端注册请求

bolt的rpc,处理协议的handler只需要继承SyncUserProcessor即可。sofa-registry给了两个适配器SyncUserProcessorAdapter、AsyncUserProcessorAdapter。SyncUserProcessorAdapter将实现委托给ChannelHandler。具体大家可以去看代码。为什么这样做,其实还是为了扩展SyncUserProcessor。以及更好的设计代码。

比如业务的协议处理器只需要实现ChannelHandler。最后统一注册就可以了。希望大家可以学到适配器模式带来的好处。

PublisherHandler#reply

收到客户端的注册请求后。会调用这个方法。

public Object reply(Channel channel, Object message) throws RemotingException {

   RegisterResponse result = new RegisterResponse();
   PublisherRegister publisherRegister = (PublisherRegister) message;
   publisherHandlerStrategy.handlePublisherRegister(channel, publisherRegister, result);
   return result;
}
复制代码

从这个设计上看,会调用 publisherHandlerStrategy的方法处理。逻辑如下:

1.封装数据到Publisher

2.注册(取消注册)这个Publisher

SessionRegistry#register

这个方法的作用就是注册发布者和订阅者。

这里将逻辑封装到WrapperInvocation中。这个类的作用就是在执行方法逻辑之前,构造一个调用链。类似于filter。也是一种设计方法。

我们关注和执行逻辑。其实就是通过switch,case对不同的请求类型执行对应的逻辑。这里我们关注PUBLISHER类型。收到客户端的注册请求后。会调用这个方法。

Publisher publisher = (Publisher) storeData;
sessionDataStore.add(publisher);
writeDataAcceptor.accept(new WriteDataRequest() {
   @Override
   public Object getRequestBody() {
      return publisher;
   }
   @Override
   public WriteDataRequestType getRequestType() {
      return WriteDataRequestType.PUBLISHER;
   }
   @Override
   public String getConnectId() {
      return publisher.getSourceAddress().getAddressString() + ValueConstants.CONNECT_ID_SPLIT
            + publisher.getTargetAddress().getAddressString();
   }
   @Override
   public String getDataServerIP() {
      Node dataNode = dataNodeManager.getNode(publisher.getDataInfoId());
      return dataNode.getNodeUrl().getIpAddress();
   }
});
sessionRegistryStrategy.afterPublisherRegister(publisher);
复制代码

1.将改publisher加入sessionDataStore,底层其实就是map,然后进行了并发控制。如果存在记录,则升级版本。

这里说一下为什么要保存这份数据,其实就是为了后续跟DataServer做检查。

2.写数据到dataserver,这里构造了WriteDataRequest。

通过WriteDataAcceptor的accept方法写数据。

public void accept(WriteDataRequest request) {
    String connectId = request.getConnectId();
    WriteDataProcessor writeDataProcessor = writeDataProcessors.computeIfAbsent(connectId,
            key -> new WriteDataProcessor(connectId, taskListenerManager, sessionServerConfig, renewService));

    writeDataProcessor.process(request);
}
复制代码

每个节点都会生成一个WriteDataProcessor对象,最后调用process方法。taskListenerManager主要就是任务监听器。这里使用监听者模式,

public void process(WriteDataRequest request) {
   // record the last update time by pub/unpub
   if (isWriteRequest(request)) {
      refreshUpdateTime(request.getDataServerIP());
   }

   if (request.getRequestType() == WriteDataRequestType.DATUM_SNAPSHOT) {
      // snapshot has high priority, so handle directly
      doHandle(request);
   } else {
      // If locked, insert the queue;
      //
      if (writeDataLock.get()) {
         addQueue(request);
      } else {
         flushQueue();
         doHandle(request);
      }
   }
}
复制代码

如果被锁了直接将请求加入队列。否则执行队列任务,并处理当前请求。只有执行数据快照请求的时候才会上锁。一般情况直接走else请求。

flushQueue逻辑

private void flushQueue() {
   if (RENEW_LOGGER.isDebugEnabled()) {
      RENEW_LOGGER.debug("flushQueue: connectId={}", connectId);
   }
   while (!acceptorQueue.isEmpty()) {
      WriteDataRequest writeDataRequest = acceptorQueue.poll();
      if (writeDataRequest == null) {
         break;
      }
      acceptorQueueSize.decrementAndGet();
      doHandle(writeDataRequest);
   }
}
复制代码

这段代码其实就是通过并发队列(ConcurrentLinkedQueue)以及原子变量来进行并发控制的。原子变量保存队列长度(是否到达最大值),避免内存泄露。因为ConcurrentLinkedQueue的size方法并不靠谱,或者说是时间复杂度比价高。

我们继续来看执行任务的方法。其实就是通过switch不同的任务,去调用不同的方法。这里通过task-listener实现。我们直接看对应Listener。

public void handleEvent(TaskEvent event) {

    SessionTask publishDataTask = new PublishDataTask(dataNodeService);
    publishDataTask.setTaskEvent(event);
    executorManager.getPublishDataExecutor().execute(()-> dataNodeSingleTaskProcessor.process(publishDataTask));
}
复制代码

这里会构建一个PublishDataTask(SessionTask),然后异步调用执行dataNodeSingleTaskProcessor.process。这里为什么要有dataNodeSingleTaskProcessor。而不直接在Listener中调用,其实就是为了解耦合。因为Listener只是监听,然后交给对应的dataNodeSingleTaskProcessor。dataNodeSingleTaskProcessor会构建请求发送到Data服务器。

DataServer如何处理Publisher的注册请求?

dataserver收到数据后,会处理数据,封装成一个DataChangeEvent,然后根据datainfo,hash到对应的eventQueue。

DataChangeEventQueue#handleDatum

这个方法,先将对应的ChangeData加入到CHANGE_QUEUE。这个队列下面会说。 然后会将数据加入缓存。如果缓存没有,直接加入,如果有缓存,那么根据版本判断是否要替换。

DataChangeHandler#start

public void start() {
    DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues();
    int queueCount = queues.length;
    Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
    Executor notifyExecutor = ExecutorFactory
            .newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName());
    for (int idx = 0; idx < queueCount; idx++) {
        final DataChangeEventQueue dataChangeEventQueue = queues[idx];
        final String name = dataChangeEventQueue.getName();
        executor.execute(() -> {
            while (true) {
                try {
                    final ChangeData changeData = dataChangeEventQueue.take();
                    notifyExecutor.execute(new ChangeNotifier(changeData, name));
                } catch (Throwable e) {
                    LOGGER.error("[DataChangeHandler][{}] notify scheduler error", name, e);
                }
            }
        });
        LOGGER_START.info("[DataChangeHandler] notify datum in queue:{} success", name);
    }
}
复制代码

上面说过,ChangeData会加入CHANGE_QUEUE。DataChangeHandler启动的时候会去处理eventQueue的CHANGE_QUEUE。每个队列会有一个线程去处理。

ChangeNotifier

这个是我们需要关注的。数据变化的时候dataserver如何处理?

其实逻辑很简单,就是将数据加入缓存。然后执行dataChangeNotifiers的回调。缓存存在chm中。

/**
 * row:     dataCenter
 * column:  dataInfoId
 * value:   datum
 */
protected final Map<String, Map<String, Datum>> DATUM_MAP = new ConcurrentHashMap<>();
复制代码

回调如下:

  • sessionServerNotifier:通知session节点变化
  • backUpNotifier:备份数据
  • snapshotBackUpNotifier:快照备份。
@Bean(name = "dataChangeNotifiers")
public List<IDataChangeNotifier> dataChangeNotifiers() {
    List<IDataChangeNotifier> list = new ArrayList<>();
    list.add(sessionServerNotifier());
    list.add(tempPublisherNotifier());
    list.add(backUpNotifier());
    list.add(snapshotBackUpNotifier());
    return list;
}
复制代码

sessionServerNotifier会获取到所有session,然后发送数据变化请求。通知session来拉数据。

总结

整个实现比较简单(可能是框架使用了spring),文章也只是将整个链路串起来了。session的细节没有分析。但是服务注册也不复杂。seesion通过一致哈心选取dataserver,发送服务注册数据。dataserver接收到数据存储,并通过dataChangeNotifiers通知数据变化。所以session和dataserver是推拉结合的。后期会专注dataserver和session的细节。适可而止了。