目录
本文概览:Hystrix实现了服务弹性的三种模式。
- 在接口超时/失败达到一定阈值,触发断路器Hysgtrix的熔断策略,此时断路器会处于断路状态立刻返回失败,或者若配置了备份策略,会使用执行备份策略来代替返回失败。
- Hystrix是通过线程池管理服务调用,所以还介绍了隔离线程池(舱壁模式)和如何支持上下文传递。
1 Hysrix介绍
1.1 服务弹性
服务弹性,主要功能就是指在下游服务出现错误或者超时问题时,通过让客户端快速失败,达到不消耗当前服务的数据库连接和线程池之类资源,从而保护当前服务不被下游服务拖垮。目前常用的策略有:
- 客户端均衡模式。通过Netflix的Ribbon来实现
- 断路器模式。不再调用下游服务,直接返回错误。
- 后备模式。不再调用下游服务,调用备用策略。
- 舱壁模式。通过线程池来访问下游服务,每一个下游服务对应一个线程池。
1.2 Hystrix引入
Hystrix是一种服务弹性的实现,它实现了上面的断路器模式、后备模式、舱壁模式。
Hystrix并不具有限流的功能,Hystrix关注的是容错处理,在接口错误或者超时时触发断路器熔断,快速返回错误,还可以支持备份来代替返回失败;在熔断发生后,还支持检查服务是否恢复从而使断路器恢复通路状态。
Hytrix是通过线程池管理调用外部资源的,可以把一个线程池当做一个外部服务。
2 Hystrix与SpingCloud整合
2.1 maven配置
1、配置
1 2 3 4 5 6 |
<!-- hystrix beg --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-hystrix</artifactId> </dependency> <!-- hystrix end --> |
2、启动类添加@EnableCircuitBreaker,表示spring boot中使用hytrix
1 2 3 4 5 6 7 8 |
@SpringBootApplication(scanBasePackages = {"com.wuzhonghu.template.springboottemplate"}) @EnableDiscoveryClient @EnableCircuitBreaker public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } } |
2.2 断路器的流程和三种模式
如下图,对应三个阶段
- 第一阶段: 正常
- 第二阶段: 发现超时或者失败时,标识失败
- 第三阶段:达到阈值之后,触发了熔断器策略。标识服务不可以用,熔断器启动断开,此时如果B对C的访问,短路器会给B立刻返回失败,不会再调用C。同时会进行间断尝试,判断服务是否恢复。
Hystricx存在三种模式,舱壁模式和后备模式两种模式都需要在断路器模式上进行的完善。
- 断路器模式,设置超时或者失败等熔断策略。
- 后备策略模式:在第二阶段或者第三阶段失败时,如果存在后备策略,都会执行后备策略。
- 舱壁模式,保证一个服务独享一个线程池。
综上,Hystrix断路器的功能如下:
(1)熔断触发前,超时或者失败发生时:
- 存在后备策略,执行后备策略
- 不存后备策略,抛异常处理
(2)在熔断触发之后,可以立刻返回失败,保护下游失败。
- 存在后备策略,执行后备策略
- 不存后备策略,抛异常处理
(3)在熔断器触发之后,还能定期检查服务是否正常,将服务恢复正常。
2.2 断路器模式
通过@HystrixCommand设置超时未1s,此时超过1s之后,请求就会返回抛异常处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
@Service public class HytrixService { /** * 测试断路器模式(circuit breaker) */ @HystrixCommand( commandProperties = { @HystrixProperty(name ="execution.isolation.thread.timeoutInMilliseconds",value = "1000") } ) public void testCircuitBreaker() { // 模拟调用第三方超时3s try { Thread.sleep(1500); } catch (Exception e) {} } } |
在如下Controller中进行测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
@Controller public class HytrixController { @Autowired private HytrixService hytrixService; @RequestMapping(value = "/testCircuitBreaker") @ResponseBody public ResponseDemo testCircuitBreaker() { long starTime = System.currentTimeMillis(); long endTime; try { // 模拟调用外部服务耗时1.5s。设置超时未1s hytrixService.testCircuitBreaker(); } catch (Exception e) { LOGGER.error("超时:", e.getMessage()); } finally { endTime = System.currentTimeMillis() - starTime; } ResponseDemo demo = new ResponseDemo(); demo.setMessage("time" + endTime); demo.setResultCode("000"); return demo; } |
返回结果
1 2 3 4 |
{ "resultCode": "000", "message": "time1007" } |
这里有个问题就是:
这里实现操作就是超过1s之后,就抛异常处理,这种方式并没有减少对下游服务的调用,还是会不断的调用下游服务,而不是彻底断开,直接由Hystricx返回错误。并没有体现出“快速失败”?
可以参考上面的断路器的图,此时错误属于第二阶段错误,还没有触发断路器模式,只有达到断路器触发短路条件之后,进入第三个阶段,此时断路器会立刻断开第三方服务,请求不会发到第三方了,断路器直接返回错误,即“快速失败”。
2.3 后备模式
使用后备模式要点
- 需要在@HystrixCommand注解中添加一个名为fallbackMethod的属性
- 后备方法必须跟使用了@HysrixCommand原始方法在同一个类中。
- 后备方法必须是public类型
- 后备方法参数必须和原方法一致
2.3.1 调用失败场景下的后备策略
调用使用了@HystrixCommand的controller代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
### TestController #### @Autowired private FallBackService fallBackService; @RequestMapping(value = "/testFallback") @ResponseBody public ResponseDemo testFallback() { long starTime = System.currentTimeMillis(); long endTime; String result = ""; try { // 测试 后备模式 result = fallBackService.testFallBack(); } catch (Exception e) { result = "timeout"; LOGGER.error("超时:{}", e.getMessage()); } finally { endTime = System.currentTimeMillis() - starTime; } ResponseDemo demo = new ResponseDemo(); demo.setMessage("result=" + result); demo.setResultCode("time=" + endTime); return demo; } |
具有后备模式 的函数如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
@Service public class FallBackService { @HystrixCommand( fallbackMethod = "fallBackTwo" ) public String testFallBack(String param) { // 模拟调用第三方调用失败 throw new RuntimeException("rest call error"); } public String fallBackTwo(String param) { return "excute fallBack two"; } } |
结果为
2.3.2 调用超时情况下后备策略
Controller的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
@RequestMapping(value = "/testFallback") @ResponseBody public ResponseDemo testFallback() { long starTime = System.currentTimeMillis(); long endTime; String result = ""; try { // 测试 后备模式 result = fallBackService.testFallBackTimeOut(""); } catch (Exception e) { result = "timeout"; LOGGER.error("超时:{}", e.getMessage()); } finally { endTime = System.currentTimeMillis() - starTime; } ResponseDemo demo = new ResponseDemo(); demo.setMessage("result=" + result); demo.setResultCode("time=" + endTime); return demo; } |
耗时备用策略为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
/** * 测试后备模式(fallback) */ @HystrixCommand( fallbackMethod = "fallBackTwoTimeOut", commandProperties = { @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "2000") } ) public String testFallBackTimeOut(String param) { // 模拟调用第三方超时3s try { Thread.sleep(3000); } catch (Exception e) { } finally { return "testFallBack"; } } public String fallBackTwoTimeOut(String param) { return "excute fallBack two"; } |
执行结果为,在超过2s之后,就调用了后备策略。
2.4 舱壁模式
Hytrix是通过线程池管理调用外部资源的,默认情况下所有服务调用都公用一个线程池,如下图
舱壁模式功能就是:我们可以通过为各个服务分别指定线程池,如下图
代码如下,通过threadPoolKey和threadPoolProperties来指定:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
/** * 舱壁模式,指定线程池 */ @HystrixCommand( threadPoolKey = "fallTimeOutPool", threadPoolProperties = { @HystrixProperty(name = "coreSize", value = "30"), @HystrixProperty(name = "maxQueueSize", value = "10")} } ) public String testBulkHead(String param) { // 模拟调用第三方超时3s try { Thread.sleep(3000); } catch (Exception e) { } finally { return "testFallBack"; } } |
2.5 常见错误:
1、method wasn’t found
解决方法,需要查看后备函数:
- 返回类型需要是public类型
- 参数必须和原方法一致。
- 定义的后备函数名字和fallbackMethond属性中配置的要一样,包括大小写都要一致。
3 配置断路器触发策略
执行策略流程图如下:
Hystrix遇到一个超时/失败请求,此时启动一个10s的窗口,后续的请求会进行如下判断:
(1)查看失败次数是否超过最小调用次数
- 如果没有超过,则放行请求。
- 如果超过最小请求数,继续下面逻辑
(2)判断失败率是否超过一个阈值,这里错误是指超时和失败两种。
- 如果没有超过,则放行
- 如果超过错误阈值,则继续下面逻辑
(3)熔断器断开
- 请求会直接返回失败。
- 会开一个5s的窗口,每隔5s调用一次请求,如果成功,表示下游服务恢复,否则继续保持断路器断开状态。
涉及到属性有五个:
- circuitBreaker.requestVolumeThreshold,在统计时间窗口中,请求最小次数
- circuitBreaker.errorThresholdPercentage,熔断断开后,间隔7s,就尝试访问一次服务,查看服务是否已经恢复
- circuitBreaker.sleepWindowInMilliseconds,熔断断开后,间隔7s,就尝试访问一次服务,查看服务是否已经恢复
- metrics.rollingStats.timeInMilliseconds,在 时间窗口,从监听到第一次失败开始计时。
- metrics.rollingStats.numBuckets,在时间窗口中,收集统计信息的次数。在15s的窗口中每隔3s就收集一次,共5次。
举例如下:在10s内,请求次数超过4次,失败占比超过50%时,就触发熔断,熔断之后会每隔7s进行重试服务,如果服务恢复,则恢复断路器,否则保持断路状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
/** * 测试后备模式(fallback) */ @HystrixCommand( fallbackMethod = "fallBackTwoTimeOut", threadPoolKey = "circuitBreakerPool", commandProperties = { @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "1000"), // 请求最小次数 @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "4"), // 失败率阈值 @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "50"), // 熔断断开后,间隔7s,就尝试访问一次服务,查看服务是否已经恢复 @HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds", value = "7000"), // 时间窗口,从监听到第一次失败开始计时 @HystrixProperty(name = "metrics.rollingStats.timeInMilliseconds", value = "15000"), // 在时间窗口中,收集统计信息的次数。在15s的窗口中每隔3s就收集一次,共5次。 @HystrixProperty(name = "metrics.rollingStats.numBuckets", value = "5") } ) public String testFallBackTimeOut(String param) { // 模拟调用第三方超时3s try { Thread.sleep(2000); } catch (Exception e) { } finally { return "testFallBack"; } } |
4 Hystrix上下文传递
Hytrix是通过线程池管理调用外部资源的,一个外部服务可以对应一个独立线程池。对于线程池存在传递ThreadLocal的问题。
4.1 ThreadLocal与线程池
在线程池中没有办法获取主线程的threadLocal的信息,如下代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
public class TaskExecutor { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 3, 300, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); public void testMultiThread() { RequestContext context = new RequestContext(); context.setLogId("23q34214"); RequestContextHolder.setContext(context); executor.execute(new Runnable() { public void run() { System.out.println("logId:" + RequestContextHolder.getContext().getLogId()); } }); } public static void main(String[] args) { TaskExecutor executor = new TaskExecutor(); executor.testMultiThread(); } } |
执行结果为:
1 |
logId:null |
为了获取线程池的信息,可以定义如下一个Runnable类,在每一次线程执行时,重新初始化上下文。如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public class TransmitRunnable implements Runnable { private RequestContext requestContext; private Runnable runnable; public TransmitRunnable(RequestContext requestContext, Runnable runnable) { this.requestContext = requestContext; this.runnable = runnable; } @Override public void run() { RequestContextHolder.setContext(requestContext); runnable.run(); } } |
上面使用线程池的函数修改为使用这个新建的TransmitRunnable,如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public void testMultiThread() { RequestContext context = new RequestContext(); context.setLogId("23q34214"); RequestContextHolder.setContext(context); executor.execute(new TransmitRunnable(RequestContextHolder.getContext(),new Runnable() { public void run() { System.out.println("logId:" + RequestContextHolder.getContext().getLogId()); } })); } |
执行结果如下,已经获取到logID了
1 |
logId:23q34214 |
4.2 Hystrix支持上下文传递
4.2.1 问题复现
在Controller中初始化上下文
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
########################### ### HytrixController.java #### ########################### @RequestMapping(value = "/testContext") @ResponseBody public ResponseDemo testContext() { try { RequestContext context = new RequestContext(); context.setLogId("23q34214"); RequestContextHolder.setContext(context); // 模拟调用外部服务耗时1.5s。设置超时未1s hytrixService.testContext(); } catch (Exception e) { LOGGER.error("超时:", e.getMessage()); } ResponseDemo demo = new ResponseDemo(); demo.setMessage("success" ); demo.setResultCode("000"); return demo; } |
在使用Hystrix的函数中使用上下文,发现无法获取上下文
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
########################### ### HytrixService.java #### ########################### /** * 测试断路器模式(circuit breaker) */ @HystrixCommand( commandProperties = { @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "1000") } ) public void testContext() { // 打印上下文信息 LOGGER.info("上下文信息 logId:", RequestContextHolder.getContext().getLogId()); // 模拟调用第三方超时1.5s try { Thread.sleep(1500); } catch (Exception e) { } } |
打印日志发现,logId没有获取到
1 |
[hystrix-HytrixService-1] INFO c.w.t.s.service.HytrixService - 上下文信息 logId:null |
4.2.2 问题解决
需要三个步骤:
- 定义一个Hystrix并发策略类
- 定义一个Callable类
- 配置spring cloud以使用自定义的Hystrix并发策略类
1、自定义Hystrix并发策略类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
public class ThreadLocalAwareStrategy extends HystrixConcurrencyStrategy { private HystrixConcurrencyStrategy existingConcurrencyStrategy; // SpringCloud已经定义了一个并发类,这里是将已存在的并发类实例传入到这个构造器中 public ThreadLocalAwareStrategy(HystrixConcurrencyStrategy existingConcurrencyStrategy) { this.existingConcurrencyStrategy = existingConcurrencyStrategy; } @Override public <T> Callable<T> wrapCallable(Callable<T> callable) { return existingConcurrencyStrategy != null ? existingConcurrencyStrategy.wrapCallable( new TransmitCallable<T>(RequestContextHolder.getContext(), callable)) : super.wrapCallable( new TransmitCallable<T>(RequestContextHolder.getContext(), callable)); } // ================================================================== // 下面的几个方法都是判断existingConcurrencyStrategy存在和不存在两个情况 // ================================================================== @Override public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) { return existingConcurrencyStrategy != null ? existingConcurrencyStrategy.getBlockingQueue(maxQueueSize) : super.getBlockingQueue(maxQueueSize); } @Override public <T> HystrixRequestVariable<T> getRequestVariable(HystrixRequestVariableLifecycle<T> rv) { return existingConcurrencyStrategy != null ? existingConcurrencyStrategy.getRequestVariable(rv) : super.getRequestVariable(rv); } @Override public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { return existingConcurrencyStrategy != null ? existingConcurrencyStrategy.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue) : super.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) { return existingConcurrencyStrategy != null ? existingConcurrencyStrategy.getThreadPool(threadPoolKey, threadPoolProperties) : super.getThreadPool(threadPoolKey, threadPoolProperties); } } |
2、定义一个新的callable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public class TransmitCallable<V> implements Callable<V> { private RequestContext requestContext; private final Callable<V> runnable; public TransmitCallable(RequestContext requestContext, Callable<V> runnable) { this.requestContext = requestContext; this.runnable = runnable; } @Override public V call() throws Exception { RequestContextHolder.setContext(requestContext); return runnable.call(); } } |
3、配置新的并发策略类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
@Configuration public class ThreadLocalCofiugration { @Autowired(required = false) private HystrixConcurrencyStrategy hystrixConcurrencyStrategy; @PostConstruct public void init() { // 获取当前插件 HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier(); HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher(); HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy(); HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook(); // 重置Hystrix插件 HystrixPlugins.reset(); // 使用新创建的并发策略 HystrixPlugins.getInstance().registerConcurrencyStrategy( new ThreadLocalAwareStrategy(hystrixConcurrencyStrategy)); // 用之前的插件覆盖 HystrixPlugins.getInstance().registerEventNotifier(eventNotifier); HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher); HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy); HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook); } } |
4、执行结果,发现已经可以获取到logId了
1 |
[hystrix-HytrixService-1] INFO c.w.t.s.service.HytrixService - 上下文信息 logId:23q34214 |
附1 @HystrixCommand属性
属性名称 |
属性功能 |
子属性 |
描述 |
fallbackMethod |
后备方法 |
— |
值为后备方法名字 |
threadPoolKey |
指定线程池 |
— |
线程池名字 |
threadPoolProperties |
设置线程池属性 |
core |
|
queueSize |
阻塞队列大小 |
||
commandProperty |
超时 |
execution.isolation.thread. timeoutInMilliseconds |
耗时时间 |
断路器熔断策略 |
circuitBreaker.requestVolumeThreshold |
||
circuitBreaker.errorThresholdPercentage |
|||
circuitBreaker.sleepWindowInMilliseconds |
|||
metrics.rollingStats.timeInMilliseconds |
|||
metrics.rollingStats.numBuckets |
附2 需要注解点
1、在使用不带属性的@HystrixCommand注解时,会默认把所有请求第三方服务放置在同一个线程池下面,这样可能导致系统性能问题
附3 相关概念
1、服务降级
服务是集群的,服务降级降级体现就是个别服务不可用。
2、服务重启之后,服务的hytrix信息是否还可以保存?
在服务重启前通过hytrix对下游一个服务进行了后备模式处理,此时服务重启,那么重启之后,对下游服务是否还会进行后备莫模式处理?
参考资料
官网介绍:https://github.com/Netflix/Hystrix/wiki
官网 how it works :https://github.com/Netflix/Hystrix/wiki/How-it-Works
官网 how to use :https://github.com/Netflix/Hystrix/wiki/How-To-Use