【问题标题】:Race condition when dynamically adding websocket handlers动态添加 websocket 处理程序时的竞争条件
【发布时间】:2021-05-04 05:13:42
【问题描述】:

我正在用 netty 编写一个 websocket,我的代码中似乎有一个竞争条件:

我有一个构建管道的通道初始化器,其中包含:

  ch.pipeline().addLast(new HttpServerCodec())
  ch.pipeline().addLast(new HttpObjectAggregator(65536))
  ch.pipeline().addLast(new MyServer())

MyServer 的工作原理如下:

  • 如果它收到一个 websocket 升级请求,它会尝试对该请求进行身份验证
  • 如果失败,则返回错误请求
  • 如果成功,它会尝试:
    • 按照我的自定义逻辑处理程序添加 websocket 处理程序并完成握手并建立 webscoket 连接
    • 使用以下代码完成:
      awareLogger.debug(log"upgrading to websocket")(logContext)
      ctx.pipeline()
         .addLast(new WebSocketServerProtocolHandler(route, true))
         .addLast(new WebSocketFrameAggregator(65536))
         .addLast(new MyWebsocketLogic(logContext))
      ctx.fireChannelRead(httpRequest)
      val _ = awareLogger.debug(log"upgraded to websocket")(logContext)

它正在尝试fireChannelRead(httpRequest),希望WebSocketServerProtocolHandler能够拦截它并完成握手。

我的问题是 - httpRequest 有时似乎一直传播到 MyWebsocketLogic 处理程序并且无法建立连接和握手。

我做错了什么吗?这几乎就像我在代码中遇到了某种竞争条件。

【问题讨论】:

  • 这段代码看起来非常好......我认为你应该尝试使用调试器来看看发生了什么
  • @NormanMaurer 感谢您的回答。管道修改是在与分配给通道的线程不同的线程中完成的(整个块在不同的线程池中执行)......你认为这会导致任何麻烦吗?
  • 为了更加清晰,修改管道的整个代码块将在同一个线程池上执行,但在分配给通道线程的不同线程池上执行......而不仅仅是不同的线程完全不同的线程池
  • 我想这可以解释它。您能否尝试将其卸载到ChannelEventLoopchannel.eventLoop().execute(...).
  • 会试试的,谢谢

标签: scala websocket netty


【解决方案1】:

问题是:

      awareLogger.debug(log"upgrading to websocket")(logContext)
      ctx.pipeline()
         .addLast(new WebSocketServerProtocolHandler(route, true))
         .addLast(new WebSocketFrameAggregator(65536))
         .addLast(new MyWebsocketLogic(logContext))
      ctx.fireChannelRead(httpRequest)
      val _ = awareLogger.debug(log"upgraded to websocket")(logContext)

在与分配给给定ctx 的线程不同的线程中调用。

我可以通过应用上面 Norman 的建议来解决这个问题,即将管道修改切换到 channelEventLoop,意思是:

      ctx.channel().eventLoop().execute { () =>
        val _ = ctx
          .pipeline()
          .addLast(new WebSocketServerProtocolHandler(route, true))
          .addLast(new WebSocketFrameAggregator(65536))
          .addLast(buildWebsocketHandler(logContext, connectionHandler))
        val _ = ctx.fireChannelRead(msg)
      }

这似乎运作良好

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-02-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-01-26
    相关资源
    最近更新 更多