阅读 458

Hystrix 源码解析 —— 请求执行(四)之失败回退逻辑

摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-fourth-fallback/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Hystrix 1.5.X 版本


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Hystrix 命令执行(四)之失败回退逻辑

建议 :对 RxJava 已经有一定的了解的基础上阅读本文。

Hystrix 执行命令整体流程如下图:

FROM 《【翻译】Hystrix文档-实现原理》「流程图」

另外,#handleXXXX() 方法,整体代码比较类似,最终都是调用 #getFallbackOrThrowException() 方法,获得【回退逻辑 Observable】或者【异常 Observable】,在 「8. #getFallbackOrThrowException(...)」 详细解析。


推荐 Spring Cloud 书籍

2. handleFallback

《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》「4. #executeCommandAndObserve(...)」 中,#executeCommandAndObserve(...)第 82 行 onErrorResumeNext(handleFallback) 代码,通过调用 Observable#onErrorResumeNext(...) 方法,实现【执行命令 Observable】执行异常时,返回【回退逻辑 Observable】,执行失败回退逻辑。

FROM 《ReactiveX文档中文翻译》「onErrorResumeNext」
onErrorResumeNext 方法返回一个镜像原有 Observable 行为的新 Observable ,后者会忽略前者的 onError 调用,不会将错误传递给观察者,作为替代,它会开始镜像另一个,备用的 Observable 。

handleFallback 变量,代码如下 :

  1: final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
  2:     @Override
  3:     public Observable<R> call(Throwable t) {
  4:         // 标记尝试成功
  5:         circuitBreaker.markNonSuccess();
  6:         // 标记 executionResult 执行异常
  7:         Exception e = getExceptionFromThrowable(t);
  8:         executionResult = executionResult.setExecutionException(e);
  9:         // 返回 【回退逻辑 Observable】
 10:         if (e instanceof RejectedExecutionException) { // 线程池提交任务拒绝异常
 11:             return handleThreadPoolRejectionViaFallback(e);
 12:         } else if (t instanceof HystrixTimeoutException) { // 执行命令超时异常
 13:             return handleTimeoutViaFallback();
 14:         } else if (t instanceof HystrixBadRequestException) { // TODO 【2014】【HystrixBadRequestException】
 15:             return handleBadRequestByEmittingError(e);
 16:         } else {
 17:             /*
 18:              * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
 19:              */
 20:             if (e instanceof HystrixBadRequestException) { // TODO 【2014】【HystrixBadRequestException】
 21:                 eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
 22:                 return Observable.error(e);
 23:             }
 24: 
 25:             return handleFailureViaFallback(e);
 26:         }
 27:     }
 28: };
复制代码

3. #handleShortCircuitViaFallback()

#handleShortCircuitViaFallback() 方法,short-circuit ,处理链路处于熔断的回退逻辑,在 此处 被调用,代码如下 :

  1: private Observable<R> handleShortCircuitViaFallback() {
  2:     // TODO 【2011】【Hystrix 事件机制】
  3:     // record that we are returning a short-circuited fallback
  4:     eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey);
  5:     // 标记 executionResult 执行异常
  6:     // short-circuit and go directly to fallback (or throw an exception if no fallback implemented)
  7:     Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN");
  8:     executionResult = executionResult.setExecutionException(shortCircuitException);
  9:     try {
 10:         // 获得 【回退逻辑 Observable】 或者 【异常 Observable】
 11:         return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
 12:                 "short-circuited", shortCircuitException);
 13:     } catch (Exception e) {
 14:         return Observable.error(e);
 15:     }
 16: }
复制代码
  • 第 4 行 :TODO 【2011】【Hystrix 事件机制】
  • 第 7 至 8 行 :标记 executionResult 执行异常
  • 第 11 至 12 行 :调用 #getFallbackOrThrowException() 方法,获得【回退逻辑 Observable】或者【异常 Observable】,在 「8. #getFallbackOrThrowException(...)」 详细解析。
  • 第 14 行 :返回【异常 Observable】。

4. #handleSemaphoreRejectionViaFallback()

#handleSemaphoreRejectionViaFallback() 方法,semaphore-rejection ,处理信号量获得失败的回退逻辑,在 此处 被调用,代码如下 :

  1: private Observable<R> handleSemaphoreRejectionViaFallback() {
  2:     // 标记 executionResult 执行异常
  3:     Exception semaphoreRejectionException = new RuntimeException("could not acquire a semaphore for execution");
  4:     executionResult = executionResult.setExecutionException(semaphoreRejectionException);
  5:     // TODO 【2011】【Hystrix 事件机制】
  6:     eventNotifier.markEvent(HystrixEventType.SEMAPHORE_REJECTED, commandKey);
  7:     logger.debug("HystrixCommand Execution Rejection by Semaphore."); // debug only since we're throwing the exception and someone higher will do something with it
  8:     // retrieve a fallback or throw an exception if no fallback available
  9:     // 获得 【回退逻辑 Observable】 或者 【异常 Observable】
 10:     return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
 11:             "could not acquire a semaphore for execution", semaphoreRejectionException);
 12: }
