integration_test.rs 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. use anyhow::Result;
  2. use common::{run_rathole_client, PING, PONG};
  3. use rand::Rng;
  4. use std::time::Duration;
  5. use tokio::{
  6. io::{AsyncReadExt, AsyncWriteExt},
  7. net::{TcpStream, UdpSocket},
  8. sync::broadcast,
  9. time,
  10. };
  11. use tracing::{debug, info, instrument};
  12. use tracing_subscriber::EnvFilter;
  13. use crate::common::run_rathole_server;
  14. mod common;
  15. const ECHO_SERVER_ADDR: &str = "0.0.0.0:8080";
  16. const PINGPONG_SERVER_ADDR: &str = "0.0.0.0:8081";
  17. const ECHO_SERVER_ADDR_EXPOSED: &str = "0.0.0.0:2334";
  18. const PINGPONG_SERVER_ADDR_EXPOSED: &str = "0.0.0.0:2335";
  19. const HITTER_NUM: usize = 4;
  20. #[derive(Clone, Copy, Debug)]
  21. enum Type {
  22. Tcp,
  23. Udp,
  24. }
  25. fn init() {
  26. let level = "info";
  27. let _ = tracing_subscriber::fmt()
  28. .with_env_filter(
  29. EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::from(level)),
  30. )
  31. .try_init();
  32. }
  33. #[tokio::test]
  34. async fn tcp() -> Result<()> {
  35. init();
  36. // Spawn a echo server
  37. tokio::spawn(async move {
  38. if let Err(e) = common::tcp::echo_server(ECHO_SERVER_ADDR).await {
  39. panic!("Failed to run the echo server for testing: {:?}", e);
  40. }
  41. });
  42. // Spawn a pingpong server
  43. tokio::spawn(async move {
  44. if let Err(e) = common::tcp::pingpong_server(PINGPONG_SERVER_ADDR).await {
  45. panic!("Failed to run the pingpong server for testing: {:?}", e);
  46. }
  47. });
  48. test("tests/for_tcp/tcp_transport.toml", Type::Tcp).await?;
  49. test("tests/for_tcp/tls_transport.toml", Type::Tcp).await?;
  50. Ok(())
  51. }
  52. #[tokio::test]
  53. async fn udp() -> Result<()> {
  54. init();
  55. // Spawn a echo server
  56. tokio::spawn(async move {
  57. if let Err(e) = common::udp::echo_server(ECHO_SERVER_ADDR).await {
  58. panic!("Failed to run the echo server for testing: {:?}", e);
  59. }
  60. });
  61. // Spawn a pingpong server
  62. tokio::spawn(async move {
  63. if let Err(e) = common::udp::pingpong_server(PINGPONG_SERVER_ADDR).await {
  64. panic!("Failed to run the pingpong server for testing: {:?}", e);
  65. }
  66. });
  67. test("tests/for_udp/tcp_transport.toml", Type::Udp).await?;
  68. test("tests/for_udp/tls_transport.toml", Type::Udp).await?;
  69. Ok(())
  70. }
  71. #[instrument]
  72. async fn test(config_path: &'static str, t: Type) -> Result<()> {
  73. let (client_shutdown_tx, client_shutdown_rx) = broadcast::channel(1);
  74. let (server_shutdown_tx, server_shutdown_rx) = broadcast::channel(1);
  75. // Start the client
  76. info!("start the client");
  77. tokio::spawn(async move {
  78. run_rathole_client(&config_path, client_shutdown_rx)
  79. .await
  80. .unwrap();
  81. });
  82. // Sleep for 1 second. Expect the client keep retrying to reach the server
  83. time::sleep(Duration::from_secs(1)).await;
  84. // Start the server
  85. info!("start the server");
  86. tokio::spawn(async move {
  87. run_rathole_server(&config_path, server_shutdown_rx)
  88. .await
  89. .unwrap();
  90. });
  91. time::sleep(Duration::from_secs(1)).await; // Wait for the client to retry
  92. info!("echo");
  93. echo_hitter(ECHO_SERVER_ADDR_EXPOSED, t).await.unwrap();
  94. info!("pingpong");
  95. pingpong_hitter(PINGPONG_SERVER_ADDR_EXPOSED, t)
  96. .await
  97. .unwrap();
  98. // Simulate the client crash and restart
  99. info!("shutdown the client");
  100. client_shutdown_tx.send(true)?;
  101. time::sleep(Duration::from_millis(500)).await;
  102. info!("restart the client");
  103. let client_shutdown_rx = client_shutdown_tx.subscribe();
  104. let client = tokio::spawn(async move {
  105. run_rathole_client(&config_path, client_shutdown_rx)
  106. .await
  107. .unwrap();
  108. });
  109. time::sleep(Duration::from_secs(1)).await; // Wait for the client to start
  110. info!("echo");
  111. echo_hitter(ECHO_SERVER_ADDR_EXPOSED, t).await.unwrap();
  112. info!("pingpong");
  113. pingpong_hitter(PINGPONG_SERVER_ADDR_EXPOSED, t)
  114. .await
  115. .unwrap();
  116. // Simulate the server crash and restart
  117. info!("shutdown the server");
  118. server_shutdown_tx.send(true)?;
  119. time::sleep(Duration::from_millis(500)).await;
  120. info!("restart the server");
  121. let server_shutdown_rx = server_shutdown_tx.subscribe();
  122. let server = tokio::spawn(async move {
  123. run_rathole_server(&config_path, server_shutdown_rx)
  124. .await
  125. .unwrap();
  126. });
  127. time::sleep(Duration::from_secs(1)).await; // Wait for the client to retry
  128. // Simulate heavy load
  129. info!("lots of echo and pingpong");
  130. for _ in 0..HITTER_NUM / 2 {
  131. tokio::spawn(async move {
  132. echo_hitter(ECHO_SERVER_ADDR_EXPOSED, t).await.unwrap();
  133. });
  134. tokio::spawn(async move {
  135. pingpong_hitter(PINGPONG_SERVER_ADDR_EXPOSED, t)
  136. .await
  137. .unwrap();
  138. });
  139. }
  140. // Shutdown
  141. info!("shutdown the server and the client");
  142. server_shutdown_tx.send(true)?;
  143. client_shutdown_tx.send(true)?;
  144. let _ = tokio::join!(server, client);
  145. Ok(())
  146. }
  147. async fn echo_hitter(addr: &'static str, t: Type) -> Result<()> {
  148. match t {
  149. Type::Tcp => tcp_echo_hitter(addr).await,
  150. Type::Udp => udp_echo_hitter(addr).await,
  151. }
  152. }
  153. async fn pingpong_hitter(addr: &'static str, t: Type) -> Result<()> {
  154. match t {
  155. Type::Tcp => tcp_pingpong_hitter(addr).await,
  156. Type::Udp => udp_pingpong_hitter(addr).await,
  157. }
  158. }
  159. async fn tcp_echo_hitter(addr: &'static str) -> Result<()> {
  160. let mut conn = TcpStream::connect(addr).await?;
  161. let mut wr = [0u8; 1024];
  162. let mut rd = [0u8; 1024];
  163. for _ in 0..100 {
  164. rand::thread_rng().fill(&mut wr);
  165. conn.write_all(&wr).await?;
  166. conn.read_exact(&mut rd).await?;
  167. assert_eq!(wr, rd);
  168. }
  169. Ok(())
  170. }
  171. async fn udp_echo_hitter(addr: &'static str) -> Result<()> {
  172. let conn = UdpSocket::bind("0.0.0.0:0").await?;
  173. conn.connect(addr).await?;
  174. let mut wr = [0u8; 128];
  175. let mut rd = [0u8; 128];
  176. for _ in 0..3 {
  177. rand::thread_rng().fill(&mut wr);
  178. conn.send(&wr).await?;
  179. debug!("send");
  180. conn.recv(&mut rd).await?;
  181. debug!("recv");
  182. assert_eq!(wr, rd);
  183. }
  184. Ok(())
  185. }
  186. async fn tcp_pingpong_hitter(addr: &'static str) -> Result<()> {
  187. let mut conn = TcpStream::connect(addr).await?;
  188. let wr = PING.as_bytes();
  189. let mut rd = [0u8; PONG.len()];
  190. for _ in 0..100 {
  191. conn.write_all(wr).await?;
  192. conn.read_exact(&mut rd).await?;
  193. assert_eq!(rd, PONG.as_bytes());
  194. }
  195. Ok(())
  196. }
  197. async fn udp_pingpong_hitter(addr: &'static str) -> Result<()> {
  198. let conn = UdpSocket::bind("0.0.0.0:0").await?;
  199. conn.connect(&addr).await?;
  200. let wr = PING.as_bytes();
  201. let mut rd = [0u8; PONG.len()];
  202. for _ in 0..3 {
  203. conn.send(wr).await?;
  204. debug!("ping");
  205. conn.recv(&mut rd).await?;
  206. debug!("pong");
  207. assert_eq!(rd, PONG.as_bytes());
  208. }
  209. Ok(())
  210. }