integration_test.rs 7.2 KB


  1. use anyhow::Result;
  2. use common::{run_rathole_client, PING, PONG};
  3. use rand::Rng;
  4. use std::time::Duration;
  5. use tokio::{
  6. io::{AsyncReadExt, AsyncWriteExt},
  7. net::{TcpStream, UdpSocket},
  8. sync::broadcast,
  9. time,
  10. };
  11. use tracing::{debug, info, instrument};
  12. use tracing_subscriber::EnvFilter;
  13. use crate::common::run_rathole_server;
  14. mod common;
  15. const ECHO_SERVER_ADDR: &str = "127.0.0.1:8080";
  16. const PINGPONG_SERVER_ADDR: &str = "127.0.0.1:8081";
  17. const ECHO_SERVER_ADDR_EXPOSED: &str = "127.0.0.1:2334";
  18. const PINGPONG_SERVER_ADDR_EXPOSED: &str = "127.0.0.1:2335";
  19. const HITTER_NUM: usize = 4;
  20. #[derive(Clone, Copy, Debug)]
  21. enum Type {
  22. Tcp,
  23. Udp,
  24. }
  25. fn init() {
  26. let level = "info";
  27. let _ = tracing_subscriber::fmt()
  28. .with_env_filter(
  29. EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::from(level)),
  30. )
  31. .try_init();
  32. }
  33. #[tokio::test]
  34. async fn tcp() -> Result<()> {
  35. init();
  36. // Spawn a echo server
  37. tokio::spawn(async move {
  38. if let Err(e) = common::tcp::echo_server(ECHO_SERVER_ADDR).await {
  39. panic!("Failed to run the echo server for testing: {:?}", e);
  40. }
  41. });
  42. // Spawn a pingpong server
  43. tokio::spawn(async move {
  44. if let Err(e) = common::tcp::pingpong_server(PINGPONG_SERVER_ADDR).await {
  45. panic!("Failed to run the pingpong server for testing: {:?}", e);
  46. }
  47. });
  48. test("tests/for_tcp/tcp_transport.toml", Type::Tcp).await?;
  49. // FIXME: Self-signed certificate on Mac requires mannual interference. Disable CI for now
  50. #[cfg(not(target_os = "macos"))]
  51. test("tests/for_tcp/tls_transport.toml", Type::Tcp).await?;
  52. test("tests/for_tcp/noise_transport.toml", Type::Tcp).await?;
  53. Ok(())
  54. }
  55. #[tokio::test]
  56. async fn udp() -> Result<()> {
  57. init();
  58. // Spawn a echo server
  59. tokio::spawn(async move {
  60. if let Err(e) = common::udp::echo_server(ECHO_SERVER_ADDR).await {
  61. panic!("Failed to run the echo server for testing: {:?}", e);
  62. }
  63. });
  64. // Spawn a pingpong server
  65. tokio::spawn(async move {
  66. if let Err(e) = common::udp::pingpong_server(PINGPONG_SERVER_ADDR).await {
  67. panic!("Failed to run the pingpong server for testing: {:?}", e);
  68. }
  69. });
  70. test("tests/for_udp/tcp_transport.toml", Type::Udp).await?;
  71. // See above
  72. #[cfg(not(target_os = "macos"))]
  73. test("tests/for_udp/tls_transport.toml", Type::Udp).await?;
  74. test("tests/for_udp/noise_transport.toml", Type::Udp).await?;
  75. Ok(())
  76. }
  77. #[instrument]
  78. async fn test(config_path: &'static str, t: Type) -> Result<()> {
  79. let (client_shutdown_tx, client_shutdown_rx) = broadcast::channel(1);
  80. let (server_shutdown_tx, server_shutdown_rx) = broadcast::channel(1);
  81. // Start the client
  82. info!("start the client");
  83. let client = tokio::spawn(async move {
  84. run_rathole_client(config_path, client_shutdown_rx)
  85. .await
  86. .unwrap();
  87. });
  88. // Sleep for 1 second. Expect the client keep retrying to reach the server
  89. time::sleep(Duration::from_secs(1)).await;
  90. // Start the server
  91. info!("start the server");
  92. let server = tokio::spawn(async move {
  93. run_rathole_server(config_path, server_shutdown_rx)
  94. .await
  95. .unwrap();
  96. });
  97. time::sleep(Duration::from_millis(2500)).await; // Wait for the client to retry
  98. info!("echo");
  99. echo_hitter(ECHO_SERVER_ADDR_EXPOSED, t).await.unwrap();
  100. info!("pingpong");
  101. pingpong_hitter(PINGPONG_SERVER_ADDR_EXPOSED, t)
  102. .await
  103. .unwrap();
  104. // Simulate the client crash and restart
  105. info!("shutdown the client");
  106. client_shutdown_tx.send(true)?;
  107. let _ = tokio::join!(client);
  108. info!("restart the client");
  109. let client_shutdown_rx = client_shutdown_tx.subscribe();
  110. let client = tokio::spawn(async move {
  111. run_rathole_client(config_path, client_shutdown_rx)
  112. .await
  113. .unwrap();
  114. });
  115. time::sleep(Duration::from_secs(1)).await; // Wait for the client to start
  116. info!("echo");
  117. echo_hitter(ECHO_SERVER_ADDR_EXPOSED, t).await.unwrap();
  118. info!("pingpong");
  119. pingpong_hitter(PINGPONG_SERVER_ADDR_EXPOSED, t)
  120. .await
  121. .unwrap();
  122. // Simulate the server crash and restart
  123. info!("shutdown the server");
  124. server_shutdown_tx.send(true)?;
  125. let _ = tokio::join!(server);
  126. info!("restart the server");
  127. let server_shutdown_rx = server_shutdown_tx.subscribe();
  128. let server = tokio::spawn(async move {
  129. run_rathole_server(config_path, server_shutdown_rx)
  130. .await
  131. .unwrap();
  132. });
  133. time::sleep(Duration::from_millis(2500)).await; // Wait for the client to retry
  134. // Simulate heavy load
  135. info!("lots of echo and pingpong");
  136. let mut v = Vec::new();
  137. for _ in 0..HITTER_NUM / 2 {
  138. v.push(tokio::spawn(async move {
  139. echo_hitter(ECHO_SERVER_ADDR_EXPOSED, t).await.unwrap();
  140. }));
  141. v.push(tokio::spawn(async move {
  142. pingpong_hitter(PINGPONG_SERVER_ADDR_EXPOSED, t)
  143. .await
  144. .unwrap();
  145. }));
  146. }
  147. for h in v {
  148. assert!(tokio::join!(h).0.is_ok());
  149. }
  150. // Shutdown
  151. info!("shutdown the server and the client");
  152. server_shutdown_tx.send(true)?;
  153. client_shutdown_tx.send(true)?;
  154. let _ = tokio::join!(server, client);
  155. Ok(())
  156. }
  157. async fn echo_hitter(addr: &'static str, t: Type) -> Result<()> {
  158. match t {
  159. Type::Tcp => tcp_echo_hitter(addr).await,
  160. Type::Udp => udp_echo_hitter(addr).await,
  161. }
  162. }
  163. async fn pingpong_hitter(addr: &'static str, t: Type) -> Result<()> {
  164. match t {
  165. Type::Tcp => tcp_pingpong_hitter(addr).await,
  166. Type::Udp => udp_pingpong_hitter(addr).await,
  167. }
  168. }
  169. async fn tcp_echo_hitter(addr: &'static str) -> Result<()> {
  170. let mut conn = TcpStream::connect(addr).await?;
  171. let mut wr = [0u8; 1024];
  172. let mut rd = [0u8; 1024];
  173. for _ in 0..100 {
  174. rand::thread_rng().fill(&mut wr);
  175. conn.write_all(&wr).await?;
  176. conn.read_exact(&mut rd).await?;
  177. assert_eq!(wr, rd);
  178. }
  179. Ok(())
  180. }
  181. async fn udp_echo_hitter(addr: &'static str) -> Result<()> {
  182. let conn = UdpSocket::bind("127.0.0.1:0").await?;
  183. conn.connect(addr).await?;
  184. let mut wr = [0u8; 128];
  185. let mut rd = [0u8; 128];
  186. for _ in 0..3 {
  187. rand::thread_rng().fill(&mut wr);
  188. conn.send(&wr).await?;
  189. debug!("send");
  190. conn.recv(&mut rd).await?;
  191. debug!("recv");
  192. assert_eq!(wr, rd);
  193. }
  194. Ok(())
  195. }
  196. async fn tcp_pingpong_hitter(addr: &'static str) -> Result<()> {
  197. let mut conn = TcpStream::connect(addr).await?;
  198. let wr = PING.as_bytes();
  199. let mut rd = [0u8; PONG.len()];
  200. for _ in 0..100 {
  201. conn.write_all(wr).await?;
  202. conn.read_exact(&mut rd).await?;
  203. assert_eq!(rd, PONG.as_bytes());
  204. }
  205. Ok(())
  206. }
  207. async fn udp_pingpong_hitter(addr: &'static str) -> Result<()> {
  208. let conn = UdpSocket::bind("127.0.0.1:0").await?;
  209. conn.connect(&addr).await?;
  210. let wr = PING.as_bytes();
  211. let mut rd = [0u8; PONG.len()];
  212. for _ in 0..3 {
  213. conn.send(wr).await?;
  214. debug!("ping");
  215. conn.recv(&mut rd).await?;
  216. debug!("pong");
  217. assert_eq!(rd, PONG.as_bytes());
  218. }
  219. Ok(())
  220. }