server_manager.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. package server
  2. import (
  3. "heckel.io/ntfy/log"
  4. "strings"
  5. )
  6. func (s *Server) execManager() {
  7. // WARNING: Make sure to only selectively lock with the mutex, and be aware that this
  8. // there is no mutex for the entire function.
  9. // Prune all the things
  10. s.pruneVisitors()
  11. s.pruneTokens()
  12. s.pruneAttachments()
  13. s.pruneMessages()
  14. // Message count per topic
  15. var messagesCached int
  16. messageCounts, err := s.messageCache.MessageCounts()
  17. if err != nil {
  18. log.Tag(tagManager).Err(err).Warn("Cannot get message counts")
  19. messageCounts = make(map[string]int) // Empty, so we can continue
  20. }
  21. for _, count := range messageCounts {
  22. messagesCached += count
  23. }
  24. // Remove subscriptions without subscribers
  25. var emptyTopics, subscribers int
  26. log.
  27. Tag(tagManager).
  28. Timing(func() {
  29. s.mu.Lock()
  30. defer s.mu.Unlock()
  31. for _, t := range s.topics {
  32. subs := t.SubscribersCount()
  33. ev := log.Tag(tagManager)
  34. if ev.IsTrace() {
  35. vrate := t.RateVisitor()
  36. if vrate != nil {
  37. ev.Fields(log.Context{
  38. "rate_visitor_ip": vrate.IP(),
  39. "rate_visitor_user_id": vrate.MaybeUserID(),
  40. })
  41. }
  42. ev.
  43. Fields(log.Context{
  44. "message_topic": t.ID,
  45. "message_topic_subscribers": subs,
  46. }).
  47. Trace("- topic %s: %d subscribers", t.ID, subs)
  48. }
  49. msgs, exists := messageCounts[t.ID]
  50. if t.Stale() && (!exists || msgs == 0) {
  51. log.Tag(tagManager).Field("message_topic", t.ID).Trace("Deleting empty topic %s", t.ID)
  52. emptyTopics++
  53. delete(s.topics, t.ID)
  54. continue
  55. }
  56. subscribers += subs
  57. }
  58. }).
  59. Debug("Removed %d empty topic(s)", emptyTopics)
  60. // Mail stats
  61. var receivedMailTotal, receivedMailSuccess, receivedMailFailure int64
  62. if s.smtpServerBackend != nil {
  63. receivedMailTotal, receivedMailSuccess, receivedMailFailure = s.smtpServerBackend.Counts()
  64. }
  65. var sentMailTotal, sentMailSuccess, sentMailFailure int64
  66. if s.smtpSender != nil {
  67. sentMailTotal, sentMailSuccess, sentMailFailure = s.smtpSender.Counts()
  68. }
  69. // Print stats
  70. s.mu.Lock()
  71. messagesCount, topicsCount, visitorsCount := s.messages, len(s.topics), len(s.visitors)
  72. s.mu.Unlock()
  73. log.
  74. Tag(tagManager).
  75. Fields(log.Context{
  76. "messages_published": messagesCount,
  77. "messages_cached": messagesCached,
  78. "topics_active": topicsCount,
  79. "subscribers": subscribers,
  80. "visitors": visitorsCount,
  81. "emails_received": receivedMailTotal,
  82. "emails_received_success": receivedMailSuccess,
  83. "emails_received_failure": receivedMailFailure,
  84. "emails_sent": sentMailTotal,
  85. "emails_sent_success": sentMailSuccess,
  86. "emails_sent_failure": sentMailFailure,
  87. }).
  88. Info("Server stats")
  89. }
  90. func (s *Server) pruneVisitors() {
  91. staleVisitors := 0
  92. log.
  93. Tag(tagManager).
  94. Timing(func() {
  95. s.mu.Lock()
  96. defer s.mu.Unlock()
  97. for ip, v := range s.visitors {
  98. if v.Stale() {
  99. log.Tag(tagManager).With(v).Trace("Deleting stale visitor")
  100. delete(s.visitors, ip)
  101. staleVisitors++
  102. }
  103. }
  104. }).
  105. Field("stale_visitors", staleVisitors).
  106. Debug("Deleted %d stale visitor(s)", staleVisitors)
  107. }
  108. func (s *Server) pruneTokens() {
  109. if s.userManager != nil {
  110. log.
  111. Tag(tagManager).
  112. Timing(func() {
  113. if err := s.userManager.RemoveExpiredTokens(); err != nil {
  114. log.Tag(tagManager).Err(err).Warn("Error expiring user tokens")
  115. }
  116. if err := s.userManager.RemoveDeletedUsers(); err != nil {
  117. log.Tag(tagManager).Err(err).Warn("Error deleting soft-deleted users")
  118. }
  119. }).
  120. Debug("Removed expired tokens and users")
  121. }
  122. }
  123. func (s *Server) pruneAttachments() {
  124. if s.fileCache == nil {
  125. return
  126. }
  127. log.
  128. Tag(tagManager).
  129. Timing(func() {
  130. ids, err := s.messageCache.AttachmentsExpired()
  131. if err != nil {
  132. log.Tag(tagManager).Err(err).Warn("Error retrieving expired attachments")
  133. } else if len(ids) > 0 {
  134. if log.Tag(tagManager).IsDebug() {
  135. log.Tag(tagManager).Debug("Deleting attachments %s", strings.Join(ids, ", "))
  136. }
  137. if err := s.fileCache.Remove(ids...); err != nil {
  138. log.Tag(tagManager).Err(err).Warn("Error deleting attachments")
  139. }
  140. if err := s.messageCache.MarkAttachmentsDeleted(ids...); err != nil {
  141. log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
  142. }
  143. } else {
  144. log.Tag(tagManager).Debug("No expired attachments to delete")
  145. }
  146. }).
  147. Debug("Deleted expired attachments")
  148. }
  149. func (s *Server) pruneMessages() {
  150. log.
  151. Tag(tagManager).
  152. Timing(func() {
  153. expiredMessageIDs, err := s.messageCache.MessagesExpired()
  154. if err != nil {
  155. log.Tag(tagManager).Err(err).Warn("Error retrieving expired messages")
  156. } else if len(expiredMessageIDs) > 0 {
  157. if s.fileCache != nil {
  158. if err := s.fileCache.Remove(expiredMessageIDs...); err != nil {
  159. log.Tag(tagManager).Err(err).Warn("Error deleting attachments for expired messages")
  160. }
  161. }
  162. if err := s.messageCache.DeleteMessages(expiredMessageIDs...); err != nil {
  163. log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
  164. }
  165. } else {
  166. log.Tag(tagManager).Debug("No expired messages to delete")
  167. }
  168. }).
  169. Debug("Pruned messages")
  170. }