1
0
binwiederhier 3 жил өмнө
parent
commit
bed60b71ff

+ 0 - 147
server/server.go

@@ -1300,153 +1300,6 @@ func (s *Server) topicFromID(id string) (*topic, error) {
 	return topics[0], nil
 }
 
-func (s *Server) execManager() {
-	// WARNING: Make sure to only selectively lock with the mutex, and be aware that this
-	//          there is no mutex for the entire function.
-
-	// Expire visitors from rate visitors map
-	staleVisitors := 0
-	log.
-		Tag(tagManager).
-		Timing(func() {
-			s.mu.Lock()
-			defer s.mu.Unlock()
-			for ip, v := range s.visitors {
-				if v.Stale() {
-					log.Tag(tagManager).With(v).Trace("Deleting stale visitor")
-					delete(s.visitors, ip)
-					staleVisitors++
-				}
-			}
-		}).
-		Field("stale_visitors", staleVisitors).
-		Debug("Deleted %d stale visitor(s)", staleVisitors)
-
-	// Delete expired user tokens and users
-	if s.userManager != nil {
-		log.
-			Tag(tagManager).
-			Timing(func() {
-				if err := s.userManager.RemoveExpiredTokens(); err != nil {
-					log.Tag(tagManager).Err(err).Warn("Error expiring user tokens")
-				}
-				if err := s.userManager.RemoveDeletedUsers(); err != nil {
-					log.Tag(tagManager).Err(err).Warn("Error deleting soft-deleted users")
-				}
-			}).
-			Debug("Removed expired tokens and users")
-	}
-
-	// Delete expired attachments
-	if s.fileCache != nil {
-		log.
-			Tag(tagManager).
-			Timing(func() {
-				ids, err := s.messageCache.AttachmentsExpired()
-				if err != nil {
-					log.Tag(tagManager).Err(err).Warn("Error retrieving expired attachments")
-				} else if len(ids) > 0 {
-					if log.Tag(tagManager).IsDebug() {
-						log.Tag(tagManager).Debug("Deleting attachments %s", strings.Join(ids, ", "))
-					}
-					if err := s.fileCache.Remove(ids...); err != nil {
-						log.Tag(tagManager).Err(err).Warn("Error deleting attachments")
-					}
-					if err := s.messageCache.MarkAttachmentsDeleted(ids...); err != nil {
-						log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
-					}
-				} else {
-					log.Tag(tagManager).Debug("No expired attachments to delete")
-				}
-			}).
-			Debug("Deleted expired attachments")
-	}
-
-	// Prune messages
-	log.
-		Tag(tagManager).
-		Timing(func() {
-			expiredMessageIDs, err := s.messageCache.MessagesExpired()
-			if err != nil {
-				log.Tag(tagManager).Err(err).Warn("Error retrieving expired messages")
-			} else if len(expiredMessageIDs) > 0 {
-				if err := s.fileCache.Remove(expiredMessageIDs...); err != nil {
-					log.Tag(tagManager).Err(err).Warn("Error deleting attachments for expired messages")
-				}
-				if err := s.messageCache.DeleteMessages(expiredMessageIDs...); err != nil {
-					log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
-				}
-			} else {
-				log.Tag(tagManager).Debug("No expired messages to delete")
-			}
-		}).
-		Debug("Pruned messages")
-
-	// Message count per topic
-	var messagesCached int
-	messageCounts, err := s.messageCache.MessageCounts()
-	if err != nil {
-		log.Tag(tagManager).Err(err).Warn("Cannot get message counts")
-		messageCounts = make(map[string]int) // Empty, so we can continue
-	}
-	for _, count := range messageCounts {
-		messagesCached += count
-	}
-
-	// Remove subscriptions without subscribers
-	var emptyTopics, subscribers int
-	log.
-		Tag(tagManager).
-		Timing(func() {
-			s.mu.Lock()
-			defer s.mu.Unlock()
-			for _, t := range s.topics {
-				subs := t.SubscribersCount()
-				log.Tag(tagManager).Trace("- topic %s: %d subscribers", t.ID, subs)
-				msgs, exists := messageCounts[t.ID]
-				if subs == 0 && (!exists || msgs == 0) {
-					log.Tag(tagManager).Trace("Deleting empty topic %s", t.ID)
-					emptyTopics++
-					delete(s.topics, t.ID)
-					continue
-				}
-				subscribers += subs
-			}
-		}).
-		Debug("Removed %d empty topic(s)", emptyTopics)
-
-	// Mail stats
-	var receivedMailTotal, receivedMailSuccess, receivedMailFailure int64
-	if s.smtpServerBackend != nil {
-		receivedMailTotal, receivedMailSuccess, receivedMailFailure = s.smtpServerBackend.Counts()
-	}
-	var sentMailTotal, sentMailSuccess, sentMailFailure int64
-	if s.smtpSender != nil {
-		sentMailTotal, sentMailSuccess, sentMailFailure = s.smtpSender.Counts()
-	}
-
-	// Print stats
-	s.mu.Lock()
-	messagesCount, topicsCount, visitorsCount := s.messages, len(s.topics), len(s.visitors)
-	s.mu.Unlock()
-	log.
-		Tag(tagManager).
-		Fields(log.Context{
-			"messages_published":      messagesCount,
-			"messages_cached":         messagesCached,
-			"topics_active":           topicsCount,
-			"subscribers":             subscribers,
-			"visitors":                visitorsCount,
-			"emails_received":         receivedMailTotal,
-			"emails_received_success": receivedMailSuccess,
-			"emails_received_failure": receivedMailFailure,
-			"emails_sent":             sentMailTotal,
-			"emails_sent_success":     sentMailSuccess,
-			"emails_sent_failure":     sentMailFailure,
-		}).
-		Info("Server stats")
-}
-
 func (s *Server) runSMTPServer() error {
 	s.smtpServerBackend = newMailBackend(s.config, s.handle)
 	s.smtpServer = smtp.NewServer(s.smtpServerBackend)

+ 1 - 0
server/server_account.go

@@ -479,6 +479,7 @@ func (s *Server) handleAccountReservationDelete(w http.ResponseWriter, r *http.R
 		if err := s.messageCache.ExpireMessages(topic); err != nil {
 			return err
 		}
+		s.pruneMessages()
 	}
 	return s.writeJSON(w, newSuccessResponse())
 }

