【问题标题】:How to enrich payload and insert objects into MongoDB using Mule如何使用 Mule 丰富有效负载并将对象插入 MongoDB
【发布时间】:2014-05-26 12:51:48
【问题描述】:

我的 Mule 流程中有一个消息丰富器,如下所示:

<set-variable doc:name="Variable" value="#[payload['MeterUID']]" variableName="#['theKey']"/>
    <enricher target="#[payload]" doc:name="Message Enricher">
<mongo:find-objects-using-query-map config-ref="Mongo_DB1" collection="meterentity" doc:name="Mongo DB">
    <mongo:query-attributes>
                <mongo:query-attribute key="_id">#[theKey]</mongo:query-attribute>
    </mongo:query-attributes>
    <mongo:fields>
        <mongo:field>IpAddress</mongo:field>
        <mongo:field>LastSetTime</mongo:field>
        <mongo:field>LastReadGsmData</mongo:field>
    </mongo:fields>
</mongo:find-objects-using-query-map>

这是消息丰富器从我的入站端点收到的有效负载:

{TimeStamp=2013-12-16 08:48:33,270, MeterUID=4B414D000000011613CF, SignalStrengthIndication=15, CellID=4938, LocationAreaCode=280, MobileCountryCode=238, MobileNetworkCode=1}

在 Mongo-collection to json 中,我的有效载荷如下所示:

[ { "_id" : "4B414D000000011613CC" , "IpAddress" : "10.12.189.12" , "LastSetTime" : { "$date" : "2014-03-11T14:40:36.987Z"} , "LastReadGsmData" : { "$date" : "2014-03-11T14:40:47.253Z"}}]

这对我来说似乎是错误的。首先,我的第一个有效载荷去哪儿了? 其次,当流尝试插入我的 mongodb 时,我得到了这个错误:

Exception stack is:
1. BasicBSONList can only work with numeric keys, not: [_id] (java.lang.IllegalArgumentException)
  org.bson.types.BasicBSONList:161 (null)
2. Failed to invoke insertObject. Message payload is of type: String 

我怎样才能做到这一点? 我对骡子还是很陌生,所以我希望你们能在这里帮助我。

我的整个流程:

    <?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:vm="http://www.mulesoft.org/schema/mule/vm" xmlns:json="http://www.mulesoft.org/schema/mule/json" xmlns:mulexml="http://www.mulesoft.org/schema/mule/xml" xmlns:mongo="http://www.mulesoft.org/schema/mule/mongo" xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns:data-mapper="http://www.mulesoft.org/schema/mule/ee/data-mapper" xmlns:file="http://www.mulesoft.org/schema/mule/file" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
    xmlns:spring="http://www.springframework.org/schema/beans" version="EE-3.4.1"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/ee/data-mapper http://www.mulesoft.org/schema/mule/ee/data-mapper/current/mule-data-mapper.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/mongo http://www.mulesoft.org/schema/mule/mongo/2.0/mule-mongo.xsd
http://www.mulesoft.org/schema/mule/xml http://www.mulesoft.org/schema/mule/xml/current/mule-xml.xsd
http://www.mulesoft.org/schema/mule/json http://www.mulesoft.org/schema/mule/json/current/mule-json.xsd
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd">
    <data-mapper:config name="csv_to_xml_7" transformationGraphPath="csv_to_xml_7.grf" doc:name="csv_to_xml_7"/>
    <data-mapper:config name="csv_to_xml_8" transformationGraphPath="csv_to_xml_8.grf" doc:name="csv_to_xml_8"/>
    <mongo:config name="Mongo_DB" username="$[admin]" doc:name="Mongo DB">
        <mongo:connection-pooling-profile initialisationPolicy="INITIALISE_ONE" exhaustedAction="WHEN_EXHAUSTED_GROW"/>
    </mongo:config>
    <mongo:config name="Mongo_DB1" username="$[admin]" doc:name="Mongo DB">
        <mongo:connection-pooling-profile initialisationPolicy="INITIALISE_ONE" exhaustedAction="WHEN_EXHAUSTED_GROW"/>
    </mongo:config>
    <data-mapper:config name="xml_to_json_2" transformationGraphPath="xml_to_json_2.grf" doc:name="xml_to_json_2"/>
    <data-mapper:config name="xml_to_json_3" transformationGraphPath="xml_to_json_3.grf" doc:name="xml_to_json_3"/>
    <mongo:config name="Mongo_DB2" username="$[admin]" doc:name="Mongo DB">
        <mongo:connection-pooling-profile initialisationPolicy="INITIALISE_ONE" exhaustedAction="WHEN_EXHAUSTED_GROW"/>
    </mongo:config>
    <flow name="p2pcontrollerloganalyserFlow1" doc:name="p2pcontrollerloganalyserFlow1">
        <file:inbound-endpoint path="C:\Users\Simon\Desktop\CSVFile" responseTimeout="10000" doc:name="File" moveToDirectory="C:\Users\Simon\Desktop\CSVPros" />
        <byte-array-to-string-transformer doc:name="Byte Array to String"/>
        <choice doc:name="Choice">
            <when expression="#[message.outboundProperties.originalFilename=='gsmdata.log']">
                <data-mapper:transform config-ref="csv_to_xml_7" doc:name="GSMV1 to xml"/>
                <splitter expression="#[xpath('//Root')]" doc:name="Splitter"/>
                <mulexml:dom-to-xml-transformer doc:name="DOM to XML"/>
                <data-mapper:transform config-ref="xml_to_json_2" doc:name="XML To JSON"/>
                <byte-array-to-string-transformer doc:name="Byte Array to String"/>
                <vm:outbound-endpoint exchange-pattern="one-way" path="json1" doc:name="VM"/>
            </when>
            <when expression="#[message.outboundProperties.originalFilename=='gsmdatav2.log']">
                <data-mapper:transform config-ref="csv_to_xml_8" doc:name="GSMV2 to XML"/>
                <splitter expression="#[xpath('//Root')]" doc:name="Splitter"/>
                <mulexml:dom-to-xml-transformer doc:name="DOM to XML"/>
                <data-mapper:transform config-ref="xml_to_json_3" doc:name="XML To JSON"/>
                <byte-array-to-string-transformer doc:name="Byte Array to String"/>
                <vm:outbound-endpoint exchange-pattern="one-way" path="json2" doc:name="VM"/>
            </when>
            <otherwise>
                <file:outbound-endpoint path="C:\Users\Simon\Desktop\CSVFile" responseTimeout="10000" doc:name="Failed GSM"/>
            </otherwise>
        </choice>
    </flow>
    <flow name="json1persistent" doc:name="json1persistent">
        <vm:inbound-endpoint exchange-pattern="one-way" path="json1" doc:name="VM"/>
        <json:json-to-object-transformer returnClass="java.lang.Object" doc:name="JSON to Object"/>
        <set-variable doc:name="Variable" value="#[payload['MeterUID']]" variableName="#['theKey']"/>
        <enricher target="#[payload]" doc:name="Message Enricher">
    <mongo:find-objects-using-query-map config-ref="Mongo_DB1" collection="meterentity" doc:name="Mongo DB">
        <mongo:query-attributes>
                    <mongo:query-attribute key="_id">#[theKey]</mongo:query-attribute>
        </mongo:query-attributes>
        <mongo:fields>
            <mongo:field>IpAddress</mongo:field>
            <mongo:field>LastSetTime</mongo:field>
            <mongo:field>LastReadGsmData</mongo:field>
        </mongo:fields>
    </mongo:find-objects-using-query-map>
