【问题标题】:Rewriting looping blocking code to SwiftNIO style non-blocking code将循环阻塞代码重写为 SwiftNIO 风格的非阻塞代码
【发布时间】:2020-05-15 11:37:36
【问题描述】:

我正在开发一个从网络读取数据的驱动程序。它不知道响应中有多少,除了当它尝试读取并取回 0 字节时,它就完成了。所以我的阻塞 Swift 代码看起来很天真:

func readAllBlocking() -> [Byte] {

  var buffer: [Byte] = []
  var fullBuffer: [Byte] = []

  repeat {
    buffer = read() // synchronous, blocking
    fullBuffer.append(buffer)
  } while buffer.count > 0

  return fullBuffer
}

如何将其重写为一个将继续运行直到读取整个结果的承诺?在试图将我的大脑包裹起来之后,我仍然被困在这里:

func readAllNonBlocking() -> EventLoopFuture<[Byte]> {

  ///...?
}

我应该补充一点,我可以将 read() 重写为,而不是返回 [Byte],而是返回 EventLoopF​​uture

【问题讨论】:

    标签: swift promise future swift-nio


    【解决方案1】:

    一般来说,同步编程中的循环被转换为递归,以获得与使用期货的异步编程相同的效果(以及在函数式编程中)。

    所以你的函数可能看起来像这样:

    func readAllNonBlocking(on eventLoop: EventLoop) -> EventLoopFuture<[Byte]> {
        // The accumulated chunks
        var accumulatedChunks: [Byte] = []
    
        // The promise that will hold the overall result
        let promise = eventLoop.makePromise(of: [Byte].self)
    
        // We turn the loop into recursion:
        func loop() {
            // First, we call `read` to read in the next chunk and hop
            // over to `eventLoop` so we can safely write to `accumulatedChunks`
            // without a lock.
            read().hop(to: eventLoop).map { nextChunk in
                // Next, we just append the chunk to the accumulation
                accumulatedChunks.append(contentsOf: nextChunk)
                guard nextChunk.count > 0 else {
                    promise.succeed(accumulatedChunks)
                    return
                }
                // and if it wasn't empty, we loop again.
                loop()
            }.cascadeFailure(to: promise) // if anything goes wrong, we fail the whole thing.
        }
    
        loop() // Let's kick everything off.
    
        return promise.futureResult
    }
    

    不过,我想补充两点:

    首先,您在上面实现的是简单地读取所有内容,直到您看到 EOF,如果该软件暴露在互联网上,您绝对应该添加一个最大字节数限制在内存中。

    其次,SwiftNIO 是一个事件驱动系统,因此如果您使用 SwiftNIO 读取这些字节,程序实际上看起来会略有不同。如果您对在 SwiftNIO 中简单地累积所有字节直到 EOF 的样子感兴趣,那就是:

    struct AccumulateUntilEOF: ByteToMessageDecoder {
        typealias InboundOut = ByteBuffer
    
        func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
            // `decode` will be called if new data is coming in.
            // We simply return `.needMoreData` because always need more data because our message end is EOF.
            // ByteToMessageHandler will automatically accumulate everything for us because we tell it that we need more
            // data to decode a message.
            return .needMoreData
        }
    
        func decodeLast(context: ChannelHandlerContext, buffer: inout ByteBuffer, seenEOF: Bool) throws -> DecodingState {
            // `decodeLast` will be called if NIO knows that this is the _last_ time a decode function is called. Usually,
            // this is because of EOF or an error.
            if seenEOF {
                // This is what we've been waiting for, `buffer` should contain all bytes, let's fire them through
                // the pipeline.
                context.fireChannelRead(self.wrapInboundOut(buffer))
            } else {
                // Odd, something else happened, probably an error or we were just removed from the pipeline. `buffer`
                // will now contain what we received so far but maybe we should just drop it on the floor.
            }
            buffer.clear()
            return .needMoreData
        }
    }
    

    如果你想用 SwiftNIO 制作一个完整的程序,这里有一个例子,它是一个接受所有数据的服务器,直到它看到 EOF,然后实际上只是写回接收到的字节数:)。当然,在现实世界中,您永远不会保留所有接收到的字节来计算它们(您可以只添加每个单独的部分),但我想它可以作为示例。

    import NIO
    
    let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
    defer {
        try! group.syncShutdownGracefully()
    }
    
    struct AccumulateUntilEOF: ByteToMessageDecoder {
        typealias InboundOut = ByteBuffer
    
        func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
            // `decode` will be called if new data is coming in.
            // We simply return `.needMoreData` because always need more data because our message end is EOF.
            // ByteToMessageHandler will automatically accumulate everything for us because we tell it that we need more
            // data to decode a message.
            return .needMoreData
        }
    
        func decodeLast(context: ChannelHandlerContext, buffer: inout ByteBuffer, seenEOF: Bool) throws -> DecodingState {
            // `decodeLast` will be called if NIO knows that this is the _last_ time a decode function is called. Usually,
            // this is because of EOF or an error.
            if seenEOF {
                // This is what we've been waiting for, `buffer` should contain all bytes, let's fire them through
                // the pipeline.
                context.fireChannelRead(self.wrapInboundOut(buffer))
            } else {
                // Odd, something else happened, probably an error or we were just removed from the pipeline. `buffer`
                // will now contain what we received so far but maybe we should just drop it on the floor.
            }
            buffer.clear()
            return .needMoreData
        }
    }
    
    // Just an example "business logic" handler. It will wait for one message
    // and just write back the length.
    final class SendBackLengthOfFirstInput: ChannelInboundHandler {
        typealias InboundIn = ByteBuffer
        typealias OutboundOut = ByteBuffer
    
        func channelRead(context: ChannelHandlerContext, data: NIOAny) {
            // Once we receive the message, we allocate a response buffer and just write the length of the received
            // message in there. We then also close the channel.
            let allData = self.unwrapInboundIn(data)
            var response = context.channel.allocator.buffer(capacity: 10)
            response.writeString("\(allData.readableBytes)\n")
            context.writeAndFlush(self.wrapOutboundOut(response)).flatMap {
                context.close(mode: .output)
            }.whenSuccess {
                context.close(promise: nil)
            }
        }
    
        func errorCaught(context: ChannelHandlerContext, error: Error) {
            print("ERROR: \(error)")
            context.channel.close(promise: nil)
        }
    }
    
    let server = try ServerBootstrap(group: group)
        // Allow us to reuse the port after the process quits.
        .serverChannelOption(ChannelOptions.socket(.init(SOL_SOCKET), .init(SO_REUSEADDR)), value: 1)
        // We should allow half-closure because we want to write back after having received an EOF on the input
        .childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: true)
        // Our program consists of two parts:
        .childChannelInitializer { channel in
            channel.pipeline.addHandlers([
                // 1: The accumulate everything until EOF handler
                ByteToMessageHandler(AccumulateUntilEOF(),
                                     // We want 1 MB of buffering max. If you remove this parameter, it'll also
                                     // buffer indefinitely.
                                     maximumBufferSize: 1024 * 1024),
                // 2: Our "business logic"
                SendBackLengthOfFirstInput()
            ])
        }
        // Let's bind port 9999
        .bind(to: SocketAddress(ipAddress: "127.0.0.1", port: 9999))
        .wait()
    
    // This will never return.
    try server.closeFuture.wait()
    

    演示:

    $ echo -n "hello world" | nc localhost 9999
    11
    

    【讨论】:

    • 非常感谢您清晰而有启发性的回答,Johannes。 :-) 函数中的函数确实比我的大多数递归尝试更干净,而且 hop(to:) 对我来说是新的。我认为在递归中我必须将承诺链接在一起,但你已经在这里非常干净地解决了这个问题 - 太棒了。 :-) 当互联网暴露时,你当然是正确的。
    • Theo,我正在研究的 Swift 的 Neo4j 驱动程序,它给我带来了这个问题,正在直接使用 SwiftNIO,所以我正在尽我所能从示例中学习并按顺序设计将其转换为完全非阻塞。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-01-18
    • 1970-01-01
    • 2015-11-25
    • 2023-03-16
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多