接上篇:

使用命令开启两个服务提供者

java -jar .\hello-0.0.1-SNAPSHOT.jar --server.port=8081
java -jar .\hello-0.0.1-SNAPSHOT.jar --server.port=8082

运行ribbon-consumer,访问 http://localhost:9000/ribbon-consumer

停掉8081服务,刷新页面,会提示错误

Spring Cloud Hystrix
    





            
Spring Cloud Eureka

 

改造ribbon-consumer项目

在pom中加入Hystrix

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-hystrix</artifactId>
        </dependency>

在启动类RibbonConsumerApplication 中加入@EnableCircuitBreaker注解

package org.mythsky.ribbonconsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;

@EnableCircuitBreaker
@EnableDiscoveryClient
@SpringBootApplication
public class RibbonConsumerApplication {
    @Bean
    @LoadBalanced
    RestTemplate restTemplate(){
        return new RestTemplate();
    }

    public static void main(String[] args) {
        SpringApplication.run(RibbonConsumerApplication.class, args);
    }
}
View Code

新增HelloService类,在helloService方法上增加@HystrixCommand

注解来制定回调方法

package org.mythsky.ribbonconsumer.service;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

@Service
public class HelloService {
    @Autowired
    RestTemplate restTemplate;
    @HystrixCommand(fallbackMethod = "helloFallback")
    public String helloService(){
        return restTemplate.getForEntity("http://hello-service/hello",String.class).getBody();
    }

    public String helloFallback(){
        return "error";
    }
}
View Code

修改ConsumerController

package org.mythsky.ribbonconsumer.controller;

import org.mythsky.ribbonconsumer.service.HelloService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

@RestController
public class ConsumerController {
    @Autowired
    HelloService helloService;
//    RestTemplate restTemplate;
    @RequestMapping(value = "/ribbon-consumer",method = RequestMethod.GET)
    public String helloConsumer(){
//        return restTemplate.getForEntity("http://hello-service/hello",String.class).getBody();
        return helloService.helloService();
    }
}
View Code

同样按上面方法启动8081和8082,打开http://localhost:9000/ribbon-consumer

然后停掉8081,多次刷新页面,会直接到error页

Spring Cloud Hystrix
    





            
Spring Cloud Eureka

 

继续改造服务提供者hello-service,模拟服务阻塞,修改HelloController

@RequestMapping(value = "/hello",method = RequestMethod.GET)
    public String index() throws Exception {
        ServiceInstance instance=client.getLocalServiceInstance();
        //让处理线程等待几秒钟
        int sleepTime=new Random().nextInt(3000);
        logger.info("sleepTime:"+sleepTime);
        Thread.sleep(sleepTime);

        logger.info("/hello,host:"+instance.getHost()+", service_id:"+instance.getServiceId());
        return "Hello world";
    }
View Code

按上面的流程重新测试,Hystrix默认超时时间为2000ms,多刷新几次就能看到效果。

新建Spring boot 工程hystrix-dashboard,添加pom引用

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-hystrix</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-hystrix-dashboard</artifactId>
        </dependency>
View Code

在入口类添加@EnableHystrixDashboard注解

package org.mythsky.hystrixdashboard;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.hystrix.dashboard.EnableHystrixDashboard;

@EnableHystrixDashboard
@SpringBootApplication
public class HystrixDashboardApplication {

    public static void main(String[] args) {
        SpringApplication.run(HystrixDashboardApplication.class, args);
    }
}
View Code

添加配置

spring.application.name=hystrix-dashboard
server.port=2001
View Code

启动项目,打开浏览器:http://localhost:2001/hystrix

Spring Cloud Hystrix
    





            
Spring Cloud Eureka

 

修改ribbon-consumer的pom,添加引用

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
View Code

启动ribbon-consumer,可以看到以下节点

Spring Cloud Hystrix
    





            
Spring Cloud Eureka

 

在hystrix-dashboard界面输入监控地址:http://localhost:9000/hystrix.stream  再点击监控,刷新http://localhost:9000/ribbon-consumer 在dashboard界面即可看到相关信息

Spring Cloud Hystrix
    





            
Spring Cloud Eureka

 

接下来体验集群监控

新建spring-boot项目turbine,添加pom引用

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-turbine</artifactId>
        </dependency>
View Code

在入口类添加注解

package org.mythsky.tuibine;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.turbine.EnableTurbine;

@EnableTurbine
@EnableDiscoveryClient
@SpringBootApplication
public class TuibineApplication {

    public static void main(String[] args) {
        SpringApplication.run(TuibineApplication.class, args);
    }
}
View Code

添加配置

server.port=8989
management.port=8990

spring.application.name=tuibine

eureka.client.service-url.defaultZone=http://localhost:1111/eureka/

turbine.app-config=ribbon-consumer
turbine.cluster-name-expression="default"
turbine.combine-host-port=true
View Code

启动项目,然后启动两个服务提供者和两个服务消费者

Spring Cloud Hystrix
    





            
Spring Cloud Eureka

 

在dashboard中对turbine进行监控

Spring Cloud Hystrix
    





            
Spring Cloud Eureka

 hystrix源码分析

首先看@HystrixCommand,然后查看一下这个注解的引用,发现了HystrixCommandAspect,这个切面会拦截所有带@HystrixCommand注解的方法

    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")

    public void hystrixCommandAnnotationPointcut() {
    }

    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
    public void hystrixCollapserAnnotationPointcut() {
    }

    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
        Method method = getMethodFromTarget(joinPoint);
        Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
        if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
            throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                    "annotations at the same time");
        }
        MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
        MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
        HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
        ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
                metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

        Object result;
        try {
            if (!metaHolder.isObservable()) {
                result = CommandExecutor.execute(invokable, executionType, metaHolder);
            } else {
                result = executeObservable(invokable, executionType, metaHolder);
            }
        } catch (HystrixBadRequestException e) {
            throw e.getCause();
        } catch (HystrixRuntimeException e) {
            throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
        }
        return result;
    }
