【问题标题】:Netflix Hystrix - HystrixObservableCommand asynchronous runNetflix Hystrix - HystrixObservableCommand 异步运行
【发布时间】:2017-01-10 12:40:22
【问题描述】:

我有一些基本项目,它对某些外部资源有四次调用,在当前版本中同步运行。我想要实现的是将调用包装到HystrixObservableCommand 中,然后异步调用它。

根据我的阅读,在HystrixObservableCommand 对象上调用.observe() 后,应该立即异步调用包装的逻辑。但是我做错了,因为它是同步工作的。

在示例代码中,输出为Void,因为我对输出不感兴趣(目前)。这也是为什么我没有将 Observable 分配给任何对象,只是称为constructor.observe()

@Component
public class LoggerProducer {

    private static final Logger LOGGER = Logger.getLogger(LoggerProducer.class);

    @Autowired
    SimpMessagingTemplate template;

    private void push(Iterable<Message> messages, String topic) throws Exception {
        template.convertAndSend("/messages/"+topic, messages);
    }

    public void splitAndPush(Iterable<Message> messages) {

        Map<MessageTypeEnum, List<Message>> groupByMessageType = StreamSupport.stream(messages.spliterator(), true)
                .collect(Collectors.groupingBy(Message::getType));

        //should be async - it's not 
        new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.INFO),
                MessageTypeEnum.INFO.toString().toLowerCase()).observe();
        new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.WARN),
                MessageTypeEnum.WARN.toString().toLowerCase()).observe();
        new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.ERROR),
                MessageTypeEnum.ERROR.toString().toLowerCase()).observe();

    }

    class CommandPushToBrowser extends HystrixObservableCommand<Void> {

        private Iterable<Message> messages;
        private String messageTypeName;

        public CommandPushToBrowser(Iterable<Message> messages, String messageTypeName) {
            super(HystrixCommandGroupKey.Factory.asKey("Messages"));
            this.messageTypeName = messageTypeName;
            this.messages = messages;
        }

        @Override
        protected Observable<Void> construct() {
            return Observable.create(new Observable.OnSubscribe<Void>() {

                @Override
                public void call(Subscriber<? super Void> observer) {
                    try {
                        for (int i = 0 ; i < 50 ; i ++ ) {
                            LOGGER.info("Count: " + i + " messageType " + messageTypeName);
                        }
                        if (null != messages) {
                            push(messages, messageTypeName);
                            LOGGER.info("Message type: " + messageTypeName + " pushed: " + messages);
                        }
                        if (!observer.isUnsubscribed()) {
                            observer.onCompleted();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        observer.onError(e);
                    }
                }
            });
        }
    }
}

那里有一些纯“测试”代码片段,当我试图找出问题时,忽略逻辑,主要关注的是让它与.observe() 异步运行。我知道我可以通过标准HystrixCommand 实现这一目标,但这不是目标。

希望有人帮助:) 问候,

【问题讨论】:

    标签: java asynchronous spring-boot netflix hystrix


    【解决方案1】:

    找到答案:

    "Observables 不会自动添加并发。如果你在建模 同步的,用 Observable 阻塞执行,那么它们将 同步执行。

    您可以通过使用 订阅(Schedulers.io())。这是一个简单的包装示例 使用 Observable 阻塞调用: https://speakerdeck.com/benjchristensen/applying-reactive-programming-with-rxjava-at-goto-chicago-2015?slide=33

    但是,如果你要包装阻塞调用,你应该坚持 使用 HystrixCommand 因为它就是为它而构建的 默认在单独的线程中运行所有内容。使用 HystrixCommand.observe() 会给你并发的,异步的 您正在寻找的作品。

    HystrixObservableCommand 用于包裹异步, 不需要额外线程的非阻塞 Observables。”

    -- Ben Christensen - Netflix 边缘工程

    来源:https://groups.google.com/forum/#!topic/hystrixoss/g7ZLIudE8Rs

    【讨论】:

      猜你喜欢
      • 2023-03-29
      • 2017-07-04
      • 2014-05-20
      • 2017-09-10
      • 2017-10-18
      • 2018-05-27
      • 2017-01-02
      • 2016-03-25
      • 1970-01-01
      相关资源
      最近更新 更多