【问题标题】:What is the correct way of managing transactionality in RxJava Services?在 RxJava 服务中管理事务性的正确方法是什么?
【发布时间】:2016-03-18 23:19:47
【问题描述】:

我最近开始尝试使用 RxJava,并看到一位 Netflix 工程师的演示文稿,其中建议将我们的业务 API 迁移到 Observable API,例如:

public interface VideoService {
    Observable<VideoBasicInfo> createVideoBasicInfo(VideoBasicInfo videoBasic);
    Observable<VideoBasicInfo> getVideoBasicInfo(Integer videoId);
    Observable<VideoRating> getVideoRating(Integer videoId);
}

但是,我还没有找到解释如何在此服务中管理事务性的任何地方。一开始我只是用@Transactional注释了我的服务实现@

@Service
@Transactional
public class VideoServiceImpl implements VideoService{

    @Autowired
    private VideoBasicInfoRepository basicInfoRepo;
    @Autowired
    private VideoRatingRepository ratingRepo;

    public Observable<VideoBasicInfo> createVideoBasicInfo(VideoBasicInfo videoBasic){
        return Observable.create( s -> {
            s.onNext(basicInfoRepo.save(videBasic));
        });
    }

我们想要的是,Object.create lambda (s -&gt; { // This code }) 中的所有代码的执行都发生在一个事务中。 然而,实际发生的是:

  1. createVideoBasicInfo() 的调用以事务方式执行,返回冷可观察对象。
  2. save() 作为原子事务执行。

显然这是有道理的,因为 Spring 代理适用于 serviceImpl 方法。我已经想到了一些方法来做我真正期望的事情,比如开始一个程序化事务:

return Observable.create( s -> {
    VideoBasicInfo savedBasic = transactionTemplate.execute( status -> {
        VideoBasicInfo basicInfo = basicInfoRepo.save(videoBasicInfo);
        return basicInfo;
    });
    s.onNext(savedBasic);
});

在使用响应式 API 时,这是管理事务的推荐方式吗?

【问题讨论】:

  • 你的存储库是什么?它是 Spring Data CrudRepository 的实现吗? CrudRepository 有一个标记为事务性的“保存”方法
  • 更不用说一旦你使用了 Observables,一个 api 用户可以很容易地订阅或观察不同的线程,并且 Sping 事务和许多其他组件依赖于每个线程的行为,使用线程局部变量。将 RxJava 与复杂的 Spring 事务一起使用可能会很复杂,但现在 Spring 5 支持 Spring Reactor,我也想了解它应该如何工作。

标签: spring reactive-programming rx-java rx-android netflix


【解决方案1】:

Spring Data JpaRepository 方法签名已经被标记为@Transactional,所以如果你只使用一个,那么你不需要做任何特别的事情:

public interface PersonRepository extends JpaRepository<Person, Integer> {
}

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = {RepositoryConfiguration.class})
public class PersonRepositoryTest {
    private PersonRepository personRepository;

    @Autowired
    public void setPersonRepository(PersonRepository PersonRepository) {
        this.personRepository = PersonRepository;
    }

    @Test
    public void testReactiveSavePerson() {
        Person person = new Person("Jane", "Doe");
        assertNull(person.getId()); //null before save

        //save person
        Observable.create(s -> {
            s.onNext(personRepository.save(person));
        }).subscribe();

        //fetch from DB
        Person fetchedPerson = personRepository.findOne(person.getId());

        //should not be null
        assertNotNull(fetchedPerson);

        //should equal
        assertEquals(person.getId(), fetchedPerson.getId());
        assertEquals(person.getFirstName(), fetchedPerson.getFirstName());
    }
}

如果您需要将多个存储库合并到一个事务中,您可以使用类似下面的类:

@Component()
public class ObservableTxFactory {
    public final <T> Observable<T> create(Observable.OnSubscribe<T> f) {
        return new ObservableTx<>(this, f);
    }

    @Transactional
    public void call(Observable.OnSubscribe onSubscribe, Subscriber subscriber) {
        onSubscribe.call(subscriber);
    }

    private static class ObservableTx<T> extends Observable<T> {

        public ObservableTx(ObservableTxFactory observableTxFactory, OnSubscribe<T> f) {
            super(new OnSubscribeDecorator<>(observableTxFactory, f));
        }
    }

    private static class OnSubscribeDecorator<T> implements Observable.OnSubscribe<T> {

        private final ObservableTxFactory observableTxFactory;
        private final Observable.OnSubscribe<T> onSubscribe;

        OnSubscribeDecorator(final ObservableTxFactory observableTxFactory, final Observable.OnSubscribe<T> s) {
            this.onSubscribe = s;
            this.observableTxFactory = observableTxFactory;
        }

        @Override
        public void call(Subscriber<? super T> subscriber) {
            observableTxFactory.call(onSubscribe, subscriber);
        }
    }
}

还需要定义工厂 bean:

@Bean
ObservableTxFactory observableTxFactory() {
    return new ObservableTxFactory();
}

服务:

@Service
public class PersonService {
    @Autowired
    PersonRepository personRepository;
    @Autowired
    ObservableTxFactory observableTxFactory;

    public Observable<Person> createPerson(String firstName, String lastName) {
        return observableTxFactory.create(s -> {
            Person p = new Person(firstName, lastName);
            s.onNext(personRepository.save(p));
        });
    }
}

测试:

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = {RepositoryConfiguration.class})
public class PersonServiceTest {
    @Autowired
    PersonRepository personRepository;
    @Autowired
    ObservableTxFactory observableTxFactory;

