【问题标题】:How to update Hazelcast map frequently如何经常更新 Hazelcast 地图
【发布时间】:2013-08-29 12:03:23
【问题描述】:

我正在使用 Hazelcast 2.0.1 频繁更新数据(大约 2 分钟),其中包括先删除然后从数据库加载数据。然而,在某个地方,其中一个线程持有一个键上的锁,这会阻止删除操作并引发异常 (java.util.ConcurrentModificationException: Another thread holds a lock for the key: abc@gmail.com)。请帮助我更新 hazelcast 中的地图。

我在下面给出我的代码

DeltaParallelizer

def customerDetails = dataOperations.getDistributedStore(DataStructures.customer_project.name()).keySet()
ExecutorService service = Hazelcast.getExecutorService()

def result
try{
    customerDetails?.each{customerEmail->
        log.info String.format('Creating delta task for customer:%s',customerEmail)
        def dTask = new DistributedTask(new EagerDeltaTask(customerEmail))
        service.submit(dTask);
    }
    customerDetails?.each {customerEmail ->
        log.info String.format('Creating task customer aggregation for %s',customerEmail)
        def task = new DistributedTask(new EagerCustomerAggregationTask(customerEmail))
        service.submit(task)
    }
}
catch(Exception e){
    e.printStackTrace()
}

EagerDeltaTask

class EagerDeltaTask implements Callable,Serializable {
    private final def emailId
    EagerDeltaTask(email){
        emailId = email
    }
    @Override
    public Object call() throws Exception {
        log.info(String.format("Eagerly computing delta for %s",emailId))       
        def dataOperations = new DataOperator()
        def tx = Hazelcast.getTransaction()
        tx.begin()
        try{
            deleteAll(dataOperations)
            loadAll(dataOperations)
            tx.commit()
        }
        catch(Exception e){
            tx.rollback()
            log.error(String.format('Delta computation is screwed while loading data for the project:%s',emailId),e)
        }       
    }

    private void deleteAll(dataOperations){
        log.info String.format('Deleting entries for customer %s',emailId)      
        def projects = dataOperations.getDistributedStore(DataStructures.customer_project.name()).get(emailId)
        projects?.each{project->
            log.info String.format('Deleting entries for project %s',project[DataConstants.PROJECT_NUM.name()])
            def srs = dataOperations.srs(project[DataConstants.PROJECT_NUM.name()])?.collect{it[DataConstants.SR_NUM.name()]}
            def activitiesStore = dataOperations.getDistributedStore(DataStructures.sr_activities.name())
            srs?.each{sr ->
                activitiesStore.remove(sr)
            }
            dataOperations.getDistributedStore(DataStructures.project_sr_aggregation.name()).remove(project[DataConstants.PROJECT_NUM.name()])
        }       
        dataOperations.getDistributedStore(DataStructures.customer_project.name()).remove(emailId)
    }

    private void loadAll(dataOperations){
        log.info(String.format('Loading entries for customer %s',emailId))
        def projects = dataOperations.projects(emailId)
        projects?.each{project->
            log.info String.format('Loading entries for project %s',project[DataConstants.PROJECT_NUM.name()])
            def srs = dataOperations.srs(project[DataConstants.PROJECT_NUM.name()])
            srs?.each{sr->
                dataOperations.activities(sr[DataConstants.SR_NUM.name()])
            }
        }       
    }   
}

数据运算符

class DataOperator {
def getDistributedStore(String name){
    Hazelcast.getMap(name)
}
}

我在 deleteAll srs 中遇到异常,因此删除了一些地图内容,并且仅为内容被删除的地图加载了新数据,而地图的其余部分具有旧数据。所以我没有在我的 Hazelcast 地图中获得更新的数据。请就如何在我的 Hazelcast 地图中获取更新数据提出您的看法。

这个Hazelcast.getTransaction 客户端也用于此目的吗?

注意:客户可以有多个project_num,1个project_num也可以被多个客户共享 1 project_num 可以有多个 SR_NUM

【问题讨论】:

    标签: java groovy hazelcast


    【解决方案1】:

    我使用了Hazelcast eviction policy,它解决了我的问题。我使用了<time-to-live-seconds>300</time-to-live-seconds>,它每 5 分钟清除一次地图内容,当 UI 对任何地图发出任何请求时,它会从加载器重新加载该地图内容。

    以下是 Hazelcast 地图配置之一

    ...
    <map name="customer_project" >
        <map-store enabled="true">
            <class-name>com.abc.arena.datagrid.loader.CustomerProjectData</class-name>
        </map-store>
        <time-to-live-seconds>300</time-to-live-seconds>
    </map>
    ...
    

    CustomerProjectData loader 类只是将数据从 DB 加载到 map 中。所以现在我不再需要DeltaParallelizerEagerDeltaTask

    也欢迎不同的方法:)

    【讨论】:

      猜你喜欢
      • 2017-08-03
      • 1970-01-01
      • 2015-09-11
      • 1970-01-01
      • 2016-06-03
      • 2020-05-14
      • 2018-01-06
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多