HystrixCommandAspect

默认走了这个方法

Spring Cloud Hystrix
    





            
Spring Cloud Eureka

 

 看下这个方法

public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
        Validate.notNull(invokable);
        Validate.notNull(metaHolder);

        switch (executionType) {
            case SYNCHRONOUS: {
                return castToExecutable(invokable, executionType).execute();
            }
            case ASYNCHRONOUS: {
                HystrixExecutable executable = castToExecutable(invokable, executionType);
                if (metaHolder.hasFallbackMethodCommand()
                        && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                    return new FutureDecorator(executable.queue());
                }
                return executable.queue();
            }
            case OBSERVABLE: {
                HystrixObservable observable = castToObservable(invokable);
                return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
            }
            default:
                throw new RuntimeException("unsupported execution type: " + executionType);
        }
    }
View Code

如果是同步方法,执行return castToExecutable(invokable, executionType).execute();

再看看这里的execute,是HystrixExecutable 接口,定义如下

Spring Cloud Hystrix
    





            
Spring Cloud Eureka

 

 找一下接口的实现

Spring Cloud Hystrix
    





            
Spring Cloud Eureka

 

 

HystrixCommand这个类在 com.netflix.hystrix 包中,下面是execute 方法

public R execute() {
    try {
        return queue().get();
    } catch (Exception e) {
        throw Exceptions.sneakyThrow(decomposeException(e));
    }
}

下面是queue 方法

public Future<R> queue() {
        /*
         * The Future returned by Observable.toBlocking().toFuture() does not implement the
         * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
         * thus, to comply with the contract of Future, we must wrap around it.
         */
        final Future<R> delegate = toObservable().toBlocking().toFuture();
        
        final Future<R> f = new Future<R>() {

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                if (delegate.isCancelled()) {
                    return false;
                }

                if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
                    /*
                     * The only valid transition here is false -> true. If there are two futures, say f1 and f2, created by this command
                     * (which is super-weird, but has never been prohibited), and calls to f1.cancel(true) and to f2.cancel(false) are
                     * issued by different threads, it's unclear about what value would be used by the time mayInterruptOnCancel is checked.
                     * The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption,
                     * than that interruption request cannot be taken back.
                     */
                    interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
                }

                final boolean res = delegate.cancel(interruptOnFutureCancel.get());

                if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                    final Thread t = executionThread.get();
                    if (t != null && !t.equals(Thread.currentThread())) {
                        t.interrupt();
                    }
                }

                return res;
            }

            @Override
            public boolean isCancelled() {
                return delegate.isCancelled();
            }

            @Override
            public boolean isDone() {
                return delegate.isDone();
            }

            @Override
            public R get() throws InterruptedException, ExecutionException {
                return delegate.get();
            }

            @Override
            public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return delegate.get(timeout, unit);
            }
            
        };

        /* special handling of error states that throw immediately */
        if (f.isDone()) {
            try {
                f.get();
                return f;
            } catch (Exception e) {
                Throwable t = decomposeException(e);
                if (t instanceof HystrixBadRequestException) {
                    return f;
                } else if (t instanceof HystrixRuntimeException) {
                    HystrixRuntimeException hre = (HystrixRuntimeException) t;
                    switch (hre.getFailureType()) {
                    case COMMAND_EXCEPTION:
                    case TIMEOUT:
                        // we don't throw these types from queue() only from queue().get() as they are execution errors
                        return f;
                    default:
                        // these are errors we throw from queue() as they as rejection type errors
                        throw hre;
                    }
                } else {
                    throw Exceptions.sneakyThrow(t);
                }
            }
        }

        return f;
    }
View Code

看一下这里的toObservable()

final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
    @Override
    public Observable<R> call() {
        if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
            return Observable.never();
        }
        return applyHystrixSemantics(_cmd);
    }
};

Spring Cloud Hystrix
    





            
Spring Cloud Eureka

 

 

Spring Cloud Hystrix
    





            
Spring Cloud Eureka

 

 Spring Cloud Hystrix
    





            
Spring Cloud Eureka

 

 Spring Cloud Hystrix
    





            
Spring Cloud Eureka

 

 Spring Cloud Hystrix
    





            
Spring Cloud Eureka

 

 Spring Cloud Hystrix
    





            
Spring Cloud Eureka

 

 Spring Cloud Hystrix
    





            
Spring Cloud Eureka

 

 Spring Cloud Hystrix
    





            
Spring Cloud Eureka

 

 可以看到hystrix底层大量使用了RxJava

写个测试代码

import rx.Observable;
import rx.Subscriber;

public class ObServableTest {
    public static void main(String[] args){
        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Hello RxJava");
                subscriber.onNext("I am tom");
                subscriber.onCompleted();
            }
        });
        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("订阅完成");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("订阅出错");
            }

            @Override
            public void onNext(String s) {
                System.out.println("订阅事件:"+s);
            }
        };
        observable.subscribe(subscriber);
    }
}
ObServableTest

运行可以看到

Spring Cloud Hystrix
    





            
Spring Cloud Eureka

 关于RxJava 可以参考:https://www.jianshu.com/p/414f755983f1

相关文章: