Procházet zdrojové kódy

feat: expose TCP_NODELAY and TCP_KEEPALIVE (#96)

* feat: expose TCP_NODELAY
* fix: opt-in `nodelay` for each service
* fix: enforce `nodelay` for every control channel
* feat: expose keepalive_secs
* feat: expose keepalive_interval
* docs: update docs
* fix: update dependencies and implement hint for noise
Yujia Qiao před 4 roky
rodič
revize
0d9e5ec1aa
12 změnil soubory, kde provedl 382 přidání a 146 odebrání
  1. 146 83
      Cargo.lock
  2. 2 2
      Cargo.toml
  3. 6 0
      README-zh.md
  4. 8 0
      README.md
  5. 12 6
      src/client.rs
  6. 38 1
      src/config.rs
  7. 17 14
      src/helper.rs
  8. 9 3
      src/server.rs
  9. 97 5
      src/transport/mod.rs
  10. 17 13
      src/transport/noise.rs
  11. 14 7
      src/transport/tcp.rs
  12. 16 12
      src/transport/tls.rs

+ 146 - 83
Cargo.lock

@@ -123,10 +123,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9fe17f59a06fe8b87a6fc8bf53bb70b3aba76d7685f432487a68cd5552853625"
 dependencies = [
  "futures-core",
- "getrandom 0.2.3",
+ "getrandom 0.2.4",
  "instant",
  "pin-project",
- "rand 0.8.4",
+ "rand",
  "tokio",
 ]
 
@@ -180,6 +180,12 @@ dependencies = [
  "generic-array",
 ]
 
+[[package]]
+name = "bumpalo"
+version = "3.9.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a4a45a46ab1f2412e53d3a0ade76ffad2025804294569aae387231a0cd6e0899"
+
 [[package]]
 name = "byteorder"
 version = "1.4.3"
@@ -259,9 +265,9 @@ dependencies = [
 
 [[package]]
 name = "clap"
-version = "3.0.0"
+version = "3.0.7"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d17bf219fcd37199b9a29e00ba65dfb8cd5b2688b7297ec14ff829c40ac50ca9"
+checksum = "12e8611f9ae4e068fa3e56931fded356ff745e70987ff76924a6e0ab1c8ef2e3"
 dependencies = [
  "atty",
  "bitflags",
@@ -276,17 +282,29 @@ dependencies = [
 
 [[package]]
 name = "clap_derive"
-version = "3.0.0"
+version = "3.0.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e1b9752c030a14235a0bd5ef3ad60a1dcac8468c30921327fc8af36b20c790b9"
+checksum = "517358c28fcef6607bf6f76108e02afad7e82297d132a6b846dcc1fc3efcd153"
 dependencies = [
- "heck",
+ "heck 0.4.0",
  "proc-macro-error",
  "proc-macro2",
  "quote",
  "syn",
 ]
 
+[[package]]
+name = "coarsetime"
+version = "0.1.20"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5b6ec6f6e80e839eb22bd61b18f19a8f2ae3f8bda9cf0fdce9dd96c9c5df8393"
+dependencies = [
+ "libc",
+ "once_cell",
+ "wasi 0.10.2+wasi-snapshot-preview1",
+ "wasm-bindgen",
+]
+
 [[package]]
 name = "console-api"
 version = "0.1.0"
@@ -378,9 +396,9 @@ dependencies = [
 
 [[package]]
 name = "crossbeam-channel"
-version = "0.5.1"
+version = "0.5.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
+checksum = "e54ea8bc3fb1ee042f5aace6e3c6e025d3874866da222930f70ce62aceba0bfa"
 dependencies = [
  "cfg-if",
  "crossbeam-utils",
@@ -388,9 +406,9 @@ dependencies = [
 
 [[package]]
 name = "crossbeam-utils"
-version = "0.8.5"
+version = "0.8.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db"
+checksum = "cfcae03edb34f947e64acdb1c33ec169824e20657e9ecb61cef6c8c74dcb8120"
 dependencies = [
  "cfg-if",
  "lazy_static",
@@ -484,12 +502,12 @@ dependencies = [
 ]
 
 [[package]]
-name = "exponential-backoff"
-version = "1.1.0"
+name = "fastrand"
+version = "1.6.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "851c5c1b202b7317c442c5f9c1f55f4cb6cb7e3dee875dd422d124c081a8da88"
+checksum = "779d043b6a0b90cc4c0ed7ee380a6504394cee7efd7db050e3774eee387324b2"
 dependencies = [
- "rand 0.7.3",
+ "instant",
 ]
 
 [[package]]
@@ -564,9 +582,9 @@ dependencies = [
 
 [[package]]
 name = "fsevent-sys"
-version = "4.0.0"
+version = "4.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5c0e564d24da983c053beff1bb7178e237501206840a3e6bf4e267b9e8ae734a"
+checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
 dependencies = [
  "libc",
 ]
@@ -647,9 +665,9 @@ dependencies = [
 
 [[package]]
 name = "generic-array"
-version = "0.14.4"
+version = "0.14.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "501466ecc8a30d1d3b7fc9229b122b2ce8ed6e9d9223f1138d4babb253e51817"
+checksum = "fd48d33ec7f05fbfa152300fdad764757cbded343c1aa1cff2fbaf4134851803"
 dependencies = [
  "typenum",
  "version_check",
@@ -668,9 +686,9 @@ dependencies = [
 
 [[package]]
 name = "getrandom"
-version = "0.2.3"
+version = "0.2.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753"
+checksum = "418d37c8b1d42553c93648be529cb70f920d3baf8ef469b74b9638df426e0b4c"
 dependencies = [
  "cfg-if",
  "libc",
@@ -714,9 +732,9 @@ dependencies = [
 
 [[package]]
 name = "h2"
-version = "0.3.9"
+version = "0.3.10"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8f072413d126e57991455e0a922b31e4c8ba7c2ffbebf6b78b4f8521397d65cd"
+checksum = "0c9de88456263e249e241fcd211d3954e2c9b0ef7ccfc235a444eb367cae3689"
 dependencies = [
  "bytes",
  "fnv",
@@ -759,6 +777,12 @@ dependencies = [
  "unicode-segmentation",
 ]
 
+[[package]]
+name = "heck"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
+
 [[package]]
 name = "hermit-abi"
 version = "0.1.19"
@@ -863,9 +887,9 @@ dependencies = [
 
 [[package]]
 name = "indexmap"
-version = "1.7.0"
+version = "1.8.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5"
+checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223"
 dependencies = [
  "autocfg",
  "hashbrown",
@@ -1182,9 +1206,9 @@ dependencies = [
 
 [[package]]
 name = "openssl-probe"
-version = "0.1.4"
+version = "0.1.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a"
+checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
 
 [[package]]
 name = "openssl-sys"
@@ -1375,7 +1399,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5"
 dependencies = [
  "bytes",
- "heck",
+ "heck 0.3.3",
  "itertools",
  "lazy_static",
  "log",
@@ -1420,19 +1444,6 @@ dependencies = [
  "proc-macro2",
 ]
 
-[[package]]
-name = "rand"
-version = "0.7.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03"
-dependencies = [
- "getrandom 0.1.16",
- "libc",
- "rand_chacha 0.2.2",
- "rand_core 0.5.1",
- "rand_hc 0.2.0",
-]
-
 [[package]]
 name = "rand"
 version = "0.8.4"
@@ -1440,19 +1451,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8"
 dependencies = [
  "libc",
- "rand_chacha 0.3.1",
+ "rand_chacha",
  "rand_core 0.6.3",
- "rand_hc 0.3.1",
-]
-
-[[package]]
-name = "rand_chacha"
-version = "0.2.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
-dependencies = [
- "ppv-lite86",
- "rand_core 0.5.1",
+ "rand_hc",
 ]
 
 [[package]]
@@ -1480,16 +1481,7 @@ version = "0.6.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7"
 dependencies = [
- "getrandom 0.2.3",
-]
-
-[[package]]
-name = "rand_hc"
-version = "0.2.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
-dependencies = [
- "rand_core 0.5.1",
+ "getrandom 0.2.4",
 ]
 
 [[package]]
@@ -1519,9 +1511,9 @@ dependencies = [
  "hex",
  "lazy_static",
  "notify",
- "rand 0.8.4",
+ "rand",
  "serde",
- "sha2 0.10.0",
+ "sha2 0.10.1",
  "snowstorm",
  "socket2",
  "tokio",
@@ -1606,6 +1598,16 @@ dependencies = [
  "winapi-util",
 ]
 
+[[package]]
+name = "scalable_cuckoo_filter"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "25a8d1a3d9b887d069652d63bba07dad0e0b0f2f7a7ee2c962df23414b4fd714"
+dependencies = [
+ "rand",
+ "siphasher",
+]
+
 [[package]]
 name = "schannel"
 version = "0.1.19"
@@ -1696,9 +1698,9 @@ dependencies = [
 
 [[package]]
 name = "sha2"
-version = "0.9.8"
+version = "0.9.9"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b69f9a4c9740d74c5baa3fd2e547f9525fa8088a8a958e0ca2409a514e33f5fa"
+checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800"
 dependencies = [
  "block-buffer 0.9.0",
  "cfg-if",
@@ -1709,9 +1711,9 @@ dependencies = [
 
 [[package]]
 name = "sha2"
-version = "0.10.0"
+version = "0.10.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "900d964dd36bb15bcf2f2b35694c072feab74969a54f2bbeec7a2d725d2bdcb6"
+checksum = "99c3bd8169c58782adad9290a9af5939994036b76187f7b4f0e6de91dbbfc0ec"
 dependencies = [
  "cfg-if",
  "cpufeatures",
@@ -1736,6 +1738,12 @@ dependencies = [
  "libc",
 ]
 
+[[package]]
+name = "siphasher"
+version = "0.3.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "533494a8f9b724d33625ab53c6c4800f7cc445895924a8ef649222dcb76e938b"
+
 [[package]]
 name = "slab"
 version = "0.4.5"
@@ -1744,9 +1752,9 @@ checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5"
 
 [[package]]
 name = "smallvec"
-version = "1.7.0"
+version = "1.8.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309"
+checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
 
 [[package]]
 name = "snow"
@@ -1757,26 +1765,27 @@ dependencies = [
  "aes-gcm",
  "blake2",
  "chacha20poly1305",
- "rand 0.8.4",
+ "rand",
  "rand_core 0.6.3",
  "rustc_version",
- "sha2 0.9.8",
+ "sha2 0.9.9",
  "subtle",
  "x25519-dalek",
 ]
 
 [[package]]
 name = "snowstorm"
-version = "0.2.0"
+version = "0.3.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2389f02a6cad262ce2b23b0c99e864511dd6adf916a6c67e614d38f4fe60ed89"
+checksum = "253f76cc2e96ff871886f8a869197794a2d1a2d2b89c8711a9c5644614f6f700"
 dependencies = [
  "bytes",
- "exponential-backoff",
+ "coarsetime",
  "futures-util",
  "log",
  "pin-project",
- "rand 0.8.4",
+ "rand",
+ "scalable_cuckoo_filter",
  "snow",
  "thiserror",
  "tokio",
@@ -1806,9 +1815,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
 
 [[package]]
 name = "syn"
-version = "1.0.84"
+version = "1.0.85"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ecb2e6da8ee5eb9a61068762a32fa9619cc591ceb055b3687f4cd4051ec2e06b"
+checksum = "a684ac3dcd8913827e18cd09a68384ee66c1de24157e3c556c9ab16d85695fb7"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -1829,13 +1838,13 @@ dependencies = [
 
 [[package]]
 name = "tempfile"
-version = "3.2.0"
+version = "3.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22"
+checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4"
 dependencies = [
  "cfg-if",
+ "fastrand",
  "libc",
- "rand 0.8.4",
  "redox_syscall",
  "remove_dir_all",
  "winapi",
@@ -2049,7 +2058,7 @@ dependencies = [
  "indexmap",
  "pin-project",
  "pin-project-lite",
- "rand 0.8.4",
+ "rand",
  "slab",
  "tokio",
  "tokio-stream",
@@ -2297,6 +2306,60 @@ version = "0.10.2+wasi-snapshot-preview1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
 
+[[package]]
+name = "wasm-bindgen"
+version = "0.2.78"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce"
+dependencies = [
+ "cfg-if",
+ "wasm-bindgen-macro",
+]
+
+[[package]]
+name = "wasm-bindgen-backend"
+version = "0.2.78"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a317bf8f9fba2476b4b2c85ef4c4af8ff39c3c7f0cdfeed4f82c34a880aa837b"
+dependencies = [
+ "bumpalo",
+ "lazy_static",
+ "log",
+ "proc-macro2",
+ "quote",
+ "syn",
+ "wasm-bindgen-shared",
+]
+
+[[package]]
+name = "wasm-bindgen-macro"
+version = "0.2.78"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d56146e7c495528bf6587663bea13a8eb588d39b36b679d83972e1a2dbbdacf9"
+dependencies = [
+ "quote",
+ "wasm-bindgen-macro-support",
+]
+
+[[package]]
+name = "wasm-bindgen-macro-support"
+version = "0.2.78"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7803e0eea25835f8abdc585cd3021b3deb11543c6fe226dcd30b228857c5c5ab"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+ "wasm-bindgen-backend",
+ "wasm-bindgen-shared",
+]
+
+[[package]]
+name = "wasm-bindgen-shared"
+version = "0.2.78"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0237232789cf037d5480773fe568aac745bfe2afbc11a863e97901780a6b47cc"
+
 [[package]]
 name = "which"
 version = "4.2.2"

+ 2 - 2
Cargo.toml

@@ -60,11 +60,11 @@ rand = "0.8"
 backoff = { version="0.3", features=["tokio"] }
 tracing = "0.1"
 tracing-subscriber = "0.2"
-socket2 = "0.4"
+socket2 = { version = "0.4", features = ["all"] }
 fdlimit = "0.2"
 tokio-native-tls = { version = "0.3", optional = true }
 async-trait = "0.1"
-snowstorm = { version = "0.2", optional = true }
+snowstorm = { version = "0.3", optional = true, features = ["stream"] }
 base64 = { version = "0.13", optional = true }
 notify = { version = "5.0.0-pre.13", optional = true }
 console-subscriber = { version = "0.1", optional = true, features = ["parking_lot"] }

+ 6 - 0
README-zh.md

@@ -94,6 +94,9 @@ default_token = "default_token_if_not_specify" # Optional. The default token of
 
 [client.transport] # The whole block is optional. Specify which transport to use
 type = "tcp" # Optional. Possible values: ["tcp", "tls", "noise"]. Default: "tcp"
+nodelay = false # Optional. Determine whether to enable TCP_NODELAY for data transmission, if applicable, to improve the latency but decrease the bandwidth. Default: false
+keepalive_secs = 10 # Optional. Specify `tcp_keepalive_time` in `tcp(7)`, if applicable. Default: 10 seconds
+keepalive_interval = 5 # Optional. Specify `tcp_keepalive_intvl` in `tcp(7)`, if applicable. Default: 5 seconds
 
 [client.transport.tls] # Necessary if `type` is "tls"
 trusted_root = "ca.pem" # Necessary. The certificate of CA that signed the server's certificate
@@ -118,6 +121,9 @@ default_token = "default_token_if_not_specify" # Optional
 
 [server.transport] # Same as `[client.transport]`
 type = "tcp" 
+nodelay = false
+keepalive_secs = 10
+keepalive_interval = 5
 
 [server.transport.tls] # Necessary if `type` is "tls"
 pkcs12 = "identify.pfx" # Necessary. pkcs12 file of server's certificate and private key

+ 8 - 0
README.md

@@ -94,6 +94,9 @@ default_token = "default_token_if_not_specify" # Optional. The default token of
 
 [client.transport] # The whole block is optional. Specify which transport to use
 type = "tcp" # Optional. Possible values: ["tcp", "tls", "noise"]. Default: "tcp"
+nodelay = false # Optional. Determine whether to enable TCP_NODELAY, if applicable, to improve the latency but decrease the bandwidth. Default: false
+keepalive_secs = 10 # Optional. Specify `tcp_keepalive_time` in `tcp(7)`, if applicable. Default: 10 seconds
+keepalive_interval = 5 # Optional. Specify `tcp_keepalive_intvl` in `tcp(7)`, if applicable. Default: 5 seconds
 
 [client.transport.tls] # Necessary if `type` is "tls"
 trusted_root = "ca.pem" # Necessary. The certificate of CA that signed the server's certificate
@@ -108,6 +111,7 @@ remote_public_key = "key_encoded_in_base64" # Optional
 type = "tcp" # Optional. The protocol that needs forwarding. Possible values: ["tcp", "udp"]. Default: "tcp"
 token = "whatever" # Necessary if `client.default_token` not set
 local_addr = "127.0.0.1:1081" # Necessary. The address of the service that needs to be forwarded
+nodelay = false # Optional. Determine whether to enable TCP_NODELAY for data transmission, if applicable, to improve the latency but decrease the bandwidth. Default: false
 
 [client.services.service2] # Multiple services can be defined
 local_addr = "127.0.0.1:1082"
@@ -118,6 +122,9 @@ default_token = "default_token_if_not_specify" # Optional
 
 [server.transport] # Same as `[client.transport]`
 type = "tcp" 
+nodelay = false
+keepalive_secs = 10
+keepalive_interval = 5
 
 [server.transport.tls] # Necessary if `type` is "tls"
 pkcs12 = "identify.pfx" # Necessary. pkcs12 file of server's certificate and private key
@@ -132,6 +139,7 @@ remote_public_key = "key_encoded_in_base64"
 type = "tcp" # Optional. Same as the client `[client.services.X.type]
 token = "whatever" # Necessary if `server.default_token` not set
 bind_addr = "0.0.0.0:8081" # Necessary. The address of the service is exposed at. Generally only the port needs to be change. 
+nodelay = false # Optional. Same as the client
 
 [server.services.service2] 
 bind_addr = "0.0.0.1:8082"

+ 12 - 6
src/client.rs

@@ -6,7 +6,7 @@ use crate::protocol::{
     self, read_ack, read_control_cmd, read_data_cmd, read_hello, Ack, Auth, ControlChannelCmd,
     DataChannelCmd, UdpTraffic, CURRENT_PROTO_VERSION, HASH_WIDTH_IN_BYTES,
 };
-use crate::transport::{TcpTransport, Transport};
+use crate::transport::{SocketOpts, TcpTransport, Transport};
 use anyhow::{anyhow, bail, Context, Result};
 use backoff::ExponentialBackoff;
 use bytes::{Bytes, BytesMut};
@@ -82,9 +82,7 @@ impl<'a, T: 'static + Transport> Client<'a, T> {
             config,
             service_handles: HashMap::new(),
             transport: Arc::new(
-                T::new(&config.transport)
-                    .await
-                    .with_context(|| "Failed to create the transport")?,
+                T::new(&config.transport).with_context(|| "Failed to create the transport")?,
             ),
         })
     }
@@ -153,6 +151,7 @@ struct RunDataChannelArgs<T: Transport> {
     remote_addr: String,
     local_addr: String,
     connector: Arc<T>,
+    socket_opts: SocketOpts,
 }
 
 async fn do_data_channel_handshake<T: Transport>(
@@ -170,11 +169,14 @@ async fn do_data_channel_handshake<T: Transport>(
     let mut conn: T::Stream = backoff::future::retry_notify(
         backoff,
         || async {
-            Ok(args
+            let conn = args
                 .connector
                 .connect(&args.remote_addr)
                 .await
-                .with_context(|| "Failed to connect to remote_addr")?)
+                .with_context(|| "Failed to connect to remote_addr")?;
+            T::hint(&conn, args.socket_opts);
+
+            Ok(conn)
         },
         |e, duration| {
             warn!("{:?}. Retry in {:?}", e, duration);
@@ -381,6 +383,7 @@ impl<T: 'static + Transport> ControlChannel<T> {
             .connect(&self.remote_addr)
             .await
             .with_context(|| format!("Failed to connect to the server: {}", &self.remote_addr))?;
+        T::hint(&conn, SocketOpts::for_control_channel());
 
         // Send hello
         debug!("Sending hello");
@@ -424,11 +427,14 @@ impl<T: 'static + Transport> ControlChannel<T> {
 
         let remote_addr = self.remote_addr.clone();
         let local_addr = self.service.local_addr.clone();
+        // Socket options for the data channel
+        let socket_opts = SocketOpts::from_client_cfg(&self.service);
         let data_ch_args = Arc::new(RunDataChannelArgs {
             session_key,
             remote_addr,
             local_addr,
             connector: self.transport.clone(),
+            socket_opts,
         });
 
         loop {

+ 38 - 1
src/config.rs

@@ -4,6 +4,8 @@ use std::collections::HashMap;
 use std::path::Path;
 use tokio::fs;
 
+use crate::transport::{DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_KEEPALIVE_SECS, DEFAULT_NODELAY};
+
 #[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq)]
 pub enum TransportType {
     #[serde(rename = "tcp")]
@@ -28,6 +30,7 @@ pub struct ClientServiceConfig {
     pub name: String,
     pub local_addr: String,
     pub token: Option<String>,
+    pub nodelay: Option<bool>,
 }
 
 impl ClientServiceConfig {
@@ -65,6 +68,7 @@ pub struct ServerServiceConfig {
     pub name: String,
     pub bind_addr: String,
     pub token: Option<String>,
+    pub nodelay: Option<bool>,
 }
 
 impl ServerServiceConfig {
@@ -96,14 +100,45 @@ pub struct NoiseConfig {
     // TODO: Maybe psk can be added
 }
 
-#[derive(Debug, Serialize, Deserialize, Default, PartialEq, Clone)]
+fn default_nodelay() -> bool {
+    DEFAULT_NODELAY
+}
+
+fn default_keepalive_secs() -> u64 {
+    DEFAULT_KEEPALIVE_SECS
+}
+
+fn default_keepalive_interval() -> u64 {
+    DEFAULT_KEEPALIVE_INTERVAL
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
 pub struct TransportConfig {
     #[serde(rename = "type")]
     pub transport_type: TransportType,
+    #[serde(default = "default_nodelay")]
+    pub nodelay: bool,
+    #[serde(default = "default_keepalive_secs")]
+    pub keepalive_secs: u64,
+    #[serde(default = "default_keepalive_interval")]
+    pub keepalive_interval: u64,
     pub tls: Option<TlsConfig>,
     pub noise: Option<NoiseConfig>,
 }
 
+impl Default for TransportConfig {
+    fn default() -> TransportConfig {
+        TransportConfig {
+            transport_type: Default::default(),
+            nodelay: default_nodelay(),
+            keepalive_secs: default_keepalive_secs(),
+            keepalive_interval: default_keepalive_interval(),
+            tls: None,
+            noise: None,
+        }
+    }
+}
+
 fn default_transport() -> TransportConfig {
     Default::default()
 }
@@ -294,6 +329,7 @@ mod tests {
                 name: "foo1".into(),
                 bind_addr: "127.0.0.1:80".into(),
                 token: None,
+                ..Default::default()
             },
         );
 
@@ -341,6 +377,7 @@ mod tests {
                 name: "foo1".into(),
                 local_addr: "127.0.0.1:80".into(),
                 token: None,
+                ..Default::default()
             },
         );
 

+ 17 - 14
src/helper.rs

@@ -5,28 +5,31 @@ use std::{
     time::Duration,
 };
 
-use anyhow::{anyhow, Context, Result};
+use anyhow::{anyhow, Result};
 use socket2::{SockRef, TcpKeepalive};
 use tokio::net::{lookup_host, TcpStream, ToSocketAddrs, UdpSocket};
-use tracing::error;
+use tracing::trace;
 
 // Tokio hesitates to expose this option...So we have to do it on our own :(
 // The good news is that using socket2 it can be easily done, without losing portability.
 // See https://github.com/tokio-rs/tokio/issues/3082
-pub fn try_set_tcp_keepalive(conn: &TcpStream) -> Result<()> {
+pub fn try_set_tcp_keepalive(
+    conn: &TcpStream,
+    keepalive_duration: Duration,
+    keepalive_interval: Duration,
+) -> Result<()> {
     let s = SockRef::from(conn);
-    let keepalive = TcpKeepalive::new().with_time(Duration::from_secs(30));
-    s.set_tcp_keepalive(&keepalive)
-        .with_context(|| "Failed to set keepalive")
-}
+    let keepalive = TcpKeepalive::new()
+        .with_time(keepalive_duration)
+        .with_interval(keepalive_interval);
 
-pub fn set_tcp_keepalive(conn: &TcpStream) {
-    if let Err(e) = try_set_tcp_keepalive(conn) {
-        error!(
-            "Failed to set TCP keepalive. The connection maybe unstable: {:?}",
-            e
-        );
-    }
+    trace!(
+        "Set TCP keepalive {:?} {:?}",
+        keepalive_duration,
+        keepalive_interval
+    );
+
+    Ok(s.set_tcp_keepalive(&keepalive)?)
 }
 
 #[allow(dead_code)]

+ 9 - 3
src/server.rs

@@ -7,7 +7,7 @@ use crate::protocol::{
     self, read_auth, read_hello, Ack, ControlChannelCmd, DataChannelCmd, Hello, UdpTraffic,
     HASH_WIDTH_IN_BYTES,
 };
-use crate::transport::{TcpTransport, Transport};
+use crate::transport::{SocketOpts, TcpTransport, Transport};
 use anyhow::{anyhow, bail, Context, Result};
 use backoff::backoff::Backoff;
 use backoff::ExponentialBackoff;
@@ -111,7 +111,7 @@ impl<'a, T: 'static + Transport> Server<'a, T> {
             config,
             services: Arc::new(RwLock::new(generate_service_hashmap(config))),
             control_channels: Arc::new(RwLock::new(ControlChannelMap::new())),
-            transport: Arc::new(T::new(&config.transport).await?),
+            transport: Arc::new(T::new(&config.transport)?),
         })
     }
 
@@ -254,6 +254,8 @@ async fn do_control_channel_handshake<T: 'static + Transport>(
 ) -> Result<()> {
     info!("Try to handshake a control channel");
 
+    T::hint(&conn, SocketOpts::for_control_channel());
+
     // Generate a nonce
     let mut nonce = vec![0u8; HASH_WIDTH_IN_BYTES];
     rand::thread_rng().fill_bytes(&mut nonce);
@@ -338,6 +340,8 @@ async fn do_data_channel_handshake<T: 'static + Transport>(
     let control_channels_guard = control_channels.read().await;
     match control_channels_guard.get2(&nonce) {
         Some(handle) => {
+            T::hint(&conn, SocketOpts::from_server_cfg(&handle.service));
+
             // Send the data channel to the corresponding control channel
             handle
                 .data_ch_tx
@@ -356,6 +360,7 @@ pub struct ControlChannelHandle<T: Transport> {
     // Shutdown the control channel by dropping it
     _shutdown_tx: broadcast::Sender<bool>,
     data_ch_tx: mpsc::Sender<T::Stream>,
+    service: ServerServiceConfig,
 }
 
 impl<T> ControlChannelHandle<T>
@@ -428,7 +433,7 @@ where
         let ch = ControlChannel::<T> {
             conn,
             shutdown_rx,
-            service,
+            service: service.clone(),
             data_ch_req_rx,
         };
 
@@ -445,6 +450,7 @@ where
         ControlChannelHandle {
             _shutdown_tx: shutdown_tx,
             data_ch_tx,
+            service,
         }
     }
 }

+ 97 - 5
src/transport/mod.rs

@@ -1,21 +1,31 @@
-use crate::config::TransportConfig;
-use anyhow::Result;
+use crate::config::{ClientServiceConfig, ServerServiceConfig, TransportConfig};
+use crate::helper::try_set_tcp_keepalive;
+use anyhow::{Context, Result};
 use async_trait::async_trait;
 use std::fmt::Debug;
 use std::net::SocketAddr;
+use std::time::Duration;
 use tokio::io::{AsyncRead, AsyncWrite};
-use tokio::net::ToSocketAddrs;
+use tokio::net::{TcpStream, ToSocketAddrs};
+use tracing::{error, trace};
 
-// Specify a transport layer, like TCP, TLS
+pub static DEFAULT_NODELAY: bool = false;
+
+pub static DEFAULT_KEEPALIVE_SECS: u64 = 10;
+pub static DEFAULT_KEEPALIVE_INTERVAL: u64 = 5;
+
+/// Specify a transport layer, like TCP, TLS
 #[async_trait]
 pub trait Transport: Debug + Send + Sync {
     type Acceptor: Send + Sync;
     type RawStream: Send + Sync;
     type Stream: 'static + AsyncRead + AsyncWrite + Unpin + Send + Sync + Debug;
 
-    async fn new(config: &TransportConfig) -> Result<Self>
+    fn new(config: &TransportConfig) -> Result<Self>
     where
         Self: Sized;
+    /// Provide the transport with socket options, which can be handled at the need of the transport
+    fn hint(conn: &Self::Stream, opts: SocketOpts);
     async fn bind<T: ToSocketAddrs + Send + Sync>(&self, addr: T) -> Result<Self::Acceptor>;
     async fn accept(&self, a: &Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)>;
     async fn handshake(&self, conn: Self::RawStream) -> Result<Self::Stream>;
@@ -33,3 +43,85 @@ pub use tls::TlsTransport;
 mod noise;
 #[cfg(feature = "noise")]
 pub use noise::NoiseTransport;
+
+#[derive(Debug, Clone, Copy)]
+struct Keepalive {
+    // tcp_keepalive_time if the underlying protocol is TCP
+    pub keepalive_secs: u64,
+    // tcp_keepalive_intvl if the underlying protocol is TCP
+    pub keepalive_interval: u64,
+}
+
+#[derive(Debug, Clone, Copy)]
+pub struct SocketOpts {
+    // None means do not change
+    nodelay: Option<bool>,
+    // keepalive must be Some or None at the same time, or the behavior will be platform-dependent
+    keepalive: Option<Keepalive>,
+}
+
+impl SocketOpts {
+    fn none() -> SocketOpts {
+        SocketOpts {
+            nodelay: None,
+            keepalive: None,
+        }
+    }
+
+    /// Socket options for the control channel
+    pub fn for_control_channel() -> SocketOpts {
+        SocketOpts {
+            nodelay: Some(true),  // Always set nodelay for the control channel
+            ..SocketOpts::none()  // None means do not change. Keepalive is set by TcpTransport
+        }
+    }
+}
+
+impl SocketOpts {
+    pub fn from_transport_cfg(cfg: &TransportConfig) -> SocketOpts {
+        SocketOpts {
+            nodelay: Some(cfg.nodelay),
+            keepalive: Some(Keepalive {
+                keepalive_secs: cfg.keepalive_secs,
+                keepalive_interval: cfg.keepalive_interval,
+            }),
+        }
+    }
+
+    pub fn from_client_cfg(cfg: &ClientServiceConfig) -> SocketOpts {
+        SocketOpts {
+            nodelay: cfg.nodelay,
+            ..SocketOpts::none()
+        }
+    }
+
+    pub fn from_server_cfg(cfg: &ServerServiceConfig) -> SocketOpts {
+        SocketOpts {
+            nodelay: cfg.nodelay,
+            ..SocketOpts::none()
+        }
+    }
+
+    pub fn apply(&self, conn: &TcpStream) {
+        if let Some(v) = self.keepalive {
+            let keepalive_duration = Duration::from_secs(v.keepalive_secs);
+            let keepalive_interval = Duration::from_secs(v.keepalive_interval);
+
+            if let Err(e) = try_set_tcp_keepalive(conn, keepalive_duration, keepalive_interval)
+                .with_context(|| "Failed to set keepalive")
+            {
+                error!("{:?}", e);
+            }
+        }
+
+        if let Some(nodelay) = self.nodelay {
+            trace!("Set nodelay {}", nodelay);
+            if let Err(e) = conn
+                .set_nodelay(nodelay)
+                .with_context(|| "Failed to set nodelay")
+            {
+                error!("{:?}", e);
+            }
+        }
+    }
+}

+ 17 - 13
src/transport/noise.rs

@@ -1,16 +1,14 @@
 use std::net::SocketAddr;
 
-use super::Transport;
-use crate::{
-    config::{NoiseConfig, TransportConfig},
-    helper::set_tcp_keepalive,
-};
+use super::{SocketOpts, TcpTransport, Transport};
+use crate::config::{NoiseConfig, TransportConfig};
 use anyhow::{anyhow, Context, Result};
 use async_trait::async_trait;
 use snowstorm::{Builder, NoiseParams, NoiseStream};
 use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
 
 pub struct NoiseTransport {
+    tcp: TcpTransport,
     config: NoiseConfig,
     params: NoiseParams,
     local_private_key: Vec<u8>,
@@ -39,7 +37,9 @@ impl Transport for NoiseTransport {
     type RawStream = TcpStream;
     type Stream = snowstorm::stream::NoiseStream<TcpStream>;
 
-    async fn new(config: &TransportConfig) -> Result<Self> {
+    fn new(config: &TransportConfig) -> Result<Self> {
+        let tcp = TcpTransport::new(config)?;
+
         let config = match &config.noise {
             Some(v) => v.clone(),
             None => return Err(anyhow!("Missing noise config")),
@@ -61,6 +61,7 @@ impl Transport for NoiseTransport {
         let params: NoiseParams = config.pattern.parse()?;
 
         Ok(NoiseTransport {
+            tcp,
             config,
             params,
             local_private_key,
@@ -68,17 +69,19 @@ impl Transport for NoiseTransport {
         })
     }
 
+    fn hint(conn: &Self::Stream, opt: SocketOpts) {
+        opt.apply(conn.get_inner());
+    }
+
     async fn bind<T: ToSocketAddrs + Send + Sync>(&self, addr: T) -> Result<Self::Acceptor> {
         Ok(TcpListener::bind(addr).await?)
     }
 
     async fn accept(&self, a: &Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)> {
-        let (conn, addr) = a
-            .accept()
+        self.tcp
+            .accept(a)
             .await
-            .with_context(|| "Failed to accept TCP connection")?;
-        set_tcp_keepalive(&conn);
-        Ok((conn, addr))
+            .with_context(|| "Failed to accept TCP connection")
     }
 
     async fn handshake(&self, conn: Self::RawStream) -> Result<Self::Stream> {
@@ -89,10 +92,11 @@ impl Transport for NoiseTransport {
     }
 
     async fn connect(&self, addr: &str) -> Result<Self::Stream> {
-        let conn = TcpStream::connect(addr)
+        let conn = self
+            .tcp
+            .connect(addr)
             .await
             .with_context(|| "Failed to connect TCP socket")?;
-        set_tcp_keepalive(&conn);
 
         let conn = NoiseStream::handshake(conn, self.builder().build_initiator()?)
             .await

+ 14 - 7
src/transport/tcp.rs

@@ -1,14 +1,15 @@
 use crate::config::TransportConfig;
-use crate::helper::set_tcp_keepalive;
 
-use super::Transport;
+use super::{SocketOpts, Transport};
 use anyhow::Result;
 use async_trait::async_trait;
 use std::net::SocketAddr;
 use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
 
 #[derive(Debug)]
-pub struct TcpTransport {}
+pub struct TcpTransport {
+    socket_opts: SocketOpts,
+}
 
 #[async_trait]
 impl Transport for TcpTransport {
@@ -16,8 +17,14 @@ impl Transport for TcpTransport {
     type Stream = TcpStream;
     type RawStream = TcpStream;
 
-    async fn new(_config: &TransportConfig) -> Result<Self> {
-        Ok(TcpTransport {})
+    fn new(config: &TransportConfig) -> Result<Self> {
+        Ok(TcpTransport {
+            socket_opts: SocketOpts::from_transport_cfg(config),
+        })
+    }
+
+    fn hint(conn: &Self::Stream, opt: SocketOpts) {
+        opt.apply(conn);
     }
 
     async fn bind<T: ToSocketAddrs + Send + Sync>(&self, addr: T) -> Result<Self::Acceptor> {
@@ -26,7 +33,7 @@ impl Transport for TcpTransport {
 
     async fn accept(&self, a: &Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)> {
         let (s, addr) = a.accept().await?;
-        set_tcp_keepalive(&s);
+        self.socket_opts.apply(&s);
         Ok((s, addr))
     }
 
@@ -36,7 +43,7 @@ impl Transport for TcpTransport {
 
     async fn connect(&self, addr: &str) -> Result<Self::Stream> {
         let s = TcpStream::connect(addr).await?;
-        set_tcp_keepalive(&s);
+        self.socket_opts.apply(&s);
         Ok(s)
     }
 }

+ 16 - 12
src/transport/tls.rs

@@ -1,17 +1,17 @@
 use std::net::SocketAddr;
 
-use super::Transport;
+use super::{SocketOpts, TcpTransport, Transport};
 use crate::config::{TlsConfig, TransportConfig};
-use crate::helper::set_tcp_keepalive;
 use anyhow::{anyhow, Context, Result};
 use async_trait::async_trait;
-use tokio::fs;
+use std::fs;
 use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
 use tokio_native_tls::native_tls::{self, Certificate, Identity};
 use tokio_native_tls::{TlsAcceptor, TlsConnector, TlsStream};
 
 #[derive(Debug)]
 pub struct TlsTransport {
+    tcp: TcpTransport,
     config: TlsConfig,
     connector: Option<TlsConnector>,
     tls_acceptor: Option<TlsAcceptor>,
@@ -23,7 +23,8 @@ impl Transport for TlsTransport {
     type RawStream = TcpStream;
     type Stream = TlsStream<TcpStream>;
 
-    async fn new(config: &TransportConfig) -> Result<Self> {
+    fn new(config: &TransportConfig) -> Result<Self> {
+        let tcp = TcpTransport::new(config)?;
         let config = match &config.tls {
             Some(v) => v,
             None => {
@@ -34,7 +35,6 @@ impl Transport for TlsTransport {
         let connector = match config.trusted_root.as_ref() {
             Some(path) => {
                 let s = fs::read_to_string(path)
-                    .await
                     .with_context(|| "Failed to read the `tls.trusted_root`")?;
                 let cert = Certificate::from_pem(s.as_bytes())
                     .with_context(|| "Failed to read certificate from `tls.trusted_root`")?;
@@ -49,7 +49,7 @@ impl Transport for TlsTransport {
         let tls_acceptor = match config.pkcs12.as_ref() {
             Some(path) => {
                 let ident = Identity::from_pkcs12(
-                    &fs::read(path).await?,
+                    &fs::read(path)?,
                     config.pkcs12_password.as_ref().unwrap(),
                 )
                 .with_context(|| "Failed to create identitiy")?;
@@ -61,12 +61,17 @@ impl Transport for TlsTransport {
         };
 
         Ok(TlsTransport {
+            tcp,
             config: config.clone(),
             connector,
             tls_acceptor,
         })
     }
 
+    fn hint(conn: &Self::Stream, opt: SocketOpts) {
+        opt.apply(conn.get_ref().get_ref().get_ref());
+    }
+
     async fn bind<A: ToSocketAddrs + Send + Sync>(&self, addr: A) -> Result<Self::Acceptor> {
         let l = TcpListener::bind(addr)
             .await
@@ -75,10 +80,10 @@ impl Transport for TlsTransport {
     }
 
     async fn accept(&self, a: &Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)> {
-        let (conn, addr) = a.accept().await?;
-        set_tcp_keepalive(&conn);
-
-        Ok((conn, addr))
+        self.tcp
+            .accept(a)
+            .await
+            .with_context(|| "Failed to accept TCP connection")
     }
 
     async fn handshake(&self, conn: Self::RawStream) -> Result<Self::Stream> {
@@ -87,8 +92,7 @@ impl Transport for TlsTransport {
     }
 
     async fn connect(&self, addr: &str) -> Result<Self::Stream> {
-        let conn = TcpStream::connect(&addr).await?;
-        set_tcp_keepalive(&conn);
+        let conn = self.tcp.connect(addr).await?;
 
         let connector = self.connector.as_ref().unwrap();
         Ok(connector