【问题标题】:How to create a service with multiple threads which all use the same Hibernate session?如何创建具有多个线程的服务,这些线程都使用相同的 Hibernate 会话?
【发布时间】:2013-08-27 17:35:32
【问题描述】:

我的目标是解析一个大的 XML 文件,并根据 XML 数据将对象持久保存到数据库中,并快速完成。该操作需要是事务性的,以便在解析 XML 时出现问题或无法验证创建的对象时可以回滚。

我正在使用Grails Executor plugin 线程化操作。问题是我在服务中创建的每个线程都有自己的事务和会话。如果我创建 4 个线程并且 1 个失败,则未失败的 3 个会话可能已经刷新,或者它们将来可能会刷新。

我在想是否可以告诉每个线程使用“当前”的 Hibernate 会话,这可能会解决我的问题。我的另一个想法是,我可以阻止所有会话刷新,直到知道所有会话都已完成且没有错误。不幸的是,我不知道如何做这两件事。

还有一个额外的问题。有许多这样的 XML 文件需要解析,而且还有许多将在未来创建。其中许多 XML 文件包含的数据在解析时会创建一个与解析前一个 XML 文件时已经创建的对象相同的对象。在这种情况下,我需要引用现有对象。我为每个类添加了一个瞬态isUnique 变量来解决这个问题。使用 Grails unique 约束不起作用,因为它没有考虑 hasMany 关系,正如我在问题 here 中所概述的那样。

下面的例子与实物相比非常简单。我正在解析的 XML 文件包含具有许多属性的深层嵌套元素。

想象以下领域类:

class Foo {
    String ver

    Set<Bar> bars
    Set<Baz> bazs
    static hasMany = [bars: Bar, bazs: Baz]

    boolean getIsUnique() {
        Util.isUnique(this)
    }
    static transients = [
        'isUnique'
    ]

    static constraints = {
        ver(nullable: false)
        isUnique(
            validator: { val, obj ->
                obj.isUnique
            }
        )
    }
}

class Bar {
    String name

    boolean getIsUnique() {
        Util.isUnique(this)
    }
    static transients = [
        'isUnique'
    ]

    static constraints = {
        isUnique(
            validator: { val, obj ->
                obj.isUnique
            }
        )
    }
}


class Baz {
    String name

    boolean getIsUnique() {
        Util.isUnique(this)
    }
    static transients = [
        'isUnique'
    ]

    static constraints = {
        isUnique(
            validator: { val, obj ->
                obj.isUnique
            }
        )
    }
}

这是我的Util.groovy 类,位于我的src/groovy 文件夹中。此类包含我用来确定域类的实例是否唯一和/或检索已经存在的相等实例的方法:

import org.hibernate.Hibernate

class Util {
    /**
     * Gets the first instance of the domain class of the object provided that
     * is equal to the object provided.
     *
     * @param obj
     * @return the first instance of obj's domain class that is equal to obj
     */
    static def getFirstDuplicate(def obj) {
        def objClass = Hibernate.getClass(obj)
        objClass.getAll().find{it == obj}
    }

    /**
     * Determines if an object is unique in its domain class
     *
     * @param obj
     * @return true if obj is unique, otherwise false
     */
    static def isUnique(def obj) {
        getFirstDuplicate(obj) == null
    }

    /**
     * Validates all of an object's constraints except those contained in the
     * provided blacklist, then saves the object if it is valid.
     *
     * @param obj
     * @return the validated object, saved if valid
     */
    static def validateWithBlacklistAndSave(def obj, def blacklist = null) {
        def propertiesToValidate = obj.domainClass.constraints.keySet().collectMany{!blacklist?.contains(it)?  [it] : []}
        if(obj.validate(propertiesToValidate)) {
            obj.save(validate: false)
        }
        obj
    }
}

想象一下 XML 文件“A”与此类似:

    <foo ver="1.0">
        <!-- Start bar section -->
        <bar name="bar_1"/>
        <bar name="bar_2"/>
        <bar name="bar_3"/>
        ...
        <bar name="bar_5000"/>

        <!-- Start baz section -->
        <baz name="baz_1"/>
        <baz name="baz_2"/>
        <baz name="baz_3"/>
        ...
        <baz name="baz_100000"/>
    </foo>

