【问题标题】:Need to wait for asynchronous api callback before I return from method in Java在我从 Java 中的方法返回之前需要等待异步 api 回调
【发布时间】:2013-02-02 11:19:10
【问题描述】:
  import java.util.concurrent.CountDownLatch;

  import quickfix.Initiator;


  public class UserSession {
    private final CountDownLatch latch = new CountDownLatch(1);

public String await() {
        try {
            System.out.println("waiting...");
            if (latch.await(5, TimeUnit.SECONDS))
                System.out.println("released!");
            else
                System.out.println("timed out");
            return secret;
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            System.out.println(e.getMessage());
            e.printStackTrace();
        }
        return null;
    }

    public void countdown(String s) {
        System.out.println("In countdown: "+s+ ". Latch count: "+latch.getCount());
        secret = s;
        latch.countDown();
        System.out.println("Latch count: "+latch.getCount());
    }
  }


  public class LogonHandler extends AbstractHandler {

    public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) 
        throws IOException, ServletException
        {
            Map<String,String[]> query = request.getParameterMap();

            if (query.containsKey("method")) {
                if (query.get("method")[0].compareTo(method) == 0) {
                    baseRequest.setHandled(true);
                    response.getWriter().println(logon(query));
                }
            }
            else
                baseRequest.setHandled(false);
        }

    private String logon(Map<String,String[]> query) {
        if (query.containsKey("username") && query.containsKey("password") &&           query.containsKey("sendercompid")) {

            app.mapUser(query.get("sendercompid")[0], new   UserSession(query.get("username")[0], query.get("password")[0]));

            SessionID session = new SessionID(new BeginString("FIX.4.4"), new SenderCompID(query.get("sendercompid")[0]), new TargetCompID("PARFX"));

            try {
                ThreadedSocketInitiator tsi = new ThreadedSocketInitiator(app, app.getFileStoreFactory(), settings, app.getLogFactory(), app.getMessageFactory());
                UserSession userSession = new UserSession(query.get("username")[0], query.get("password")[0]);
                userSession.setInitiator(tsi);

                tsi.start();
                return userSession.await();
            } catch (ConfigError e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
                return e.toString();
            }
        }
        return "fail";
    }
  }


public class QuickfixjApplication implements Application {
    private Map<String,UserSession> users = new HashMap<String,UserSession>();

    public void mapUser(String s, UserSession u) {
        users.put(s, u);
    }

    public void toAdmin(Message message, SessionID sessionId) {

        try {
            if (message.getHeader().getField(new StringField(MsgType.FIELD)).valueEquals(Logon.MSGTYPE)) {
                UserSession user = users.get(sessionId.getSenderCompID());
                message.setField(new Username(user.getUsername()));
                message.setField(new Password(user.getPassword()));
            }
        } catch (FieldNotFound e) {
            e.printStackTrace();
        }
    }

    public void fromAdmin(Message message, SessionID sessionId)
        throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon {

        if (message.getHeader().getField(new StringField(MsgType.FIELD)).valueEquals(Logon.MSGTYPE)) {
            System.out.println(message.toString());
            UserSession user = users.get(sessionId.getSenderCompID());
            user.countdown(message.toString());
        }
    }
}

好的,我已尝试在此处仅包含最少的代码。有三个有趣的类,UserSession 是 Jetty 处理程序和 QuickFix/j 应用程序之间的内部粘合剂。

LogonHandler 接收 HTTP 登录请求并尝试将用户登录到 QuickFix/j 应用程序会话。

QuickFix/j 正在向 FIX 服务器发送登录消息,此登录请求/响应是异步的。 HTTP 登录请求当然是同步的。因此,我们必须等待 FIX 服务器的回复,然后才能从 HTTP 请求返回。我使用 CountDownLatch 和这个 UserSession 对象来做到这一点。

当我创建 QuickFix/j 会话对象时,我还创建了一个 UserSession 对象并将其添加到地图中(这发生在 LogonHandler 登录方法中)。

QuickFix/j 应用程序对象中有两个回调,toAdmin() 和 fromAdmin()。在 fromAdmin() 中,我检查消息是否是登录响应,如果是,我调用 UserSession 的方法来倒计时锁存器。在调试代码时,我看到 fromAdmin() 方法被命中,在 map 中找到 UserSession 对象并调用 countdown() 方法,latch.getCount() 从 1 变为 0,但 latch.await( ) UserSession await() 中的方法永远不会返回。它总是超时。

【问题讨论】:

    标签: java asynchronous concurrency callback


    【解决方案1】:

    你可以像这样使用CountDownLatch

    public class LogonHandler implements Handler {
        private final CountDownLatch loginLatch = new CountDownLatch (1);
    
        private boolean callbackResults;
    
        public void serverResponseCallback(boolean result) {
            callbackResults = result;
            loginLatch.countDown ();
        }
    
        public boolean tryLogon(Credentials creds) throws InterruptedException {
            SomeServer server = new SomeServer(address);
            server.tryLogon (creds.getName (), creds.getPass ());
            loginLatch.await ();
            return callbackResults;
        }
    }
    

    如果您想将等待时间限制为例如 5 秒,则不要使用 loginLatch.await (),而是使用以下内容:

    if (loginLatch.await (5L, TimeUnit.SECONDS))
        return callbackResults;
    else
        return false; // Timeout exceeded
    

    【讨论】:

    • 谢谢米哈伊尔,我试过了。 loginLatch.count() 变为 0,但 await() 永远不会释放。
    • @shaz CountDownLatch 应该可以工作。可能错误出现在代码的其他位置。你能在这里用CountDownLatch 发布你的代码吗?
    • @shaz 你的代码看起来不错。一旦你说你的闩锁归零,而await() 没有解除阻塞,你有可能在UserSession 对象的一个​​实例上调用countdown,而在另一个实例上调用await(),即在克隆实例上?
    • 所以你的意思是我把它放在 HashMap 中?
    • 等待...java.util.concurrent.CountDownLatch@98350a[Count = 1] 2013 年 2 月 18 日上午 11:45:14 quickfix.mina.SessionConnector startSessionTimer 信息:SessionTimer 于 2013 年 2 月 18 日开始上午 11:45:14 quickfix.mina.initiator.InitiatorIoHandler sessionCreated INFO:为 FIX.4.4:test9044->XXX 创建了 MINA 会话:local=/172.16.34.167:60198,类 org.apache.mina.transport.socket.nio .SocketSessionImpl, remote=/172.17.22.52:8020 in onLogon 方法...倒计时:FIX.4.4:test9044->XXX。闩锁计数:1 java.util.concurrent.CountDownLatch@bb2bc3[Count = 1] 闩锁计数:0 超时
    猜你喜欢
    • 2015-06-13
    • 2014-11-26
    • 1970-01-01
    • 1970-01-01
    • 2019-08-03
    • 1970-01-01
    • 2013-02-14
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多