【问题标题】:Implementation of Infinispan cache in Wildfly 9 AS (considering clustering)在 Wildfly 9 AS 中实现 Infinispan 缓存(考虑集群)
【发布时间】:2023-03-17 15:55:02
【问题描述】:

情况:

我有一张包含数千条记录的清算表。它们被分成例如包。 500 条记录。然后每个数据包通过消息驱动 Beans 发送到 AS。 AS根据每条记录的内容(如货币、validStart、validEnd)计算出一个key,并需要将该key(连同内容的组合)存储在数据库中。

请求:

为了避免重复,我需要一个集中的“工具”来计算密钥并存储它们,从而通过缓存这些密钥和记录来减少与数据库的通信。

现在我尝试为每个包处理线程使用在实用程序类实现中访问的本地 Infinispan 缓存。这导致了这样一个事实,即多个包计算了相同的密钥,因此在数据库中插入了重复项。或者有时我遇到了死锁。

我尝试通过静态变量实现“锁定”以在数据库插入期间阻止对缓存的访问,但没有成功。 下一个尝试是使用复制的、分别分布的 Infinispan 缓存。这并没有改变 AS 行为的结果。

我的最后一个想法是实现为 bean 管理的单例会话 bean,以在插入数据库期间获取事务锁。

AS 目前以单机模式运行,但近期会迁移到集群中,因此首选高可用性解决方案。

恢复中:

在创建(键、值)对以避免重复时锁定 Infinispan 缓存访问的正确方法是什么?

更新:

@cruftex:我的请求是:我有一组(键,值)对,应该被缓存。如果应该插入新记录,则对其应用算法并计算密钥。然后检查缓存是否存在键,并将值附加到新记录中。但如果 Value 不存在,则应创建并存储在数据库中。

缓存需要使用 Infinispan 来实现,因为 AS 应该运行在集群中。存在用于创建密钥的算法。也在数据库中插入值(通过 JDBC 或实体)。但我有一个问题,即使用消息驱动 Bean(因此在 AS 中使用多线程)相同的(键、值)对是在不同的线程中计算的,因此每个线程都试图在数据库中插入值(我想避免!)。

@戴夫:

public class Cache {
     private static final Logger log = Logger.getLogger(Cache.class);
     private final Cache<Key, FullValueViewer> fullCache;
     private HomeCache homes;        // wraps EntityManager
     private final Session session;

     public Cache(Session session, EmbeddedCacheManager cacheContainer, HomeCache homes) {
         this.session = session;
         this.homes = homes;
         fullCache = cacheContainer.getCache(Const.CACHE_CONDCOMBI);
     } 

     public Long getId(FullValueViewer viewerWithoutId) {
         Long result = null;

         final Key key = new Key(viewerWithoutId);
         FullValueViewer view = fullCache.get(key);

         if(view == null) {
             view = checkDatabase(viewerWithoutId);
             if(view != null) {
                 fullCache.put(key, view);
             }
         }

         if(view == null) {
             view = createValue(viewerWithoutId);

             // 1. Try
             fullCache.put(key, view);

             // 2. Try
             //      if(!fullCache.containsKey(key)) {
             //           fullCache.put(key, view);
             //       } else {
             //           try {
             //               homes.condCombi().remove(view.idnr);
             //           } catch (Exception e) {
             //               log.error("remove", e);
             //           }
             //       }

             // 3. Try
             //       synchronized(fullCache) {
             //           view = createValue(viewerWithoutId);
             //           fullCache.put(key, view);
             //       }
         }
         result = view.idnr;
         return result;
     }

     private FullValueViewer checkDatabase(FullValueViewer newView) {
         FullValueViewer result = null;
         try {
             CondCombiBean bean = homes.condCombi().findByTypeAndKeys(_parameters_);
             result = bean.getAsView();
         } catch (FinderException e) {
         }
         return result;
     }

     private FullValueViewer createValue(FullValueViewer newView) {
         FullValueViewer result = null;
         try {
             CondCombiBean bean = homes.condCombi().create(session.subpk);
             bean.setFromView(newView);
             result = bean.getAsView();
         } catch (Exception e) {
             log.error("createValue", e);
         }
         return result;
     }

     private class Key {

         private final FullValueViewer view;

         public Key(FullValueViewer v) {
            this.view = v;
         }

         @Override
         public int hashCode() {
             _omitted_
         }

         @Override
         public boolean equals(Object obj) {
             _omitted_
         }
     }
 }

我尝试使用 Wildfly 的缓存配置:

<cache-container name="server" default-cache="default" module="org.wildfly.clustering.server">
   <local-cache name="default">
      <transaction mode="BATCH"/>
   </local-cache>
</cache-container>

<cache-container name="server" default-cache="default" module="org.wildfly.clustering.server">
   <transport lock-timeout="60000"/>
   <distributed-cache name="default" mode="ASYNC"/>
