server_manager.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. package server
  2. import (
  3. "heckel.io/ntfy/log"
  4. "heckel.io/ntfy/util"
  5. "strings"
  6. )
  7. func (s *Server) execManager() {
  8. // WARNING: Make sure to only selectively lock with the mutex, and be aware that this
  9. // there is no mutex for the entire function.
  10. // Prune all the things
  11. s.pruneVisitors()
  12. s.pruneTokens()
  13. s.pruneAttachments()
  14. s.pruneMessages()
  15. if s.config.WebPushPublicKey != "" {
  16. s.expireOrNotifyOldSubscriptions()
  17. }
  18. // Message count per topic
  19. var messagesCached int
  20. messageCounts, err := s.messageCache.MessageCounts()
  21. if err != nil {
  22. log.Tag(tagManager).Err(err).Warn("Cannot get message counts")
  23. messageCounts = make(map[string]int) // Empty, so we can continue
  24. }
  25. for _, count := range messageCounts {
  26. messagesCached += count
  27. }
  28. // Remove subscriptions without subscribers
  29. var emptyTopics, subscribers int
  30. log.
  31. Tag(tagManager).
  32. Timing(func() {
  33. s.mu.Lock()
  34. defer s.mu.Unlock()
  35. for _, t := range s.topics {
  36. subs, lastAccess := t.Stats()
  37. ev := log.Tag(tagManager).With(t)
  38. if t.Stale() {
  39. if ev.IsTrace() {
  40. ev.Trace("- topic %s: Deleting stale topic (%d subscribers, accessed %s)", t.ID, subs, util.FormatTime(lastAccess))
  41. }
  42. emptyTopics++
  43. delete(s.topics, t.ID)
  44. } else {
  45. if ev.IsTrace() {
  46. ev.Trace("- topic %s: %d subscribers, accessed %s", t.ID, subs, util.FormatTime(lastAccess))
  47. }
  48. subscribers += subs
  49. }
  50. }
  51. }).
  52. Debug("Removed %d empty topic(s)", emptyTopics)
  53. // Mail stats
  54. var receivedMailTotal, receivedMailSuccess, receivedMailFailure int64
  55. if s.smtpServerBackend != nil {
  56. receivedMailTotal, receivedMailSuccess, receivedMailFailure = s.smtpServerBackend.Counts()
  57. }
  58. var sentMailTotal, sentMailSuccess, sentMailFailure int64
  59. if s.smtpSender != nil {
  60. sentMailTotal, sentMailSuccess, sentMailFailure = s.smtpSender.Counts()
  61. }
  62. // Users
  63. var usersCount int64
  64. if s.userManager != nil {
  65. usersCount, err = s.userManager.UsersCount()
  66. if err != nil {
  67. log.Tag(tagManager).Err(err).Warn("Error counting users")
  68. }
  69. }
  70. // Print stats
  71. s.mu.RLock()
  72. messagesCount, topicsCount, visitorsCount := s.messages, len(s.topics), len(s.visitors)
  73. s.mu.RUnlock()
  74. // Update stats
  75. s.updateAndWriteStats(messagesCount)
  76. // Log stats
  77. log.
  78. Tag(tagManager).
  79. Fields(log.Context{
  80. "messages_published": messagesCount,
  81. "messages_cached": messagesCached,
  82. "topics_active": topicsCount,
  83. "subscribers": subscribers,
  84. "visitors": visitorsCount,
  85. "users": usersCount,
  86. "emails_received": receivedMailTotal,
  87. "emails_received_success": receivedMailSuccess,
  88. "emails_received_failure": receivedMailFailure,
  89. "emails_sent": sentMailTotal,
  90. "emails_sent_success": sentMailSuccess,
  91. "emails_sent_failure": sentMailFailure,
  92. }).
  93. Info("Server stats")
  94. mset(metricMessagesCached, messagesCached)
  95. mset(metricVisitors, visitorsCount)
  96. mset(metricUsers, usersCount)
  97. mset(metricSubscribers, subscribers)
  98. mset(metricTopics, topicsCount)
  99. }
  100. func (s *Server) pruneVisitors() {
  101. staleVisitors := 0
  102. log.
  103. Tag(tagManager).
  104. Timing(func() {
  105. s.mu.Lock()
  106. defer s.mu.Unlock()
  107. for ip, v := range s.visitors {
  108. if v.Stale() {
  109. log.Tag(tagManager).With(v).Trace("Deleting stale visitor")
  110. delete(s.visitors, ip)
  111. staleVisitors++
  112. }
  113. }
  114. }).
  115. Field("stale_visitors", staleVisitors).
  116. Debug("Deleted %d stale visitor(s)", staleVisitors)
  117. }
  118. func (s *Server) pruneTokens() {
  119. if s.userManager != nil {
  120. log.
  121. Tag(tagManager).
  122. Timing(func() {
  123. if err := s.userManager.RemoveExpiredTokens(); err != nil {
  124. log.Tag(tagManager).Err(err).Warn("Error expiring user tokens")
  125. }
  126. if err := s.userManager.RemoveDeletedUsers(); err != nil {
  127. log.Tag(tagManager).Err(err).Warn("Error deleting soft-deleted users")
  128. }
  129. }).
  130. Debug("Removed expired tokens and users")
  131. }
  132. }
  133. func (s *Server) pruneAttachments() {
  134. if s.fileCache == nil {
  135. return
  136. }
  137. log.
  138. Tag(tagManager).
  139. Timing(func() {
  140. ids, err := s.messageCache.AttachmentsExpired()
  141. if err != nil {
  142. log.Tag(tagManager).Err(err).Warn("Error retrieving expired attachments")
  143. } else if len(ids) > 0 {
  144. if log.Tag(tagManager).IsDebug() {
  145. log.Tag(tagManager).Debug("Deleting attachments %s", strings.Join(ids, ", "))
  146. }
  147. if err := s.fileCache.Remove(ids...); err != nil {
  148. log.Tag(tagManager).Err(err).Warn("Error deleting attachments")
  149. }
  150. if err := s.messageCache.MarkAttachmentsDeleted(ids...); err != nil {
  151. log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
  152. }
  153. } else {
  154. log.Tag(tagManager).Debug("No expired attachments to delete")
  155. }
  156. }).
  157. Debug("Deleted expired attachments")
  158. }
  159. func (s *Server) pruneMessages() {
  160. log.
  161. Tag(tagManager).
  162. Timing(func() {
  163. expiredMessageIDs, err := s.messageCache.MessagesExpired()
  164. if err != nil {
  165. log.Tag(tagManager).Err(err).Warn("Error retrieving expired messages")
  166. } else if len(expiredMessageIDs) > 0 {
  167. if s.fileCache != nil {
  168. if err := s.fileCache.Remove(expiredMessageIDs...); err != nil {
  169. log.Tag(tagManager).Err(err).Warn("Error deleting attachments for expired messages")
  170. }
  171. }
  172. if err := s.messageCache.DeleteMessages(expiredMessageIDs...); err != nil {
  173. log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
  174. }
  175. } else {
  176. log.Tag(tagManager).Debug("No expired messages to delete")
  177. }
  178. }).
  179. Debug("Pruned messages")
  180. }