聊聊skywalking的register-receiver-plugin

256 阅读3分钟

本文主要研究一下skywalking的register-receiver-plugin

RegisterModuleProvider

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/RegisterModuleProvider.java

public class RegisterModuleProvider extends ModuleProvider {

    @Override public String name() {
        return "default";
    }

    @Override public Class<? extends ModuleDefine> module() {
        return RegisterModule.class;
    }

    @Override public ModuleConfig createConfigBeanIfAbsent() {
        return null;
    }

    @Override public void prepare() {
    }

    @Override public void start() {
        GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME).provider().getService(GRPCHandlerRegister.class);
        grpcHandlerRegister.addHandler(new ApplicationRegisterHandler(getManager()));
        grpcHandlerRegister.addHandler(new InstanceDiscoveryServiceHandler(getManager()));
        grpcHandlerRegister.addHandler(new ServiceNameDiscoveryHandler(getManager()));
        grpcHandlerRegister.addHandler(new NetworkAddressRegisterServiceHandler(getManager()));

        // v2
        grpcHandlerRegister.addHandler(new RegisterServiceHandler(getManager()));
        grpcHandlerRegister.addHandler(new ServiceInstancePingServiceHandler(getManager()));

        JettyHandlerRegister jettyHandlerRegister = getManager().find(SharingServerModule.NAME).provider().getService(JettyHandlerRegister.class);
        jettyHandlerRegister.addHandler(new ApplicationRegisterServletHandler(getManager()));
        jettyHandlerRegister.addHandler(new InstanceDiscoveryServletHandler(getManager()));
        jettyHandlerRegister.addHandler(new InstanceHeartBeatServletHandler(getManager()));
        jettyHandlerRegister.addHandler(new NetworkAddressRegisterServletHandler(getManager()));
        jettyHandlerRegister.addHandler(new ServiceNameDiscoveryServiceHandler(getManager()));
    }

    @Override public void notifyAfterCompleted() {

    }

    @Override public String[] requiredModules() {
        return new String[] {CoreModule.NAME, SharingServerModule.NAME};
    }
}
  • RegisterModuleProvider继承了ModuleProvider,其start方法获取grpcHandlerRegister,并注册了ApplicationRegisterHandler、InstanceDiscoveryServiceHandler、ServiceNameDiscoveryHandler、NetworkAddressRegisterServiceHandler、RegisterServiceHandler、ServiceInstancePingServiceHandler;同时也获取jettyHandlerRegister,并注册了ApplicationRegisterServletHandler、InstanceDiscoveryServletHandler、InstanceHeartBeatServletHandler、NetworkAddressRegisterServletHandler、ServiceNameDiscoveryServiceHandler

ApplicationRegisterServletHandler

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/ApplicationRegisterServletHandler.java

public class ApplicationRegisterServletHandler extends JettyJsonHandler {

    private static final Logger logger = LoggerFactory.getLogger(ApplicationRegisterServletHandler.class);

    private final IServiceInventoryRegister serviceInventoryRegister;
    private Gson gson = new Gson();
    private static final String APPLICATION_CODE = "c";
    private static final String APPLICATION_ID = "i";

    public ApplicationRegisterServletHandler(ModuleManager moduleManager) {
        serviceInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService(IServiceInventoryRegister.class);
    }

    @Override public String pathSpec() {
        return "/application/register";
    }

    @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
        throw new UnsupportedOperationException();
    }

    @Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
        JsonArray responseArray = new JsonArray();
        try {
            JsonArray applicationCodes = gson.fromJson(req.getReader(), JsonArray.class);
            for (int i = 0; i < applicationCodes.size(); i++) {
                String applicationCode = applicationCodes.get(i).getAsString();
                int applicationId = serviceInventoryRegister.getOrCreate(applicationCode, null);
                JsonObject mapping = new JsonObject();
                mapping.addProperty(APPLICATION_CODE, applicationCode);
                mapping.addProperty(APPLICATION_ID, applicationId);
                responseArray.add(mapping);
            }
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
        }
        return responseArray;
    }
}
  • ApplicationRegisterServletHandler继承了JettyJsonHandler,其doPost方法获取applicationCode,然后执行serviceInventoryRegister.getOrCreate(applicationCode, null)

InstanceDiscoveryServletHandler

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/InstanceDiscoveryServletHandler.java

public class InstanceDiscoveryServletHandler extends JettyJsonHandler {