并且想象 XML 文件“B”与此类似(与 XML 文件“A”相同,除了添加了一个新的bar 和一个新的baz)。在 XML 文件“A”之后解析 XML 文件“B”时,应创建三个新对象 1.) A Barname = bar_5001 2.) A Bazname = baz_100001, 3.) A Foo使用ver = 2.0barsbazs 的列表等于显示的内容,重用从XML 文件A 的导入中已经存在的BarBaz 的实例:

    <foo ver="2.0">
        <!-- Start bar section -->
        <bar name="bar_1"/>
        <bar name="bar_2"/>
        <bar name="bar_3"/>
        ...
        <bar name="bar_5000"/>
        <bar name="bar_5001"/>

        <!-- Start baz section -->
        <baz name="baz_1"/>
        <baz name="baz_2"/>
        <baz name="baz_3"/>
        ...
        <baz name="baz_100000"/>
        <baz name="baz_100001"/>
    </foo>

还有一个类似这样的服务:

class BigXmlFileUploadService {

    // Pass in a 20MB XML file
    def upload(def xml) {
        String rslt = null
        def xsd = Util.getDefsXsd()
        if(Util.validateXmlWithXsd(xml, xsd)) { // Validate the structure of the XML file
            def fooXml = new XmlParser().parseText(xml.getText()) // Parse the XML

            def bars = callAsync { // Make a thread for creating the Bar objects
                def bars = []
                for(barXml in fooXml.bar) { // Loop through each bar XML element inside the foo XML element
                    def bar = new Bar( // Create a new Bar object
                        name: barXml.attribute("name")
                    )
                    bar = retrieveExistingOrSave(bar) // If an instance of Bar that is equal to this one already exists then use it
                    bars.add(bar) // Add the new Bar object to the list of Bars
                }
                bars // Return the list of Bars
            }

            def bazs = callAsync { // Make a thread for creating the Baz objects
                def bazs = []
                for(bazXml in fooXml.baz) { // Loop through each baz XML element inside the foo XML element
                    def baz = new Baz( // Create a new Baz object
                        name: bazXml.attribute("name")
                    )
                    baz = retrieveExistingOrSave(baz) // If an instance of Baz that is equal to this one already exists then use it
                    bazs.add(baz) // Add the new Baz object to the list of Bazs
                }
                bazs // Return the list of Bazs
            }

            bars = bars.get() // Wait for thread then call Future.get() to get list of Bars
            bazs = bazs.get() // Wait for thread then call Future.get() to get list of Bazs

            def foo = new Foo( // Create a new Foo object with the list of Bars and Bazs
                ver: fooXml.attribute("ver")
                bars: bars
                bazs: bazs
            ).save()

            rslt = "Successfully uploaded ${xml.getName()}!"
        } else {
            rslt = "File failed XSD validation!"
        }
        rslt
    }

    private def retrieveExistingOrSave(def obj, def existingObjCache) {
        def dup = Util.getFirstDuplicate(obj)
        obj = dup ?: Util.validateWithBlacklistAndSave(obj, ["isUnique"])
        if(obj.errors.allErrors) {
            log.error "${obj} has errors ${obj.errors}"
            throw new RuntimeException() // Force transaction to rollback
        }
        obj
    }
}

所以问题是我如何让我的服务的 upload 方法中发生的所有事情都像在单个会话中发生的那样运行,这样如果任何一个部分发生故障,发生的所有事情都可以回滚?

