admin管理员组

文章数量:1597474

What is Spring Cloud Netflix?

其官方文档中对自己的定义是:

Spring Cloud Netflix provides Netflix OSS integrations for Spring Boot apps through autoconfiguration and binding to the Spring Environment and other Spring programming model idioms. With a few simple annotations you can quickly enable and configure the common patterns inside your application and build large distributed systems with battle-tested Netflix components. The patterns provided include Service Discovery (Eureka), Circuit Breaker (Hystrix), Intelligent Routing (Zuul) and Client Side Load Balancing (Ribbon)..

Spring Cloud Netflix这个项目对于Spring Boot应用来说,它集成了NetFlix OSS的一些组件,只需通过注解配置和Spring环境的通用简单的使用注解,你可以快速的启用和配置这些久经测试考验的NetFlix的组件于你的应用和用于构建分布式系统中。这些组件包含的功能有服务发现(Eureka),熔断器(Hystrix),智能路由(Zuul)以及客户端的负载均衡器(Ribbon)
简单的来说,Spring Cloud NetFlix这个项目对NetFlix中一些久经考验靠谱的服务发现,熔断,网关,智能路由,以及负载均衡等做了封装,并通过注解的或简单配置的方式提供给Spring Cloud用户用。

What is Hystrix?

Hystrix is a latency and fault tolerance library designed to isolate points of access to remote systems, services and 3rd party libraries, stop cascading failure and enable resilience in complex distributed systems where failure is inevitable.

简单来说Hystrix就是Netflix开源的一款断路器逻辑的实现,用于调用三方服务发生错误时及时断路防止级联错误

Spring-Cloud-Hystrix

Hystrix作为Springcloud中的断路器,当对于一个服务的调用在metrics.rollingStats.timeInMilliseconds(默认10秒)
这段时间内超过circuitBreaker.requestVolumeThreshold(默认20次)次,其中失败的百分比超过了circuitBreaker.errorThresholdPercentage
(默认50%),则这个调用会被断路,调用将不会被执行而直接返回断路响应。

1. 使用Hystrix

在SpringCloud的基础依赖之上,添加依赖:

对于SpringCloud-Finchley及以上版本,添加依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>

对于Finchley之前的版本是:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-hystrix</artifactId>
</dependency>

针对想添加断路器的服务,在对应的服务bean方法上面,添加@HystrixCommand注解:

@Component
public class TestService {
    //一定要是Bean下的方法,并且通过bean调用,断路器才会生效
    @HystrixCommand(fallbackMethod = "defaultReturn")
    public String test(int parameter) throws Exception {
        System.out.println("test is called " + parameter);
        //对于正数返回成功,非正数返回失败
        if (parameter > 0) {
            return "Successful";
        } else {
            throw new Exception("Failed");
        }
    }
    @HystrixCommand(fallbackMethod = "defaultReturn")
    public String test2(int parameter) throws Exception {
        System.out.println("test2 is called " + parameter);
        //对于正数返回成功,非正数返回失败
        if (parameter > 0) {
            return "Successful";
        } else {
            throw new Exception("Failed");
        }
    }
    //失败之后服务降级调用的方法
    public String defaultReturn(int parameter) {
        return "fallback";
    }
}

测试启用断路器:

@SpringBootApplication
@EnableCircuitBreaker
@EnableScheduling
public class Application {

    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).run(args);
    }

}
@Service
public class TestCalling {
    @Autowired
    private TestService testService;

    @Scheduled(fixedDelay = 400)
    public void call() {
        try {
            //随机正负数
            System.out.println(testService.test(ThreadLocalRandom.current().nextInt(10) - 4));
            System.out.println(testService.test2(ThreadLocalRandom.current().nextInt(10) - 4));
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}

通过启动程序,我们可以观察到:

  • 只要test方法失败,就会调用defaultReturn返回defaultReturn的返回
  • 当test方法失败在一段时间内超过一定次数之后,断路器就会打开,断路器打开之后,再调用test方法,test方法体被忽略,直接返回defaultReturn的返回
  • 断路器打开一段时间之后,会再次尝试test方法体,如果成功,断路器关闭,否则断路器还是打开状态,再过一段时间之后才会继续尝试

2. HystrixCommand的多线程特性

如果使用ThreadLocal对于被@HystrixCommand注解的方法进行传参,那么可能ThreadLocal无法正常传参,因为这个方法默认是被另一个Hystrix线程池执行的。
这个和Hystrix隔离策略有关,我们可以通过修改这个隔离策略来实现在同一线程执行HystrixCommand,例如上面的:

@HystrixCommand(fallbackMethod = "defaultReturn",
                commandProperties = {
                  @HystrixProperty(name="execution.isolation.strategy", value="SEMAPHORE")
                })

对于Spring Security的SecurityContext,Hystrix还特殊设置了一个配置:hystrix.shareSecurityContext,如果设置为true,会自动将SecurityContext从调用HystrixCommand的线程传入执行HystrixCommand的线程

对于HystrixCommand的并发特性,还有并发策略HystrixConcurrencyStrategy可以配置。Hystrix只支持单一的HystrixConcurrencyStrategy注册,我们可以实现自己的HystrixConcurrencyStrategy并作为Bean载入到上下文之中。
如果想要自己控制Hystrix的线程池,任务队列等等,可以自己覆盖HystrixConcurrencyStrategy,我们先来看看默认的HystrixConcurrencyStrategy实现:

public abstract class HystrixConcurrencyStrategy {

    /**
     * 获取执行HystrixCommand的线程池,根据HystrixThreadPoolProperties,初始化线程池
     * @param threadPoolKey 线程池key,这个由HystrixCommand生成
     * @param threadPoolProperties HystrixCommand对应的配置
     * @return 线程池
     */
    public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
        final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);

        final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
        final int dynamicCoreSize = threadPoolProperties.coreSize().get();
        final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
        final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
        final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);

