聊聊Elasticsearch RestClient的RequestLogger

614 阅读2分钟

本文主要研究一下Elasticsearch RestClient的RequestLogger

RequestLogger

elasticsearch-7.0.1/client/rest/src/main/java/org/elasticsearch/client/RequestLogger.java

final class RequestLogger {

    private static final Log tracer = LogFactory.getLog("tracer");

    private RequestLogger() {
    }

    /**
     * Logs a request that yielded a response
     */
    static void logResponse(Log logger, HttpUriRequest request, HttpHost host, HttpResponse httpResponse) {
        if (logger.isDebugEnabled()) {
            logger.debug("request [" + request.getMethod() + " " + host + getUri(request.getRequestLine()) +
                    "] returned [" + httpResponse.getStatusLine() + "]");
        }
        if (logger.isWarnEnabled()) {
            Header[] warnings = httpResponse.getHeaders("Warning");
            if (warnings != null && warnings.length > 0) {
                logger.warn(buildWarningMessage(request, host, warnings));
            }
        }
        if (tracer.isTraceEnabled()) {
            String requestLine;
            try {
                requestLine = buildTraceRequest(request, host);
            } catch(IOException e) {
                requestLine = "";
                tracer.trace("error while reading request for trace purposes", e);
            }
            String responseLine;
            try {
                responseLine = buildTraceResponse(httpResponse);
            } catch(IOException e) {
                responseLine = "";
                tracer.trace("error while reading response for trace purposes", e);
            }
            tracer.trace(requestLine + '\n' + responseLine);
        }
    }

    /**
     * Logs a request that failed
     */
    static void logFailedRequest(Log logger, HttpUriRequest request, Node node, Exception e) {
        if (logger.isDebugEnabled()) {
            logger.debug("request [" + request.getMethod() + " " + node.getHost() + getUri(request.getRequestLine()) + "] failed", e);
        }
        if (tracer.isTraceEnabled()) {
            String traceRequest;
            try {
                traceRequest = buildTraceRequest(request, node.getHost());
            } catch (IOException e1) {
                tracer.trace("error while reading request for trace purposes", e);
                traceRequest = "";
            }
            tracer.trace(traceRequest);
        }
    }

    static String buildWarningMessage(HttpUriRequest request, HttpHost host, Header[] warnings) {
        StringBuilder message = new StringBuilder("request [").append(request.getMethod()).append(" ").append(host)
                .append(getUri(request.getRequestLine())).append("] returned ").append(warnings.length).append(" warnings: ");
        for (int i = 0; i < warnings.length; i++) {
            if (i > 0) {
                message.append(",");
            }
            message.append("[").append(warnings[i].getValue()).append("]");
        }
        return message.toString();
    }

    /**
     * Creates curl output for given request
     */
    static String buildTraceRequest(HttpUriRequest request, HttpHost host) throws IOException {
        String requestLine = "curl -iX " + request.getMethod() + " '" + host + getUri(request.getRequestLine()) + "'";
        if (request instanceof  HttpEntityEnclosingRequest) {
            HttpEntityEnclosingRequest enclosingRequest = (HttpEntityEnclosingRequest) request;
            if (enclosingRequest.getEntity() != null) {
                requestLine += " -d '";
                HttpEntity entity = enclosingRequest.getEntity();
                if (entity.isRepeatable() == false) {
                    entity = new BufferedHttpEntity(enclosingRequest.getEntity());
                    enclosingRequest.setEntity(entity);
                }
                requestLine += EntityUtils.toString(entity, StandardCharsets.UTF_8) + "'";
            }
        }
        return requestLine;
    }

    /**
     * Creates curl output for given response
     */
    static String buildTraceResponse(HttpResponse httpResponse) throws IOException {
        StringBuilder responseLine = new StringBuilder();
        responseLine.append("# ").append(httpResponse.getStatusLine());
        for (Header header : httpResponse.getAllHeaders()) {
            responseLine.append("\n# ").append(header.getName()).append(": ").append(header.getValue());
        }
        responseLine.append("\n#");
        HttpEntity entity = httpResponse.getEntity();
        if (entity != null) {
            if (entity.isRepeatable() == false) {
                entity = new BufferedHttpEntity(entity);
            }
            httpResponse.setEntity(entity);
            ContentType contentType = ContentType.get(entity);
            Charset charset = StandardCharsets.UTF_8;
            if (contentType != null && contentType.getCharset() != null) {
                charset = contentType.getCharset();
            }
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(entity.getContent(), charset))) {
                String line;
                while( (line = reader.readLine()) != null) {
                    responseLine.append("\n# ").append(line);
                }
            }
        }
        return responseLine.toString();
    }

    private static String getUri(RequestLine requestLine) {
        if (requestLine.getUri().charAt(0) != '/') {
            return "/" + requestLine.getUri();
        }
        return requestLine.getUri();
    }
}
  • RequestLogger提供了logResponse、logFailedRequest等方法

