聊聊skywalking的metric-exporter

1,433 阅读3分钟

本文主要研究一下skywalking的metric-exporter

metric-exporter.proto

skywalking-6.6.0/oap-server/exporter/src/main/proto/metric-exporter.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "org.apache.skywalking.oap.server.exporter.grpc";


service MetricExportService {
    rpc export (stream ExportMetricValue) returns (ExportResponse) {
    }

    rpc subscription (SubscriptionReq) returns (SubscriptionsResp) {
    }
}

message ExportMetricValue {
    string metricName = 1;
    string entityName = 2;
    string entityId = 3;
    ValueType type = 4;
    int64 timeBucket = 5;
    int64 longValue = 6;
    double doubleValue = 7;
}

message SubscriptionsResp {
    repeated string metricNames = 1;
}

enum ValueType {
    LONG = 0;
    DOUBLE = 1;
}

message SubscriptionReq {

}

message ExportResponse {
}
  • metric-exporter.proto定义了MetricExportService服务,它有export、subscription两个rpc方法

GRPCExporterSetting

skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterSetting.java

@Setter
@Getter
public class GRPCExporterSetting extends ModuleConfig {
    private String targetHost;
    private int targetPort;
    private int bufferChannelSize = 20000;
    private int bufferChannelNum = 2;
}
  • GRPCExporterSetting定义了targetHost、targetPort、bufferChannelSize、bufferChannelNum属性

GRPCExporterProvider

skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java

public class GRPCExporterProvider extends ModuleProvider {
    private GRPCExporterSetting setting;
    private GRPCExporter exporter;

    @Override public String name() {
        return "grpc";
    }

    @Override public Class<? extends ModuleDefine> module() {
        return ExporterModule.class;
    }

    @Override public ModuleConfig createConfigBeanIfAbsent() {
        setting = new GRPCExporterSetting();
        return setting;
    }

    @Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {
        exporter = new GRPCExporter(setting);
        this.registerServiceImplementation(MetricValuesExportService.class, exporter);
    }

    @Override public void start() throws ServiceNotProvidedException, ModuleStartException {

    }

    @Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
        ModuleServiceHolder serviceHolder = getManager().find(CoreModule.NAME).provider();
        exporter.setServiceInventoryCache(serviceHolder.getService(ServiceInventoryCache.class));
        exporter.setServiceInstanceInventoryCache(serviceHolder.getService(ServiceInstanceInventoryCache.class));
        exporter.setEndpointInventoryCache(serviceHolder.getService(EndpointInventoryCache.class));

        exporter.initSubscriptionList();
    }

    @Override public String[] requiredModules() {
        return new String[] {CoreModule.NAME};
    }
}
  • GRPCExporterProvider继承了ModuleProvider,其prepare方法创建GRPCExporter,然后执行registerServiceImplementation;其notifyAfterCompleted方法主要是给exporter设置serviceInventoryCache、serviceInstanceInventoryCache、endpointInventoryCache,然后执行exporter.initSubscriptionList()

MetricFormatter

skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/MetricFormatter.java

@Setter
public class MetricFormatter {
    private ServiceInventoryCache serviceInventoryCache;
    private ServiceInstanceInventoryCache serviceInstanceInventoryCache;
    private EndpointInventoryCache endpointInventoryCache;

    protected String getEntityName(MetricsMetaInfo meta) {
        int scope = meta.getScope();
        if (DefaultScopeDefine.inServiceCatalog(scope)) {
            int entityId = Integer.valueOf(meta.getId());
            return serviceInventoryCache.get(entityId).getName();
        } else if (DefaultScopeDefine.inServiceInstanceCatalog(scope)) {
            int entityId = Integer.valueOf(meta.getId());
            return serviceInstanceInventoryCache.get(entityId).getName();
        } else if (DefaultScopeDefine.inEndpointCatalog(scope)) {
            int entityId = Integer.valueOf(meta.getId());
            return endpointInventoryCache.get(entityId).getName();
        } else if (scope == DefaultScopeDefine.ALL) {
            return "";
        } else {
            return null;
        }
    }
}
  • MetricFormatter提供了getEntityName方法,用于从MetricsMetaInfo提取entityName

MetricValuesExportService

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java

public interface MetricValuesExportService extends Service {
    /**
     * This method is sync-mode export, the performance effects the persistence result. Queue mode is highly recommended.
     *
     * @param event value is only accurate when the method invokes. Don't cache it.
     */
    void export(ExportEvent event);
}
  • MetricValuesExportService继承了Service,它定义了export方法

GRPCExporter

skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java

public class GRPCExporter extends MetricFormatter implements MetricValuesExportService, IConsumer<GRPCExporter.ExportData> {
    private static final Logger logger = LoggerFactory.getLogger(GRPCExporter.class);

    private GRPCExporterSetting setting;
    private final MetricExportServiceGrpc.MetricExportServiceStub exportServiceFutureStub;
    private final MetricExportServiceGrpc.MetricExportServiceBlockingStub blockingStub;
    private final DataCarrier exportBuffer;
    private final Set<String> subscriptionSet;

