Jelajahi Sumber

fix: ignore broken data channels (#110)

Yujia Qiao 4 tahun lalu
induk
melakukan
3331f3e7e8
1 mengubah file dengan 18 tambahan dan 9 penghapusan
  1. 18 9
      src/server.rs

+ 18 - 9
src/server.rs

@@ -587,17 +587,26 @@ async fn run_tcp_connection_pool<T: Transport>(
     data_ch_req_tx: mpsc::UnboundedSender<bool>,
     shutdown_rx: broadcast::Receiver<bool>,
 ) -> Result<()> {
-    let mut visitor_rx = tcp_listen_and_send(bind_addr, data_ch_req_tx, shutdown_rx);
-    while let Some(mut visitor) = visitor_rx.recv().await {
-        if let Some(mut ch) = data_ch_rx.recv().await {
-            tokio::spawn(async move {
-                let cmd = bincode::serialize(&DataChannelCmd::StartForwardTcp).unwrap();
+    let mut visitor_rx = tcp_listen_and_send(bind_addr, data_ch_req_tx.clone(), shutdown_rx);
+    let cmd = bincode::serialize(&DataChannelCmd::StartForwardTcp).unwrap();
+
+    'pool: while let Some(mut visitor) = visitor_rx.recv().await {
+        loop {
+            if let Some(mut ch) = data_ch_rx.recv().await {
                 if ch.write_all(&cmd).await.is_ok() {
-                    let _ = copy_bidirectional(&mut ch, &mut visitor).await;
+                    tokio::spawn(async move {
+                        let _ = copy_bidirectional(&mut ch, &mut visitor).await;
+                    });
+                    break;
+                } else {
+                    // Current data channel is broken. Request for a new one
+                    if data_ch_req_tx.send(true).is_err() {
+                        break 'pool;
+                    }
                 }
-            });
-        } else {
-            break;
+            } else {
+                break 'pool;
+            }
         }
     }