integration_test.rs 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. use anyhow::Result;
  2. use common::{run_rathole_client, PING, PONG};
  3. use rand::Rng;
  4. use std::time::Duration;
  5. use tokio::{
  6. io::{AsyncReadExt, AsyncWriteExt},
  7. net::TcpStream,
  8. sync::broadcast,
  9. time,
  10. };
  11. use crate::common::run_rathole_server;
  12. mod common;
  13. const ECHO_SERVER_ADDR: &str = "localhost:8080";
  14. const PINGPONG_SERVER_ADDR: &str = "localhost:8081";
  15. const ECHO_SERVER_ADDR_EXPOSED: &str = "localhost:2334";
  16. const PINGPONG_SERVER_ADDR_EXPOSED: &str = "localhost:2335";
  17. const HITTER_NUM: usize = 4;
  18. #[tokio::test]
  19. async fn main() -> Result<()> {
  20. // Spawn a echo server
  21. tokio::spawn(async move {
  22. if let Err(e) = common::echo_server(ECHO_SERVER_ADDR).await {
  23. panic!("Failed to run the echo server for testing: {:?}", e);
  24. }
  25. });
  26. // Spawn a pingpong server
  27. tokio::spawn(async move {
  28. if let Err(e) = common::pingpong_server(PINGPONG_SERVER_ADDR).await {
  29. panic!("Failed to run the pingpong server for testing: {:?}", e);
  30. }
  31. });
  32. test("tests/tcp_transport.toml").await?;
  33. test("tests/tls_transport.toml").await?;
  34. Ok(())
  35. }
  36. async fn test(config_path: &'static str) -> Result<()> {
  37. let (client_shutdown_tx, client_shutdown_rx) = broadcast::channel(1);
  38. let (server_shutdown_tx, server_shutdown_rx) = broadcast::channel(1);
  39. // Start the client
  40. tokio::spawn(async move {
  41. run_rathole_client(&config_path, client_shutdown_rx)
  42. .await
  43. .unwrap();
  44. });
  45. // Sleep for 1 second. Expect the client keep retrying to reach the server
  46. time::sleep(Duration::from_secs(1)).await;
  47. // Start the server
  48. tokio::spawn(async move {
  49. run_rathole_server(&config_path, server_shutdown_rx)
  50. .await
  51. .unwrap();
  52. });
  53. time::sleep(Duration::from_secs(1)).await; // Wait for the client to retry
  54. echo_hitter(ECHO_SERVER_ADDR_EXPOSED).await.unwrap();
  55. pingpong_hitter(PINGPONG_SERVER_ADDR_EXPOSED).await.unwrap();
  56. // Simulate the client crash and restart
  57. client_shutdown_tx.send(true)?;
  58. time::sleep(Duration::from_millis(500)).await;
  59. let client_shutdown_rx = client_shutdown_tx.subscribe();
  60. tokio::spawn(async move {
  61. run_rathole_client(&config_path, client_shutdown_rx)
  62. .await
  63. .unwrap();
  64. });
  65. echo_hitter(ECHO_SERVER_ADDR_EXPOSED).await.unwrap();
  66. pingpong_hitter(PINGPONG_SERVER_ADDR_EXPOSED).await.unwrap();
  67. // Simulate the server crash and restart
  68. server_shutdown_tx.send(true)?;
  69. time::sleep(Duration::from_millis(500)).await;
  70. let server_shutdown_rx = server_shutdown_tx.subscribe();
  71. tokio::spawn(async move {
  72. run_rathole_server(&config_path, server_shutdown_rx)
  73. .await
  74. .unwrap();
  75. });
  76. time::sleep(Duration::from_secs(1)).await; // Wait for the client to retry
  77. // Simulate heavy load
  78. for _ in 0..HITTER_NUM / 2 {
  79. tokio::spawn(async move {
  80. echo_hitter(ECHO_SERVER_ADDR_EXPOSED).await.unwrap();
  81. });
  82. tokio::spawn(async move {
  83. pingpong_hitter(PINGPONG_SERVER_ADDR_EXPOSED).await.unwrap();
  84. });
  85. }
  86. // Shutdown
  87. server_shutdown_tx.send(true)?;
  88. client_shutdown_tx.send(true)?;
  89. Ok(())
  90. }
  91. async fn echo_hitter(addr: &str) -> Result<()> {
  92. let mut conn = TcpStream::connect(addr).await?;
  93. let mut wr = [0u8; 1024];
  94. let mut rd = [0u8; 1024];
  95. for _ in 0..100 {
  96. rand::thread_rng().fill(&mut wr);
  97. conn.write_all(&wr).await?;
  98. conn.read_exact(&mut rd).await?;
  99. assert_eq!(wr, rd);
  100. }
  101. Ok(())
  102. }
  103. async fn pingpong_hitter(addr: &str) -> Result<()> {
  104. let mut conn = TcpStream::connect(addr).await?;
  105. let wr = PING.as_bytes();
  106. let mut rd = [0u8; PONG.len()];
  107. for _ in 0..100 {
  108. conn.write_all(wr).await?;
  109. conn.read_exact(&mut rd).await?;
  110. assert_eq!(rd, PONG.as_bytes());
  111. }
  112. Ok(())
  113. }