聊聊skywalking的cluster-nacos-plugin

214 阅读1分钟

本文主要研究一下skywalking的cluster-nacos-plugin

ClusterModuleNacosConfig

skywalking-6.6.0/oap-server/server-cluster-plugin/cluster-nacos-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/nacos/ClusterModuleNacosConfig.java

public class ClusterModuleNacosConfig extends ModuleConfig {
    @Setter @Getter private String serviceName;
    @Setter @Getter private String hostPort;
    @Setter @Getter private String namespace = "public";
}
  • ClusterModuleNacosConfig定义了serviceName、hostPort、namespace属性

ClusterModuleNacosProvider

skywalking-6.6.0/oap-server/server-cluster-plugin/cluster-nacos-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/nacos/ClusterModuleNacosProvider.java

public class ClusterModuleNacosProvider extends ModuleProvider {

    private final ClusterModuleNacosConfig config;
    private NamingService namingService;

    public ClusterModuleNacosProvider() {
        super();
        this.config = new ClusterModuleNacosConfig();
    }

    @Override
    public String name() {
        return "nacos";
    }

    @Override
    public Class<? extends ModuleDefine> module() {
        return ClusterModule.class;
    }

    @Override
    public ModuleConfig createConfigBeanIfAbsent() {
        return config;
    }

    @Override
    public void prepare() throws ServiceNotProvidedException, ModuleStartException {
        try {
            Properties properties = new Properties();
            properties.put(PropertyKeyConst.SERVER_ADDR, config.getHostPort());
            properties.put(PropertyKeyConst.NAMESPACE, config.getNamespace());
            namingService = NamingFactory.createNamingService(properties);
        } catch (Exception e) {
            throw new ModuleStartException(e.getMessage(), e);
        }
        NacosCoordinator coordinator = new NacosCoordinator(namingService, config);
        this.registerServiceImplementation(ClusterRegister.class, coordinator);
        this.registerServiceImplementation(ClusterNodesQuery.class, coordinator);
    }

    @Override
    public void start() throws ServiceNotProvidedException {

    }

    @Override
    public void notifyAfterCompleted() throws ServiceNotProvidedException {

    }

    @Override
    public String[] requiredModules() {
        return new String[]{CoreModule.NAME};
    }
}
  • ClusterModuleNacosProvider继承了ModuleProvider,其prepare方法创建NamingService及NacosCoordinator,然后将NacosCoordinator注册为ClusterRegister及ClusterNodesQuery的实现

NacosCoordinator

skywalking-6.6.0/oap-server/server-cluster-plugin/cluster-nacos-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/nacos/NacosCoordinator.java

public class NacosCoordinator implements ClusterRegister, ClusterNodesQuery {

    private final NamingService namingService;
    private final ClusterModuleNacosConfig config;
    private volatile Address selfAddress;

    public NacosCoordinator(NamingService namingService, ClusterModuleNacosConfig config) {
        this.namingService = namingService;
        this.config = config;
    }

    @Override
    public List<RemoteInstance> queryRemoteNodes() {
        List<RemoteInstance> result = new ArrayList<>();
        try {
            List<Instance> instances = namingService.selectInstances(config.getServiceName(), true);
            if (CollectionUtils.isNotEmpty(instances)) {
                instances.forEach(instance -> {
                    Address address = new Address(instance.getIp(), instance.getPort(), false);
                    if (address.equals(selfAddress)) {
                        address.setSelf(true);
                    }
                    result.add(new RemoteInstance(address));
                });
            }
        } catch (NacosException e) {
            throw new ServiceQueryException(e.getErrMsg());
        }
        return result;
    }

    @Override
    public void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
        String host = remoteInstance.getAddress().getHost();
        int port = remoteInstance.getAddress().getPort();
        try {
            namingService.registerInstance(config.getServiceName(), host, port);
        } catch (Exception e) {
            throw new ServiceRegisterException(e.getMessage());
        }
        this.selfAddress = remoteInstance.getAddress();
        TelemetryRelatedContext.INSTANCE.setId(selfAddress.toString());
    }
}
  • NacosCoordinator实现了ClusterRegister、ClusterNodesQuery接口,其queryRemoteNodes方法通过namingService.selectInstances(config.getServiceName(), true)获取instances;其registerRemote方法则通过namingService.registerInstance(config.getServiceName(), host, port)进行注册

小结

ClusterModuleNacosProvider继承了ModuleProvider,其prepare方法创建NamingService及NacosCoordinator,然后将NacosCoordinator注册为ClusterRegister及ClusterNodesQuery的实现

doc