    private static final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServletHandler.class);

    private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister;
    private final ServiceInventoryCache serviceInventoryCache;
    private final Gson gson = new Gson();

    private static final String APPLICATION_ID = "ai";
    private static final String AGENT_UUID = "au";
    private static final String REGISTER_TIME = "rt";
    private static final String INSTANCE_ID = "ii";
    private static final String OS_INFO = "oi";

    public InstanceDiscoveryServletHandler(ModuleManager moduleManager) {
        this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
        this.serviceInstanceInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService(IServiceInstanceInventoryRegister.class);
    }

    @Override public String pathSpec() {
        return "/instance/register";
    }

    @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
        throw new UnsupportedOperationException();
    }

    @Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
        JsonObject responseJson = new JsonObject();
        try {
            JsonObject instance = gson.fromJson(req.getReader(), JsonObject.class);
            int applicationId = instance.get(APPLICATION_ID).getAsInt();
            String agentUUID = instance.get(AGENT_UUID).getAsString();
            long registerTime = instance.get(REGISTER_TIME).getAsLong();
            JsonObject osInfoJson = instance.get(OS_INFO).getAsJsonObject();

            List<String> ipv4sList = new ArrayList<>();
            JsonArray ipv4s = osInfoJson.get("ipv4s").getAsJsonArray();
            ipv4s.forEach(ipv4 -> ipv4sList.add(ipv4.getAsString()));

            ServiceInventory serviceInventory = serviceInventoryCache.get(applicationId);

            JsonObject instanceProperties = new JsonObject();
            instanceProperties.addProperty(ServiceInstanceInventory.PropertyUtil.HOST_NAME, osInfoJson.get("hostName").getAsString());
            instanceProperties.addProperty(ServiceInstanceInventory.PropertyUtil.OS_NAME, osInfoJson.get("osName").getAsString());
            instanceProperties.addProperty(ServiceInstanceInventory.PropertyUtil.PROCESS_NO, osInfoJson.get("processId").getAsInt() + "");
            instanceProperties.addProperty(ServiceInstanceInventory.PropertyUtil.IPV4S, ServiceInstanceInventory.PropertyUtil.ipv4sSerialize(ipv4sList));

            String instanceName = serviceInventory.getName();
            if (instanceProperties.has(PROCESS_NO)) {
                instanceName += "-pid:" + instanceProperties.get(PROCESS_NO).getAsString();
            }
            if (instanceProperties.has(HOST_NAME)) {
                instanceName += "@" + instanceProperties.get(HOST_NAME).getAsString();
            }

            int instanceId = serviceInstanceInventoryRegister.getOrCreate(applicationId, instanceName, agentUUID, registerTime, instanceProperties);
            responseJson.addProperty(APPLICATION_ID, applicationId);
            responseJson.addProperty(INSTANCE_ID, instanceId);
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
        }
        return responseJson;
    }
}
  • InstanceDiscoveryServletHandler继承了JettyJsonHandler,其doPost方法构造instanceProperties然后执行serviceInstanceInventoryRegister.getOrCreate(applicationId, instanceName, agentUUID, registerTime, instanceProperties)

InstanceHeartBeatServletHandler

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/InstanceHeartBeatServletHandler.java

public class InstanceHeartBeatServletHandler extends JettyJsonHandler {

    private static final Logger logger = LoggerFactory.getLogger(InstanceHeartBeatServletHandler.class);

    private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister;
    private final ServiceInstanceInventoryCache serviceInstanceInventoryCache;
    private final IServiceInventoryRegister serviceInventoryRegister;
    private final Gson gson = new Gson();

    private static final String INSTANCE_ID = "ii";
    private static final String HEARTBEAT_TIME = "ht";

    public InstanceHeartBeatServletHandler(ModuleManager moduleManager) {
        this.serviceInstanceInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService(IServiceInstanceInventoryRegister.class);
        this.serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);
        this.serviceInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService(IServiceInventoryRegister.class);
    }

    @Override public String pathSpec() {
        return "/instance/heartbeat";
    }

    @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
        throw new UnsupportedOperationException();
    }

    @Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException, IOException {
        JsonObject responseJson = new JsonObject();
        try {
            JsonObject heartBeat = gson.fromJson(req.getReader(), JsonObject.class);
            int instanceId = heartBeat.get(INSTANCE_ID).getAsInt();
            long heartBeatTime = heartBeat.get(HEARTBEAT_TIME).getAsLong();

            serviceInstanceInventoryRegister.heartbeat(instanceId, heartBeatTime);
            ServiceInstanceInventory serviceInstanceInventory = serviceInstanceInventoryCache.get(instanceId);
            if (Objects.nonNull(serviceInstanceInventory)) {
                serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime);
            } else {
                logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId);
            }
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
        }
        return responseJson;
    }
}
  • InstanceHeartBeatServletHandler继承了JettyJsonHandler,其doPost方法执行serviceInstanceInventoryRegister.heartbeat(instanceId, heartBeatTime),若serviceInstanceInventory不为null则执行serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime)