        //如果allowMaximumSizeToDivergeFromCoreSize为true,则在设置线程池coreSize的时候会比较下是否大于maximumSize,如果大于就以coreSize设置线程池的core和max大小
        if (allowMaximumSizeToDivergeFromCoreSize) {
            final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
            if (dynamicCoreSize > dynamicMaximumSize) {
                logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
                        dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +
                        dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
                return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
            } else {
                return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
            }
        } else {
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
        }
    }

    /**
     * 获取线程工厂,其实就是命名用,而且设置为守护线程,因为这些线程只处理HystrixCommand(而HystrixCommend又是业务主线程调用过来的)并不会自己主动去执行什么任务。
     */
    private static ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) {
        //检查是否处于Google AppEngine环境,如果并非在Google AppEngine环境下,则返回普通的线程工厂
        if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
            return new ThreadFactory() {

                private final AtomicInteger threadNumber = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
                    thread.setDaemon(true);
                    return thread;
                }

            };
        } else {
            return PlatformSpecific.getAppEngineThreadFactory();
        }
    }

    /**
     * 获取线程池需要的阻塞队列
     */
    public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
        //如果最大队列大小小于等于0,则使用同步队列
        if (maxQueueSize <= 0) {
            return new SynchronousQueue<Runnable>();
        } else {
        //否则,用限行阻塞队列
            return new LinkedBlockingQueue<Runnable>(maxQueueSize);
        }
    }

    /**
     * 封装Callable
     * 可以通过封装Callable来对Hystrix执行的任务做一些处理
     */
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        return callable;
    }

    /**
     * 封装Callable
     * 可以通过封装Callable来对Hystrix执行的任务做一些处理
     */
    public <T> HystrixRequestVariable<T> getRequestVariable(final HystrixRequestVariableLifecycle<T> rv) {
        return new HystrixLifecycleForwardingRequestVariable<T>(rv);
    } 
}

3. Actuator监控中的Hystrix

在引入Actuator相关依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

之后,在/actuator/health(Finchley之后的版本,之前的是/health)接口返回中我们也能看到hystrix的健康状态
但是有个前提,就是需要配置
management.endpoint.health.show-details=always来显示详细信息(这个配置默认的是never)

在断路器关闭的时候,示例返回:

{
    "status": "UP",
    "details": 
        "hystrix": {
            "status": "UP"
        }
    }
}

在断路器打开的时候,示例返回:

{
    "status": "UP",
    "details": 
        "hystrix": {
            "status": "CIRCUIT_OPEN",
            "details": {
                "openCircuitBreakers": ["TestService::test"]
            }
        }
    }
}

这部分可以参考代码HystrixHealthIndicator

4. Hystrix监控

4.1. 开启Hystrix监控流

每个HystrixCommand如何监控,当前调用情况如何,怎样看Hystrix调用线程池的使用情况,可以通过Hystrix监控流知晓
同样的,开启Hystrix监控流,需要先添加Actuator依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

然后添加配置management.endpoints.web.exposure.include=hystrix.stream

在上面程序的基础上,就能开启Hystrix监控流

程序启动后,访问http://127.0.0.1:8080/actuator/hystrix.stream(Finchley之后的版本,之前的是/hystrix.stream),可以看到:

(由于Hystrix是在使用时懒加载,所以如果没有对于HystrixCommand的调用,可能不会有数据,但我们的程序里面有定时调用)

ping: 