</cache-container>

【问题讨论】:

  • 请发布您实际编写/尝试过的内容; Stack Overflow 在这里帮助您解决编码问题,而不是为您编写所有内容:)
  • 不清楚你要什么。您想知道如何生成密钥吗?在缓存中存储一​​些东西?或者使用缓存生成密钥(为什么?)?应该如何从数据中导出key?如果您想生成唯一的密钥,例如查看 UUID 算法,您不需要任何通信。

标签: java caching cluster-computing wildfly infinispan


【解决方案1】:

我只会回答简历问题:

你不能锁定整个缓存;那不会扩展。最好的方法是使用cache.putIfAbsent(key, value) 操作,如果条目已经存在则生成不同的键(或使用列表作为值并使用条件cache.replace(key, oldValue, newValue) 替换它)。

如果你想真正禁止写入某个键,你可以使用带有悲观锁定策略的事务缓存,并发出cache.getAdvancedCache().lock(key)。请注意,没有解锁:当事务通过事务管理器提交/回滚时,所有​​锁都会被释放。

【讨论】:

  • 我在使用 putIfAbsent 时遇到的问题是,我需要进行数据库插入以检索主键(并将其附加到值),然后才允许我将值放入缓存中.替换也是禁止的,因为这意味着我创建了一个副本。
  • 因此,如果您从 DB 获取(唯一)密钥,则不能重复。
  • 该表包含一个主自动增量键和两个不可为空的列,每个组合都应该是唯一的(就像没有唯一约束的组合键)。即使我添加了唯一约束,我也不会插入重复项,但这会使通过解析错误代码等来进行 SQL 异常处理(对我来说似乎是错误和丑陋的)
  • 那你是用这两列在缓存中形成一个key吗?因为如果只是一个值,就无法锁定缓存来禁止任何其他包含相同值的条目。
【解决方案2】:

您不能生成自己的密钥并同时使用它来检测重复项。

每个数据行要么保证只到达一次,要么需要包含来自生成它的外部系统的唯一标识符。

如果数据中有一个唯一标识符,如果一切都出错了,并且其中没有 id,则只是所有属性连接在一起,那么您需要使用它来检查重复项。

现在您可以直接使用该唯一标识符,或生成自己的内部标识符。如果你做后者,你需要从外部 id 到内部 id 的翻译。

如果有重复到达,则需要在生成内部id时根据外部id加锁,然后记录你分配的内部id。

要生成唯一的长值序列,在集群中,您可以使用缓存的 CAS 操作。例如这样的:

@NotThreadSafe
class KeyGeneratorForOneThread {

  final String KEY = "keySequenceForXyRecords";
  final int INTERVAL = 100;
  Cache<String,Long> cache = ...;
  long nextKey = 0;
  long upperBound = -1;

  void requestNewInterval() {
    do {
      nextKey = cache.get(KEY);
      upperBound = nextKey + INTERVAL;
    } while (!cache.replace(KEY, nextKey, upperBound));
  } 

  long generateKey() {
    if (nextKey >= upperBound) {
     requestNewInterval();
    }
    return nextKey++;
  }
}

每个线程都有自己的密钥生成器,无需协调即可生成 100 个密钥。

您可能需要单独的缓存:

  • 通过外部 id 锁定
  • 从外部到内部 id 查找
  • 序列号,注意其实不是缓存,因为重启后必须知道最后一个号
  • 数据的内部 ID

【讨论】:

    【解决方案3】:

    我们找到了一个适用于我们的案例的解决方案,并且可能对其他人有帮助:

    我们有两个主要组件,一个缓存类和一个单例 bean。
    缓存包含当前存在于数据库中的所有记录的副本和大量逻辑。
    单例 bean 可以访问 infinispan-cache 并用于创建新记录。

    最初,缓存从单例 bean 中获取 infinispan-cache 的副本。然后,如果我们在缓存中搜索一条记录,我们首先应用一种哈希方法,该方法为该记录计算一个 unqiue 键。使用此键,我们可以确定是否需要将记录添加到数据库中。 如果是这样,那么缓存使用带有 @Lock(WRITE) 注释的创建方法调用单例 bean。 create 方法首先检查该值是否包含在 infinispan-cache 中,如果没有,则创建一条新记录。

    使用这种方法我们可以保证,即使缓存在多个线程中使用,并且每个线程都发送请求在数据库中创建相同的记录,创建过程被锁定并且不会继续执行所有后续请求,因为该值是已在之前的请求中创建。

    【讨论】:

      猜你喜欢
      • 2016-06-08
      • 2013-04-02
      • 2012-10-30
      • 2017-09-07
      • 2013-06-23
      • 2015-12-13
      • 2018-05-25
      • 2021-02-18
      • 2017-03-26
      相关资源
      最近更新 更多