聊聊carrera的GroovyScriptAction

259 阅读1分钟

本文主要研究一下carrera的GroovyScriptAction

Action

DDMQ/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/actions/Action.java

public interface Action {
    enum Status {
        FAIL, CONTINUE, FINISH, ASYNCHRONIZED
    }

    class UnsupportedDataType extends RuntimeException {
    }

    default Status act(UpstreamJob job) {
        Object data = job.getData();
        if (data instanceof byte[]) {
            return act(job, (byte[]) data);
        } else if (data instanceof JSONObject) {
            return act(job, (JSONObject) data);
        } else {
            throw new UnsupportedDataType();
        }
    }

    default Status act(UpstreamJob job, byte[] bytes) {
        throw new UnsupportedDataType();
    }

    default Status act(UpstreamJob job, JSONObject jsonObject) {
        throw new UnsupportedDataType();
    }

    default void shutdown() {
        // DO NOTHING BY DEFAULT
    }

    default void logMetrics() {
        // DO NOTHING BY DEFAULT
    }
}
  • Action接口定义了Status枚举,也定义了act、shutdown、logMetrics方法

GroovyScriptAction

DDMQ/carrera-consumer/src/main/java/com/xiaojukeji/carrera/cproxy/actions/GroovyScriptAction.java

public class GroovyScriptAction implements Action {
    private final static String CARRERA_GROOVY_CONTEXT = "carreraContext";

    @SuppressWarnings("rawtypes")
    private final static LoadingCache<String, Class> cache = CacheBuilder
        .newBuilder()
        .expireAfterAccess(1, TimeUnit.HOURS)
        .build(new CacheLoader<String, Class>() {
            private final AtomicLong al = new AtomicLong(0);

            @Override
            public Class load(String key) throws Exception {
                try (GroovyClassLoader groovyLoader = new GroovyClassLoader()) {
                    GroovyCodeSource gcs = AccessController.doPrivileged((PrivilegedAction<GroovyCodeSource>) () -> new GroovyCodeSource(key, "Script" + al.getAndIncrement() + ".groovy", "/groovy/shell"));
                    Class clazz = groovyLoader.parseClass(gcs, false);
                    return clazz;
                } catch (Throwable e) {
                    LogUtils.logErrorInfo("GroovyScript_error", "[GroovyErr]", e);
                    return null;
                }
            }

        });

    @Override
    public Status act(UpstreamJob job, JSONObject jsonObject) {
        String groovyText = job.getUpstreamTopic().getGroovyScript();
        if (StringUtils.isBlank(groovyText)) {
            return Status.FINISH;
        }

        try {
            @SuppressWarnings("rawtypes")
            Class groovyScript = cache.get(groovyText);
            if (groovyScript == null) {
                MetricUtils.qpsAndFilterMetric(job, MetricUtils.ConsumeResult.INVALID);
                return Status.FINISH;
            }

            jsonObject.put(CARRERA_GROOVY_CONTEXT, new GroovyContext(job));
            Script script = InvokerHelper.createScript(groovyScript, new Binding(jsonObject));
            Object scriptRet = script.run();
            if (scriptRet instanceof Boolean) {
                if ((Boolean) scriptRet) {
                    jsonObject.remove(CARRERA_GROOVY_CONTEXT);
                    return Status.CONTINUE;
                }
            }
        } catch (MissingPropertyException e) {
            LogUtils.logErrorInfo("GroovyScript_error", "missing property exception, jsonObject:{}, job:{}, e.msg:{}",
                    JsonUtils.toJsonString(jsonObject), job.info(), e.getMessageWithoutLocationText());
        } catch (Throwable e) {
            LogUtils.logErrorInfo("GroovyScript_error", "error when running groovy script, job={}, e={}", job, e.getMessage());
        }

        MetricUtils.qpsAndFilterMetric(job, MetricUtils.ConsumeResult.INVALID);
        return Status.FINISH;
    }
}
  • GroovyScriptAction实现了Action接口,它使用guava的LoadingCache定义了groovy class的缓存,其CacheLoader的load方法会创建GroovyClassLoader,然后解析指定GroovyCodeSource的class;其act方法从job.getUpstreamTopic().getGroovyScript()获取groovyText,然后再根据groovyText从cache获取指定的Class,之后通过InvokerHelper.createScript(groovyScript, new Binding(jsonObject))创建Script,然后执行script.run()获取返回值

小结

GroovyScriptAction实现了Action接口,它使用guava的LoadingCache定义了groovy class的缓存,其CacheLoader的load方法会创建GroovyClassLoader,然后解析指定GroovyCodeSource的class;其act方法从job.getUpstreamTopic().getGroovyScript()获取groovyText,然后再根据groovyText从cache获取指定的Class,之后通过InvokerHelper.createScript(groovyScript, new Binding(jsonObject))创建Script,然后执行script.run()获取返回值

doc