Bläddra i källkod

test: add the integration test

Yujia Qiao 4 år sedan
förälder
incheckning
6d0bf7e7e8
4 ändrade filer med 266 tillägg och 0 borttagningar
  1. 75 0
      tests/common/mod.rs
  2. 139 0
      tests/integration_test.rs
  3. 23 0
      tests/tcp_transport.toml
  4. 29 0
      tests/tls_transport.toml

+ 75 - 0
tests/common/mod.rs

@@ -0,0 +1,75 @@
+use std::path::PathBuf;
+
+use anyhow::Result;
+use tokio::{
+    io::{self, AsyncReadExt, AsyncWriteExt},
+    net::{TcpListener, TcpStream, ToSocketAddrs},
+    sync::broadcast,
+};
+
+pub const PING: &str = "ping";
+pub const PONG: &str = "pong";
+
+pub async fn run_rathole_server(
+    config_path: &str,
+    shutdown_rx: broadcast::Receiver<bool>,
+) -> Result<()> {
+    let cli = rathole::Cli {
+        config_path: PathBuf::from(config_path),
+        server: true,
+        client: false,
+    };
+    rathole::run(&cli, shutdown_rx).await
+}
+
+pub async fn run_rathole_client(
+    config_path: &str,
+    shutdown_rx: broadcast::Receiver<bool>,
+) -> Result<()> {
+    let cli = rathole::Cli {
+        config_path: PathBuf::from(config_path),
+        server: false,
+        client: true,
+    };
+    rathole::run(&cli, shutdown_rx).await
+}
+
+pub async fn echo_server<A: ToSocketAddrs>(addr: A) -> Result<()> {
+    let l = TcpListener::bind(addr).await?;
+
+    loop {
+        let (conn, _addr) = l.accept().await?;
+        tokio::spawn(async move {
+            let _ = echo(conn).await;
+        });
+    }
+}
+
+pub async fn pingpong_server<A: ToSocketAddrs>(addr: A) -> Result<()> {
+    let l = TcpListener::bind(addr).await?;
+
+    loop {
+        let (conn, _addr) = l.accept().await?;
+        tokio::spawn(async move {
+            let _ = pingpong(conn).await;
+        });
+    }
+}
+
+async fn echo(conn: TcpStream) -> Result<()> {
+    let (mut rd, mut wr) = conn.into_split();
+    io::copy(&mut rd, &mut wr).await?;
+
+    Ok(())
+}
+
+async fn pingpong(mut conn: TcpStream) -> Result<()> {
+    let mut buf = [0u8; PING.len()];
+
+    while conn.read_exact(&mut buf).await? != 0 {
+        assert_eq!(buf, PING.as_bytes());
+        conn.write_all(PONG.as_bytes()).await?;
+    }
+
+    Ok(())
+}

+ 139 - 0
tests/integration_test.rs

