Seata源码(十五)TM注册

202 阅读3分钟

banner窄.png

铿然架构  |  作者  /  铿然一叶 这是铿然架构的第 105 篇原创文章

码字不易,先赞后看!!!

1. 概念

1.1 TC (Transaction Coordinator) - 事务协调者

维护全局和分支事务的状态,驱动全局事务提交或回滚。

1.2 TM (Transaction Manager) - 事务管理器

定义全局事务的范围:开始全局事务、提交或回滚全局事务。

2. 客户端源码分析

2.1 TM注册请求类结构

类结构如下:

image.png

RM注册基本和RM注册一样,比RM要简单很多,只有一条路径,从第标号3之后的流程完全一样。下面只看差异部分。

2.2 TMClient初始化

TMClient.java

    public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
        TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);
        tmNettyRemotingClient.init();
    }

2.3 TmNettyRemotingClient初始化

没什么好说的,要说有就是采用同样的方式在调用父类构造器时将getPoolKeyFunction函数传入了NettyClientChannelManager的构造参数,已在后面可以调用该概述构造RM请求消息:

AbstractNettyRemotingClient.java

    public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,
                                       ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {
        super(messageExecutor);
        this.transactionRole = transactionRole;
        clientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole);
        clientBootstrap.setChannelHandlers(new ClientHandler());
        // 这里的getPoolKeyFunction方法是其子类TmNettyRemotingClient的方法
        clientChannelManager = new NettyClientChannelManager(
            new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig);
    }

2.4 构造请求消息

构造RegisterTMRequest对象:

TmNettyRemotingClient.java

    protected Function<String, NettyPoolKey> getPoolKeyFunction() {
        return severAddress -> {
            RegisterTMRequest message = new RegisterTMRequest(applicationId, transactionServiceGroup, getExtraData());
            return new NettyPoolKey(NettyPoolKey.TransactionRole.TMROLE, severAddress, message);
        };
    }

ExtraData数据:

TmNettyRemotingClient.java

   private String getExtraData() {
        String ip = NetUtil.getLocalIp();
        String timestamp = String.valueOf(System.currentTimeMillis());
        String digestSource;
        if (StringUtils.isEmpty(ip)) {
            digestSource = transactionServiceGroup + ",127.0.0.1," + timestamp;
        } else {
            digestSource = transactionServiceGroup + "," + ip + "," + timestamp;
        }
        String digest = signer.sign(digestSource, secretKey);
        StringBuilder sb = new StringBuilder();
        sb.append(RegisterTMRequest.UDATA_AK).append(EXTRA_DATA_KV_CHAR).append(accessKey).append(EXTRA_DATA_SPLIT_CHAR);
        sb.append(RegisterTMRequest.UDATA_DIGEST).append(EXTRA_DATA_KV_CHAR).append(digest).append(EXTRA_DATA_SPLIT_CHAR);
        sb.append(RegisterTMRequest.UDATA_TIMESTAMP).append(EXTRA_DATA_KV_CHAR).append(timestamp).append(EXTRA_DATA_SPLIT_CHAR);
        return sb.toString();
    }

2.5 注册成功处理

成功处理入口:

TmNettyRemotingClient.java

    public void onRegisterMsgSuccess(String serverAddress, Channel channel, Object response,
                                     AbstractMessage requestMessage) {
        RegisterTMRequest registerTMRequest = (RegisterTMRequest) requestMessage;
        RegisterTMResponse registerTMResponse = (RegisterTMResponse) response;
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("register TM success. client version:{}, server version:{},channel:{}", registerTMRequest.getVersion(), registerTMResponse.getVersion(), channel);
        }
        getClientChannelManager().registerChannel(serverAddress, channel);
    }

缓存server地址和channel的映射关系:

NettyClientChannelManager.java

    void registerChannel(final String serverAddress, final Channel channel) {
        Channel channelToServer = channels.get(serverAddress);
        if (channelToServer != null && channelToServer.isActive()) {
            return;
        }
        channels.put(serverAddress, channel);
    }

2.6 注册失败处理

打印日志,抛出异常:

TmNettyRemotingClient.java

    public void onRegisterMsgFail(String serverAddress, Channel channel, Object response,
                                  AbstractMessage requestMessage) {
        RegisterTMRequest registerTMRequest = (RegisterTMRequest) requestMessage;
        RegisterTMResponse registerTMResponse = (RegisterTMResponse) response;
        String errMsg = String.format(
                "register TM failed. client version: %s,server version: %s, errorMsg: %s, " + "channel: %s", registerTMRequest.getVersion(), registerTMResponse.getVersion(), registerTMResponse.getMsg(), channel);
        throw new FrameworkException(errMsg);
    }

3. 服务端源码分析

类结构如下:

image.png

其类结构和RM注册服务端也基本完全一样,只是RegTmProcessor替换了RegRmProcessor类,另外注册chanel的逻辑有区别:

ChannelManager.java

    public static void registerTMChannel(RegisterTMRequest request, Channel channel)
        throws IncompatibleVersionException {
        Version.checkVersion(request.getVersion());
        RpcContext rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.TMROLE, request.getVersion(),
            request.getApplicationId(),
            request.getTransactionServiceGroup(),
            null, channel);
        rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
        String clientIdentified = rpcContext.getApplicationId() + Constants.CLIENT_ID_SPLIT_CHAR
            + ChannelUtil.getClientIpFromChannel(channel);
        ConcurrentMap<Integer, RpcContext> clientIdentifiedMap = CollectionUtils.computeIfAbsent(TM_CHANNELS,
            clientIdentified, key -> new ConcurrentHashMap<>());
        rpcContext.holdInClientChannels(clientIdentifiedMap);
    }

end.


其他阅读:

萌新快速成长之路
如何编写软件设计文档
JAVA编程思想(一)通过依赖注入增加扩展性
JAVA编程思想(二)如何面向接口编程
JAVA编程思想(三)去掉别扭的if,自注册策略模式优雅满足开闭原则
JAVA编程思想(四)Builder模式经典范式以及和工厂模式如何选?
Java编程思想(七)使用组合和继承的场景
JAVA基础(一)简单、透彻理解内部类和静态内部类
JAVA基础(二)内存优化-使用Java引用做缓存
JAVA基础(三)ClassLoader实现热加载
JAVA基础(四)枚举(enum)和常量定义,工厂类使用对比
JAVA基础(五)函数式接口-复用,解耦之利刃