| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- use anyhow::Result;
- use common::{run_rathole_client, PING, PONG};
- use rand::Rng;
- use std::time::Duration;
- use tokio::{
- io::{AsyncReadExt, AsyncWriteExt},
- net::TcpStream,
- sync::broadcast,
- time,
- };
- use crate::common::run_rathole_server;
- mod common;
- const ECHO_SERVER_ADDR: &str = "localhost:8080";
- const PINGPONG_SERVER_ADDR: &str = "localhost:8081";
- const ECHO_SERVER_ADDR_EXPOSED: &str = "localhost:2334";
- const PINGPONG_SERVER_ADDR_EXPOSED: &str = "localhost:2335";
- const HITTER_NUM: usize = 4;
- #[tokio::test]
- async fn main() -> Result<()> {
- // Spawn a echo server
- tokio::spawn(async move {
- if let Err(e) = common::echo_server(ECHO_SERVER_ADDR).await {
- panic!("Failed to run the echo server for testing: {:?}", e);
- }
- });
- // Spawn a pingpong server
- tokio::spawn(async move {
- if let Err(e) = common::pingpong_server(PINGPONG_SERVER_ADDR).await {
- panic!("Failed to run the pingpong server for testing: {:?}", e);
- }
- });
- test("tests/tcp_transport.toml").await?;
- test("tests/tls_transport.toml").await?;
- Ok(())
- }
- async fn test(config_path: &'static str) -> Result<()> {
- let (client_shutdown_tx, client_shutdown_rx) = broadcast::channel(1);
- let (server_shutdown_tx, server_shutdown_rx) = broadcast::channel(1);
- // Start the client
- tokio::spawn(async move {
- run_rathole_client(&config_path, client_shutdown_rx)
- .await
- .unwrap();
- });
- // Sleep for 1 second. Expect the client keep retrying to reach the server
- time::sleep(Duration::from_secs(1)).await;
- // Start the server
- tokio::spawn(async move {
- run_rathole_server(&config_path, server_shutdown_rx)
- .await
- .unwrap();
- });
- time::sleep(Duration::from_secs(1)).await; // Wait for the client to retry
- echo_hitter(ECHO_SERVER_ADDR_EXPOSED).await.unwrap();
- pingpong_hitter(PINGPONG_SERVER_ADDR_EXPOSED).await.unwrap();
- // Simulate the client crash and restart
- client_shutdown_tx.send(true)?;
- time::sleep(Duration::from_millis(500)).await;
- let client_shutdown_rx = client_shutdown_tx.subscribe();
- tokio::spawn(async move {
- run_rathole_client(&config_path, client_shutdown_rx)
- .await
- .unwrap();
- });
- echo_hitter(ECHO_SERVER_ADDR_EXPOSED).await.unwrap();
- pingpong_hitter(PINGPONG_SERVER_ADDR_EXPOSED).await.unwrap();
- // Simulate the server crash and restart
- server_shutdown_tx.send(true)?;
- time::sleep(Duration::from_millis(500)).await;
- let server_shutdown_rx = server_shutdown_tx.subscribe();
- tokio::spawn(async move {
- run_rathole_server(&config_path, server_shutdown_rx)
- .await
- .unwrap();
- });
- time::sleep(Duration::from_secs(1)).await; // Wait for the client to retry
- // Simulate heavy load
- for _ in 0..HITTER_NUM / 2 {
- tokio::spawn(async move {
- echo_hitter(ECHO_SERVER_ADDR_EXPOSED).await.unwrap();
- });
- tokio::spawn(async move {
- pingpong_hitter(PINGPONG_SERVER_ADDR_EXPOSED).await.unwrap();
- });
- }
- // Shutdown
- server_shutdown_tx.send(true)?;
- client_shutdown_tx.send(true)?;
- Ok(())
- }
- async fn echo_hitter(addr: &str) -> Result<()> {
- let mut conn = TcpStream::connect(addr).await?;
- let mut wr = [0u8; 1024];
- let mut rd = [0u8; 1024];
- for _ in 0..100 {
- rand::thread_rng().fill(&mut wr);
- conn.write_all(&wr).await?;
- conn.read_exact(&mut rd).await?;
- assert_eq!(wr, rd);
- }
- Ok(())
- }
- async fn pingpong_hitter(addr: &str) -> Result<()> {
- let mut conn = TcpStream::connect(addr).await?;
- let wr = PING.as_bytes();
- let mut rd = [0u8; PONG.len()];
- for _ in 0..100 {
- conn.write_all(wr).await?;
- conn.read_exact(&mut rd).await?;
- assert_eq!(rd, PONG.as_bytes());
- }
- Ok(())
- }
|