复制代码
  • 第 3 至 4 行 :标记 executionResult 执行异常
  • 第 6 至 7 行 :TODO 【2011】【Hystrix 事件机制】
  • 第 10 至 11 行 :调用 #getFallbackOrThrowException() 方法,获得【回退逻辑 Observable】或者【异常 Observable】,在 「8. #getFallbackOrThrowException(...)」 详细解析。

5. #handleThreadPoolRejectionViaFallback()

#handleThreadPoolRejectionViaFallback() 方法,thread-pool-rejection ,处理线程池提交任务拒绝的回退逻辑,在 此处 被调用,代码如下:

  1: private Observable<R> handleThreadPoolRejectionViaFallback(Exception underlying) {
  2:     // TODO 【2011】【Hystrix 事件机制】
  3:     eventNotifier.markEvent(HystrixEventType.THREAD_POOL_REJECTED, commandKey);
  4:     // TODO 【2002】【metrics】
  5:     threadPool.markThreadRejection();
  6:     // 获得 【回退逻辑 Observable】 或者 【异常 Observable】
  7:     // use a fallback instead (or throw exception if not implemented)
  8:     return getFallbackOrThrowException(this, HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION,
  9:             "could not be queued for execution", underlying);
 10: }
复制代码
  • 第 3 行 :TODO 【2011】【Hystrix 事件机制】
  • 第 5 行 :TODO 【2002】【metrics】
  • 第 8 至 9 行 :调用 #getFallbackOrThrowException() 方法,获得【回退逻辑 Observable】或者【异常 Observable】,在 「8. #getFallbackOrThrowException(...)」 详细解析。

6. #handleTimeoutViaFallback()

#handleTimeoutViaFallback() 方法,execution-timeout ,处理命令执行超时的回退逻辑,在 此处 被调用,代码如下:

  1: private Observable<R> handleTimeoutViaFallback() {
  2:     // 获得 【回退逻辑 Observable】 或者 【异常 Observable】
  3:     return getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT,
  4:             "timed-out", new TimeoutException());
  5: }
复制代码

7. #handleFailureViaFallback()

#handleFailureViaFallback() 方法,execution-failure ,处理命令执行异常的回退逻辑,在 此处 被调用,代码如下:

  1: private Observable<R> handleFailureViaFallback(Exception underlying) {
  2:     // TODO 【2011】【Hystrix 事件机制】
  3:     /**
  4:      * All other error handling
  5:      */
  6:     logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", underlying);
  7: 
  8:     // report failure
  9:     eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey);
 10: 
 11:     // 标记 executionResult 异常 TODO 【2007】【executionResult】用途 为啥不是执行异常
 12:     // record the exception
 13:     executionResult = executionResult.setException(underlying);
 14:     // 获得 【回退逻辑 Observable】 或者 【异常 Observable】
 15:     return getFallbackOrThrowException(this, HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", underlying);
 16: }
复制代码
  • 第 2 至 9 行 :TODO 【2011】【Hystrix 事件机制】
  • 第 13 行 :标记 executionResult 异常
  • 第 15 行 :调用 #getFallbackOrThrowException() 方法,获得【回退逻辑 Observable】或者【异常 Observable】,在 「8. #getFallbackOrThrowException(...)」 详细解析。

8. #getFallbackOrThrowException(...)

