【问题标题】:Spring-data-jpa: Thread-safe unique insertSpring-data-jpa:线程安全的唯一插入
【发布时间】:2018-07-03 09:22:34
【问题描述】:

我正在尝试使用parallelStream() 模拟分布式应用程序,在数据库上写入,其中条目组合应该是唯一的。但是,我尝试了 @Transactional@Lock 的几个选项,但似乎都没有。

这是代码的一部分,应该可以清楚地说明问题:

AtomicDbService:

@Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.SERIALIZABLE)
public TestEntity atomicInsert(TestEntity testEntity) {
    TestEntityParent testEntityParent = testEntityParentRepository
            .findByStringTwo(testEntity.getTestEntityParent().getStringTwo())
            .orElseGet(() -> testEntityParentRepository.save(TestEntityParent.builder()
                    .stringTwo(testEntity.getTestEntityParent().getStringTwo())
                    .build()));

    return testEnityRepository.findByStringAndTestEntityParentStringTwo(
            testEntity.getString(), testEntity.getTestEntityParent().getStringTwo()
    ).orElseGet(() -> testEnityRepository
            .save(
                    TestEntity.builder()
                            .string(testEntity.getString())
                            .testEntityParent(testEntityParent)
                            .build()
            )
    );
}

测试:

@Test
@Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.SERIALIZABLE)
public void testOperationsParallelStream() {

    List<Integer> list = IntStream.range(0, 3).boxed().collect(Collectors.toList()); 
    list.parallelStream().forEach(lala -> atomicDbService.atomicInsert(testEntity));

    System.out.println(testEnityRepository.findAll());

}

作为输出我得到例如:

[TestEntity(id=4, string=test, testEntityParent=TestEntityParent(id=3, stringTwo=testTwo)), TestEntity(id=5, string=test, testEntityParent=TestEntityParent(id=1, stringTwo=testTwo))]

但实际上应该只有一个结果。更多的线程当然会导致异常。

【问题讨论】:

  • 您没有说明您使用的数据库。该隔离模式是否有效(或如何工作)因数据库而异。
  • 目前我使用嵌入式 H2 进行测试 (runtime('com.h2database:h2')。在生产中我们目前使用 MySQL,但将来可能会切换。
  • 您似乎希望在数据库级别而不是在应用程序级别保持数据一致性,因此这不是线程安全的。当您有多个应用程序(微服务)实例正在运行时,线程安全不适用。在数据库级别,您需要应用某种形式的锁定:要么是悲观的——你需要在事务开始时锁定一行或一个表——要么是乐观的——你会定义一些会导致事务回滚的约束如果他们被违反了。
  • 感谢您的评论。这正是我想要做的。这里我没提,但是save方法注释为悲观写。然而这并没有改变任何东西,据我所知,isolation.SERIALIZABLE 应该做同样的事情吗?

标签: java spring concurrency spring-data-jpa thread-safety


【解决方案1】:

@Transactional 注释不会提供任何应用程序级别的线程安全。您看到的是线程安全问题。使用您使用orElseGet 后跟save 创建的UPSERT 模式,您将需要在应用程序中进行线程级保护。数据库对这种模式一无所知,因为您在不同的事务中创建不同的行。大概是这样的:

@Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.SERIALIZABLE)
    public TestEntity atomicInsert(TestEntity testEntity) {

synchronized(TestEntity.class) {
        TestEntityParent testEntityParent = testEntityParentRepository.findByStringTwo(testEntity.getTestEntityParent().getStringTwo())
                .orElseGet(() -> testEntityParentRepository.save(TestEntityParent.builder()
                        .stringTwo(testEntity.getTestEntityParent().getStringTwo())
                        .build()));

        return testEnityRepository.findByStringAndTestEntityParentStringTwo(
                testEntity.getString(), testEntity.getTestEntityParent().getStringTwo()
        ).orElseGet(() -> testEnityRepository
                .save(
                        TestEntity.builder()
                                .string(testEntity.getString())
                                .testEntityParent(testEntityParent)
                                .build()
                )
        );
    }
}

【讨论】:

  • 这是否意味着,如果说服务而不是两个线程可以做到这一点,我不会有这个问题?
  • 好的,我试过了,它适用于 4 个线程,但是更多的线程再次导致双倍的条目。你能不能给个简短的提示,后面发生了什么?它是否试图强制并行线程的序列化执行?
  • 你确定你的测试数据吗?在您发布的回复中,TestEntity 4 和 5 具有不同的父 ID。根据您那里的逻辑,这似乎应该是正确的。您仍然需要同步,以便父级只创建一次。
  • 如果有两个服务进行调用,您仍然会遇到问题。他们很可能在不同的线程中。尤其是在正常情况下从控制器调用中调用服务时。
  • 嗯,我当然不是第一个尝试创建使用 spring 插入数据库的分布式应用程序的人。所以我希望有某种标准解决方案。
猜你喜欢
  • 1970-01-01
  • 2013-04-04
  • 2019-10-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-08-06
  • 1970-01-01
  • 2018-01-23
相关资源
最近更新 更多