Prechádzať zdrojové kódy

feat: support SOCKS5 and HTTP proxy (#135)

* chore: add comments

* feat: support socks5/http proxy

* fix: clippy

* fix: always validate tcp config

* chore: rename directories
Yujia Qiao 3 rokov pred
rodič
commit
1ef7747019
9 zmenil súbory, kde vykonal 152 pridanie a 28 odobranie
  1. 27 0
      Cargo.lock
  2. 3 0
      Cargo.toml
  3. 3 0
      README.md
  4. 15 0
      examples/use_proxy/client.toml
  5. 29 17
      src/config.rs
  6. 61 4
      src/helper.rs
  7. 3 2
      src/transport/mod.rs
  8. 8 3
      src/transport/tcp.rs
  9. 3 2
      src/transport/tls.rs

+ 27 - 0
Cargo.lock

@@ -67,6 +67,29 @@ version = "1.0.54"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "7a99269dff3bc004caa411f38845c20303f1e393ca2bd6581576fa3a7f59577d"
 
+[[package]]
+name = "async-http-proxy"
+version = "1.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "29faa5d4d308266048bd7505ba55484315a890102f9345b9ff4b87de64201592"
+dependencies = [
+ "base64",
+ "httparse",
+ "thiserror",
+ "tokio",
+]
+
+[[package]]
+name = "async-socks5"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "77f634add2445eb2c1f785642a67ca1073fedd71e73dc3ca69435ef9b9bdedc7"
+dependencies = [
+ "async-trait",
+ "thiserror",
+ "tokio",
+]
+
 [[package]]
 name = "async-stream"
 version = "0.3.2"
@@ -1483,6 +1506,8 @@ name = "rathole"
 version = "0.3.10"
 dependencies = [
  "anyhow",
+ "async-http-proxy",
+ "async-socks5",
  "async-trait",
  "atty",
  "backoff",
@@ -1505,6 +1530,7 @@ dependencies = [
  "toml",
  "tracing",
  "tracing-subscriber 0.2.25",
+ "url",
  "vergen",
 ]
 
@@ -2222,6 +2248,7 @@ dependencies = [
  "idna",
  "matches",
  "percent-encoding",
+ "serde",
 ]
 
 [[package]]

+ 3 - 0
Cargo.toml

@@ -71,6 +71,9 @@ base64 = { version = "0.13", optional = true }
 notify = { version = "5.0.0-pre.13", optional = true }
 console-subscriber = { version = "0.1", optional = true, features = ["parking_lot"] }
 atty = "0.2"
+async-http-proxy = { version = "1.2", features = ["runtime-tokio", "basic-auth"] }
+async-socks5 = "0.5"
+url = { version = "2.2", features = ["serde"] }
 
 [build-dependencies]
 vergen = { version = "6.0", default-features = false, features = ["build", "git", "cargo"] }

+ 3 - 0
README.md

@@ -108,6 +108,9 @@ default_token = "default_token_if_not_specify" # Optional. The default token of
 
 [client.transport] # The whole block is optional. Specify which transport to use
 type = "tcp" # Optional. Possible values: ["tcp", "tls", "noise"]. Default: "tcp"
+
+[client.transport.tcp] # Optional
+proxy = "socks5://user:passwd@127.0.0.1:1080" # Optional. Use the proxy to connect to the server
 nodelay = false # Optional. Determine whether to enable TCP_NODELAY, if applicable, to improve the latency but decrease the bandwidth. Default: false
 keepalive_secs = 10 # Optional. Specify `tcp_keepalive_time` in `tcp(7)`, if applicable. Default: 10 seconds
 keepalive_interval = 5 # Optional. Specify `tcp_keepalive_intvl` in `tcp(7)`, if applicable. Default: 5 seconds

+ 15 - 0
examples/use_proxy/client.toml

@@ -0,0 +1,15 @@
+[client]
+remote_addr = "127.0.0.1:2333"
+default_token = "123"
+
+[client.services.foo1]
+local_addr = "127.0.0.1:80"
+
+[client.transport]
+type = "tcp"
+[client.transport.tcp]
+# `proxy` controls how the client connect to the server
+# Use socks5 proxy at 127.0.0.1, with port 1080, username 'myuser' and password 'mypass'
+proxy = "socks5://myuser:mypass@127.0.0.1:1080"
+# Use http proxy. Similar to socks5 proxy
+# proxy = "http://myuser:mypass@127.0.0.1:8080"

+ 29 - 17
src/config.rs

@@ -5,6 +5,7 @@ use std::fmt::{Debug, Formatter};
 use std::ops::Deref;
 use std::path::Path;
 use tokio::fs;
+use url::Url;
 
 use crate::transport::{DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_KEEPALIVE_SECS, DEFAULT_NODELAY};
 
@@ -20,7 +21,7 @@ impl Debug for MaskedString {
 }
 
 impl Deref for MaskedString {
-    type Target = String;
+    type Target = str;
     fn deref(&self) -> &Self::Target {
         &self.0
     }
@@ -142,36 +143,38 @@ fn default_keepalive_interval() -> u64 {
     DEFAULT_KEEPALIVE_INTERVAL
 }
 
-#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
+#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
 #[serde(deny_unknown_fields)]
-pub struct TransportConfig {
-    #[serde(rename = "type")]
-    pub transport_type: TransportType,
+pub struct TcpConfig {
     #[serde(default = "default_nodelay")]
     pub nodelay: bool,
     #[serde(default = "default_keepalive_secs")]
     pub keepalive_secs: u64,
     #[serde(default = "default_keepalive_interval")]
     pub keepalive_interval: u64,
-    pub tls: Option<TlsConfig>,
-    pub noise: Option<NoiseConfig>,
+    pub proxy: Option<Url>,
 }
 
-impl Default for TransportConfig {
-    fn default() -> TransportConfig {
-        TransportConfig {
-            transport_type: Default::default(),
+impl Default for TcpConfig {
+    fn default() -> Self {
+        Self {
             nodelay: default_nodelay(),
             keepalive_secs: default_keepalive_secs(),
             keepalive_interval: default_keepalive_interval(),
-            tls: None,
-            noise: None,
+            proxy: None,
         }
     }
 }
 
-fn default_transport() -> TransportConfig {
-    Default::default()
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Default)]
+#[serde(deny_unknown_fields)]
+pub struct TransportConfig {
+    #[serde(rename = "type")]
+    pub transport_type: TransportType,
+    #[serde(default)]
+    pub tcp: TcpConfig,
+    pub tls: Option<TlsConfig>,
+    pub noise: Option<NoiseConfig>,
 }
 
 #[derive(Debug, Serialize, Deserialize, Default, PartialEq, Clone)]
@@ -180,7 +183,7 @@ pub struct ClientConfig {
     pub remote_addr: String,
     pub default_token: Option<MaskedString>,
     pub services: HashMap<String, ClientServiceConfig>,
-    #[serde(default = "default_transport")]
+    #[serde(default)]
     pub transport: TransportConfig,
 }
 
@@ -190,7 +193,7 @@ pub struct ServerConfig {
     pub bind_addr: String,
     pub default_token: Option<MaskedString>,
     pub services: HashMap<String, ServerServiceConfig>,
-    #[serde(default = "default_transport")]
+    #[serde(default)]
     pub transport: TransportConfig,
 }
 
@@ -255,6 +258,15 @@ impl Config {
     }
 
     fn validate_transport_config(config: &TransportConfig, is_server: bool) -> Result<()> {
+        config
+            .tcp
+            .proxy
+            .as_ref()
+            .map_or(Ok(()), |u| match u.scheme() {
+                "socks5" => Ok(()),
+                "http" => Ok(()),
+                _ => Err(anyhow!(format!("Unknown proxy scheme: {}", u.scheme()))),
+            })?;
         match config.transport_type {
             TransportType::Tcp => Ok(()),
             TransportType::Tls => {

+ 61 - 4
src/helper.rs

@@ -1,4 +1,5 @@
 use anyhow::{anyhow, Result};
+use async_http_proxy::{http_connect_tokio, http_connect_tokio_with_basic_auth};
 use backoff::{backoff::Backoff, Notify};
 use socket2::{SockRef, TcpKeepalive};
 use std::{future::Future, net::SocketAddr, time::Duration};
@@ -7,6 +8,7 @@ use tokio::{
     sync::broadcast,
 };
 use tracing::trace;
+use url::Url;
 
 // Tokio hesitates to expose this option...So we have to do it on our own :(
 // The good news is that using socket2 it can be easily done, without losing portability.
@@ -38,12 +40,21 @@ pub fn feature_not_compile(feature: &str) -> ! {
     )
 }
 
-/// Create a UDP socket and connect to `addr`
-pub async fn udp_connect<A: ToSocketAddrs>(addr: A) -> Result<UdpSocket> {
-    let addr = lookup_host(addr)
+async fn to_socket_addr<A: ToSocketAddrs>(addr: A) -> Result<SocketAddr> {
+    lookup_host(addr)
         .await?
         .next()
-        .ok_or(anyhow!("Failed to lookup the host"))?;
+        .ok_or(anyhow!("Failed to lookup the host"))
+}
+
+pub fn host_port_pair(s: &str) -> Result<(&str, u16)> {
+    let semi = s.rfind(':').expect("missing semicolon");
+    Ok((&s[..semi], s[semi + 1..].parse()?))
+}
+
+/// Create a UDP socket and connect to `addr`
+pub async fn udp_connect<A: ToSocketAddrs>(addr: A) -> Result<UdpSocket> {
+    let addr = to_socket_addr(addr).await?;
 
     let bind_addr = match addr {
         SocketAddr::V4(_) => "0.0.0.0:0",
@@ -55,6 +66,52 @@ pub async fn udp_connect<A: ToSocketAddrs>(addr: A) -> Result<UdpSocket> {
     Ok(s)
 }
 
+/// Create a TcpStream using a proxy
+/// e.g. socks5://user:pass@127.0.0.1:1080 http://127.0.0.1:8080
+pub async fn tcp_connect_with_proxy(addr: &str, proxy: Option<&Url>) -> Result<TcpStream> {
+    if let Some(url) = proxy {
+        let mut s = TcpStream::connect((
+            url.host_str().expect("proxy url should have host field"),
+            url.port().expect("proxy url should have port field"),
+        ))
+        .await?;
+
+        let auth = if !url.username().is_empty() || url.password().is_some() {
+            Some(async_socks5::Auth {
+                username: url.username().into(),
+                password: url.password().unwrap_or("").into(),
+            })
+        } else {
+            None
+        };
+        match url.scheme() {
+            "socks5" => {
+                async_socks5::connect(&mut s, host_port_pair(addr)?, auth).await?;
+            }
+            "http" => {
+                let (host, port) = host_port_pair(addr)?;
+                match auth {
+                    Some(auth) => {
+                        http_connect_tokio_with_basic_auth(
+                            &mut s,
+                            host,
+                            port,
+                            &auth.username,
+                            &auth.password,
+                        )
+                        .await?
+                    }
+                    None => http_connect_tokio(&mut s, host, port).await?,
+                }
+            }
+            _ => panic!("unknown proxy scheme"),
+        }
+        Ok(s)
+    } else {
+        Ok(TcpStream::connect(addr).await?)
+    }
+}
+
 // Wrapper of retry_notify
 pub async fn retry_notify_with_deadline<I, E, Fn, Fut, B, N>(
     backoff: B,

+ 3 - 2
src/transport/mod.rs

@@ -1,4 +1,4 @@
-use crate::config::{ClientServiceConfig, ServerServiceConfig, TransportConfig};
+use crate::config::{ClientServiceConfig, ServerServiceConfig, TcpConfig, TransportConfig};
 use crate::helper::try_set_tcp_keepalive;
 use anyhow::{Context, Result};
 use async_trait::async_trait;
@@ -27,6 +27,7 @@ pub trait Transport: Debug + Send + Sync {
     /// Provide the transport with socket options, which can be handled at the need of the transport
     fn hint(conn: &Self::Stream, opts: SocketOpts);
     async fn bind<T: ToSocketAddrs + Send + Sync>(&self, addr: T) -> Result<Self::Acceptor>;
+    /// accept must be cancel safe
     async fn accept(&self, a: &Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)>;
     async fn handshake(&self, conn: Self::RawStream) -> Result<Self::Stream>;
     async fn connect(&self, addr: &str) -> Result<Self::Stream>;
@@ -78,7 +79,7 @@ impl SocketOpts {
 }
 
 impl SocketOpts {
-    pub fn from_transport_cfg(cfg: &TransportConfig) -> SocketOpts {
+    pub fn from_cfg(cfg: &TcpConfig) -> SocketOpts {
         SocketOpts {
             nodelay: Some(cfg.nodelay),
             keepalive: Some(Keepalive {

+ 8 - 3
src/transport/tcp.rs

@@ -1,4 +1,7 @@
-use crate::config::TransportConfig;
+use crate::{
+    config::{TcpConfig, TransportConfig},
+    helper::tcp_connect_with_proxy,
+};
 
 use super::{SocketOpts, Transport};
 use anyhow::Result;
@@ -9,6 +12,7 @@ use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
 #[derive(Debug)]
 pub struct TcpTransport {
     socket_opts: SocketOpts,
+    cfg: TcpConfig,
 }
 
 #[async_trait]
@@ -19,7 +23,8 @@ impl Transport for TcpTransport {
 
     fn new(config: &TransportConfig) -> Result<Self> {
         Ok(TcpTransport {
-            socket_opts: SocketOpts::from_transport_cfg(config),
+            socket_opts: SocketOpts::from_cfg(&config.tcp),
+            cfg: config.tcp.clone(),
         })
     }
 
@@ -42,7 +47,7 @@ impl Transport for TcpTransport {
     }
 
     async fn connect(&self, addr: &str) -> Result<Self::Stream> {
-        let s = TcpStream::connect(addr).await?;
+        let s = tcp_connect_with_proxy(addr, self.cfg.proxy.as_ref()).await?;
         self.socket_opts.apply(&s);
         Ok(s)
     }

+ 3 - 2
src/transport/tls.rs

@@ -2,6 +2,7 @@ use std::net::SocketAddr;
 
 use super::{SocketOpts, TcpTransport, Transport};
 use crate::config::{TlsConfig, TransportConfig};
+use crate::helper::host_port_pair;
 use anyhow::{anyhow, Context, Result};
 use async_trait::async_trait;
 use std::fs;
@@ -94,8 +95,8 @@ impl Transport for TlsTransport {
             .connect(
                 self.config
                     .hostname
-                    .as_ref()
-                    .unwrap_or(&String::from(addr.split(':').next().unwrap())),
+                    .as_deref()
+                    .unwrap_or(host_port_pair(addr)?.0),
                 conn,
             )
             .await?)