RestClient

elasticsearch-7.0.1/client/rest/src/main/java/org/elasticsearch/client/RestClient.java

public class RestClient implements Closeable {
	//......

    public Response performRequest(Request request) throws IOException {
        InternalRequest internalRequest = new InternalRequest(request);
        return performRequest(nextNodes(), internalRequest, null);
    }

    private Response performRequest(final NodeTuple<Iterator<Node>> nodeTuple,
                                    final InternalRequest request,
                                    Exception previousException) throws IOException {
        RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache);
        HttpResponse httpResponse;
        try {
            httpResponse = client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, null).get();
        } catch(Exception e) {
            RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, e);
            onFailure(context.node);
            Exception cause = extractAndWrapCause(e);
            addSuppressedException(previousException, cause);
            if (nodeTuple.nodes.hasNext()) {
                return performRequest(nodeTuple, request, cause);
            }
            if (cause instanceof IOException) {
                throw (IOException) cause;
            }
            if (cause instanceof RuntimeException) {
                throw (RuntimeException) cause;
            }
            throw new IllegalStateException("unexpected exception type: must be either RuntimeException or IOException", cause);
        }
        ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse);
        if (responseOrResponseException.responseException == null) {
            return responseOrResponseException.response;
        }
        addSuppressedException(previousException, responseOrResponseException.responseException);
        if (nodeTuple.nodes.hasNext()) {
            return performRequest(nodeTuple, request, responseOrResponseException.responseException);
        }
        throw responseOrResponseException.responseException;
    }

    private ResponseOrResponseException convertResponse(InternalRequest request, Node node, HttpResponse httpResponse) throws IOException {
        RequestLogger.logResponse(logger, request.httpRequest, node.getHost(), httpResponse);
        int statusCode = httpResponse.getStatusLine().getStatusCode();
        Response response = new Response(request.httpRequest.getRequestLine(), node.getHost(), httpResponse);
        if (isSuccessfulResponse(statusCode) || request.ignoreErrorCodes.contains(response.getStatusLine().getStatusCode())) {
            onResponse(node);
            if (request.warningsHandler.warningsShouldFailRequest(response.getWarnings())) {
                throw new WarningFailureException(response);
            }
            return new ResponseOrResponseException(response);
        }
        ResponseException responseException = new ResponseException(response);
        if (isRetryStatus(statusCode)) {
            //mark host dead and retry against next one
            onFailure(node);
            return new ResponseOrResponseException(responseException);
        }
        //mark host alive and don't retry, as the error should be a request problem
        onResponse(node);
        throw responseException;
    }

    private void performRequestAsync(final NodeTuple<Iterator<Node>> nodeTuple,
                                     final InternalRequest request,
                                     final FailureTrackingResponseListener listener) {
        final RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache);
        client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, new FutureCallback<HttpResponse>() {
            @Override
            public void completed(HttpResponse httpResponse) {
                try {
                    ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse);
                    if (responseOrResponseException.responseException == null) {
                        listener.onSuccess(responseOrResponseException.response);
                    } else {
                        if (nodeTuple.nodes.hasNext()) {
                            listener.trackFailure(responseOrResponseException.responseException);
                            performRequestAsync(nodeTuple, request, listener);
                        } else {
                            listener.onDefinitiveFailure(responseOrResponseException.responseException);
                        }
                    }
                } catch(Exception e) {
                    listener.onDefinitiveFailure(e);
                }
            }

            @Override
            public void failed(Exception failure) {
                try {
                    RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, failure);
                    onFailure(context.node);
                    if (nodeTuple.nodes.hasNext()) {
                        listener.trackFailure(failure);
                        performRequestAsync(nodeTuple, request, listener);
                    } else {
                        listener.onDefinitiveFailure(failure);
                    }
                } catch(Exception e) {
                    listener.onDefinitiveFailure(e);
                }
            }

            @Override
            public void cancelled() {
                listener.onDefinitiveFailure(new ExecutionException("request was cancelled", null));
            }
        });
    }

	//......
}
  • RestClient的performRequest方法在异常的时候会调用RequestLogger.logFailedRequest,成功时则调用convertResponse方法,该方法首先通过RequestLogger.logResponse来记录response;performRequestAsync方法则是在completed方法里头调用convertResponse方法,在failed方法里头调用RequestLogger.logFailedRequest

小结

  • RequestLogger提供了logResponse、logFailedRequest等方法
  • RestClient的performRequest方法在异常的时候会调用RequestLogger.logFailedRequest,成功时则调用convertResponse方法,该方法首先通过RequestLogger.logResponse来记录response
  • RestClient的performRequestAsync方法则是在completed方法里头调用convertResponse方法,在failed方法里头调用RequestLogger.logFailedRequest

doc