【问题标题】:Spark stream data from IBM MQ来自 IBM MQ 的 Spark 流数据
【发布时间】:2018-07-13 12:16:33
【问题描述】:

我想从 IBM MQ 流式传输数据。我已经尝试过在 Github 上找到的 this code

我能够从队列中流式传输数据,但每次流式传输时,它都会从中获取所有数据。我只想获取推送到队列中的当前数据。我查了很多网站,但没有找到正确的解决方案。

在 Kafka 中,我们有类似 KafkaStreamUtils 的东西来流式传输近乎实时的数据。 IBM MQ 中是否有类似的东西,以便它只流式传输最新数据?

【问题讨论】:

    标签: apache-spark apache-kafka spark-streaming ibm-mq


    【解决方案1】:

    您提供的链接中的示例显示它调用以下方法从 IBM MQ 接收:

    CustomMQReciever(String host , int port, String qm, String channel, String qn)
    

    如果您查看 CustomMQReciever here,您会发现它只是浏览队列中的消息。这意味着该消息仍将在队列中,并且下次连接时您将收到相同的消息:

    MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue);
    

    如果您想从队列中删除消息,您需要调用一个方法来从队列中使用它们,而不是从队列中浏览它们。以下是对 CustomMQReciever.java 的更改示例,应该可以实现您想要的效果:


    initConnection() 下,将上述代码更改为以下代码,使其从队列中删除消息:

    MQMessageConsumer consumer = (MQMessageConsumer) qSession.createConsumer(queue);
    

    摆脱:

    enumeration= browser.getEnumeration();
    

    receive() 下更改以下内容:

    while (!isStopped() && enumeration.hasMoreElements() )
        {
    
        receivedMessage= (JMSMessage) enumeration.nextElement();
        String userInput = convertStreamToString(receivedMessage);
        //System.out.println("Received data :'" + userInput + "'");
        store(userInput);
        }
    

    到这样的事情:

    while (!isStopped() && (receivedMessage = consumer.receiveNoWait()) != null))
        {
        String userInput = convertStreamToString(receivedMessage);
        //System.out.println("Received data :'" + userInput + "'");
        store(userInput);
        }
    

    【讨论】:

    • 在同一个示例中,我们可以从 MQ 读取数据并对其进行处理并在控制台上打印它但是我如何将数据写回 MQ。请提供一些信息...我已尝试在 google 上搜索但未找到任何内容。
    猜你喜欢
    • 1970-01-01
    • 2017-01-17
    • 2021-07-23
    • 1970-01-01
    • 2012-10-01
    • 2016-02-18
    • 1970-01-01
    • 1970-01-01
    • 2018-01-16
    相关资源
    最近更新 更多