|
|
@@ -3,7 +3,7 @@ use common::{run_rathole_client, PING, PONG};
|
|
|
use rand::Rng;
|
|
|
use std::time::Duration;
|
|
|
use tokio::{
|
|
|
- io::{AsyncReadExt, AsyncWriteExt},
|
|
|
+ io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
|
|
|
net::{TcpStream, UdpSocket},
|
|
|
sync::broadcast,
|
|
|
time,
|
|
|
@@ -56,6 +56,8 @@ async fn tcp() -> Result<()> {
|
|
|
|
|
|
test("tests/for_tcp/tcp_transport.toml", Type::Tcp).await?;
|
|
|
|
|
|
+ test_proxy_protocol("tests/for_tcp/tcp_transport_proxy_protocol.toml").await?;
|
|
|
+
|
|
|
#[cfg(any(
|
|
|
// FIXME: Self-signed certificate on macOS nativetls requires manual interference.
|
|
|
all(target_os = "macos", feature = "rustls"),
|
|
|
@@ -304,3 +306,81 @@ async fn udp_pingpong_hitter(addr: &'static str) -> Result<()> {
|
|
|
|
|
|
Ok(())
|
|
|
}
|
|
|
+
|
|
|
+#[instrument]
|
|
|
+async fn test_proxy_protocol(config_path: &'static str) -> Result<()> {
|
|
|
+ if cfg!(not(all(feature = "client", feature = "server"))) {
|
|
|
+ return Ok(());
|
|
|
+ }
|
|
|
+
|
|
|
+ let (client_shutdown_tx, client_shutdown_rx) = broadcast::channel(1);
|
|
|
+ let (server_shutdown_tx, server_shutdown_rx) = broadcast::channel(1);
|
|
|
+
|
|
|
+ info!("start the client");
|
|
|
+ let client = tokio::spawn(async move {
|
|
|
+ run_rathole_client(config_path, client_shutdown_rx)
|
|
|
+ .await
|
|
|
+ .unwrap();
|
|
|
+ });
|
|
|
+
|
|
|
+ time::sleep(Duration::from_secs(1)).await;
|
|
|
+
|
|
|
+ info!("start the server");
|
|
|
+ let server = tokio::spawn(async move {
|
|
|
+ run_rathole_server(config_path, server_shutdown_rx)
|
|
|
+ .await
|
|
|
+ .unwrap();
|
|
|
+ });
|
|
|
+
|
|
|
+ time::sleep(Duration::from_millis(2500)).await;
|
|
|
+
|
|
|
+ info!("echo");
|
|
|
+ tcp_echo_hitter_expect_proxy_protocol(ECHO_SERVER_ADDR_EXPOSED).await?;
|
|
|
+
|
|
|
+ info!("pingpong )");
|
|
|
+ tcp_pingpong_hitter(PINGPONG_SERVER_ADDR_EXPOSED).await?;
|
|
|
+
|
|
|
+ info!("shutdown the server and the client");
|
|
|
+ server_shutdown_tx.send(true)?;
|
|
|
+ client_shutdown_tx.send(true)?;
|
|
|
+
|
|
|
+ let _ = tokio::join!(server, client);
|
|
|
+
|
|
|
+ Ok(())
|
|
|
+}
|
|
|
+
|
|
|
+async fn tcp_echo_hitter_expect_proxy_protocol(addr: &'static str) -> Result<()> {
|
|
|
+ let conn = TcpStream::connect(addr).await?;
|
|
|
+ let local = conn.local_addr()?;
|
|
|
+ let peer = conn.peer_addr()?;
|
|
|
+
|
|
|
+ let (rd, mut wr) = conn.into_split();
|
|
|
+ let mut rd = BufReader::new(rd);
|
|
|
+
|
|
|
+ // Read the echoed PROXY header line first.
|
|
|
+ let mut header = String::new();
|
|
|
+ let n = time::timeout(Duration::from_secs(5), rd.read_line(&mut header)).await??;
|
|
|
+ assert!(n > 0, "expected a proxy protocol header line");
|
|
|
+
|
|
|
+ let expected = format!(
|
|
|
+ "PROXY TCP4 {} {} {} {}\r\n",
|
|
|
+ local.ip(),
|
|
|
+ peer.ip(),
|
|
|
+ local.port(),
|
|
|
+ peer.port()
|
|
|
+ );
|
|
|
+ assert_eq!(header, expected);
|
|
|
+
|
|
|
+ // Now the stream should behave like a normal echo connection.
|
|
|
+ let mut wr_buf = [0u8; 1024];
|
|
|
+ let mut rd_buf = [0u8; 1024];
|
|
|
+
|
|
|
+ for _ in 0..100 {
|
|
|
+ rand::thread_rng().fill(&mut wr_buf);
|
|
|
+ wr.write_all(&wr_buf).await?;
|
|
|
+ rd.read_exact(&mut rd_buf).await?;
|
|
|
+ assert_eq!(wr_buf, rd_buf);
|
|
|
+ }
|
|
|
+
|
|
|
+ Ok(())
|
|
|
+}
|