integration_test.rs 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. use anyhow::{Ok, Result};
  2. use common::{run_rathole_client, PING, PONG};
  3. use rand::Rng;
  4. use std::time::Duration;
  5. use tokio::{
  6. io::{AsyncReadExt, AsyncWriteExt},
  7. net::{TcpStream, UdpSocket},
  8. sync::broadcast,
  9. time,
  10. };
  11. use tracing::{debug, info, instrument};
  12. use tracing_subscriber::EnvFilter;
  13. use crate::common::run_rathole_server;
  14. mod common;
  15. const ECHO_SERVER_ADDR: &str = "127.0.0.1:8080";
  16. const PINGPONG_SERVER_ADDR: &str = "127.0.0.1:8081";
  17. const ECHO_SERVER_ADDR_EXPOSED: &str = "127.0.0.1:2334";
  18. const PINGPONG_SERVER_ADDR_EXPOSED: &str = "127.0.0.1:2335";
  19. const HITTER_NUM: usize = 4;
  20. #[derive(Clone, Copy, Debug)]
  21. enum Type {
  22. Tcp,
  23. Udp,
  24. }
  25. fn init() {
  26. let level = "info";
  27. let _ = tracing_subscriber::fmt()
  28. .with_env_filter(
  29. EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::from(level)),
  30. )
  31. .try_init();
  32. }
  33. #[tokio::test]
  34. async fn tcp() -> Result<()> {
  35. init();
  36. // Spawn a echo server
  37. tokio::spawn(async move {
  38. if let Err(e) = common::tcp::echo_server(ECHO_SERVER_ADDR).await {
  39. panic!("Failed to run the echo server for testing: {:?}", e);
  40. }
  41. });
  42. // Spawn a pingpong server
  43. tokio::spawn(async move {
  44. if let Err(e) = common::tcp::pingpong_server(PINGPONG_SERVER_ADDR).await {
  45. panic!("Failed to run the pingpong server for testing: {:?}", e);
  46. }
  47. });
  48. test("tests/for_tcp/tcp_transport.toml", Type::Tcp).await?;
  49. #[cfg(any(
  50. // FIXME: Self-signed certificate on macOS nativetls requires manual interference.
  51. all(target_os = "macos", feature = "rustls"),
  52. // On other OS accept run with either
  53. all(not(target_os = "macos"), any(feature = "native-tls", feature = "rustls")),
  54. ))]
  55. test("tests/for_tcp/tls_transport.toml", Type::Tcp).await?;
  56. #[cfg(feature = "noise")]
  57. test("tests/for_tcp/noise_transport.toml", Type::Tcp).await?;
  58. #[cfg(any(feature = "websocket-native-tls", feature = "websocket-rustls"))]
  59. test("tests/for_tcp/websocket_transport.toml", Type::Tcp).await?;
  60. #[cfg(not(target_os = "macos"))]
  61. #[cfg(any(feature = "websocket-native-tls", feature = "websocket-rustls"))]
  62. test("tests/for_tcp/websocket_tls_transport.toml", Type::Tcp).await?;
  63. Ok(())
  64. }
  65. #[tokio::test]
  66. async fn udp() -> Result<()> {
  67. init();
  68. // Spawn a echo server
  69. tokio::spawn(async move {
  70. if let Err(e) = common::udp::echo_server(ECHO_SERVER_ADDR).await {
  71. panic!("Failed to run the echo server for testing: {:?}", e);
  72. }
  73. });
  74. // Spawn a pingpong server
  75. tokio::spawn(async move {
  76. if let Err(e) = common::udp::pingpong_server(PINGPONG_SERVER_ADDR).await {
  77. panic!("Failed to run the pingpong server for testing: {:?}", e);
  78. }
  79. });
  80. test("tests/for_udp/tcp_transport.toml", Type::Udp).await?;
  81. #[cfg(any(
  82. // FIXME: Self-signed certificate on macOS nativetls requires manual interference.
  83. all(target_os = "macos", feature = "rustls"),
  84. // On other OS accept run with either
  85. all(not(target_os = "macos"), any(feature = "native-tls", feature = "rustls")),
  86. ))]
  87. test("tests/for_udp/tls_transport.toml", Type::Udp).await?;
  88. #[cfg(feature = "noise")]
  89. test("tests/for_udp/noise_transport.toml", Type::Udp).await?;
  90. #[cfg(any(feature = "websocket-native-tls", feature = "websocket-rustls"))]
  91. test("tests/for_udp/websocket_transport.toml", Type::Udp).await?;
  92. #[cfg(not(target_os = "macos"))]
  93. #[cfg(any(feature = "websocket-native-tls", feature = "websocket-rustls"))]
  94. test("tests/for_udp/websocket_tls_transport.toml", Type::Udp).await?;
  95. Ok(())
  96. }
  97. #[instrument]
  98. async fn test(config_path: &'static str, t: Type) -> Result<()> {
  99. if cfg!(not(all(feature = "client", feature = "server"))) {
  100. // Skip the test if the client or the server is not enabled
  101. return Ok(());
  102. }
  103. let (client_shutdown_tx, client_shutdown_rx) = broadcast::channel(1);
  104. let (server_shutdown_tx, server_shutdown_rx) = broadcast::channel(1);
  105. // Start the client
  106. info!("start the client");
  107. let client = tokio::spawn(async move {
  108. run_rathole_client(config_path, client_shutdown_rx)
  109. .await
  110. .unwrap();
  111. });
  112. // Sleep for 1 second. Expect the client keep retrying to reach the server
  113. time::sleep(Duration::from_secs(1)).await;
  114. // Start the server
  115. info!("start the server");
  116. let server = tokio::spawn(async move {
  117. run_rathole_server(config_path, server_shutdown_rx)
  118. .await
  119. .unwrap();
  120. });
  121. time::sleep(Duration::from_millis(2500)).await; // Wait for the client to retry
  122. info!("echo");
  123. echo_hitter(ECHO_SERVER_ADDR_EXPOSED, t).await.unwrap();
  124. info!("pingpong");
  125. pingpong_hitter(PINGPONG_SERVER_ADDR_EXPOSED, t)
  126. .await
  127. .unwrap();
  128. // Simulate the client crash and restart
  129. info!("shutdown the client");
  130. client_shutdown_tx.send(true)?;
  131. let _ = tokio::join!(client);
  132. info!("restart the client");
  133. let client_shutdown_rx = client_shutdown_tx.subscribe();
  134. let client = tokio::spawn(async move {
  135. run_rathole_client(config_path, client_shutdown_rx)
  136. .await
  137. .unwrap();
  138. });
  139. time::sleep(Duration::from_secs(1)).await; // Wait for the client to start
  140. info!("echo");
  141. echo_hitter(ECHO_SERVER_ADDR_EXPOSED, t).await.unwrap();
  142. info!("pingpong");
  143. pingpong_hitter(PINGPONG_SERVER_ADDR_EXPOSED, t)
  144. .await
  145. .unwrap();
  146. // Simulate the server crash and restart
  147. info!("shutdown the server");
  148. server_shutdown_tx.send(true)?;
  149. let _ = tokio::join!(server);
  150. info!("restart the server");
  151. let server_shutdown_rx = server_shutdown_tx.subscribe();
  152. let server = tokio::spawn(async move {
  153. run_rathole_server(config_path, server_shutdown_rx)
  154. .await
  155. .unwrap();
  156. });
  157. time::sleep(Duration::from_millis(2500)).await; // Wait for the client to retry
  158. // Simulate heavy load
  159. info!("lots of echo and pingpong");
  160. let mut v = Vec::new();
  161. for _ in 0..HITTER_NUM / 2 {
  162. v.push(tokio::spawn(async move {
  163. echo_hitter(ECHO_SERVER_ADDR_EXPOSED, t).await.unwrap();
  164. }));
  165. v.push(tokio::spawn(async move {
  166. pingpong_hitter(PINGPONG_SERVER_ADDR_EXPOSED, t)
  167. .await
  168. .unwrap();
  169. }));
  170. }
  171. for h in v {
  172. assert!(tokio::join!(h).0.is_ok());
  173. }
  174. // Shutdown
  175. info!("shutdown the server and the client");
  176. server_shutdown_tx.send(true)?;
  177. client_shutdown_tx.send(true)?;
  178. let _ = tokio::join!(server, client);
  179. Ok(())
  180. }
  181. async fn echo_hitter(addr: &'static str, t: Type) -> Result<()> {
  182. match t {
  183. Type::Tcp => tcp_echo_hitter(addr).await,
  184. Type::Udp => udp_echo_hitter(addr).await,
  185. }
  186. }
  187. async fn pingpong_hitter(addr: &'static str, t: Type) -> Result<()> {
  188. match t {
  189. Type::Tcp => tcp_pingpong_hitter(addr).await,
  190. Type::Udp => udp_pingpong_hitter(addr).await,
  191. }
  192. }
  193. async fn tcp_echo_hitter(addr: &'static str) -> Result<()> {
  194. let mut conn = TcpStream::connect(addr).await?;
  195. let mut wr = [0u8; 1024];
  196. let mut rd = [0u8; 1024];
  197. for _ in 0..100 {
  198. rand::thread_rng().fill(&mut wr);
  199. conn.write_all(&wr).await?;
  200. conn.read_exact(&mut rd).await?;
  201. assert_eq!(wr, rd);
  202. }
  203. Ok(())
  204. }
  205. async fn udp_echo_hitter(addr: &'static str) -> Result<()> {
  206. let conn = UdpSocket::bind("127.0.0.1:0").await?;
  207. conn.connect(addr).await?;
  208. let mut wr = [0u8; 128];
  209. let mut rd = [0u8; 128];
  210. for _ in 0..3 {
  211. rand::thread_rng().fill(&mut wr);
  212. conn.send(&wr).await?;
  213. debug!("send");
  214. conn.recv(&mut rd).await?;
  215. debug!("recv");
  216. assert_eq!(wr, rd);
  217. }
  218. Ok(())
  219. }
  220. async fn tcp_pingpong_hitter(addr: &'static str) -> Result<()> {
  221. let mut conn = TcpStream::connect(addr).await?;
  222. let wr = PING.as_bytes();
  223. let mut rd = [0u8; PONG.len()];
  224. for _ in 0..100 {
  225. conn.write_all(wr).await?;
  226. conn.read_exact(&mut rd).await?;
  227. assert_eq!(rd, PONG.as_bytes());
  228. }
  229. Ok(())
  230. }
  231. async fn udp_pingpong_hitter(addr: &'static str) -> Result<()> {
  232. let conn = UdpSocket::bind("127.0.0.1:0").await?;
  233. conn.connect(&addr).await?;
  234. let wr = PING.as_bytes();
  235. let mut rd = [0u8; PONG.len()];
  236. for _ in 0..3 {
  237. conn.send(wr).await?;
  238. debug!("ping");
  239. conn.recv(&mut rd).await?;
  240. debug!("pong");
  241. assert_eq!(rd, PONG.as_bytes());
  242. }
  243. Ok(())
  244. }