【发布时间】:2012-03-20 15:11:07
【问题描述】:
大家好!
我使用 netty 3.1 构建了一个套接字调度服务器,它将套接字数据传输到另一个套接字服务器,所以我在第一条消息到达时在 netty 服务器处理程序中创建一个客户端连接,并等待连接完成,当下一个 messageRecv 事件到达时,我只是将缓冲区从服务器通道传输到客户端通道。但是我发现在使用 future.await*() 操作时在处理程序中是禁止的。如果我不使用 await(),因为 connectFuture 是同步的,有可能在下一条消息到达时,但连接未完成。我不知道如何处理这个问题。
如何确保客户端连接在下一个 messageRecv 事件到达之前完成?
现在,我只是做一个锁来同步两个代码,就像这样:
/**
* server handler
*/
public class ServerChannelHandler extends SimpleChannelUpstreamHandler {
private static Logger _logger = LoggerFactory.getLogger(cn.szboc.dispatch.server.netty.ServerChannelHandler.class);
public ServerChannelHandler(ProxyClientFactory clientFactory) {
this.clientFactory = clientFactory;
}
/** factory connect another server */
private ProxyClientFactory clientFactory;
/** anotherchannel */
private Channel innerChannel;
private ChannelFuture connectFuture;
private ReentrantLock connectLock = new ReentrantLock();
private Condition notComplete = connectLock.newCondition();
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
final ChannelBuffer buffer = ((ChannelBuffer) e.getMessage()).copy();
final Channel outChannel = ctx.getChannel();
// first connect
if (connectFuture == null) {
final ClientChannelHandler cch = new ClientChannelHandler(ctx.getChannel());
ProxyClient client = clientFactory.retrieveClient();
connectFuture = client.getConnectChannelFuture();
connectFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
connectLock.lock();
try {
if (future.isSuccess()) {
innerChannel = future.getChannel();
innerChannel.getPipeline().addLast("clientchannelhandler", cch);
innerChannel.write(buffer);
} else {
Channels.fireExceptionCaught(outChannel, future.getCause());
}
} finally {
notComplete.signal();
connectLock.unlock();
}
}
});
} else {
connectLock.lock();
try {
if (!connectFuture.isDone()) {
if (!notComplete.await(500, TimeUnit.MILLISECONDS)) {
throw new Exception("");
}
}
if (connectFuture.isSuccess()) {
if(innerChannel == null){
if (!notComplete.await(500, TimeUnit.MILLISECONDS)) {
throw new Exception("");
}
}
innerChannel.write(buffer);
} else {
_logger.error("");
}
} finally {
connectLock.unlock();
}
}
}
【问题讨论】:
标签: netty