| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- use anyhow::{anyhow, Result};
- use async_http_proxy::{http_connect_tokio, http_connect_tokio_with_basic_auth};
- use backoff::{backoff::Backoff, Notify};
- use socket2::{SockRef, TcpKeepalive};
- use std::{future::Future, net::SocketAddr, time::Duration};
- use tokio::{
- net::{lookup_host, TcpStream, ToSocketAddrs, UdpSocket},
- sync::broadcast,
- };
- use tracing::trace;
- use url::Url;
- // Tokio hesitates to expose this option...So we have to do it on our own :(
- // The good news is that using socket2 it can be easily done, without losing portability.
- // See https://github.com/tokio-rs/tokio/issues/3082
- pub fn try_set_tcp_keepalive(
- conn: &TcpStream,
- keepalive_duration: Duration,
- keepalive_interval: Duration,
- ) -> Result<()> {
- let s = SockRef::from(conn);
- let keepalive = TcpKeepalive::new()
- .with_time(keepalive_duration)
- .with_interval(keepalive_interval);
- trace!(
- "Set TCP keepalive {:?} {:?}",
- keepalive_duration,
- keepalive_interval
- );
- Ok(s.set_tcp_keepalive(&keepalive)?)
- }
- #[allow(dead_code)]
- pub fn feature_not_compile(feature: &str) -> ! {
- panic!(
- "The feature '{}' is not compiled in this binary. Please re-compile rathole",
- feature
- )
- }
- async fn to_socket_addr<A: ToSocketAddrs>(addr: A) -> Result<SocketAddr> {
- lookup_host(addr)
- .await?
- .next()
- .ok_or(anyhow!("Failed to lookup the host"))
- }
- pub fn host_port_pair(s: &str) -> Result<(&str, u16)> {
- let semi = s.rfind(':').expect("missing semicolon");
- Ok((&s[..semi], s[semi + 1..].parse()?))
- }
- /// Create a UDP socket and connect to `addr`
- pub async fn udp_connect<A: ToSocketAddrs>(addr: A) -> Result<UdpSocket> {
- let addr = to_socket_addr(addr).await?;
- let bind_addr = match addr {
- SocketAddr::V4(_) => "0.0.0.0:0",
- SocketAddr::V6(_) => ":::0",
- };
- let s = UdpSocket::bind(bind_addr).await?;
- s.connect(addr).await?;
- Ok(s)
- }
- /// Create a TcpStream using a proxy
- /// e.g. socks5://user:pass@127.0.0.1:1080 http://127.0.0.1:8080
- pub async fn tcp_connect_with_proxy(addr: &str, proxy: Option<&Url>) -> Result<TcpStream> {
- if let Some(url) = proxy {
- let mut s = TcpStream::connect((
- url.host_str().expect("proxy url should have host field"),
- url.port().expect("proxy url should have port field"),
- ))
- .await?;
- let auth = if !url.username().is_empty() || url.password().is_some() {
- Some(async_socks5::Auth {
- username: url.username().into(),
- password: url.password().unwrap_or("").into(),
- })
- } else {
- None
- };
- match url.scheme() {
- "socks5" => {
- async_socks5::connect(&mut s, host_port_pair(addr)?, auth).await?;
- }
- "http" => {
- let (host, port) = host_port_pair(addr)?;
- match auth {
- Some(auth) => {
- http_connect_tokio_with_basic_auth(
- &mut s,
- host,
- port,
- &auth.username,
- &auth.password,
- )
- .await?
- }
- None => http_connect_tokio(&mut s, host, port).await?,
- }
- }
- _ => panic!("unknown proxy scheme"),
- }
- Ok(s)
- } else {
- Ok(TcpStream::connect(addr).await?)
- }
- }
- // Wrapper of retry_notify
- pub async fn retry_notify_with_deadline<I, E, Fn, Fut, B, N>(
- backoff: B,
- operation: Fn,
- notify: N,
- deadline: &mut broadcast::Receiver<bool>,
- ) -> Result<I>
- where
- E: std::error::Error + Send + Sync + 'static,
- B: Backoff,
- Fn: FnMut() -> Fut,
- Fut: Future<Output = std::result::Result<I, backoff::Error<E>>>,
- N: Notify<E>,
- {
- tokio::select! {
- v = backoff::future::retry_notify(backoff, operation, notify) => {
- v.map_err(anyhow::Error::new)
- }
- _ = deadline.recv() => {
- Err(anyhow!("shutdown"))
- }
- }
- }
|