聊聊skywalking的sharding-sphere-plugin

770 阅读2分钟

本文主要研究一下skywalking的sharding-sphere-plugin

skywalking-plugin.def

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/sharding-sphere-4.x-plugin/src/main/resources/skywalking-plugin.def

sharding-sphere-4.x=org.apache.skywalking.apm.plugin.shardingsphere.v4.define.ProxyRootInvokeInstrumentation
sharding-sphere-4.x=org.apache.skywalking.apm.plugin.shardingsphere.v4.define.JDBCRootInvokeInstrumentation
sharding-sphere-4.x=org.apache.skywalking.apm.plugin.shardingsphere.v4.define.ParseInstrumentation
sharding-sphere-4.x=org.apache.skywalking.apm.plugin.shardingsphere.v4.define.ExecuteInstrumentation
  • skywalking的sharding-sphere-plugin提供了ProxyRootInvokeInstrumentation、JDBCRootInvokeInstrumentation、ParseInstrumentation、ExecuteInstrumentation这几个增强

ProxyRootInvokeInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/sharding-sphere-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/shardingsphere/v4/define/ProxyRootInvokeInstrumentation.java

public class ProxyRootInvokeInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
    
    private static final String ENHANCE_CLASS = "org.apache.shardingsphere.shardingproxy.frontend.command.CommandExecutorTask";
    
    private static final String PROXY_ROOT_INVOKE_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.shardingsphere.v4.ProxyRootInvokeInterceptor";
    
    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[]{
            new InstanceMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return named("run");
                }
                
                @Override
                public String getMethodsInterceptor() {
                    return PROXY_ROOT_INVOKE_INTERCEPTOR_CLASS;
                }
                
                @Override
                public boolean isOverrideArgs() {
                    return false;
                }
            }
        };
    }
    
    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return new ConstructorInterceptPoint[0];
    }
    
    @Override
    protected ClassMatch enhanceClass() {
        return NameMatch.byName(ENHANCE_CLASS);
    }
}
  • ProxyRootInvokeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.shardingsphere.v4.ProxyRootInvokeInterceptor增强org.apache.shardingsphere.shardingproxy.frontend.command.CommandExecutorTask的run方法

ProxyRootInvokeInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/sharding-sphere-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/shardingsphere/v4/ProxyRootInvokeInterceptor.java

public class ProxyRootInvokeInterceptor implements InstanceMethodsAroundInterceptor {
    
    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) {
        ContextManager.createLocalSpan("/ShardingSphere/ProxyRootInvoke/").setComponent(ComponentsDefine.SHARDING_SPHERE);
        ShardingExecuteDataMap.getDataMap().put(Constant.CONTEXT_SNAPSHOT, ContextManager.capture());
    }
    
    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) {
        ContextManager.stopSpan();
        return ret;
    }
    
    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
        ContextManager.activeSpan().errorOccurred().log(t);
    }
}
  • ProxyRootInvokeInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法执行ContextManager.createLocalSpan,并将ContextManager.capture()放入到ShardingExecuteDataMap.getDataMap()中;其afterMethod执行ContextManager.stopSpan();其handleMethodException方法执行ContextManager.activeSpan().errorOccurred().log(t)

JDBCRootInvokeInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/sharding-sphere-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/shardingsphere/v4/define/JDBCRootInvokeInstrumentation.java

public class JDBCRootInvokeInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
    
    private static final String ENHANCE_CLASS = "org.apache.shardingsphere.shardingjdbc.executor.AbstractStatementExecutor";
    
    private static final String JDBC_ROOT_INVOKE_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.shardingsphere.v4.JDBCRootInvokeInterceptor";
    
    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[]{
            new InstanceMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return named("executeCallback");
                }
                
                @Override
                public String getMethodsInterceptor() {
                    return JDBC_ROOT_INVOKE_INTERCEPTOR_CLASS;
                }
                
                @Override
                public boolean isOverrideArgs() {
                    return false;
                }
            }
        };
    }
    
    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return new ConstructorInterceptPoint[0];
    }
    
    @Override
    protected ClassMatch enhanceClass() {
        return NameMatch.byName(ENHANCE_CLASS);
    }
}
  • JDBCRootInvokeInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.shardingsphere.v4.JDBCRootInvokeInterceptor增强了org.apache.shardingsphere.shardingjdbc.executor.AbstractStatementExecutor的executeCallback方法

JDBCRootInvokeInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/sharding-sphere-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/shardingsphere/v4/JDBCRootInvokeInterceptor.java

