|
|
@@ -138,6 +138,7 @@ impl<'a, T: 'static + Transport> Server<'a, T> {
|
|
|
// Wait for connections and shutdown signals
|
|
|
loop {
|
|
|
tokio::select! {
|
|
|
+ // FIXME: This should be cancel safe.
|
|
|
// Wait for incoming control and data channels
|
|
|
ret = self.transport.accept(&l) => {
|
|
|
match ret {
|
|
|
@@ -481,8 +482,11 @@ fn tcp_listen_and_send(
|
|
|
let (tx, rx) = mpsc::channel(CHAN_SIZE);
|
|
|
|
|
|
tokio::spawn(async move {
|
|
|
- let l = backoff::future::retry(listen_backoff(), || async {
|
|
|
+ // FIXME: Respect shutdown signal
|
|
|
+ let l = backoff::future::retry_notify(listen_backoff(), || async {
|
|
|
Ok(TcpListener::bind(&addr).await?)
|
|
|
+ }, |e, _| {
|
|
|
+ error!("{:?}", e);
|
|
|
})
|
|
|
.await
|
|
|
.with_context(|| "Failed to listen for the service");
|
|
|
@@ -581,18 +585,18 @@ async fn run_udp_connection_pool<T: Transport>(
|
|
|
) -> Result<()> {
|
|
|
// TODO: Load balance
|
|
|
|
|
|
- let l: UdpSocket = backoff::future::retry(listen_backoff(), || async {
|
|
|
- Ok(match UdpSocket::bind(&bind_addr)
|
|
|
- .await
|
|
|
- .with_context(|| "Failed to listen for the service")
|
|
|
- {
|
|
|
- Err(e) => {
|
|
|
- error!("{:?}", e);
|
|
|
- Err(e)
|
|
|
- }
|
|
|
- v => v,
|
|
|
- }?)
|
|
|
- })
|
|
|
+ // FIXME: Respect shutdown signal
|
|
|
+ let l: UdpSocket = backoff::future::retry_notify(
|
|
|
+ listen_backoff(),
|
|
|
+ || async {
|
|
|
+ Ok(UdpSocket::bind(&bind_addr)
|
|
|
+ .await
|
|
|
+ .with_context(|| "Failed to listen for the service")?)
|
|
|
+ },
|
|
|
+ |e, _| {
|
|
|
+ error!("{:?}", e);
|
|
|
+ },
|
|
|
+ )
|
|
|
.await
|
|
|
.with_context(|| "Failed to listen for the service")?;
|
|
|
|