【发布时间】:2013-08-27 17:35:32
【问题描述】:
我的目标是解析一个大的 XML 文件,并根据 XML 数据将对象持久保存到数据库中,并快速完成。该操作需要是事务性的,以便在解析 XML 时出现问题或无法验证创建的对象时可以回滚。
我正在使用Grails Executor plugin 线程化操作。问题是我在服务中创建的每个线程都有自己的事务和会话。如果我创建 4 个线程并且 1 个失败,则未失败的 3 个会话可能已经刷新,或者它们将来可能会刷新。
我在想是否可以告诉每个线程使用“当前”的 Hibernate 会话,这可能会解决我的问题。我的另一个想法是,我可以阻止所有会话刷新,直到知道所有会话都已完成且没有错误。不幸的是,我不知道如何做这两件事。
还有一个额外的问题。有许多这样的 XML 文件需要解析,而且还有许多将在未来创建。其中许多 XML 文件包含的数据在解析时会创建一个与解析前一个 XML 文件时已经创建的对象相同的对象。在这种情况下,我需要引用现有对象。我为每个类添加了一个瞬态isUnique 变量来解决这个问题。使用 Grails unique 约束不起作用,因为它没有考虑 hasMany 关系,正如我在问题 here 中所概述的那样。
下面的例子与实物相比非常简单。我正在解析的 XML 文件包含具有许多属性的深层嵌套元素。
想象以下领域类:
class Foo {
String ver
Set<Bar> bars
Set<Baz> bazs
static hasMany = [bars: Bar, bazs: Baz]
boolean getIsUnique() {
Util.isUnique(this)
}
static transients = [
'isUnique'
]
static constraints = {
ver(nullable: false)
isUnique(
validator: { val, obj ->
obj.isUnique
}
)
}
}
class Bar {
String name
boolean getIsUnique() {
Util.isUnique(this)
}
static transients = [
'isUnique'
]
static constraints = {
isUnique(
validator: { val, obj ->
obj.isUnique
}
)
}
}
class Baz {
String name
boolean getIsUnique() {
Util.isUnique(this)
}
static transients = [
'isUnique'
]
static constraints = {
isUnique(
validator: { val, obj ->
obj.isUnique
}
)
}
}
这是我的Util.groovy 类,位于我的src/groovy 文件夹中。此类包含我用来确定域类的实例是否唯一和/或检索已经存在的相等实例的方法:
import org.hibernate.Hibernate
class Util {
/**
* Gets the first instance of the domain class of the object provided that
* is equal to the object provided.
*
* @param obj
* @return the first instance of obj's domain class that is equal to obj
*/
static def getFirstDuplicate(def obj) {
def objClass = Hibernate.getClass(obj)
objClass.getAll().find{it == obj}
}
/**
* Determines if an object is unique in its domain class
*
* @param obj
* @return true if obj is unique, otherwise false
*/
static def isUnique(def obj) {
getFirstDuplicate(obj) == null
}
/**
* Validates all of an object's constraints except those contained in the
* provided blacklist, then saves the object if it is valid.
*
* @param obj
* @return the validated object, saved if valid
*/
static def validateWithBlacklistAndSave(def obj, def blacklist = null) {
def propertiesToValidate = obj.domainClass.constraints.keySet().collectMany{!blacklist?.contains(it)? [it] : []}
if(obj.validate(propertiesToValidate)) {
obj.save(validate: false)
}
obj
}
}
想象一下 XML 文件“A”与此类似:
<foo ver="1.0">
<!-- Start bar section -->
<bar name="bar_1"/>
<bar name="bar_2"/>
<bar name="bar_3"/>
...
<bar name="bar_5000"/>
<!-- Start baz section -->
<baz name="baz_1"/>
<baz name="baz_2"/>
<baz name="baz_3"/>
...
<baz name="baz_100000"/>
</foo>
并且想象 XML 文件“B”与此类似(与 XML 文件“A”相同,除了添加了一个新的bar 和一个新的baz)。在 XML 文件“A”之后解析 XML 文件“B”时,应创建三个新对象 1.) A Bar 和 name = bar_5001 2.) A Baz 和 name = baz_100001, 3.) A Foo使用ver = 2.0 和bars 和bazs 的列表等于显示的内容,重用从XML 文件A 的导入中已经存在的Bar 和Baz 的实例:
<foo ver="2.0">
<!-- Start bar section -->
<bar name="bar_1"/>
<bar name="bar_2"/>
<bar name="bar_3"/>
...
<bar name="bar_5000"/>
<bar name="bar_5001"/>
<!-- Start baz section -->
<baz name="baz_1"/>
<baz name="baz_2"/>
<baz name="baz_3"/>
...
<baz name="baz_100000"/>
<baz name="baz_100001"/>
</foo>
还有一个类似这样的服务:
class BigXmlFileUploadService {
// Pass in a 20MB XML file
def upload(def xml) {
String rslt = null
def xsd = Util.getDefsXsd()
if(Util.validateXmlWithXsd(xml, xsd)) { // Validate the structure of the XML file
def fooXml = new XmlParser().parseText(xml.getText()) // Parse the XML
def bars = callAsync { // Make a thread for creating the Bar objects
def bars = []
for(barXml in fooXml.bar) { // Loop through each bar XML element inside the foo XML element
def bar = new Bar( // Create a new Bar object
name: barXml.attribute("name")
)
bar = retrieveExistingOrSave(bar) // If an instance of Bar that is equal to this one already exists then use it
bars.add(bar) // Add the new Bar object to the list of Bars
}
bars // Return the list of Bars
}
def bazs = callAsync { // Make a thread for creating the Baz objects
def bazs = []
for(bazXml in fooXml.baz) { // Loop through each baz XML element inside the foo XML element
def baz = new Baz( // Create a new Baz object
name: bazXml.attribute("name")
)
baz = retrieveExistingOrSave(baz) // If an instance of Baz that is equal to this one already exists then use it
bazs.add(baz) // Add the new Baz object to the list of Bazs
}
bazs // Return the list of Bazs
}
bars = bars.get() // Wait for thread then call Future.get() to get list of Bars
bazs = bazs.get() // Wait for thread then call Future.get() to get list of Bazs
def foo = new Foo( // Create a new Foo object with the list of Bars and Bazs
ver: fooXml.attribute("ver")
bars: bars
bazs: bazs
).save()
rslt = "Successfully uploaded ${xml.getName()}!"
} else {
rslt = "File failed XSD validation!"
}
rslt
}
private def retrieveExistingOrSave(def obj, def existingObjCache) {
def dup = Util.getFirstDuplicate(obj)
obj = dup ?: Util.validateWithBlacklistAndSave(obj, ["isUnique"])
if(obj.errors.allErrors) {
log.error "${obj} has errors ${obj.errors}"
throw new RuntimeException() // Force transaction to rollback
}
obj
}
}
所以问题是我如何让我的服务的 upload 方法中发生的所有事情都像在单个会话中发生的那样运行,这样如果任何一个部分发生故障,发生的所有事情都可以回滚?
【问题讨论】:
标签: multithreading hibernate grails executor