public class JDBCRootInvokeInterceptor implements InstanceMethodsAroundInterceptor {
    
    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) {
        ContextManager.createLocalSpan("/ShardingSphere/JDBCRootInvoke/").setComponent(ComponentsDefine.SHARDING_SPHERE);
        ShardingExecuteDataMap.getDataMap().put(Constant.CONTEXT_SNAPSHOT, ContextManager.capture());
    }
    
    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) {
        ContextManager.stopSpan();
        return ret;
    }
    
    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
        ContextManager.activeSpan().errorOccurred().log(t);
    }
}
  • JDBCRootInvokeInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法执行ContextManager.createLocalSpan,并将ContextManager.capture()放入到ShardingExecuteDataMap.getDataMap()中;其afterMethod执行ContextManager.stopSpan();其handleMethodException方法执行ContextManager.activeSpan().errorOccurred().log(t)

ParseInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/sharding-sphere-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/shardingsphere/v4/define/ParseInstrumentation.java

public class ParseInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
    
    private static final String ENHANCE_CLASS = "org.apache.shardingsphere.core.route.router.sharding.ParsingSQLRouter";
    
    private static final String EXECUTE_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.shardingsphere.v4.ParseInterceptor";
    
    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[]{
            new InstanceMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return named("parse");
                }
                
                @Override
                public String getMethodsInterceptor() {
                    return EXECUTE_INTERCEPTOR_CLASS;
                }
                
                @Override
                public boolean isOverrideArgs() {
                    return false;
                }
            }
        };
    }
    
    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return new ConstructorInterceptPoint[0];
    }
    
    @Override
    protected ClassMatch enhanceClass() {
        return NameMatch.byName(ENHANCE_CLASS);
    }
}
  • ParseInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.shardingsphere.v4.ParseInterceptor增强了org.apache.shardingsphere.core.route.router.sharding.ParsingSQLRouter的parse方法

ParseInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/sharding-sphere-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/shardingsphere/v4/ParseInterceptor.java

public class ParseInterceptor implements InstanceMethodsAroundInterceptor {
    
    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) {
        AbstractSpan span = ContextManager.createLocalSpan("/ShardingSphere/parseSQL/").setComponent(ComponentsDefine.SHARDING_SPHERE);
        Tags.DB_STATEMENT.set(span, (String) allArguments[0]);
    }
    
    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) {
        ContextManager.stopSpan();
        return ret;
    }
    
    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
        ContextManager.activeSpan().errorOccurred().log(t);
    }
}
  • ParseInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法执行ContextManager.createLocalSpan并设置DB_STATEMENT;其afterMethod方法执行ContextManager.stopSpan();其handleMethodException方法执行ContextManager.activeSpan().errorOccurred().log(t)

ExecuteInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/sharding-sphere-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/shardingsphere/v4/define/ExecuteInstrumentation.java

public class ExecuteInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
    
    private static final String ENHANCE_CLASS = "org.apache.shardingsphere.core.execute.sql.execute.SQLExecuteCallback";
    
    private static final String EXECUTE_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.shardingsphere.v4.ExecuteInterceptor";
    
    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[]{
            new InstanceMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return named("execute0");
                }
                
                @Override
                public String getMethodsInterceptor() {
                    return EXECUTE_INTERCEPTOR_CLASS;
                }
                
                @Override
                public boolean isOverrideArgs() {
                    return false;
                }
            }
        };
    }
    
    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return new ConstructorInterceptPoint[0];
    }
    
    @Override
    protected ClassMatch enhanceClass() {
        return NameMatch.byName(ENHANCE_CLASS);
    }
}
  • ExecuteInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.shardingsphere.v4.ExecuteInterceptor增强了org.apache.shardingsphere.core.execute.sql.execute.SQLExecuteCallback的execute0方法

ExecuteInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/sharding-sphere-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/shardingsphere/v4/ExecuteInterceptor.java

public class ExecuteInterceptor implements InstanceMethodsAroundInterceptor {
    
    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) {
        ContextManager.createLocalSpan("/ShardingSphere/executeSQL/").setComponent(ComponentsDefine.SHARDING_SPHERE);
        ContextSnapshot contextSnapshot = (ContextSnapshot) ShardingExecuteDataMap.getDataMap().get(Constant.CONTEXT_SNAPSHOT);
        if (null == contextSnapshot) {
            contextSnapshot = (ContextSnapshot) ((Map) allArguments[2]).get(Constant.CONTEXT_SNAPSHOT);
        }
        if (null != contextSnapshot) {
            ContextManager.continued(contextSnapshot);
        }
    }
    
    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) {
        ContextManager.stopSpan();
        return ret;
    }
    
    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
        ContextManager.activeSpan().errorOccurred().log(t);
    }
}
  • ExecuteInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod执行ContextManager.createLocalSpan并通过ShardingExecuteDataMap.getDataMap().get(Constant.CONTEXT_SNAPSHOT)获取contextSnapshot,然后执行ContextManager.continued(contextSnapshot);其afterMethod方法执行ContextManager.stopSpan();其handleMethodException方法执行ContextManager.activeSpan().errorOccurred().log(t)

小结

skywalking的sharding-sphere-plugin提供了ProxyRootInvokeInstrumentation、JDBCRootInvokeInstrumentation、ParseInstrumentation、ExecuteInstrumentation这几个增强

doc