+ 19 - 2
server/server_account_test.go

@@ -669,8 +669,8 @@ func TestAccount_Reservation_Delete_Messages_And_Attachments(t *testing.T) {
 	require.Equal(t, 200, rr.Code)
 
 	// Verify that messages and attachments were deleted
+	// This does not explicitly call the manager!
 	time.Sleep(time.Second)
-	s.execManager()
 
 	ms, err := s.messageCache.Messages("mytopic1", sinceAllMessages, false)
 	require.Nil(t, err)
@@ -804,10 +804,27 @@ func TestAccount_Persist_UserStats_After_Tier_Change(t *testing.T) {
 		"Authorization": util.BasicAuth("phil", "phil"),
 	})
 	require.Equal(t, 200, rr.Code)
+	account, _ := util.UnmarshalJSON[apiAccountResponse](io.NopCloser(rr.Body))
+	require.Equal(t, int64(1), account.Stats.Messages) // Is not reset!
+
+	// Publish another message
+	rr = request(t, s, "POST", "/mytopic", "hi", map[string]string{
+		"Authorization": util.BasicAuth("phil", "phil"),
+	})
+	require.Equal(t, 200, rr.Code)
 
 	// Verify that message stats were persisted
 	time.Sleep(300 * time.Millisecond)
 	u, err = s.userManager.User("phil")
 	require.Nil(t, err)
-	require.Equal(t, int64(0), u.Stats.Messages) // v.EnqueueUserStats had run!
+	require.Equal(t, int64(2), u.Stats.Messages) // v.EnqueueUserStats had run!
+
+	// Stats keep counting
+	rr = request(t, s, "GET", "/v1/account", "", map[string]string{
+		"Authorization": util.BasicAuth("phil", "phil"),
+	})
+	require.Equal(t, 200, rr.Code)
+	account, _ = util.UnmarshalJSON[apiAccountResponse](io.NopCloser(rr.Body))
+	require.Equal(t, int64(2), account.Stats.Messages) // Is not reset!
+
 }

+ 163 - 0
server/server_manager.go

