config_watcher.rs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. use crate::{
  2. config::{ClientConfig, ClientServiceConfig, ServerConfig, ServerServiceConfig},
  3. Config,
  4. };
  5. use anyhow::{Context, Result};
  6. use std::{
  7. collections::HashMap,
  8. path::{Path, PathBuf},
  9. };
  10. use tokio::sync::{broadcast, mpsc};
  11. use tracing::{error, info, instrument};
  12. #[cfg(feature = "notify")]
  13. use notify::{EventKind, RecursiveMode, Watcher};
  14. #[derive(Debug, PartialEq)]
  15. pub enum ConfigChange {
  16. General(Box<Config>), // Trigger a full restart
  17. ServiceChange(ServiceChange),
  18. }
  19. #[derive(Debug, PartialEq)]
  20. pub enum ServiceChange {
  21. ClientAdd(ClientServiceConfig),
  22. ClientDelete(String),
  23. ServerAdd(ServerServiceConfig),
  24. ServerDelete(String),
  25. }
  26. impl From<ClientServiceConfig> for ServiceChange {
  27. fn from(c: ClientServiceConfig) -> Self {
  28. ServiceChange::ClientAdd(c)
  29. }
  30. }
  31. impl From<ServerServiceConfig> for ServiceChange {
  32. fn from(c: ServerServiceConfig) -> Self {
  33. ServiceChange::ServerAdd(c)
  34. }
  35. }
  36. trait InstanceConfig: Clone {
  37. type ServiceConfig: Into<ServiceChange> + PartialEq + Clone;
  38. fn equal_without_service(&self, rhs: &Self) -> bool;
  39. fn to_service_change_delete(s: String) -> ServiceChange;
  40. fn get_services(&self) -> &HashMap<String, Self::ServiceConfig>;
  41. }
  42. impl InstanceConfig for ServerConfig {
  43. type ServiceConfig = ServerServiceConfig;
  44. fn equal_without_service(&self, rhs: &Self) -> bool {
  45. let left = ServerConfig {
  46. services: Default::default(),
  47. ..self.clone()
  48. };
  49. let right = ServerConfig {
  50. services: Default::default(),
  51. ..rhs.clone()
  52. };
  53. left == right
  54. }
  55. fn to_service_change_delete(s: String) -> ServiceChange {
  56. ServiceChange::ServerDelete(s)
  57. }
  58. fn get_services(&self) -> &HashMap<String, Self::ServiceConfig> {
  59. &self.services
  60. }
  61. }
  62. impl InstanceConfig for ClientConfig {
  63. type ServiceConfig = ClientServiceConfig;
  64. fn equal_without_service(&self, rhs: &Self) -> bool {
  65. let left = ClientConfig {
  66. services: Default::default(),
  67. ..self.clone()
  68. };
  69. let right = ClientConfig {
  70. services: Default::default(),
  71. ..rhs.clone()
  72. };
  73. left == right
  74. }
  75. fn to_service_change_delete(s: String) -> ServiceChange {
  76. ServiceChange::ClientDelete(s)
  77. }
  78. fn get_services(&self) -> &HashMap<String, Self::ServiceConfig> {
  79. &self.services
  80. }
  81. }
  82. pub struct ConfigWatcherHandle {
  83. pub event_rx: mpsc::UnboundedReceiver<ConfigChange>,
  84. }
  85. impl ConfigWatcherHandle {
  86. pub async fn new(path: &Path, shutdown_rx: broadcast::Receiver<bool>) -> Result<Self> {
  87. let (event_tx, event_rx) = mpsc::unbounded_channel();
  88. let origin_cfg = Config::from_file(path).await?;
  89. // Initial start
  90. event_tx
  91. .send(ConfigChange::General(Box::new(origin_cfg.clone())))
  92. .unwrap();
  93. tokio::spawn(config_watcher(
  94. path.to_owned(),
  95. shutdown_rx,
  96. event_tx,
  97. origin_cfg,
  98. ));
  99. Ok(ConfigWatcherHandle { event_rx })
  100. }
  101. }
  102. // Fake config watcher when compiling without `notify`
  103. #[cfg(not(feature = "notify"))]
  104. async fn config_watcher(
  105. _path: PathBuf,
  106. mut shutdown_rx: broadcast::Receiver<bool>,
  107. _event_tx: mpsc::UnboundedSender<ConfigChange>,
  108. _old: Config,
  109. ) -> Result<()> {
  110. // Do nothing except waiting for ctrl-c
  111. let _ = shutdown_rx.recv().await;
  112. Ok(())
  113. }
  114. #[cfg(feature = "notify")]
  115. #[instrument(skip(shutdown_rx, event_tx, old))]
  116. async fn config_watcher(
  117. path: PathBuf,
  118. mut shutdown_rx: broadcast::Receiver<bool>,
  119. event_tx: mpsc::UnboundedSender<ConfigChange>,
  120. mut old: Config,
  121. ) -> Result<()> {
  122. let (fevent_tx, mut fevent_rx) = mpsc::unbounded_channel();
  123. let parent_path = path.parent().expect("config file should have a parent dir");
  124. let path_clone = path.clone();
  125. let mut watcher =
  126. notify::recommended_watcher(move |res: Result<notify::Event, _>| match res {
  127. Ok(e) => {
  128. if matches!(e.kind, EventKind::Modify(_))
  129. && e.paths
  130. .iter()
  131. .map(|x| x.file_name())
  132. .any(|x| x == path_clone.file_name())
  133. {
  134. let _ = fevent_tx.send(true);
  135. }
  136. }
  137. Err(e) => error!("watch error: {:#}", e),
  138. })?;
  139. watcher.watch(parent_path, RecursiveMode::NonRecursive)?;
  140. info!("Start watching the config");
  141. loop {
  142. tokio::select! {
  143. e = fevent_rx.recv() => {
  144. match e {
  145. Some(_) => {
  146. info!("Rescan the configuration");
  147. let new = match Config::from_file(&path).await.with_context(|| "The changed configuration is invalid. Ignored") {
  148. Ok(v) => v,
  149. Err(e) => {
  150. error!("{:#}", e);
  151. // If the config is invalid, just ignore it
  152. continue;
  153. }
  154. };
  155. for event in calculate_events(&old, &new) {
  156. event_tx.send(event)?;
  157. }
  158. old = new;
  159. },
  160. None => break
  161. }
  162. },
  163. _ = shutdown_rx.recv() => break
  164. }
  165. }
  166. info!("Config watcher exiting");
  167. Ok(())
  168. }
  169. fn calculate_events(old: &Config, new: &Config) -> Vec<ConfigChange> {
  170. if old == new {
  171. return vec![];
  172. }
  173. let mut ret = vec![];
  174. if old.server != new.server {
  175. if old.server.is_some() != new.server.is_some() {
  176. return vec![ConfigChange::General(Box::new(new.clone()))];
  177. } else {
  178. match calculate_instance_config_events(
  179. old.server.as_ref().unwrap(),
  180. new.server.as_ref().unwrap(),
  181. ) {
  182. Some(mut v) => ret.append(&mut v),
  183. None => return vec![ConfigChange::General(Box::new(new.clone()))],
  184. }
  185. }
  186. }
  187. if old.client != new.client {
  188. if old.client.is_some() != new.client.is_some() {
  189. return vec![ConfigChange::General(Box::new(new.clone()))];
  190. } else {
  191. match calculate_instance_config_events(
  192. old.client.as_ref().unwrap(),
  193. new.client.as_ref().unwrap(),
  194. ) {
  195. Some(mut v) => ret.append(&mut v),
  196. None => return vec![ConfigChange::General(Box::new(new.clone()))],
  197. }
  198. }
  199. }
  200. ret
  201. }
  202. // None indicates a General change needed
  203. fn calculate_instance_config_events<T: InstanceConfig>(
  204. old: &T,
  205. new: &T,
  206. ) -> Option<Vec<ConfigChange>> {
  207. if !old.equal_without_service(new) {
  208. return None;
  209. }
  210. let old = old.get_services();
  211. let new = new.get_services();
  212. let mut v = vec![];
  213. v.append(&mut calculate_service_delete_events::<T>(old, new));
  214. v.append(&mut calculate_service_add_events(old, new));
  215. Some(v.into_iter().map(ConfigChange::ServiceChange).collect())
  216. }
  217. fn calculate_service_delete_events<T: InstanceConfig>(
  218. old: &HashMap<String, T::ServiceConfig>,
  219. new: &HashMap<String, T::ServiceConfig>,
  220. ) -> Vec<ServiceChange> {
  221. old.keys()
  222. .filter(|&name| new.get(name).is_none())
  223. .map(|x| T::to_service_change_delete(x.to_owned()))
  224. .collect()
  225. }
  226. fn calculate_service_add_events<T: PartialEq + Clone + Into<ServiceChange>>(
  227. old: &HashMap<String, T>,
  228. new: &HashMap<String, T>,
  229. ) -> Vec<ServiceChange> {
  230. new.iter()
  231. .filter(|(name, c)| old.get(*name) != Some(*c))
  232. .map(|(_, c)| c.clone().into())
  233. .collect()
  234. }
  235. #[cfg(test)]
  236. mod test {
  237. use crate::config::ServerConfig;
  238. use super::*;
  239. // macro to create map or set literal
  240. macro_rules! collection {
  241. // map-like
  242. ($($k:expr => $v:expr),* $(,)?) => {{
  243. use std::iter::{Iterator, IntoIterator};
  244. Iterator::collect(IntoIterator::into_iter([$(($k, $v),)*]))
  245. }};
  246. }
  247. #[test]
  248. fn test_calculate_events() {
  249. struct Test {
  250. old: Config,
  251. new: Config,
  252. }
  253. let tests = [
  254. Test {
  255. old: Config {
  256. server: Some(Default::default()),
  257. client: None,
  258. },
  259. new: Config {
  260. server: Some(Default::default()),
  261. client: Some(Default::default()),
  262. },
  263. },
  264. Test {
  265. old: Config {
  266. server: Some(ServerConfig {
  267. bind_addr: String::from("127.0.0.1:2334"),
  268. ..Default::default()
  269. }),
  270. client: None,
  271. },
  272. new: Config {
  273. server: Some(ServerConfig {
  274. bind_addr: String::from("127.0.0.1:2333"),
  275. services: collection!(String::from("foo") => Default::default()),
  276. ..Default::default()
  277. }),
  278. client: None,
  279. },
  280. },
  281. Test {
  282. old: Config {
  283. server: Some(Default::default()),
  284. client: None,
  285. },
  286. new: Config {
  287. server: Some(ServerConfig {
  288. services: collection!(String::from("foo") => Default::default()),
  289. ..Default::default()
  290. }),
  291. client: None,
  292. },
  293. },
  294. Test {
  295. old: Config {
  296. server: Some(ServerConfig {
  297. services: collection!(String::from("foo") => Default::default()),
  298. ..Default::default()
  299. }),
  300. client: None,
  301. },
  302. new: Config {
  303. server: Some(Default::default()),
  304. client: None,
  305. },
  306. },
  307. Test {
  308. old: Config {
  309. server: Some(ServerConfig {
  310. services: collection!(String::from("foo1") => ServerServiceConfig::with_name("foo1"), String::from("foo2") => ServerServiceConfig::with_name("foo2")),
  311. ..Default::default()
  312. }),
  313. client: Some(ClientConfig {
  314. services: collection!(String::from("foo1") => ClientServiceConfig::with_name("foo1"), String::from("foo2") => ClientServiceConfig::with_name("foo2")),
  315. ..Default::default()
  316. }),
  317. },
  318. new: Config {
  319. server: Some(ServerConfig {
  320. services: collection!(String::from("bar1") => ServerServiceConfig::with_name("bar1"), String::from("foo2") => ServerServiceConfig::with_name("foo2")),
  321. ..Default::default()
  322. }),
  323. client: Some(ClientConfig {
  324. services: collection!(String::from("bar1") => ClientServiceConfig::with_name("bar1"), String::from("bar2") => ClientServiceConfig::with_name("bar2")),
  325. ..Default::default()
  326. }),
  327. },
  328. },
  329. ];
  330. let mut expected = [
  331. vec![ConfigChange::General(Box::new(tests[0].new.clone()))],
  332. vec![ConfigChange::General(Box::new(tests[1].new.clone()))],
  333. vec![ConfigChange::ServiceChange(ServiceChange::ServerAdd(
  334. Default::default(),
  335. ))],
  336. vec![ConfigChange::ServiceChange(ServiceChange::ServerDelete(
  337. String::from("foo"),
  338. ))],
  339. vec![
  340. ConfigChange::ServiceChange(ServiceChange::ServerDelete(String::from("foo1"))),
  341. ConfigChange::ServiceChange(ServiceChange::ServerAdd(
  342. tests[4].new.server.as_ref().unwrap().services["bar1"].clone(),
  343. )),
  344. ConfigChange::ServiceChange(ServiceChange::ClientDelete(String::from("foo1"))),
  345. ConfigChange::ServiceChange(ServiceChange::ClientDelete(String::from("foo2"))),
  346. ConfigChange::ServiceChange(ServiceChange::ClientAdd(
  347. tests[4].new.client.as_ref().unwrap().services["bar1"].clone(),
  348. )),
  349. ConfigChange::ServiceChange(ServiceChange::ClientAdd(
  350. tests[4].new.client.as_ref().unwrap().services["bar2"].clone(),
  351. )),
  352. ],
  353. ];
  354. assert_eq!(tests.len(), expected.len());
  355. for i in 0..tests.len() {
  356. let mut actual = calculate_events(&tests[i].old, &tests[i].new);
  357. let get_key = |x: &ConfigChange| -> String {
  358. match x {
  359. ConfigChange::General(_) => String::from("g"),
  360. ConfigChange::ServiceChange(sc) => match sc {
  361. ServiceChange::ClientAdd(c) => "c_add_".to_owned() + &c.name,
  362. ServiceChange::ClientDelete(s) => "c_del_".to_owned() + s,
  363. ServiceChange::ServerAdd(c) => "s_add_".to_owned() + &c.name,
  364. ServiceChange::ServerDelete(s) => "s_del_".to_owned() + s,
  365. },
  366. }
  367. };
  368. actual.sort_by_cached_key(get_key);
  369. expected[i].sort_by_cached_key(get_key);
  370. assert_eq!(actual, expected[i]);
  371. }
  372. }
  373. }