【问题标题】:Tokio server getting killed after overheatingTokio 服务器在过热后被杀死
【发布时间】:2021-06-27 16:32:07
【问题描述】:

我正在监听 udp 数据包,一旦第一个数据包到达,我就开始监听更多数据包。

如果收到一个数据包,则服务器将开始全速运行(即使应该有延迟let duration = Duration::from_millis(1300); ?),并最终被杀死

MVE 代码如下:

server.rs

use std::net::SocketAddr;
use std::time::Duration;
use tokio::net::UdpSocket;
use tokio::{sync::mpsc, task, time}; // 1.4.0
use std::env;
use std::sync::Arc;

const UDP_HEADER: usize = 8;
const IP_HEADER: usize = 20;
const AG_HEADER: usize = 4;
const MAX_DATA_LENGTH: usize = (64 * 1024 - 1) - UDP_HEADER - IP_HEADER;
const MAX_CHUNK_SIZE: usize = MAX_DATA_LENGTH - AG_HEADER;
const MAX_DATAGRAM_SIZE: usize = 0x10000;
const ADDRESS: &str = "127.0.0.1:8080";
const ADDRESS_CLIENT: &str = "127.0.0.1:8000";

#[tokio::main]
async fn main() {
    server().await;
}

async fn server() {
    eprintln!("Starting the server");
    let addr = env::args().nth(1).unwrap_or_else(|| ADDRESS.to_string());
    let socket = UdpSocket::bind(&addr).await.unwrap();
    let arc = Arc::new(socket);
    let mut buf = [0u8; MAX_DATA_LENGTH];
    let (debounce_tx, mut debounce_rx) = mpsc::channel::<Vec<u8>>(MAX_DATAGRAM_SIZE);

    let _debouncer = task::spawn(async move {
        let mut _packet_ids: Vec<i32> = Vec::new();
        _packet_ids = vec![0; 10];
        let duration = Duration::from_millis(1300);

        loop {
            match time::timeout(duration, debounce_rx.recv()).await {
                Ok(Some(bytes)) => {
                    let id: u8 = bytes.clone()[0];
                    _packet_ids[id as usize] = 1;
                    eprintln!("{} id packet received:{:?}", id, _packet_ids);
                    if _packet_ids.iter().all(|x| x == &1i32) {
                        println!("All packets have been received, stop program ");
                     //   break;
                    }
                }
                Ok(None) => {
                    eprintln!("Done: {:?}", _packet_ids);
                    break;
                }
                Err(_) => {
                    eprintln!("No activity for 1.3sd");
                }
            }
        }
    });
    // Listen for first packet
    let result = arc.clone().recv_from(&mut buf).await;
    match result {
        Ok((len, addr)) => {
            eprintln!("Bytes len: {} from {}", len, addr);
            debounce_tx
                .send(buf.to_vec())
                .await
                .expect("Unable to talk to debounce");
        }
        Err(_) => {
            eprintln!("Couldnt get datagram");
        }
    }
    // listen for other packets
    loop {
        let thread_socket = arc.clone();
        let _server = task::spawn({
            let debounce_tx = debounce_tx.clone();

            async move {
                while let result = thread_socket.recv_from(&mut buf).await {
                    match result {
                        Ok((len, addr)) => {
                            eprintln!("Bytes len: {} from {}", len, addr);
                            debounce_tx
                                .send(buf.to_vec())
                                .await
                                .expect("Unable to talk to debounce");
                        }
                        Err(_) => {
                            eprintln!("Couldnt get datagram");
                        }
                    }
                 }
                 // Prevent deadlocks
                 drop(debounce_tx);
            }
        });
    }
}

client.rs(用于测试目的)

async fn client() {
    eprintln!("Starting the client");

    let remote_addr: SocketAddr = env::args()
        .nth(2)
        .unwrap_or_else(|| ADDRESS.into()) // cargo run --example udp-client -- 127.0.0.1:8080
        .parse()
        .unwrap();

    // We use port 0 to let the operating system allocate an available port for us.
    let local_addr: SocketAddr = if remote_addr.is_ipv4() {
        ADDRESS_CLIENT // "0.0.0.0:0" //
    } else {
        "[::]:0"
    }
    .parse()
    .unwrap();
    let socket = UdpSocket::bind(ADDRESS_CLIENT).await.unwrap();

    socket.connect(&remote_addr).await.unwrap();

    socket.send(&[0, 2, 3]).await.expect("Unable to talk to network");
    socket.send(&[1, 2, 3]).await.expect("Unable to talk to network");
    time::sleep(Duration::from_millis(1200)).await;
    socket.send(&[2, 2, 3]).await.expect("Unable to talk to network");
    socket.send(&[3, 2, 3]).await.expect("Unable to talk to network");
    socket.send(&[4, 2, 3]).await.expect("Unable to talk to network");
    socket.send(&[5, 2, 3]).await.expect("Unable to talk to network");
    socket.send(&[6, 2, 3]).await.expect("Unable to talk to network");
    socket.send(&[7, 2, 3]).await.expect("Unable to talk to network");
    time::sleep(Duration::from_millis(1200)).await;
    socket.send(&[8, 2, 3]).await.expect("Unable to talk to network");
    time::sleep(Duration::from_millis(3200)).await;
    socket.send(&[9, 2, 3]).await.expect("Unable to talk to network"); // stop when n1 = 0

    eprintln!("Client done");
}