data: {"type":"HystrixCommand","name":"test","group":"TestService","currentTime":1534997465495,"isCircuitBreakerOpen":false,"errorPercentage":60,"errorCount":15,"requestCount":25,"rollingCountBadRequests":0,"rollingCountCollapsedRequests":0,"rollingCountEmit":0,"rollingCountExceptionsThrown":0,"rollingCountFailure":15,"rollingCountFallbackEmit":0,"rollingCountFallbackFailure":0,"rollingCountFallbackMissing":0,"rollingCountFallbackRejection":0,"rollingCountFallbackSuccess":15,"rollingCountResponsesFromCache":0,"rollingCountSemaphoreRejected":0,"rollingCountShortCircuited":0,"rollingCountSuccess":10,"rollingCountThreadPoolRejected":0,"rollingCountTimeout":0,"currentConcurrentExecutionCount":0,"rollingMaxConcurrentExecutionCount":1,"latencyExecute_mean":0,"latencyExecute":{"0":0,"25":0,"50":0,"75":1,"90":1,"95":1,"99":3,"99.5":3,"100":4},"latencyTotal_mean":0,"latencyTotal":{"0":0,"25":0,"50":1,"75":1,"90":1,"95":1,"99":3,"99.5":3,"100":4},"propertyValue_circuitBreakerRequestVolumeThreshold":20,"propertyValue_circuitBreakerSleepWindowInMilliseconds":5000,"propertyValue_circuitBreakerErrorThresholdPercentage":50,"propertyValue_circuitBreakerForceOpen":false,"propertyValue_circuitBreakerForceClosed":false,"propertyValue_circuitBreakerEnabled":false,"propertyValue_executionIsolationStrategy":"THREAD","propertyValue_executionIsolationThreadTimeoutInMilliseconds":1000,"propertyValue_executionTimeoutInMilliseconds":1000,"propertyValue_executionIsolationThreadInterruptOnTimeout":true,"propertyValue_executionIsolationThreadPoolKeyOverride":null,"propertyValue_executionIsolationSemaphoreMaxConcurrentRequests":10,"propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests":10,"propertyValue_metricsRollingStatisticalWindowInMilliseconds":10000,"propertyValue_requestCacheEnabled":true,"propertyValue_requestLogEnabled":true,"reportingHosts":1,"threadPool":"TestService"}

data: {"type":"HystrixThreadPool","name":"TestService","currentTime":1534997465495,"currentActiveCount":0,"currentCompletedTaskCount":179,"currentCorePoolSize":10,"currentLargestPoolSize":10,"currentMaximumPoolSize":10,"currentPoolSize":10,"currentQueueSize":0,"currentTaskCount":179,"rollingCountThreadsExecuted":25,"rollingMaxActiveThreads":1,"rollingCountCommandRejections":0,"propertyValue_queueSizeRejectionThreshold":5,"propertyValue_metricsRollingStatisticalWindowInMilliseconds":10000,"reportingHosts":1}

ping: 

data: {"type":"HystrixCommand","name":"test","group":"TestService","currentTime":1534997465995,"isCircuitBreakerOpen":false,"errorPercentage":58,"errorCount":14,"requestCount":24,"rollingCountBadRequests":0,"rollingCountCollapsedRequests":0,"rollingCountEmit":0,"rollingCountExceptionsThrown":0,"rollingCountFailure":15,"rollingCountFallbackEmit":0,"rollingCountFallbackFailure":0,"rollingCountFallbackMissing":0,"rollingCountFallbackRejection":0,"rollingCountFallbackSuccess":15,"rollingCountResponsesFromCache":0,"rollingCountSemaphoreRejected":0,"rollingCountShortCircuited":0,"rollingCountSuccess":10,"rollingCountThreadPoolRejected":0,"rollingCountTimeout":0,"currentConcurrentExecutionCount":0,"rollingMaxConcurrentExecutionCount":1,"latencyExecute_mean":0,"latencyExecute":{"0":0,"25":0,"50":0,"75":1,"90":1,"95":1,"99":3,"99.5":3,"100":4},"latencyTotal_mean":0,"latencyTotal":{"0":0,"25":0,"50":1,"75":1,"90":1,"95":1,"99":3,"99.5":3,"100":4},"propertyValue_circuitBreakerRequestVolumeThreshold":20,"propertyValue_circuitBreakerSleepWindowInMilliseconds":5000,"propertyValue_circuitBreakerErrorThresholdPercentage":50,"propertyValue_circuitBreakerForceOpen":false,"propertyValue_circuitBreakerForceClosed":false,"propertyValue_circuitBreakerEnabled":false,"propertyValue_executionIsolationStrategy":"THREAD","propertyValue_executionIsolationThreadTimeoutInMilliseconds":1000,"propertyValue_executionTimeoutInMilliseconds":1000,"propertyValue_executionIsolationThreadInterruptOnTimeout":true,"propertyValue_executionIsolationThreadPoolKeyOverride":null,"propertyValue_executionIsolationSemaphoreMaxConcurrentRequests":10,"propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests":10,"propertyValue_metricsRollingStatisticalWindowInMilliseconds":10000,"propertyValue_requestCacheEnabled":true,"propertyValue_requestLogEnabled":true,"reportingHosts":1,"threadPool":"TestService"}

data: {"type":"HystrixThreadPool","name":"TestService","currentTime":1534997465995,"currentActiveCount":0,"currentCompletedTaskCount":180,"currentCorePoolSize":10,"currentLargestPoolSize":10,"currentMaximumPoolSize":10,"currentPoolSize":10,"currentQueueSize":0,"currentTaskCount":180,"rollingCountThreadsExecuted":25,"rollingMaxActiveThreads":1,"rollingCountCommandRejections":0,"propertyValue_queueSizeRejectionThreshold":5,"propertyValue_metricsRollingStatisticalWindowInMilliseconds":10000,"reportingHosts":1}

4.2. Hystrix监控面板

添加相关依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId>
</dependency>

之后可以通过@EnableHystrixDashboard注解启动一个Hystrix监控面板:


本文标签: SpringCloudHystrix