|
@@ -1,7 +1,7 @@
|
|
|
use crate::config::{Config, ServerConfig, ServerServiceConfig, ServiceType, TransportType};
|
|
use crate::config::{Config, ServerConfig, ServerServiceConfig, ServiceType, TransportType};
|
|
|
use crate::config_watcher::{ConfigChange, ServerServiceChange};
|
|
use crate::config_watcher::{ConfigChange, ServerServiceChange};
|
|
|
use crate::constants::{listen_backoff, UDP_BUFFER_SIZE};
|
|
use crate::constants::{listen_backoff, UDP_BUFFER_SIZE};
|
|
|
-use crate::helper::{retry_notify_with_deadline, write_and_flush};
|
|
|
|
|
|
|
+use crate::helper::{generate_proxy_protocol_v1_header, retry_notify_with_deadline, write_and_flush};
|
|
|
use crate::multi_map::MultiMap;
|
|
use crate::multi_map::MultiMap;
|
|
|
use crate::protocol::Hello::{ControlChannelHello, DataChannelHello};
|
|
use crate::protocol::Hello::{ControlChannelHello, DataChannelHello};
|
|
|
use crate::protocol::{
|
|
use crate::protocol::{
|
|
@@ -427,11 +427,16 @@ where
|
|
|
|
|
|
|
|
let shutdown_rx_clone = shutdown_tx.subscribe();
|
|
let shutdown_rx_clone = shutdown_tx.subscribe();
|
|
|
let bind_addr = service.bind_addr.clone();
|
|
let bind_addr = service.bind_addr.clone();
|
|
|
|
|
+ let enable_proxy_protocol = service.enable_proxy_protocol.unwrap_or_default();
|
|
|
|
|
+ if enable_proxy_protocol {
|
|
|
|
|
+ debug!("Proxy protocol is enabled");
|
|
|
|
|
+ }
|
|
|
match service.service_type {
|
|
match service.service_type {
|
|
|
ServiceType::Tcp => tokio::spawn(
|
|
ServiceType::Tcp => tokio::spawn(
|
|
|
async move {
|
|
async move {
|
|
|
if let Err(e) = run_tcp_connection_pool::<T>(
|
|
if let Err(e) = run_tcp_connection_pool::<T>(
|
|
|
bind_addr,
|
|
bind_addr,
|
|
|
|
|
+ enable_proxy_protocol,
|
|
|
data_ch_rx,
|
|
data_ch_rx,
|
|
|
data_ch_req_tx,
|
|
data_ch_req_tx,
|
|
|
shutdown_rx_clone,
|
|
shutdown_rx_clone,
|
|
@@ -625,6 +630,7 @@ fn tcp_listen_and_send(
|
|
|
#[instrument(skip_all)]
|
|
#[instrument(skip_all)]
|
|
|
async fn run_tcp_connection_pool<T: Transport>(
|
|
async fn run_tcp_connection_pool<T: Transport>(
|
|
|
bind_addr: String,
|
|
bind_addr: String,
|
|
|
|
|
+ enable_proxy_protocol: bool,
|
|
|
mut data_ch_rx: mpsc::Receiver<T::Stream>,
|
|
mut data_ch_rx: mpsc::Receiver<T::Stream>,
|
|
|
data_ch_req_tx: mpsc::UnboundedSender<bool>,
|
|
data_ch_req_tx: mpsc::UnboundedSender<bool>,
|
|
|
shutdown_rx: broadcast::Receiver<bool>,
|
|
shutdown_rx: broadcast::Receiver<bool>,
|
|
@@ -637,6 +643,11 @@ async fn run_tcp_connection_pool<T: Transport>(
|
|
|
if let Some(mut ch) = data_ch_rx.recv().await {
|
|
if let Some(mut ch) = data_ch_rx.recv().await {
|
|
|
if write_and_flush(&mut ch, &cmd).await.is_ok() {
|
|
if write_and_flush(&mut ch, &cmd).await.is_ok() {
|
|
|
tokio::spawn(async move {
|
|
tokio::spawn(async move {
|
|
|
|
|
+ if enable_proxy_protocol {
|
|
|
|
|
+ let proxy_proto_header = generate_proxy_protocol_v1_header(&visitor).unwrap();
|
|
|
|
|
+ let _ = ch.write_all(&proxy_proto_header.into_bytes()).await;
|
|
|
|
|
+ let _ = ch.flush().await;
|
|
|
|
|
+ }
|
|
|
let _ = copy_bidirectional(&mut ch, &mut visitor).await;
|
|
let _ = copy_bidirectional(&mut ch, &mut visitor).await;
|
|
|
});
|
|
});
|
|
|
break;
|
|
break;
|