server_manager.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package server
  2. import (
  3. "heckel.io/ntfy/log"
  4. "heckel.io/ntfy/util"
  5. "strings"
  6. )
  7. func (s *Server) execManager() {
  8. // WARNING: Make sure to only selectively lock with the mutex, and be aware that this
  9. // there is no mutex for the entire function.
  10. // Prune all the things
  11. s.pruneVisitors()
  12. s.pruneTokens()
  13. s.pruneAttachments()
  14. s.pruneMessages()
  15. // Message count per topic
  16. var messagesCached int
  17. messageCounts, err := s.messageCache.MessageCounts()
  18. if err != nil {
  19. log.Tag(tagManager).Err(err).Warn("Cannot get message counts")
  20. messageCounts = make(map[string]int) // Empty, so we can continue
  21. }
  22. for _, count := range messageCounts {
  23. messagesCached += count
  24. }
  25. // Remove subscriptions without subscribers
  26. var emptyTopics, subscribers int
  27. log.
  28. Tag(tagManager).
  29. Timing(func() {
  30. s.mu.Lock()
  31. defer s.mu.Unlock()
  32. for _, t := range s.topics {
  33. subs, lastAccess := t.Stats()
  34. ev := log.Tag(tagManager).With(t)
  35. if t.Stale() {
  36. if ev.IsTrace() {
  37. ev.Trace("- topic %s: Deleting stale topic (%d subscribers, accessed %s)", t.ID, subs, util.FormatTime(lastAccess))
  38. }
  39. emptyTopics++
  40. delete(s.topics, t.ID)
  41. } else {
  42. if ev.IsTrace() {
  43. ev.Trace("- topic %s: %d subscribers, accessed %s", t.ID, subs, util.FormatTime(lastAccess))
  44. }
  45. subscribers += subs
  46. }
  47. }
  48. }).
  49. Debug("Removed %d empty topic(s)", emptyTopics)
  50. // Mail stats
  51. var receivedMailTotal, receivedMailSuccess, receivedMailFailure int64
  52. if s.smtpServerBackend != nil {
  53. receivedMailTotal, receivedMailSuccess, receivedMailFailure = s.smtpServerBackend.Counts()
  54. }
  55. var sentMailTotal, sentMailSuccess, sentMailFailure int64
  56. if s.smtpSender != nil {
  57. sentMailTotal, sentMailSuccess, sentMailFailure = s.smtpSender.Counts()
  58. }
  59. // Users
  60. var usersCount int64
  61. if s.userManager != nil {
  62. usersCount, err = s.userManager.UsersCount()
  63. if err != nil {
  64. log.Tag(tagManager).Err(err).Warn("Error counting users")
  65. }
  66. }
  67. // Print stats
  68. s.mu.Lock()
  69. messagesCount, topicsCount, visitorsCount := s.messages, len(s.topics), len(s.visitors)
  70. s.mu.Unlock()
  71. log.
  72. Tag(tagManager).
  73. Fields(log.Context{
  74. "messages_published": messagesCount,
  75. "messages_cached": messagesCached,
  76. "topics_active": topicsCount,
  77. "subscribers": subscribers,
  78. "visitors": visitorsCount,
  79. "users": usersCount,
  80. "emails_received": receivedMailTotal,
  81. "emails_received_success": receivedMailSuccess,
  82. "emails_received_failure": receivedMailFailure,
  83. "emails_sent": sentMailTotal,
  84. "emails_sent_success": sentMailSuccess,
  85. "emails_sent_failure": sentMailFailure,
  86. }).
  87. Info("Server stats")
  88. mset(metricMessagesCached, messagesCached)
  89. mset(metricVisitors, visitorsCount)
  90. mset(metricUsers, usersCount)
  91. mset(metricSubscribers, subscribers)
  92. mset(metricTopics, topicsCount)
  93. }
  94. func (s *Server) pruneVisitors() {
  95. staleVisitors := 0
  96. log.
  97. Tag(tagManager).
  98. Timing(func() {
  99. s.mu.Lock()
  100. defer s.mu.Unlock()
  101. for ip, v := range s.visitors {
  102. if v.Stale() {
  103. log.Tag(tagManager).With(v).Trace("Deleting stale visitor")
  104. delete(s.visitors, ip)
  105. staleVisitors++
  106. }
  107. }
  108. }).
  109. Field("stale_visitors", staleVisitors).
  110. Debug("Deleted %d stale visitor(s)", staleVisitors)
  111. }
  112. func (s *Server) pruneTokens() {
  113. if s.userManager != nil {
  114. log.
  115. Tag(tagManager).
  116. Timing(func() {
  117. if err := s.userManager.RemoveExpiredTokens(); err != nil {
  118. log.Tag(tagManager).Err(err).Warn("Error expiring user tokens")
  119. }
  120. if err := s.userManager.RemoveDeletedUsers(); err != nil {
  121. log.Tag(tagManager).Err(err).Warn("Error deleting soft-deleted users")
  122. }
  123. }).
  124. Debug("Removed expired tokens and users")
  125. }
  126. }
  127. func (s *Server) pruneAttachments() {
  128. if s.fileCache == nil {
  129. return
  130. }
  131. log.
  132. Tag(tagManager).
  133. Timing(func() {
  134. ids, err := s.messageCache.AttachmentsExpired()
  135. if err != nil {
  136. log.Tag(tagManager).Err(err).Warn("Error retrieving expired attachments")
  137. } else if len(ids) > 0 {
  138. if log.Tag(tagManager).IsDebug() {
  139. log.Tag(tagManager).Debug("Deleting attachments %s", strings.Join(ids, ", "))
  140. }
  141. if err := s.fileCache.Remove(ids...); err != nil {
  142. log.Tag(tagManager).Err(err).Warn("Error deleting attachments")
  143. }
  144. if err := s.messageCache.MarkAttachmentsDeleted(ids...); err != nil {
  145. log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
  146. }
  147. } else {
  148. log.Tag(tagManager).Debug("No expired attachments to delete")
  149. }
  150. }).
  151. Debug("Deleted expired attachments")
  152. }
  153. func (s *Server) pruneMessages() {
  154. log.
  155. Tag(tagManager).
  156. Timing(func() {
  157. expiredMessageIDs, err := s.messageCache.MessagesExpired()
  158. if err != nil {
  159. log.Tag(tagManager).Err(err).Warn("Error retrieving expired messages")
  160. } else if len(expiredMessageIDs) > 0 {
  161. if s.fileCache != nil {
  162. if err := s.fileCache.Remove(expiredMessageIDs...); err != nil {
  163. log.Tag(tagManager).Err(err).Warn("Error deleting attachments for expired messages")
  164. }
  165. }
  166. if err := s.messageCache.DeleteMessages(expiredMessageIDs...); err != nil {
  167. log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
  168. }
  169. } else {
  170. log.Tag(tagManager).Debug("No expired messages to delete")
  171. }
  172. }).
  173. Debug("Pruned messages")
  174. }