client.rs 18 KB


  1. use crate::config::{ClientConfig, ClientServiceConfig, Config, ServiceType, TransportType};
  2. use crate::config_watcher::ServiceChange;
  3. use crate::helper::udp_connect;
  4. use crate::protocol::Hello::{self, *};
  5. use crate::protocol::{
  6. self, read_ack, read_control_cmd, read_data_cmd, read_hello, Ack, Auth, ControlChannelCmd,
  7. DataChannelCmd, UdpTraffic, CURRENT_PROTO_VERSION, HASH_WIDTH_IN_BYTES,
  8. };
  9. use crate::transport::{SocketOpts, TcpTransport, Transport};
  10. use anyhow::{anyhow, bail, Context, Result};
  11. use backoff::ExponentialBackoff;
  12. use backoff::{backoff::Backoff, future::retry_notify};
  13. use bytes::{Bytes, BytesMut};
  14. use std::collections::HashMap;
  15. use std::net::SocketAddr;
  16. use std::sync::Arc;
  17. use tokio::io::{self, copy_bidirectional, AsyncReadExt, AsyncWriteExt};
  18. use tokio::net::{TcpStream, UdpSocket};
  19. use tokio::sync::{broadcast, mpsc, oneshot, RwLock};
  20. use tokio::time::{self, Duration, Instant};
  21. use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
  22. #[cfg(feature = "noise")]
  23. use crate::transport::NoiseTransport;
  24. #[cfg(feature = "tls")]
  25. use crate::transport::TlsTransport;
  26. use crate::constants::{run_control_chan_backoff, UDP_BUFFER_SIZE, UDP_SENDQ_SIZE, UDP_TIMEOUT};
  27. // The entrypoint of running a client
  28. pub async fn run_client(
  29. config: Config,
  30. shutdown_rx: broadcast::Receiver<bool>,
  31. service_rx: mpsc::Receiver<ServiceChange>,
  32. ) -> Result<()> {
  33. let config = config.client.ok_or_else(|| {
  34. anyhow!(
  35. "Try to run as a client, but the configuration is missing. Please add the `[client]` block"
  36. )
  37. })?;
  38. match config.transport.transport_type {
  39. TransportType::Tcp => {
  40. let mut client = Client::<TcpTransport>::from(config).await?;
  41. client.run(shutdown_rx, service_rx).await
  42. }
  43. TransportType::Tls => {
  44. #[cfg(feature = "tls")]
  45. {
  46. let mut client = Client::<TlsTransport>::from(config).await?;
  47. client.run(shutdown_rx, service_rx).await
  48. }
  49. #[cfg(not(feature = "tls"))]
  50. crate::helper::feature_not_compile("tls")
  51. }
  52. TransportType::Noise => {
  53. #[cfg(feature = "noise")]
  54. {
  55. let mut client = Client::<NoiseTransport>::from(config).await?;
  56. client.run(shutdown_rx, service_rx).await
  57. }
  58. #[cfg(not(feature = "noise"))]
  59. crate::helper::feature_not_compile("noise")
  60. }
  61. }
  62. }
  63. type ServiceDigest = protocol::Digest;
  64. type Nonce = protocol::Digest;
  65. // Holds the state of a client
  66. struct Client<T: Transport> {
  67. config: ClientConfig,
  68. service_handles: HashMap<String, ControlChannelHandle>,
  69. transport: Arc<T>,
  70. }
  71. impl<T: 'static + Transport> Client<T> {
  72. // Create a Client from `[client]` config block
  73. async fn from(config: ClientConfig) -> Result<Client<T>> {
  74. let transport =
  75. Arc::new(T::new(&config.transport).with_context(|| "Failed to create the transport")?);
  76. Ok(Client {
  77. config,
  78. service_handles: HashMap::new(),
  79. transport,
  80. })
  81. }
  82. // The entrypoint of Client
  83. async fn run(
  84. &mut self,
  85. mut shutdown_rx: broadcast::Receiver<bool>,
  86. mut service_rx: mpsc::Receiver<ServiceChange>,
  87. ) -> Result<()> {
  88. for (name, config) in &self.config.services {
  89. // Create a control channel for each service defined
  90. let handle = ControlChannelHandle::new(
  91. (*config).clone(),
  92. self.config.remote_addr.clone(),
  93. self.transport.clone(),
  94. self.config.heartbeat_timeout,
  95. );
  96. self.service_handles.insert(name.clone(), handle);
  97. }
  98. // Wait for the shutdown signal
  99. loop {
  100. tokio::select! {
  101. val = shutdown_rx.recv() => {
  102. match val {
  103. Ok(_) => {}
  104. Err(err) => {
  105. error!("Unable to listen for shutdown signal: {}", err);
  106. }
  107. }
  108. break;
  109. },
  110. e = service_rx.recv() => {
  111. if let Some(e) = e {
  112. match e {
  113. ServiceChange::ClientAdd(s)=> {
  114. let name = s.name.clone();
  115. let handle = ControlChannelHandle::new(
  116. s,
  117. self.config.remote_addr.clone(),
  118. self.transport.clone(),
  119. self.config.heartbeat_timeout
  120. );
  121. let _ = self.service_handles.insert(name, handle);
  122. },
  123. ServiceChange::ClientDelete(s)=> {
  124. let _ = self.service_handles.remove(&s);
  125. },
  126. _ => ()
  127. }
  128. }
  129. }
  130. }
  131. }
  132. // Shutdown all services
  133. for (_, handle) in self.service_handles.drain() {
  134. handle.shutdown();
  135. }
  136. Ok(())
  137. }
  138. }
  139. struct RunDataChannelArgs<T: Transport> {
  140. session_key: Nonce,
  141. remote_addr: String,
  142. connector: Arc<T>,
  143. socket_opts: SocketOpts,
  144. service: ClientServiceConfig,
  145. }
  146. async fn do_data_channel_handshake<T: Transport>(
  147. args: Arc<RunDataChannelArgs<T>>,
  148. ) -> Result<T::Stream> {
  149. // Retry at least every 100ms, at most for 10 seconds
  150. let backoff = ExponentialBackoff {
  151. max_interval: Duration::from_millis(100),
  152. max_elapsed_time: Some(Duration::from_secs(10)),
  153. ..Default::default()
  154. };
  155. // Connect to remote_addr
  156. let mut conn: T::Stream = retry_notify(
  157. backoff,
  158. || async {
  159. args.connector
  160. .connect(&args.remote_addr)
  161. .await
  162. .with_context(|| format!("Failed to connect to {}", &args.remote_addr))
  163. .map_err(backoff::Error::transient)
  164. },
  165. |e, duration| {
  166. warn!("{:#}. Retry in {:?}", e, duration);
  167. },
  168. )
  169. .await?;
  170. T::hint(&conn, args.socket_opts);
  171. // Send nonce
  172. let v: &[u8; HASH_WIDTH_IN_BYTES] = args.session_key[..].try_into().unwrap();
  173. let hello = Hello::DataChannelHello(CURRENT_PROTO_VERSION, v.to_owned());
  174. conn.write_all(&bincode::serialize(&hello).unwrap()).await?;
  175. conn.flush().await?;
  176. Ok(conn)
  177. }
  178. async fn run_data_channel<T: Transport>(args: Arc<RunDataChannelArgs<T>>) -> Result<()> {
  179. // Do the handshake
  180. let mut conn = do_data_channel_handshake(args.clone()).await?;
  181. // Forward
  182. match read_data_cmd(&mut conn).await? {
  183. DataChannelCmd::StartForwardTcp => {
  184. if args.service.service_type != ServiceType::Tcp {
  185. bail!("Expect TCP traffic. Please check the configuration.")
  186. }
  187. run_data_channel_for_tcp::<T>(conn, &args.service.local_addr).await?;
  188. }
  189. DataChannelCmd::StartForwardUdp => {
  190. if args.service.service_type != ServiceType::Udp {
  191. bail!("Expect UDP traffic. Please check the configuration.")
  192. }
  193. run_data_channel_for_udp::<T>(conn, &args.service.local_addr).await?;
  194. }
  195. }
  196. Ok(())
  197. }
  198. // Simply copying back and forth for TCP
  199. #[instrument(skip(conn))]
  200. async fn run_data_channel_for_tcp<T: Transport>(
  201. mut conn: T::Stream,
  202. local_addr: &str,
  203. ) -> Result<()> {
  204. debug!("New data channel starts forwarding");
  205. let mut local = TcpStream::connect(local_addr)
  206. .await
  207. .with_context(|| format!("Failed to connect to {}", local_addr))?;
  208. let _ = copy_bidirectional(&mut conn, &mut local).await;
  209. Ok(())
  210. }
  211. // Things get a little tricker when it gets to UDP because it's connection-less.
  212. // A UdpPortMap must be maintained for recent seen incoming address, giving them
  213. // each a local port, which is associated with a socket. So just the sender
  214. // to the socket will work fine for the map's value.
  215. type UdpPortMap = Arc<RwLock<HashMap<SocketAddr, mpsc::Sender<Bytes>>>>;
  216. #[instrument(skip(conn))]
  217. async fn run_data_channel_for_udp<T: Transport>(conn: T::Stream, local_addr: &str) -> Result<()> {
  218. debug!("New data channel starts forwarding");
  219. let port_map: UdpPortMap = Arc::new(RwLock::new(HashMap::new()));
  220. // The channel stores UdpTraffic that needs to be sent to the server
  221. let (outbound_tx, mut outbound_rx) = mpsc::channel::<UdpTraffic>(UDP_SENDQ_SIZE);
  222. // FIXME: https://github.com/tokio-rs/tls/issues/40
  223. // Maybe this is our concern
  224. let (mut rd, mut wr) = io::split(conn);
  225. // Keep sending items from the outbound channel to the server
  226. tokio::spawn(async move {
  227. while let Some(t) = outbound_rx.recv().await {
  228. trace!("outbound {:?}", t);
  229. if let Err(e) = t
  230. .write(&mut wr)
  231. .await
  232. .with_context(|| "Failed to forward UDP traffic to the server")
  233. {
  234. debug!("{:?}", e);
  235. break;
  236. }
  237. }
  238. });
  239. loop {
  240. // Read a packet from the server
  241. let hdr_len = rd.read_u8().await?;
  242. let packet = UdpTraffic::read(&mut rd, hdr_len)
  243. .await
  244. .with_context(|| "Failed to read UDPTraffic from the server")?;
  245. let m = port_map.read().await;
  246. if m.get(&packet.from).is_none() {
  247. // This packet is from a address we don't see for a while,
  248. // which is not in the UdpPortMap.
  249. // So set up a mapping (and a forwarder) for it
  250. // Drop the reader lock
  251. drop(m);
  252. // Grab the writer lock
  253. // This is the only thread that will try to grab the writer lock
  254. // So no need to worry about some other thread has already set up
  255. // the mapping between the gap of dropping the reader lock and
  256. // grabbing the writer lock
  257. let mut m = port_map.write().await;
  258. match udp_connect(local_addr).await {
  259. Ok(s) => {
  260. let (inbound_tx, inbound_rx) = mpsc::channel(UDP_SENDQ_SIZE);
  261. m.insert(packet.from, inbound_tx);
  262. tokio::spawn(run_udp_forwarder(
  263. s,
  264. inbound_rx,
  265. outbound_tx.clone(),
  266. packet.from,
  267. port_map.clone(),
  268. ));
  269. }
  270. Err(e) => {
  271. error!("{:#}", e);
  272. }
  273. }
  274. }
  275. // Now there should be a udp forwarder that can receive the packet
  276. let m = port_map.read().await;
  277. if let Some(tx) = m.get(&packet.from) {
  278. let _ = tx.send(packet.data).await;
  279. }
  280. }
  281. }
  282. // Run a UdpSocket for the visitor `from`
  283. #[instrument(skip_all, fields(from))]
  284. async fn run_udp_forwarder(
  285. s: UdpSocket,
  286. mut inbound_rx: mpsc::Receiver<Bytes>,
  287. outbount_tx: mpsc::Sender<UdpTraffic>,
  288. from: SocketAddr,
  289. port_map: UdpPortMap,
  290. ) -> Result<()> {
  291. debug!("Forwarder created");
  292. let mut buf = BytesMut::new();
  293. buf.resize(UDP_BUFFER_SIZE, 0);
  294. loop {
  295. tokio::select! {
  296. // Receive from the server
  297. data = inbound_rx.recv() => {
  298. if let Some(data) = data {
  299. s.send(&data).await?;
  300. } else {
  301. break;
  302. }
  303. },
  304. // Receive from the service
  305. val = s.recv(&mut buf) => {
  306. let len = match val {
  307. Ok(v) => v,
  308. Err(_) => break
  309. };
  310. let t = UdpTraffic{
  311. from,
  312. data: Bytes::copy_from_slice(&buf[..len])
  313. };
  314. outbount_tx.send(t).await?;
  315. },
  316. // No traffic for the duration of UDP_TIMEOUT, clean up the state
  317. _ = time::sleep(Duration::from_secs(UDP_TIMEOUT)) => {
  318. break;
  319. }
  320. }
  321. }
  322. let mut port_map = port_map.write().await;
  323. port_map.remove(&from);
  324. debug!("Forwarder dropped");
  325. Ok(())
  326. }
  327. // Control channel, using T as the transport layer
  328. struct ControlChannel<T: Transport> {
  329. digest: ServiceDigest, // SHA256 of the service name
  330. service: ClientServiceConfig, // `[client.services.foo]` config block
  331. shutdown_rx: oneshot::Receiver<u8>, // Receives the shutdown signal
  332. remote_addr: String, // `client.remote_addr`
  333. transport: Arc<T>, // Wrapper around the transport layer
  334. heartbeat_timeout: u64, // Application layer heartbeat timeout in secs
  335. }
  336. // Handle of a control channel
  337. // Dropping it will also drop the actual control channel
  338. struct ControlChannelHandle {
  339. shutdown_tx: oneshot::Sender<u8>,
  340. }
  341. impl<T: 'static + Transport> ControlChannel<T> {
  342. #[instrument(skip_all)]
  343. async fn run(&mut self) -> Result<()> {
  344. let mut conn = self
  345. .transport
  346. .connect(&self.remote_addr)
  347. .await
  348. .with_context(|| format!("Failed to connect to {}", &self.remote_addr))?;
  349. T::hint(&conn, SocketOpts::for_control_channel());
  350. // Send hello
  351. debug!("Sending hello");
  352. let hello_send =
  353. Hello::ControlChannelHello(CURRENT_PROTO_VERSION, self.digest[..].try_into().unwrap());
  354. conn.write_all(&bincode::serialize(&hello_send).unwrap())
  355. .await?;
  356. conn.flush().await?;
  357. // Read hello
  358. debug!("Reading hello");
  359. let nonce = match read_hello(&mut conn).await? {
  360. ControlChannelHello(_, d) => d,
  361. _ => {
  362. bail!("Unexpected type of hello");
  363. }
  364. };
  365. // Send auth
  366. debug!("Sending auth");
  367. let mut concat = Vec::from(self.service.token.as_ref().unwrap().as_bytes());
  368. concat.extend_from_slice(&nonce);
  369. let session_key = protocol::digest(&concat);
  370. let auth = Auth(session_key);
  371. conn.write_all(&bincode::serialize(&auth).unwrap()).await?;
  372. conn.flush().await?;
  373. // Read ack
  374. debug!("Reading ack");
  375. match read_ack(&mut conn).await? {
  376. Ack::Ok => {}
  377. v => {
  378. return Err(anyhow!("{}", v))
  379. .with_context(|| format!("Authentication failed: {}", self.service.name));
  380. }
  381. }
  382. // Channel ready
  383. info!("Control channel established");
  384. let remote_addr = self.remote_addr.clone();
  385. // Socket options for the data channel
  386. let socket_opts = SocketOpts::from_client_cfg(&self.service);
  387. let data_ch_args = Arc::new(RunDataChannelArgs {
  388. session_key,
  389. remote_addr,
  390. connector: self.transport.clone(),
  391. socket_opts,
  392. service: self.service.clone(),
  393. });
  394. loop {
  395. tokio::select! {
  396. val = read_control_cmd(&mut conn) => {
  397. let val = val?;
  398. debug!( "Received {:?}", val);
  399. match val {
  400. ControlChannelCmd::CreateDataChannel => {
  401. let args = data_ch_args.clone();
  402. tokio::spawn(async move {
  403. if let Err(e) = run_data_channel(args).await.with_context(|| "Failed to run the data channel") {
  404. warn!("{:#}", e);
  405. }
  406. }.instrument(Span::current()));
  407. },
  408. ControlChannelCmd::HeartBeat => ()
  409. }
  410. },
  411. _ = time::sleep(Duration::from_secs(self.heartbeat_timeout)), if self.heartbeat_timeout != 0 => {
  412. return Err(anyhow!("Heartbeat timed out"))
  413. }
  414. _ = &mut self.shutdown_rx => {
  415. break;
  416. }
  417. }
  418. }
  419. info!("Control channel shutdown");
  420. Ok(())
  421. }
  422. }
  423. impl ControlChannelHandle {
  424. #[instrument(name="handle", skip_all, fields(service = %service.name))]
  425. fn new<T: 'static + Transport>(
  426. service: ClientServiceConfig,
  427. remote_addr: String,
  428. transport: Arc<T>,
  429. heartbeat_timeout: u64,
  430. ) -> ControlChannelHandle {
  431. let digest = protocol::digest(service.name.as_bytes());
  432. info!("Starting {}", hex::encode(digest));
  433. let (shutdown_tx, shutdown_rx) = oneshot::channel();
  434. let mut s = ControlChannel {
  435. digest,
  436. service,
  437. shutdown_rx,
  438. remote_addr,
  439. transport,
  440. heartbeat_timeout,
  441. };
  442. tokio::spawn(
  443. async move {
  444. let mut backoff = run_control_chan_backoff();
  445. let mut start = Instant::now();
  446. while let Err(err) = s
  447. .run()
  448. .await
  449. .with_context(|| "Failed to run the control channel")
  450. {
  451. if s.shutdown_rx.try_recv() != Err(oneshot::error::TryRecvError::Empty) {
  452. break;
  453. }
  454. if start.elapsed() > Duration::from_secs(3) {
  455. // The client runs for at least 3 secs and then disconnects
  456. // Retry immediately
  457. backoff.reset();
  458. error!("{:#}. Retry...", err);
  459. } else if let Some(duration) = backoff.next_backoff() {
  460. error!("{:#}. Retry in {:?}...", err, duration);
  461. time::sleep(duration).await;
  462. } else {
  463. // Should never reach
  464. panic!("{:#}. Break", err);
  465. }
  466. start = Instant::now();
  467. }
  468. }
  469. .instrument(Span::current()),
  470. );
  471. ControlChannelHandle { shutdown_tx }
  472. }
  473. fn shutdown(self) {
  474. // A send failure shows that the actor has already shutdown.
  475. let _ = self.shutdown_tx.send(0u8);
  476. }
  477. }