</enricher>
        <mongo:mongo-collection-to-json doc:name="Mongo DB"/>
        <mongo:insert-object config-ref="Mongo_DB2" collection="GSMdata" doc:name="Mongo DB"/>
        <!-- <foreach collection="#[payload]" doc:name="For Each">
            <mongo:insert-object config-ref="Mongo_DB" collection="GSMdata" doc:name="Mongo DB"/>
        </foreach> --> 
        <!--  <enricher doc:name="Message Enricher">
            <mongo:get-file-content config-ref="Mongo_DB" query-ref="#[payload['MeterUID']]" doc:name="Find Meter"/>
        </enricher>-->
    </flow>
</mule>

我想读取一个 CSV 文件,将其拆分为更小的消息,用来自 MongoDB 的数据丰富每条消息,最后将每条消息插入另一个 MongoDB。

【问题讨论】:

    标签: json mongodb mule


    【解决方案1】:

    首先,我的第一个有效载荷去哪儿了?

    enricher target="#[payload]" 表示您将当前有效负载设置为浓缩器返回的任何内容。您确实希望目标是其他东西,例如变量:target="#[variable:myVar]"

    第二,当流尝试插入我的 mongodb

    您没有在此处共享任何相关配置,但我猜您现在正在将查询的 Mongo 对象列表发送到需要单个 Map 对象的某个组件。请参考您的第一个问题。

    如果您的目标是将查询的 Mongo 对象中的字段添加到您当前的有效负载中,您应该使用变量作为丰富目标,就像我上面的示例一样。然后,您可以稍后组合有效负载和包含 Mongo 对象的丰富变量。

    此外,如果您仅从 Mongo 获取单个对象,则可以通过在丰富器组件中使用 source="#[payload.toArray()[0]]" 之类的内容从 Mongo 列表中提取它。

    由于您的有效负载和 myVar 变量现在都是 Java 地图,您可以将它们与普通 Java 结合起来:

    <expression-component doc:name="Expression">payload.putAll(myVar)</expression-component>
    

    如果想去掉 Mongo 返回的 _id 字段,可以用

    <expression-component doc:name="Expression">payload.remove('_id')</expression-component>
    

    编辑:现在您的有效负载是一个地图,您可以将其插入到 Mongo 中

    <mongo:insert-object-from-map config-ref="Mongo_DB2" collection="GSMdata">
       <mongo:element-attributes ref="#[payload]"/> 
    </mongo:insert-object-from-map>
    

    【讨论】:

    • 用插入部分更新了我的答案。浏览我对enricher源/目标属性的说明,然后将enricher后面的两个组件替换为上述两个表达式组件和Map insert。
    • 我收到一个错误,因为在我的 Json-to-Object 之后,由于某种原因,payload 是一个linkedhashmap。为什么 ?以及如何解决?
    • 如果这与正确配置浓缩器组件无关,请创建一个新问题。此外,始终包含您的完整堆栈跟踪以查找错误,并且在这种情况下还使用 Logger 打印出您的 json,因为我们无法猜测您的 DataMapper 组件正在生成什么样的数据。更好的是,如果您可以将问题限制在流程中的特定阶段,并使用特定输入和尽可能少的组件来重现它。
    猜你喜欢
    • 2016-07-12
    • 1970-01-01
    • 2014-03-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多