【问题标题】:How to retrieve new incoming data by polling and feed it back如何通过轮询检索新的传入数据并将其反馈
【发布时间】:2012-12-23 16:49:30
【问题描述】:

这里的方法读取具有唯一ID且序列号不断增加的数据库,因为我是java初学者,我是否知道如何实现这种重复轮询并每次检查新传入的消息。

public void run() {
int seqId = 0;
    while(true) {
        List<KpiMessage> list = null;
        try {
            list = fullPoll(seqId);
            if (!list.isEmpty()) {
                seqId = list.get(0).getSequence();
                incomingMessages.addAll(list);
                System.out.println("waiting 3 seconds");
                System.out.println("new incoming message");
                Thread.sleep(3000);

            }
        } catch (Exception e1) {
            e1.printStackTrace();
        }


    }
}

//Method which defines polling of the database and also count the number of Queries
public List<KpiMessage> fullPoll(int lastSeq) throws Exception {
    Statement st = dbConnection.createStatement();
    ResultSet rs = st.executeQuery("select * from msg_new_to_bde where ACTION =  804 and SEQ >" + lastSeq + "order by SEQ DESC");     
    List<KpiMessage> pojoCol = new ArrayList<KpiMessage>();
    while (rs.next()) {
        KpiMessage filedClass = convertRecordsetToPojo(rs);
        pojoCol.add(filedClass);
    }
    for (KpiMessage pojoClass : pojoCol) {
        System.out.print(" " + pojoClass.getSequence());
        System.out.print(" " + pojoClass.getTableName());
        System.out.print(" " + pojoClass.getAction());
        System.out.print(" " + pojoClass.getKeyInfo1());
        System.out.print(" " + pojoClass.getKeyInfo2());
        System.out.println(" " + pojoClass.getEntryTime());
    }
    //          return seqId;           
    return pojoCol;
}  

我的目标是从数据库中轮询表并检查是否有新的传入消息,我可以从表中的 Header 字段 SequenceID 中找到该消息,该字段是唯一的,并且不断增加新条目。现在我的问题是

1.假设我第一次轮询后,它会读取所有条目并使线程休眠 6 秒,同时如何获取新传入的数据并再次轮询?

2.还有如何在第二次轮询时添加新数据并将新数据传递给另一个类。

【问题讨论】:

  • 请格式化您的代码!!!
  • 请删除无用的cmets!

标签: java oop object methods long-polling


【解决方案1】:

轮询器每 6 秒调用一次 fullPoll 并将 lastSeq 参数传递给它。最初 lastSeq = 0。当 Poller 获取结果列表时,它将 lastSeq 替换为最大 SEQ 值。 fullPoll 只检索 SEQ > lastSeq 的记录。

void run() throws Exception {
    int seqId = 0;
    while(true) {
          List<KpiMessage> list = fullPoll(seqId);
          if (!list.isEmpty()) {
            seqId = list.get(0).getSequene();
          }
          Thread.sleep(6000); 
    }
}

public List<KAMessage> fullPoll(int lastSeq) throws Exception {
       ...
       ResultSet rs = st.executeQuery("select * from msg_new_to_bde where ACTION =  804 and SEQ > " + lastSeq + " order by SEQ 
      DESC");     
      .. 
}

【讨论】:

  • 感谢您尝试了很多,但返回类型和传递参数仍然有错误...您能解决这个问题...gist.github.com/9bab6f82c83ff6fb9c3a
  • 如何获取下一个最大序列存在问题,请参阅我的更新。
  • 谢谢我已经执行了它,但是现在,如果它没有重复执行 fullPoll() 而是只等待新的传入数据,我该如何并行实现这一点...我已经更新了代码以上请给我一个解决方案...
【解决方案2】:

这里有一些你可以用来开始工作的代码。我尝试使用观察者模式使其非常灵活;这样您就可以将多个“消息处理器”连接到同一个轮询器:

public class MessageRetriever implements Runnable {
    private int lastID;
    private List<MessageListener> listeners;

   ...

    public void addMessageListener(MessageListener listener) {
        this.listeners.add(listener)
    }

    public void removeMessageListener(MessageListener listener) {
        this.listeners.remove(listener)
    }

    public void run() {
        //code to repeat the polling process given some time interval    
    }  

    private void pollMessages() {
        if (this.lastID == 0)
            this.fullPoll()
        else
            this.partialPoll()           
    }

    private void fullPoll() {
        //your full poll code

        //assuming they are ordered by ID and it haves the ID field - you should
        //replace this code according to your structure
        this.lastID = pojoCol.get(pojoCol.length() - 1).getID() 

        this.fireInitialMessagesSignal(pojoCol)
    }

    private void fireInitialMessagesSignal(List<KAMessage> messages) {
        for (MessageListener listener : this.listeners)
            listener.initialMessages(messages)
    }

    private void partialPoll() {
        //code to retrieve messages *past* the lastID. You could do this by 
        //adding an extra condition on your where sentence e.g 
        //select * from msg_new_to_bde where ACTION = 804 AND SEQ > lastID order by SEQ DESC
        //the same as before
        this.lastID = pojoCol.get(pojoCol.length() - 1).getID() 

        this.fireNewMessagesListener(pojoCol)
    }

    private void fireNewMessagesListener(List<KAMessage> messages) {
        for (MessageListener listener : this.listeners)
            listener.newMessages(messages)
    }

}

还有界面

public interface MessageListener {
    public void initialMessages(List<KAMessage> messages);
    public void newMessages(List<KAMessage> messages)
}

基本上,使用这种方法,检索器是一个可运行的(可以在它自己的线程上执行)并负责整个过程:进行初始轮询并在给定的时间间隔内继续进行“部分”轮询。

不同的事件触发不同的信号,将受影响的消息发送给注册的侦听器,然后这些侦听器根据需要处理消息。

【讨论】:

  • 非常感谢,我整天都在想...我会运行它并回到你身边...
  • @Babu 没有问题。确保此模型符合您的需求,这可能对您需要做的事情来说太过分了,在这种情况下,Evgeniy Dorofeev 解决方案可能更适合。
  • @arsemax 我能够第一次进行轮询并获得新消息,但我被困在这里如何将它传递给另一个类并再次反馈给轮询......我的代码间隔后无法重复轮询...请检查我更新的代码...
猜你喜欢
  • 2023-01-04
  • 1970-01-01
  • 2020-02-06
  • 2020-05-08
  • 1970-01-01
  • 1970-01-01
  • 2017-06-18
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多