#getFallbackOrThrowException() 方法,获得【回退逻辑 Observable】或者【异常 Observable】,代码如下 :

  1: private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
  2:     // 记录 HystrixRequestContext
  3:     final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();
  4:     // 标记 executionResult 添加( 记录 )事件
  5:     long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
  6:     // record the executionResult
  7:     // do this before executing fallback so it can be queried from within getFallback (see See https://github.com/Netflix/Hystrix/pull/144)
  8:     executionResult = executionResult.addEvent((int) latency, eventType);
  9: 
 10:     if (isUnrecoverable(originalException)) { // 无法恢复的异常
 11:         logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException);
 12: 
 13:         // TODO 【2003】【HOOK】
 14:         /* executionHook for all errors */
 15:         Exception e = wrapWithOnErrorHook(failureType, originalException);
 16:         // 返回 【异常 Observable】
 17:         return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + " " + message + " and encountered unrecoverable error.", e, null));
 18:     } else {
 19:         if (isRecoverableError(originalException)) { // 可恢复的异常
 20:             logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException);
 21:         }
 22: 
 23:         if (properties.fallbackEnabled().get()) {
 24:             /* fallback behavior is permitted so attempt */
 25: 
 26:             // 设置 HystrixRequestContext 的 Action
 27:             final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
 28:                 @Override
 29:                 public void call(Notification<? super R> rNotification) {
 30:                     setRequestContextIfNeeded(requestContext);
 31:                 }
 32:             };
 33: 
 34:             // TODO 【2007】【executionResult】用途
 35:             final Action1<R> markFallbackEmit = new Action1<R>() {
 36:                 @Override
 37:                 public void call(R r) {
 38:                     if (shouldOutputOnNextEvents()) {
 39:                         executionResult = executionResult.addEvent(HystrixEventType.FALLBACK_EMIT);
 40:                         eventNotifier.markEvent(HystrixEventType.FALLBACK_EMIT, commandKey);
 41:                     }
 42:                 }
 43:             };
 44: 
 45:             // TODO 【2007】【executionResult】用途
 46:             final Action0 markFallbackCompleted = new Action0() {
 47:                 @Override
 48:                 public void call() {
 49:                     long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
 50:                     eventNotifier.markEvent(HystrixEventType.FALLBACK_SUCCESS, commandKey);
 51:                     executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_SUCCESS);
 52:                 }
 53:             };
 54: 
 55:             // 处理异常 的 Func
 56:             final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() {
 57:                 @Override
 58:                 public Observable<R> call(Throwable t) {
 59:                     // TODO 【2003】【HOOK】
 60:                     /* executionHook for all errors */
 61:                     Exception e = wrapWithOnErrorHook(failureType, originalException);
 62:                     // 获得 Exception
 63:                     Exception fe = getExceptionFromThrowable(t);
 64: 
 65:                     long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
 66:                     Exception toEmit;
 67: 
 68:                     if (fe instanceof UnsupportedOperationException) {
 69:                         // TODO 【2011】【Hystrix 事件机制】
 70:                         logger.debug("No fallback for HystrixCommand. ", fe); // debug only since we're throwing the exception and someone higher will do something with it
 71:                         eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey);
 72:                         // 标记 executionResult 添加( 记录 )事件 HystrixEventType.FALLBACK_MISSING
 73:                         executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING);
 74: 
 75:                         // 创建 HystrixRuntimeException
 76:                         toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe);
 77:                     } else {
 78:                         // TODO 【2011】【Hystrix 事件机制】
 79:                         logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe);
 80:                         eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey);
 81:                         // 标记 executionResult 添加( 记录 )事件 HystrixEventType.FALLBACK_FAILURE
 82:                         executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE);
 83: 
 84:                         // 创建 HystrixRuntimeException
 85:                         toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe);
 86:                     }
 87: 
 88:                     // NOTE: we're suppressing fallback exception here
 89:                     if (shouldNotBeWrapped(originalException)) {
 90:                         return Observable.error(e);
 91:                     }
 92: 
 93:                     return Observable.error(toEmit);
 94:                 }
 95:             };
 96: 
 97:             // 获得 TryableSemaphore
 98:             final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();
 99: 