    @Test
    public void testPersonService() {
        final PersonService service = new PersonService();
        service.personRepository = personRepository;
        service.observableTxFactory = observableTxFactory;

        final Observable<Person> personObservable = service.createPerson("John", "Doe");
        personObservable.subscribe();

        //fetch from DB
        final Person fetchedPerson = StreamSupport.stream(personRepository.findAll().spliterator(), false)
                .filter(p -> p.getFirstName().equals("John") && p.getLastName().equals("Doe"))
                .findFirst()
                .get();

        //should not be null
        assertNotNull(fetchedPerson);
    }

}

显示代理的屏幕截图:

【讨论】:

  • 嗨,John,它们是 Spring Data JpaRepositositories。但是我想要实现的是服务层的事务性,也就是说在同一个事务中调用多个 DAO。可以使用 transactionTemplate 来完成,但我想知道是否有替代方法
  • JpaRepository 是 CrudRepository 的子类;在这个例子中它的效果是一样的。所以您希望多个存储库参与同一个事务?这不是您的问题中所述的问题......您写道:“我们想要的是 basicInfoRepo.save() 的执行发生在事务中。”
  • 我更新了代码来展示服务实现和测试
  • 嗨,约翰,我刚刚测试了您的解决方案,但它不起作用。请注意,ObservableTx.create 实际上调用了Observable.create,后者调用了return new Observable&lt;T&gt;(hook.onCreate(f));,因此永远不会创建新的 ObservableTx。此外,尽管@Transactional ObservableTx.call 很可能被调用,但它不是事务性的,因为它不是 Spring bean。它来自构造函数调用。
  • 我添加了 public final static &lt;T&gt; ObservableTx&lt;T&gt; createTx(OnSubscribe&lt;T&gt; f) { return new ObservableTx&lt;T&gt;(hook.onCreate(f)); } 并验证 call() 不会以事务方式执行。
【解决方案2】:

我想借鉴John Scattergood 的出色回答。我的典型用法是使用Observable.fromCallable(),所以我正在寻找一种方法来做到这一点,而不是实现Observable.OnSubscribe,所以我调整了他的技术,这样你就可以通过传入Callable来使用它

工厂类:

@Component
public class ObservableTxFactory {
    public final <T> Observable.OnSubscribe<T> createFromCallable(Callable<? extends T> resultFactory) {
        return new OnSubscribeDecorator<>(this, resultFactory);
    }

    @SuppressWarnings("unchecked")
    @Transactional
    public <T> void call(Callable<? extends T> resultFactory, Subscriber subscriber) {
        final SingleDelayedProducer<T> singleDelayedProducer = new SingleDelayedProducer<>(subscriber);

        subscriber.setProducer(singleDelayedProducer);

        try {
            singleDelayedProducer.setValue(resultFactory.call());
        } catch (Throwable t) {
            Exceptions.throwOrReport(t, subscriber);
        }
    }

    private static class OnSubscribeDecorator<T> implements Observable.OnSubscribe<T> {

        private final ObservableTxFactory observableTxFactory;
        private final Callable<? extends T> resultFactory;

        OnSubscribeDecorator(final ObservableTxFactory observableTxFactory, Callable<? extends T> resultFactory) {
            this.resultFactory = resultFactory;
            this.observableTxFactory = observableTxFactory;
        }

        @Override
        public void call(Subscriber<? super T> subscriber) {
            observableTxFactory.call(resultFactory, subscriber);
        }
    }
}

原代码:

Observable.fromCallable(() -> fooRepository.findOne(fooID));

新代码:

Observable.create(observableTxFactory.createFromCallable(() -> fooRepository.findOne(fooID)));

确保你添加 @Transactional 的方法是 public 否则 Spring AOP 将无法提供建议

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-12-06
    • 2021-07-02
    相关资源
    最近更新 更多