@@ -0,0 +1,163 @@
+package server
+
+import (
+	"heckel.io/ntfy/log"
+	"strings"
+)
+
+func (s *Server) execManager() {
+	// WARNING: Make sure to only selectively lock with the mutex, and be aware that this
+	//          there is no mutex for the entire function.
+
+	// Prune all the things
+	s.pruneVisitors()
+	s.pruneTokens()
+	s.pruneAttachments()
+	s.pruneMessages()
+
+	// Message count per topic
+	var messagesCached int
+	messageCounts, err := s.messageCache.MessageCounts()
+	if err != nil {
+		log.Tag(tagManager).Err(err).Warn("Cannot get message counts")
+		messageCounts = make(map[string]int) // Empty, so we can continue
+	}
+	for _, count := range messageCounts {
+		messagesCached += count
+	}
+
+	// Remove subscriptions without subscribers
+	var emptyTopics, subscribers int
+	log.
+		Tag(tagManager).
+		Timing(func() {
+			s.mu.Lock()
+			defer s.mu.Unlock()
+			for _, t := range s.topics {
+				subs := t.SubscribersCount()
+				log.Tag(tagManager).Trace("- topic %s: %d subscribers", t.ID, subs)
+				msgs, exists := messageCounts[t.ID]
+				if subs == 0 && (!exists || msgs == 0) {
+					log.Tag(tagManager).Trace("Deleting empty topic %s", t.ID)
+					emptyTopics++
+					delete(s.topics, t.ID)
+					continue
+				}
+				subscribers += subs
+			}
+		}).
+		Debug("Removed %d empty topic(s)", emptyTopics)
+
+	// Mail stats
+	var receivedMailTotal, receivedMailSuccess, receivedMailFailure int64
+	if s.smtpServerBackend != nil {
+		receivedMailTotal, receivedMailSuccess, receivedMailFailure = s.smtpServerBackend.Counts()
+	}
+	var sentMailTotal, sentMailSuccess, sentMailFailure int64
+	if s.smtpSender != nil {
+		sentMailTotal, sentMailSuccess, sentMailFailure = s.smtpSender.Counts()
+	}
+
+	// Print stats
+	s.mu.Lock()
+	messagesCount, topicsCount, visitorsCount := s.messages, len(s.topics), len(s.visitors)
+	s.mu.Unlock()
+	log.
+		Tag(tagManager).
+		Fields(log.Context{
+			"messages_published":      messagesCount,
+			"messages_cached":         messagesCached,
+			"topics_active":           topicsCount,
+			"subscribers":             subscribers,
+			"visitors":                visitorsCount,
+			"emails_received":         receivedMailTotal,
+			"emails_received_success": receivedMailSuccess,
+			"emails_received_failure": receivedMailFailure,
+			"emails_sent":             sentMailTotal,
+			"emails_sent_success":     sentMailSuccess,
+			"emails_sent_failure":     sentMailFailure,
+		}).
+		Info("Server stats")
+}
+
+func (s *Server) pruneVisitors() {
+	staleVisitors := 0
+	log.
+		Tag(tagManager).
+		Timing(func() {
+			s.mu.Lock()
+			defer s.mu.Unlock()
+			for ip, v := range s.visitors {
+				if v.Stale() {
+					log.Tag(tagManager).With(v).Trace("Deleting stale visitor")
+					delete(s.visitors, ip)
+					staleVisitors++
+				}
+			}
+		}).
+		Field("stale_visitors", staleVisitors).
+		Debug("Deleted %d stale visitor(s)", staleVisitors)
+}
+
+func (s *Server) pruneTokens() {
+	if s.userManager != nil {
+		log.
+			Tag(tagManager).
+			Timing(func() {
+				if err := s.userManager.RemoveExpiredTokens(); err != nil {
+					log.Tag(tagManager).Err(err).Warn("Error expiring user tokens")
+				}
+				if err := s.userManager.RemoveDeletedUsers(); err != nil {
+					log.Tag(tagManager).Err(err).Warn("Error deleting soft-deleted users")
+				}
+			}).
+			Debug("Removed expired tokens and users")
+	}
+}
+
+func (s *Server) pruneAttachments() {
+	if s.fileCache != nil {
+		log.
+			Tag(tagManager).
+			Timing(func() {
+				ids, err := s.messageCache.AttachmentsExpired()
+				if err != nil {
+					log.Tag(tagManager).Err(err).Warn("Error retrieving expired attachments")
+				} else if len(ids) > 0 {
+					if log.Tag(tagManager).IsDebug() {
+						log.Tag(tagManager).Debug("Deleting attachments %s", strings.Join(ids, ", "))
+					}
+					if err := s.fileCache.Remove(ids...); err != nil {
+						log.Tag(tagManager).Err(err).Warn("Error deleting attachments")
+					}
+					if err := s.messageCache.MarkAttachmentsDeleted(ids...); err != nil {
+						log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
+					}
+				} else {
+					log.Tag(tagManager).Debug("No expired attachments to delete")
+				}
+			}).
+			Debug("Deleted expired attachments")
+	}
+}
+
+func (s *Server) pruneMessages() {
+	log.
+		Tag(tagManager).
+		Timing(func() {
+			expiredMessageIDs, err := s.messageCache.MessagesExpired()
+			if err != nil {
+				log.Tag(tagManager).Err(err).Warn("Error retrieving expired messages")
+			} else if len(expiredMessageIDs) > 0 {
+				if err := s.fileCache.Remove(expiredMessageIDs...); err != nil {
+					log.Tag(tagManager).Err(err).Warn("Error deleting attachments for expired messages")
+				}
+				if err := s.messageCache.DeleteMessages(expiredMessageIDs...); err != nil {
+					log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
+				}
+			} else {
+				log.Tag(tagManager).Debug("No expired messages to delete")
+			}
+		}).
+		Debug("Pruned messages")
+}

+ 4 - 3
server/visitor.go

@@ -159,8 +159,9 @@ func (v *visitor) contextNoLock() log.Context {
 		fields["user_id"] = v.user.ID
 		fields["user_name"] = v.user.Name
 		if v.user.Tier != nil {
-			fields["tier_id"] = v.user.Tier.ID
-			fields["tier_name"] = v.user.Tier.Name
+			for field, value := range v.user.Tier.Context() {
+				fields[field] = value
+			}
 		}
 		if v.user.Billing.StripeCustomerID != "" {
 			fields["stripe_customer_id"] = v.user.Billing.StripeCustomerID
@@ -331,7 +332,7 @@ func (v *visitor) SetUser(u *user.User) {
 	shouldResetLimiters := v.user.TierID() != u.TierID() // TierID works with nil receiver
 	v.user = u
 	if shouldResetLimiters {
-		v.resetLimitersNoLock(0, 0, true)
+		v.resetLimitersNoLock(u.Stats.Messages, u.Stats.Emails, true)
 	}
 }