100:             // 信号量释放Action
101:             final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
102:             final Action0 singleSemaphoreRelease = new Action0() {
103:                 @Override
104:                 public void call() {
105:                     if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
106:                         fallbackSemaphore.release();
107:                     }
108:                 }
109:             };
110: 
111:             Observable<R> fallbackExecutionChain;
112: 
113:             // acquire a permit
114:             if (fallbackSemaphore.tryAcquire()) {
115:                 try {
116:                     if (isFallbackUserDefined()) {
117:                         executionHook.onFallbackStart(this);
118:                         fallbackExecutionChain = getFallbackObservable();
119:                     } else {
120:                         //same logic as above without the hook invocation
121:                         fallbackExecutionChain = getFallbackObservable();
122:                     }
123:                 } catch (Throwable ex) {
124:                     //If hook or user-fallback throws, then use that as the result of the fallback lookup
125:                     fallbackExecutionChain = Observable.error(ex);
126:                 }
127: 
128:                 // 获得 【回退逻辑 Observable】
129:                 return fallbackExecutionChain
130:                         .doOnEach(setRequestContext)
131:                         .lift(new FallbackHookApplication(_cmd)) // TODO 【2003】【HOOK】
132:                         .lift(new DeprecatedOnFallbackHookApplication(_cmd))
133:                         .doOnNext(markFallbackEmit)
134:                         .doOnCompleted(markFallbackCompleted)
135:                         .onErrorResumeNext(handleFallbackError) //
136:                         .doOnTerminate(singleSemaphoreRelease)
137:                         .doOnUnsubscribe(singleSemaphoreRelease);
138:             } else {
139:                return handleFallbackRejectionByEmittingError();
140:             }
141:         } else {
142:             return handleFallbackDisabledByEmittingError(originalException, failureType, message);
143:         }
144:     }
145: }
复制代码
  • 耐心,这个方法看起来灰常长,也仅限于长,理解成难度很小。

  • 第 3 行 :记录 HystrixRequestContext 。

  • 第 5 至 8 行 :标记 executionResult 添加( 记录 )事件。

  • 第 10 至 17 行 :调用 #isUnrecoverable(Exception) 方法,若异常不可恢复,直接返回【异常 Observable】。点击 链接 查看该方法。

  • 第 19 至 21 行 :调用 #isRecoverableError(Exception) 方法,若异常可恢复,打印 WARN 日志。点击 链接 查看该方法。主要针对 java.lang.Error 情况,打印 #isUnrecoverable(Exception) 排除掉的 Error。

  • 反向】第 141 至 143 行 :当配置 HystrixCommandProperties.fallbackEnabled = false ( 默认值 :true ) ,即失败回退功能关闭,调用 #handleFallbackDisabledByEmittingError() ,返回【异常 Observable】。点击 链接 查看该方法。

  • 反向】第 138 至 140 行 :失败回退信号量( TryableSemaphore )【注意,不是正常执行信号量】使用失败,调用 #handleFallbackRejectionByEmittingError() ,返回【异常 Observable】。点击 链接 查看该方法。

  • 第 23 行 :当配置 HystrixCommandProperties.fallbackEnabled = true ( 默认值 :true ) ,即失败回退功能开启

  • 第 27 至 32 行 :设置 HystrixRequestContext 的 Action ,使用第 3 行记录的 HystrixRequestContext 。

  • 第 35 至 43 行 :TODO 【2007】【executionResult】用途

  • 第 46 至 53 行 :TODO 【2007】【executionResult】用途

  • 第 56 至 95 行 :处理回退逻辑执行发生异常的 Func1 ,返回【异常 Observable】。

    • 第 61 行 :TODO 【2003】【HOOK】
    • 第 63 行 :调用 #getExceptionFromThrowable(Throwable) 方法,获得 Exception 。若 t类型为 Throwable 时,包装成 Exception 。点击 链接 查看该方法代码。
    • 第 68 至 76 行 :当 fe类型为 UnsupportedOperationException 时,使用 e + fe 创建 HystrixRuntimeException 。该异常发生于 HystrixCommand#getFallback() 抽象方法未被覆写。
    • 第 77 至 86 行 :当 fe类型为其他异常时,使用 e + fe 创建 HystrixRuntimeException 。该异常发生于 HystrixCommand#getFallback() 执行发生异常。
    • 第 89 至 91 行 :调用 #shouldNotBeWrapped() 方法,判断 originalException 是 ExceptionNotWrappedByHystrix 的实现时,即要求返回的【异常 Observable】不使用 HystrixRuntimeException 包装。点击 链接 查看该方法代码。
    • 第 93 行 :返回【异常 Observable】,使用 toEmit ( HystrixRuntimeException ) 为异常。
  • 第 98 行 :调用 #getFallbackSemaphore() 方法,获得失败回退信号量( TryableSemaphore )对象,点击 链接 查看该方法代码。TryableSemaphore 在 《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》「3. TryableSemaphore」 有详细解析。

  • 第 100 至 109 行 :信号量释放的 Action。

  • 第 114 至 137 行 :失败回退信号量( TryableSemaphore )使用成功,返回【回退逻辑 Observable】。

    • 【重要】第 116 至 122 行 :调用 #getFallbackObservable() 方法,创建【回退逻辑 Observable】。将子类对 HystrixCommand#getFallback() 抽象方法的执行结果,使用 Observable#just(...) 包装返回。点击 链接 查看该方法的代码。
      • 第 116 行 :调用 #isFallbackUserDefined() 方法,返回命令子类是否实现 HystrixCommand#getFallback() 抽象方法。只有已实现( true ) 的情况下,调用 HOOK TODO 【2003】【HOOK】。
    • 第 129 至 137 行 :获得 【回退逻辑 Observable】。
      • 第 131 行 :// TODO 【2003】【HOOK】
      • 第 135 行 :调用 Observable#onErrorResumeNext(...) 方法,实现【失败回退 Observable】执行异常时,返回【异常 Observable】。

有两个注意点:

  • 当命令执行超时时,失败回退逻辑使用的是 HystrixTimer 的线程池
  • 失败回退逻辑,无超时时间,使用要小心。

666. 彩蛋

比想象中“臭长”的逻辑。

总的来说,逻辑和 《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》 是很类似的。

胖友,分享一波朋友圈可好!

关注下面的标签,发现更多相似文章
评论