NetworkAddressRegisterServletHandler

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/NetworkAddressRegisterServletHandler.java

public class NetworkAddressRegisterServletHandler extends JettyJsonHandler {

    private static final Logger logger = LoggerFactory.getLogger(NetworkAddressRegisterServletHandler.class);

    private final INetworkAddressInventoryRegister networkAddressInventoryRegister;
    private Gson gson = new Gson();
    private static final String NETWORK_ADDRESS = "n";
    private static final String ADDRESS_ID = "i";

    public NetworkAddressRegisterServletHandler(ModuleManager moduleManager) {
        this.networkAddressInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService(INetworkAddressInventoryRegister.class);
    }

    @Override public String pathSpec() {
        return "/networkAddress/register";
    }

    @Override protected JsonElement doGet(HttpServletRequest req) {
        throw new UnsupportedOperationException();
    }

    @Override protected JsonElement doPost(HttpServletRequest req) {
        JsonArray responseArray = new JsonArray();
        try {
            JsonArray networkAddresses = gson.fromJson(req.getReader(), JsonArray.class);
            for (int i = 0; i < networkAddresses.size(); i++) {
                String networkAddress = networkAddresses.get(i).getAsString();

                if (logger.isDebugEnabled()) {
                    logger.debug("network getAddress register, network getAddress: {}", networkAddress);
                }

                int addressId = networkAddressInventoryRegister.getOrCreate(networkAddress, null);
                JsonObject mapping = new JsonObject();
                mapping.addProperty(ADDRESS_ID, addressId);
                mapping.addProperty(NETWORK_ADDRESS, networkAddress);
                responseArray.add(mapping);
            }
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
        }
        return responseArray;
    }
}
  • NetworkAddressRegisterServletHandler继承了JettyJsonHandler,其doPost执行networkAddressInventoryRegister.getOrCreate(networkAddress, null)

ServiceNameDiscoveryServiceHandler

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/ServiceNameDiscoveryServiceHandler.java

public class ServiceNameDiscoveryServiceHandler extends JettyJsonHandler {

    private static final Logger logger = LoggerFactory.getLogger(ServiceNameDiscoveryServiceHandler.class);

    private final IEndpointInventoryRegister inventoryService;
    private final Gson gson = new Gson();

    private static final String APPLICATION_ID = "ai";
    private static final String SERVICE_NAME = "sn";
    private static final String SRC_SPAN_TYPE = "st";
    private static final String SERVICE_ID = "si";
    private static final String ELEMENT = "el";

    public ServiceNameDiscoveryServiceHandler(ModuleManager moduleManager) {
        this.inventoryService = moduleManager.find(CoreModule.NAME).provider().getService(IEndpointInventoryRegister.class);
    }

    @Override public String pathSpec() {
        return "/servicename/discovery";
    }

    @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
        throw new UnsupportedOperationException();
    }

    @Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
        JsonArray responseArray = new JsonArray();
        try {
            JsonArray services = gson.fromJson(req.getReader(), JsonArray.class);
            for (JsonElement service : services) {
                int applicationId = service.getAsJsonObject().get(APPLICATION_ID).getAsInt();
                String serviceName = service.getAsJsonObject().get(SERVICE_NAME).getAsString();
                int srcSpanType = service.getAsJsonObject().get(SRC_SPAN_TYPE).getAsInt();

                SpanType spanType = SpanType.forNumber(srcSpanType);
                if (Objects.nonNull(spanType)) {
                    int serviceId = inventoryService.getOrCreate(applicationId, serviceName, DetectPoint.fromSpanType(spanType));
                    if (serviceId != 0) {
                        JsonObject responseJson = new JsonObject();
                        responseJson.addProperty(SERVICE_ID, serviceId);
                        responseJson.add(ELEMENT, service);
                        responseArray.add(responseJson);
                    }
                }
            }
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
        }
        return responseArray;
    }
}
  • ServiceNameDiscoveryServiceHandler继承了JettyJsonHandler,其doPost方法执行inventoryService.getOrCreate(applicationId, serviceName, DetectPoint.fromSpanType(spanType))

小结

RegisterModuleProvider继承了ModuleProvider,其start方法获取grpcHandlerRegister,并注册了ApplicationRegisterHandler、InstanceDiscoveryServiceHandler、ServiceNameDiscoveryHandler、NetworkAddressRegisterServiceHandler、RegisterServiceHandler、ServiceInstancePingServiceHandler;同时也获取jettyHandlerRegister,并注册了ApplicationRegisterServletHandler、InstanceDiscoveryServletHandler、InstanceHeartBeatServletHandler、NetworkAddressRegisterServletHandler、ServiceNameDiscoveryServiceHandler

doc