    public GRPCExporter(GRPCExporterSetting setting) {
        this.setting = setting;
        GRPCClient client = new GRPCClient(setting.getTargetHost(), setting.getTargetPort());
        client.connect();
        ManagedChannel channel = client.getChannel();
        exportServiceFutureStub = MetricExportServiceGrpc.newStub(channel);
        blockingStub = MetricExportServiceGrpc.newBlockingStub(channel);
        exportBuffer = new DataCarrier<ExportData>(setting.getBufferChannelNum(), setting.getBufferChannelSize());
        exportBuffer.consume(this, 1, 200);
        subscriptionSet = new HashSet<>();
    }

    @Override public void export(ExportEvent event) {
        if (ExportEvent.EventType.TOTAL == event.getType()) {
            Metrics metrics = event.getMetrics();
            if (metrics instanceof WithMetadata) {
                MetricsMetaInfo meta = ((WithMetadata)metrics).getMeta();
                if (subscriptionSet.size() == 0 || subscriptionSet.contains(meta.getMetricsName())) {
                    exportBuffer.produce(new ExportData(meta, metrics));
                }
            }
        }
    }

    public void initSubscriptionList() {
        SubscriptionsResp subscription = blockingStub.withDeadlineAfter(10, TimeUnit.SECONDS).subscription(SubscriptionReq.newBuilder().build());
        subscription.getMetricNamesList().forEach(subscriptionSet::add);
        logger.debug("Get exporter subscription list, {}", subscriptionSet);
    }

    @Override public void init() {

    }

    @Override public void consume(List<ExportData> data) {
        if (data.size() == 0) {
            return;
        }

        ExportStatus status = new ExportStatus();
        StreamObserver<ExportMetricValue> streamObserver = exportServiceFutureStub.withDeadlineAfter(10, TimeUnit.SECONDS).export(
            new StreamObserver<ExportResponse>() {
                @Override public void onNext(ExportResponse response) {

                }

                @Override public void onError(Throwable throwable) {
                    status.done();
                }

                @Override public void onCompleted() {
                    status.done();
                }
            }
        );
        AtomicInteger exportNum = new AtomicInteger();
        data.forEach(row -> {
            ExportMetricValue.Builder builder = ExportMetricValue.newBuilder();

            Metrics metrics = row.getMetrics();
            if (metrics instanceof LongValueHolder) {
                long value = ((LongValueHolder)metrics).getValue();
                builder.setLongValue(value);
                builder.setType(ValueType.LONG);
            } else if (metrics instanceof IntValueHolder) {
                long value = ((IntValueHolder)metrics).getValue();
                builder.setLongValue(value);
                builder.setType(ValueType.LONG);
            } else if (metrics instanceof DoubleValueHolder) {
                double value = ((DoubleValueHolder)metrics).getValue();
                builder.setDoubleValue(value);
                builder.setType(ValueType.DOUBLE);
            } else {
                return;
            }

            MetricsMetaInfo meta = row.getMeta();
            builder.setMetricName(meta.getMetricsName());
            String entityName = getEntityName(meta);
            if (entityName == null) {
                return;
            }
            builder.setEntityName(entityName);
            builder.setEntityId(meta.getId());

            builder.setTimeBucket(metrics.getTimeBucket());

            streamObserver.onNext(builder.build());
            exportNum.getAndIncrement();
        });

        streamObserver.onCompleted();

        long sleepTime = 0;
        long cycle = 100L;
        /**
         * For memory safe of oap, we must wait for the peer confirmation.
         */
        while (!status.isDone()) {
            try {
                sleepTime += cycle;
                Thread.sleep(cycle);
            } catch (InterruptedException e) {
            }

            if (sleepTime > 2000L) {
                logger.warn("Export {} metrics to {}:{}, wait {} milliseconds.",
                    exportNum.get(), setting.getTargetHost(), setting.getTargetPort(), sleepTime);
                cycle = 2000L;
            }
        }

        logger.debug("Exported {} metrics to {}:{} in {} milliseconds.",
            exportNum.get(), setting.getTargetHost(), setting.getTargetPort(), sleepTime);
    }

    @Override public void onError(List<ExportData> data, Throwable t) {
        logger.error(t.getMessage(), t);
    }

    @Override public void onExit() {

    }

    @Getter(AccessLevel.PRIVATE)
    public class ExportData {
        private MetricsMetaInfo meta;
        private Metrics metrics;

        public ExportData(MetricsMetaInfo meta, Metrics metrics) {
            this.meta = meta;
            this.metrics = metrics;
        }
    }

    private class ExportStatus {
        private boolean done = false;

        private void done() {
            done = true;
        }

        public boolean isDone() {
            return done;
        }
    }
}
  • GRPCExporter继承了MetricFormatter,实现了MetricValuesExportService、IConsumer接口;其构造器根据GRPCExporterSetting实例化MetricExportServiceGrpc.MetricExportServiceStub以及MetricExportServiceGrpc.MetricExportServiceBlockingStub,并创建DataCarrier,然后注册自身的IConsumer到exportBuffer;其export方法主要是执行exportBuffer.produce(new ExportData(meta, metrics));其consume方法主要是构造ExportMetricValue,然后执行streamObserver.onNext

小结

metric-exporter.proto定义了MetricExportService服务,它有export、subscription两个rpc方法;GRPCExporterProvider继承了ModuleProvider,其prepare方法创建GRPCExporter,然后执行registerServiceImplementation;其notifyAfterCompleted方法主要是给exporter设置serviceInventoryCache、serviceInstanceInventoryCache、endpointInventoryCache,然后执行exporter.initSubscriptionList()

doc