@@ -0,0 +1,139 @@
+use anyhow::Result;
+use common::{run_rathole_client, PING, PONG};
+use rand::Rng;
+use std::time::Duration;
+use tokio::{
+    io::{AsyncReadExt, AsyncWriteExt},
+    net::TcpStream,
+    sync::broadcast,
+    time,
+};
+
+use crate::common::run_rathole_server;
+
+mod common;
+
+const ECHO_SERVER_ADDR: &str = "localhost:8080";
+const PINGPONG_SERVER_ADDR: &str = "localhost:8081";
+const ECHO_SERVER_ADDR_EXPOSED: &str = "localhost:2334";
+const PINGPONG_SERVER_ADDR_EXPOSED: &str = "localhost:2335";
+const HITTER_NUM: usize = 4;
+
+#[tokio::test]
+async fn main() -> Result<()> {
+    // Spawn a echo server
+    tokio::spawn(async move {
+        if let Err(e) = common::echo_server(ECHO_SERVER_ADDR).await {
+            panic!("Failed to run the echo server for testing: {:?}", e);
+        }
+    });
+
+    // Spawn a pingpong server
+    tokio::spawn(async move {
+        if let Err(e) = common::pingpong_server(PINGPONG_SERVER_ADDR).await {
+            panic!("Failed to run the pingpong server for testing: {:?}", e);
+        }
+    });
+
+    test("tests/tcp_transport.toml").await?;
+    test("tests/tls_transport.toml").await?;
+
+    Ok(())
+}
+
+async fn test(config_path: &'static str) -> Result<()> {
+    let (client_shutdown_tx, client_shutdown_rx) = broadcast::channel(1);
+    let (server_shutdown_tx, server_shutdown_rx) = broadcast::channel(1);
+
+    // Start the client
+    tokio::spawn(async move {
+        run_rathole_client(&config_path, client_shutdown_rx)
+            .await
+            .unwrap();
+    });
+
+    // Sleep for 1 second. Expect the client keep retrying to reach the server
+    time::sleep(Duration::from_secs(1)).await;
+
+    // Start the server
+    tokio::spawn(async move {
+        run_rathole_server(&config_path, server_shutdown_rx)
+            .await
+            .unwrap();
+    });
+    time::sleep(Duration::from_secs(1)).await; // Wait for the client to retry
+
+    echo_hitter(ECHO_SERVER_ADDR_EXPOSED).await.unwrap();
+    pingpong_hitter(PINGPONG_SERVER_ADDR_EXPOSED).await.unwrap();
+
+    // Simulate the client crash and restart
+    client_shutdown_tx.send(true)?;
+    time::sleep(Duration::from_millis(500)).await;
+    let client_shutdown_rx = client_shutdown_tx.subscribe();
+    tokio::spawn(async move {
+        run_rathole_client(&config_path, client_shutdown_rx)
+            .await
+            .unwrap();
+    });
+
+    echo_hitter(ECHO_SERVER_ADDR_EXPOSED).await.unwrap();
+    pingpong_hitter(PINGPONG_SERVER_ADDR_EXPOSED).await.unwrap();
+
+    // Simulate the server crash and restart
+    server_shutdown_tx.send(true)?;
+    time::sleep(Duration::from_millis(500)).await;
+    let server_shutdown_rx = server_shutdown_tx.subscribe();
+    tokio::spawn(async move {
+        run_rathole_server(&config_path, server_shutdown_rx)
+            .await
+            .unwrap();
+    });
+    time::sleep(Duration::from_secs(1)).await; // Wait for the client to retry
+
+    // Simulate heavy load
+    for _ in 0..HITTER_NUM / 2 {
+        tokio::spawn(async move {
+            echo_hitter(ECHO_SERVER_ADDR_EXPOSED).await.unwrap();
+        });
+
+        tokio::spawn(async move {
+            pingpong_hitter(PINGPONG_SERVER_ADDR_EXPOSED).await.unwrap();
+        });
+    }
+
+    // Shutdown
+    server_shutdown_tx.send(true)?;
+    client_shutdown_tx.send(true)?;
+
+    Ok(())
+}
+
+async fn echo_hitter(addr: &str) -> Result<()> {
+    let mut conn = TcpStream::connect(addr).await?;
+
+    let mut wr = [0u8; 1024];
+    let mut rd = [0u8; 1024];
+    for _ in 0..100 {
+        rand::thread_rng().fill(&mut wr);
+        conn.write_all(&wr).await?;
+        conn.read_exact(&mut rd).await?;
+        assert_eq!(wr, rd);
+    }
+
+    Ok(())
+}
+
+async fn pingpong_hitter(addr: &str) -> Result<()> {
+    let mut conn = TcpStream::connect(addr).await?;
+
+    let wr = PING.as_bytes();
+    let mut rd = [0u8; PONG.len()];
+
+    for _ in 0..100 {
+        conn.write_all(wr).await?;
+        conn.read_exact(&mut rd).await?;
+        assert_eq!(rd, PONG.as_bytes());
+    }
+
+    Ok(())
+}

+ 23 - 0
tests/tcp_transport.toml

@@ -0,0 +1,23 @@
+[client]
+remote_addr = "localhost:2333" 
+default_token = "default_token_if_not_specify" 
+
+[client.transport]
+type = "tcp" 
+
+[client.services.echo] 
+local_addr = "localhost:8080" 
+[client.services.pingpong] 
+local_addr = "localhost:8081" 
+
+[server]
+bind_addr = "0.0.0.0:2333" 
+default_token = "default_token_if_not_specify" 
+
+[server.transport]
+type = "tcp" 
+
+[server.services.echo] 
+bind_addr = "0.0.0.0:2334" 
+[server.services.pingpong] 
+bind_addr = "0.0.0.0:2335" 

+ 29 - 0
tests/tls_transport.toml

@@ -0,0 +1,29 @@
+[client]
+remote_addr = "localhost:2333" 
+default_token = "default_token_if_not_specify" 
+
+[client.transport]
+type = "tls" 
+[client.transport.tls]
+trusted_root = "examples/tls/ca-cert.pem"
+hostname = "0.0.0.0"
+
+[client.services.echo] 
+local_addr = "localhost:8080" 
+[client.services.pingpong] 
+local_addr = "localhost:8081" 
+
+[server]
+bind_addr = "0.0.0.0:2333" 
+default_token = "default_token_if_not_specify" 
+
+[server.transport]
+type = "tls" 
+[server.transport.tls]
+pkcs12 = "examples/tls/identity.pfx"
+pkcs12_password = "1234"
+
+[server.services.echo] 
+bind_addr = "0.0.0.0:2334" 
+[server.services.pingpong] 
+bind_addr = "0.0.0.0:2335"