Răsfoiți Sursa

fix: improve exp backoff for control channels

Yujia Qiao 4 ani în urmă
părinte
comite
3fad4c4350
3 a modificat fișierele cu 16 adăugiri și 5 ștergeri
  1. 13 3
      src/client.rs
  2. 1 0
      src/constants.rs
  3. 2 2
      tests/integration_test.rs

+ 13 - 3
src/client.rs

@@ -17,7 +17,7 @@ use std::sync::Arc;
 use tokio::io::{self, copy_bidirectional, AsyncReadExt, AsyncWriteExt};
 use tokio::net::{TcpStream, UdpSocket};
 use tokio::sync::{broadcast, mpsc, oneshot, RwLock};
-use tokio::time::{self, Duration};
+use tokio::time::{self, Duration, Instant};
 use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
 
 #[cfg(feature = "noise")]
@@ -487,6 +487,8 @@ impl ControlChannelHandle {
         tokio::spawn(
             async move {
                 let mut backoff = run_control_chan_backoff();
+                let mut start = Instant::now();
+
                 while let Err(err) = s
                     .run()
                     .await
@@ -496,12 +498,20 @@ impl ControlChannelHandle {
                         break;
                     }
 
-                    if let Some(duration) = backoff.next_backoff() {
+                    if start.elapsed() > Duration::from_secs(3) {
+                        // The client runs for at least 3 secs and then disconnects
+                        // Retry immediately
+                        backoff.reset();
+                        error!("{:#}. Retry...", err);
+                    } else if let Some(duration) = backoff.next_backoff() {
                         error!("{:#}. Retry in {:?}...", err, duration);
                         time::sleep(duration).await;
                     } else {
-                        error!("{:#}. Break", err);
+                        // Should never reach
+                        panic!("{:#}. Break", err);
                     }
+
+                    start = Instant::now();
                 }
             }
             .instrument(Span::current()),

+ 1 - 0
src/constants.rs

@@ -17,6 +17,7 @@ pub fn listen_backoff() -> ExponentialBackoff {
 
 pub fn run_control_chan_backoff() -> ExponentialBackoff {
     ExponentialBackoff {
+        randomization_factor: 0.1,
         max_elapsed_time: None,
         max_interval: Duration::from_secs(1),
         ..Default::default()

+ 2 - 2
tests/integration_test.rs

@@ -113,7 +113,7 @@ async fn test(config_path: &'static str, t: Type) -> Result<()> {
             .await
             .unwrap();
     });
-    time::sleep(Duration::from_millis(2000)).await; // Wait for the client to retry
+    time::sleep(Duration::from_millis(2500)).await; // Wait for the client to retry
 
     info!("echo");
     echo_hitter(ECHO_SERVER_ADDR_EXPOSED, t).await.unwrap();
@@ -155,7 +155,7 @@ async fn test(config_path: &'static str, t: Type) -> Result<()> {
             .await
             .unwrap();
     });
-    time::sleep(Duration::from_millis(2000)).await; // Wait for the client to retry
+    time::sleep(Duration::from_millis(2500)).await; // Wait for the client to retry
 
     // Simulate heavy load
     info!("lots of echo and pingpong");