【问题讨论】:

    标签: multithreading hibernate grails executor


    【解决方案1】:

    你可能无法做你想做的事。

    首先,Hibernate 会话is not thread-safe

    Session 是一个廉价的、非线程安全的对象,应该使用一次然后丢弃:单个请求、对话或单个工作单元。 ...

    其次,我认为并行执行 SQL 查询不会带来太多好处。我查看了PostgreSQL's JDBC driver 的工作原理,所有实际运行查询的方法都是synchronized

    您正在做的最慢的部分可能是 XML 处理,因此我建议将其并行化并在单个线程上执行持久性。您可以创建多个工作人员来读取 XML 并将对象添加到某种队列中。然后让另一个拥有 Session 并在解析对象时保存对象的工作人员。

    您可能还想查看 Hibernate 的 batch processing 文档页面。每次插入后冲洗并不是最快的方法。

    最后,我不知道您的对象是如何映射的,但您可能会在保存 Foo 所有子对象之后遇到问题。将对象添加到foo 的集合将导致Hibernate 为每个对象设置foo_id 引用,您最终会为插入的每个对象生成一个更新查询。您可能想先创建foo,然后在每次插入之前创建baz.setFoo(foo)

    【讨论】:

    • 感谢您的回复。我知道 XML 数据的实际解析不是我花这么长时间的事情。我可以解析整个 20MB 的 XML 文件并创建我想要的所有对象(无需持久化它们)并在一分钟内将它们写回到一个新文件中。当我真正开始持久化对象时,时间消耗就来了。我会查看您链接到的 Hibernate 文档并尝试您的其他一些建议,然后发表评论。
    【解决方案2】:

    可以优化服务以解决一些痛点:

    • 我同意@takteek,解析xml 会很耗时。所以,计划使这部分异步。
    • 您不需要在每次创建子对象时使用flush。请参阅下面的优化。

    服务类看起来像:

    // Pass in a 20MB XML file
    def upload(def xml) {
        String rslt = null
        def xsd = Util.getDefsXsd()
        if (Util.validateXmlWithXsd(xml, xsd)) {
            def fooXml = new XmlParser().parseText(xml.getText())
    
            def foo = new Foo().save(flush: true)
    
            def bars = callAsync {
                saveBars(foo, fooXml)
            }
    
            def bazs = callAsync {
                saveBazs(foo, fooXml)
            }
    
            //Merge the detached instances and check whether the child objects
            //are populated or not. If children are 
            //Can also issue a flush, but we do not need it yet
            //By default domain class is validated as well.
            foo = bars.get().merge() //Future returns foo
            foo = bazs.get().merge() //Future returns foo
    
            //Merge the detached instances and check whether the child objects
            //are populated or not. If children are 
            //absent then rollback the whole transaction
            handleTransaction {
                 if(foo.bars && foo.bazs){
                    foo.save(flush: true)
                } else {
                    //Else block will be reached if any of 
                    //the children is not associated to parent yet
                    //This would happen if there was a problem in 
                    //either of the thread, corresponding
                    //transaction would have rolled back 
                    //in the respective sessions. Hence empty associations.
    
                    //Set transaction roll-back only
                       TransactionAspectSupport
                           .currentTransactionStatus()
                           .setRollbackOnly()
    
                    //Or throw an Exception and 
                    //let handleTransaction handle the rollback
                    throw new Exception("Rolling back transaction")
                }
            }
    
            rslt = "Successfully uploaded ${xml.getName()}!"
        } else {
            rslt = "File failed XSD validation!"
        }
        rslt
    }
    
    def saveBars(Foo foo, fooXml) {
        handleTransaction {
            for (barXml in fooXml.bar) {
                def bar = new Bar(name: barXml.attribute("name"))
                foo.addToBars(bar)
            }
            //Optional I think as session is flushed
            //end of method
            foo.save(flush: true)
        }
    
        foo
    }
    
    def saveBazs(Foo foo, fooXml) {
        handleTransaction {
            for (bazXml in fooXml.baz) {
                def baz = new Baz(name: bazXml.attribute("name"))
                foo.addToBazs(baz)
            }
    
            //Optional I think as session is flushed
            //end of method
            foo.save(flush: true)
        }
    
        foo
    }
    
    def handleTransaction(Closure clos){
        try {
            clos()
        } catch (e) {
            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly()
        }
    
        if (TransactionAspectSupport.currentTransactionStatus().isRollbackOnly())
            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly()
    }
    

    【讨论】:

    • 感谢您的回答。我不认为 XML 解析需要很长时间。请参阅我对@takteek 的评论。你提供的代码有帮助,昨晚我醒着时也在想类似的事情。每次创建对象时,我都会调用save(flush: true)。现在我已经删除了对save 的那些调用,事情进展得更快了,但是当我保存我的“更大”对象(像Foo 这样的对象引用了许多其他对象)时我遇到了错误,因为引用的对象有尚未持久化。
    • @ubiquibacon 用您目前所做的修改更新您的问题,然后调试/验证将很容易。 :)
    • 好的。我真的很感谢你的帮助。我会在明天晚上之前发布更新。
    • 我更新了我的问题。我没有包含您建议的模组,因为您使用bars = callAsync{...}bazs = callAsync{...} 创建的线程如果其中一个成功完成然后另一个失败,则无法回滚,但我可能正在使用TransactionAspectSupport(我没有t 完全理解)错误。我可以通过线程获得所需的处理速度,但线程似乎破坏了我需要的事务性。如果有任何失败,我需要能够回滚上传方法中发生的所有事情,并使其就像从未调用过上传方法一样。
    • @ubiquibacon 我已经更新了我的答案,以处理在任何线程未能将子元素添加到 foo 时回滚所有内容的情况。此处检查的主要修改是foo.merge(),它合并了 foo 的分离实例。查看Future.get() 之后的行。我认为这应该注意事务的原子性。
    猜你喜欢
    • 2018-03-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-04-26
    • 2016-08-11
    • 1970-01-01
    相关资源
    最近更新 更多