|
@@ -162,7 +162,6 @@ impl<'a, T: 'static + Transport> Server<'a, T> {
|
|
|
}
|
|
}
|
|
|
Ok((conn, addr)) => {
|
|
Ok((conn, addr)) => {
|
|
|
backoff.reset();
|
|
backoff.reset();
|
|
|
- debug!("Incoming connection from {}", addr);
|
|
|
|
|
|
|
|
|
|
let services = self.services.clone();
|
|
let services = self.services.clone();
|
|
|
let control_channels = self.control_channels.clone();
|
|
let control_channels = self.control_channels.clone();
|
|
@@ -239,7 +238,7 @@ async fn do_control_channel_handshake<T: 'static + Transport>(
|
|
|
control_channels: Arc<RwLock<ControlChannelMap<T>>>,
|
|
control_channels: Arc<RwLock<ControlChannelMap<T>>>,
|
|
|
service_digest: ServiceDigest,
|
|
service_digest: ServiceDigest,
|
|
|
) -> Result<()> {
|
|
) -> Result<()> {
|
|
|
- info!("New control channel incoming");
|
|
|
|
|
|
|
+ info!("Try to handshake a control channel");
|
|
|
|
|
|
|
|
// Generate a nonce
|
|
// Generate a nonce
|
|
|
let mut nonce = vec![0u8; HASH_WIDTH_IN_BYTES];
|
|
let mut nonce = vec![0u8; HASH_WIDTH_IN_BYTES];
|
|
@@ -254,15 +253,16 @@ async fn do_control_channel_handshake<T: 'static + Transport>(
|
|
|
.await?;
|
|
.await?;
|
|
|
|
|
|
|
|
// Lookup the service
|
|
// Lookup the service
|
|
|
- let services_guard = services.read().await;
|
|
|
|
|
- let service_config = match services_guard.get(&service_digest) {
|
|
|
|
|
|
|
+ let service_config = match services.read().await.get(&service_digest) {
|
|
|
Some(v) => v,
|
|
Some(v) => v,
|
|
|
None => {
|
|
None => {
|
|
|
conn.write_all(&bincode::serialize(&Ack::ServiceNotExist).unwrap())
|
|
conn.write_all(&bincode::serialize(&Ack::ServiceNotExist).unwrap())
|
|
|
.await?;
|
|
.await?;
|
|
|
bail!("No such a service {}", hex::encode(&service_digest));
|
|
bail!("No such a service {}", hex::encode(&service_digest));
|
|
|
}
|
|
}
|
|
|
- };
|
|
|
|
|
|
|
+ }
|
|
|
|
|
+ .to_owned();
|
|
|
|
|
+
|
|
|
let service_name = &service_config.name;
|
|
let service_name = &service_config.name;
|
|
|
|
|
|
|
|
// Calculate the checksum
|
|
// Calculate the checksum
|
|
@@ -284,10 +284,6 @@ async fn do_control_channel_handshake<T: 'static + Transport>(
|
|
|
);
|
|
);
|
|
|
bail!("Service {} failed the authentication", service_name);
|
|
bail!("Service {} failed the authentication", service_name);
|
|
|
} else {
|
|
} else {
|
|
|
- let service_config = service_config.clone();
|
|
|
|
|
- // Drop the rwlock as soon as possible when we're done with it
|
|
|
|
|
- drop(services_guard);
|
|
|
|
|
-
|
|
|
|
|
let mut h = control_channels.write().await;
|
|
let mut h = control_channels.write().await;
|
|
|
|
|
|
|
|
// If there's already a control channel for the service, then drop the old one.
|
|
// If there's already a control channel for the service, then drop the old one.
|
|
@@ -296,8 +292,8 @@ async fn do_control_channel_handshake<T: 'static + Transport>(
|
|
|
// the client to reconnect.
|
|
// the client to reconnect.
|
|
|
if h.remove1(&service_digest).is_some() {
|
|
if h.remove1(&service_digest).is_some() {
|
|
|
warn!(
|
|
warn!(
|
|
|
- "Dropping previous control channel for digest {}",
|
|
|
|
|
- hex::encode(service_digest)
|
|
|
|
|
|
|
+ "Dropping previous control channel for service {}",
|
|
|
|
|
+ service_name
|
|
|
);
|
|
);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -320,14 +316,18 @@ async fn do_data_channel_handshake<T: 'static + Transport>(
|
|
|
control_channels: Arc<RwLock<ControlChannelMap<T>>>,
|
|
control_channels: Arc<RwLock<ControlChannelMap<T>>>,
|
|
|
nonce: Nonce,
|
|
nonce: Nonce,
|
|
|
) -> Result<()> {
|
|
) -> Result<()> {
|
|
|
- debug!("New data channel incoming");
|
|
|
|
|
|
|
+ debug!("Try to handshake a data channel");
|
|
|
|
|
|
|
|
// Validate
|
|
// Validate
|
|
|
let control_channels_guard = control_channels.read().await;
|
|
let control_channels_guard = control_channels.read().await;
|
|
|
match control_channels_guard.get2(&nonce) {
|
|
match control_channels_guard.get2(&nonce) {
|
|
|
Some(handle) => {
|
|
Some(handle) => {
|
|
|
// Send the data channel to the corresponding control channel
|
|
// Send the data channel to the corresponding control channel
|
|
|
- handle.data_ch_tx.send(conn).await?;
|
|
|
|
|
|
|
+ handle
|
|
|
|
|
+ .data_ch_tx
|
|
|
|
|
+ .send(conn)
|
|
|
|
|
+ .await
|
|
|
|
|
+ .with_context(|| "Data channel for a stale control channel")?;
|
|
|
}
|
|
}
|
|
|
None => {
|
|
None => {
|
|
|
warn!("Data channel has incorrect nonce");
|
|
warn!("Data channel has incorrect nonce");
|
|
@@ -599,7 +599,7 @@ async fn run_udp_connection_pool<T: Transport>(
|
|
|
.with_context(|| "Failed to listen for the service")?)
|
|
.with_context(|| "Failed to listen for the service")?)
|
|
|
},
|
|
},
|
|
|
|e, _| {
|
|
|e, _| {
|
|
|
- error!("{:?}", e);
|
|
|
|
|
|
|
+ warn!("{:?}", e);
|
|
|
},
|
|
},
|
|
|
)
|
|
)
|
|
|
.await
|
|
.await
|