输出(从客户端发送 10 个数据包,其中第一个字节范围从 0 到 9,注意:我现在只是想让它工作,我知道缓冲区溢出): p>

Starting the server
Bytes len: 3 from 127.0.0.1:8000
0 id packet received:[1, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Bytes len: 3 from 127.0.0.1:8000
1 id packet received:[1, 1, 0, 0, 0, 0, 0, 0, 0, 0]
Bytes len: 3 from 127.0.0.1:8000
Bytes len: 3 from 127.0.0.1:8000
Bytes len: 3 from 127.0.0.1:8000
Bytes len: 3 from 127.0.0.1:8000
Bytes len: 3 from 127.0.0.1:8000
Bytes len: 3 from 127.0.0.1:8000
2 id packet received:[1, 1, 1, 0, 0, 0, 0, 0, 0, 0]
3 id packet received:[1, 1, 1, 1, 0, 0, 0, 0, 0, 0]
4 id packet received:[1, 1, 1, 1, 1, 0, 0, 0, 0, 0]
5 id packet received:[1, 1, 1, 1, 1, 1, 0, 0, 0, 0]
6 id packet received:[1, 1, 1, 1, 1, 1, 1, 0, 0, 0]
7 id packet received:[1, 1, 1, 1, 1, 1, 1, 1, 0, 0]
Bytes len: 3 from 127.0.0.1:8000
8 id packet received:[1, 1, 1, 1, 1, 1, 1, 1, 1, 0]
No activity for 1.3sd
No activity for 1.3sd
Bytes len: 3 from 127.0.0.1:8000
9 id packet received:[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
All packets have been received, stop program 
No activity for 1.3sd
No activity for 1.3sd
No activity for 1.3sd
Killed

我不知道如何保持服务器平稳运行而不被杀死,非常感谢任何帮助。

【问题讨论】:

  • 你在循环中调用task::spawn,没有等待任何东西,这意味着它将产生一个后台线程,并立即重复循环,产生另一个后台任务,并在全速。我希望它会继续这样做,直到生成的任务占用所有内存并且进程因此而被杀死。
  • @Frxstream,我完全知道这一点,但我不知道如何纠正它。没有循环,服务器不会保持清醒
  • 如果你删除了 task::spawn() 并且只使用了 .await 在 recv_from() 上的 while 循环怎么办? (您也可以 .await 由 tokio::spawn() 返回的 JoinHandle 以便等到该任务完成后再继续,但我认为您不需要在这里使用 spawn)
  • 如果我删除 task::spawn 不会破坏拥有多线程服务器的目的吗? (只是问)
  • 一旦收到连接,您必须使用 TCP 打开一个新套接字,但在这种情况下,您将在主服务器循环中等待 accept() 并且一旦有新连接收到并创建一个套接字,您可以生成一个新任务来处理该连接。使用 UDP,您只有一个套接字,并且接收到的数据包将排入队列,直到一次读取一个(尽管您可以通过在收到消息后生成同时处理消息,如前所述)。这是操作系统和套接字接口的限制

标签: multithreading networking rust udp tokio


【解决方案1】:

感谢@Frxstrem 和@transistor 的 cmets 解决了这个问题,请参阅代码的更改注释:

 loop {
        let thread_socket = arc.clone();
        let debounce_tx = debounce_tx.clone(); // moved up
       /* let _server = task::spawn({
            async move { */
                if let result = thread_socket.recv_from(&mut buf).await { // previously while
                    match result {
                        Ok((len, addr)) => {
                            eprintln!("Bytes len: {} from {}", len, addr);
                            debounce_tx
                                .send(buf.to_vec())
                                .await
                                .expect("Unable to talk to debounce");
                        }
                        Err(_) => {
                            eprintln!("Couldnt get datagram");
                        }
                    }
                 }
                 // Prevent deadlocks
                 drop(debounce_tx);
         /*   }
        }); */
    }

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2012-08-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-06-19
    • 2019-11-17
    相关资源
    最近更新 更多