binwiederhier 3 лет назад
Родитель
Сommit
d44a11325d
5 измененных файлов с 99 добавлено и 58 удалено
  1. 1 11
      server/log.go
  2. 42 30
      server/server.go
  3. 2 1
      server/server_account.go
  4. 45 16
      server/visitor.go
  5. 9 0
      util/util.go

+ 1 - 11
server/log.go

@@ -4,7 +4,6 @@ import (
 	"fmt"
 	"github.com/emersion/go-smtp"
 	"github.com/gorilla/websocket"
-	"golang.org/x/time/rate"
 	"heckel.io/ntfy/log"
 	"heckel.io/ntfy/util"
 	"net/http"
@@ -24,9 +23,7 @@ func logv(v *visitor) *log.Event {
 
 // logr creates a new log event with HTTP request and visitor fields
 func logvr(v *visitor, r *http.Request) *log.Event {
-	return logv(v).
-		Fields(httpContext(r)).
-		Fields(requestLimiterFields(v.RequestLimiter()))
+	return logv(v).Fields(httpContext(r))
 }
 
 // logvrm creates a new log event with HTTP request, visitor fields and message fields
@@ -73,13 +70,6 @@ func websocketErrorContext(err error) log.Context {
 	}
 }
 
-func requestLimiterFields(limiter *rate.Limiter) map[string]any {
-	return map[string]any{
-		"visitor_request_limiter_limit":  limiter.Limit(),
-		"visitor_request_limiter_tokens": limiter.Tokens(),
-	}
-}
-
 func renderHTTPRequest(r *http.Request) string {
 	peekLimit := 4096
 	lines := fmt.Sprintf("%s %s %s\n", r.Method, r.URL.RequestURI(), r.Proto)

+ 42 - 30
server/server.go

@@ -37,7 +37,6 @@ import (
 - HIGH Rate limiting: Sensitive endpoints (account/login/change-password/...)
 - HIGH Account limit creation triggers when account is taken!
 - HIGH Docs
-- HIGH make request limit independent of message limit again
 - HIGH Self-review
 - MEDIUM: Test for expiring messages after reservation removal
 - MEDIUM: Test new token endpoints & never-expiring token
@@ -138,6 +137,7 @@ const (
 
 // Log tags
 const (
+	tagStartup      = "startup"
 	tagPublish      = "publish"
 	tagFirebase     = "firebase"
 	tagEmail        = "email" // Send email
@@ -233,7 +233,7 @@ func (s *Server) Run() error {
 	if s.config.SMTPServerListen != "" {
 		listenStr += fmt.Sprintf(" %s[smtp]", s.config.SMTPServerListen)
 	}
-	log.Info("Listening on%s, ntfy %s, log level is %s", listenStr, s.config.Version, log.CurrentLevel().String())
+	log.Tag(tagStartup).Info("Listening on%s, ntfy %s, log level is %s", listenStr, s.config.Version, log.CurrentLevel().String())
 	if log.IsFile() {
 		fmt.Fprintf(os.Stderr, "Listening on%s, ntfy %s\n", listenStr, s.config.Version)
 		fmt.Fprintf(os.Stderr, "Logs are written to %s\n", log.File())
@@ -347,15 +347,15 @@ func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
 		if !ok {
 			httpErr = errHTTPInternalError
 		}
-		isNormalError := httpErr.HTTPCode == http.StatusNotFound || httpErr.HTTPCode == http.StatusBadRequest
+		isNormalError := httpErr.HTTPCode == http.StatusNotFound || httpErr.HTTPCode == http.StatusBadRequest || httpErr.HTTPCode == http.StatusTooManyRequests
 		if isNormalError {
 			logvr(v, r).
 				Err(httpErr).
-				Debug("Connection closed with HTTP %d (ntfy error %d): %s", httpErr.HTTPCode, httpErr.Code, err.Error())
+				Debug("Connection closed with HTTP %d (ntfy error %d)", httpErr.HTTPCode, httpErr.Code)
 		} else {
 			logvr(v, r).
 				Err(httpErr).
-				Info("Connection closed with HTTP %d (ntfy error %d): %s", httpErr.HTTPCode, httpErr.Code, err.Error())
+				Info("Connection closed with HTTP %d (ntfy error %d)", httpErr.HTTPCode, httpErr.Code)
 		}
 		w.Header().Set("Content-Type", "application/json")
 		w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
@@ -1294,21 +1294,21 @@ func (s *Server) execManager() {
 	staleVisitors := 0
 	for ip, v := range s.visitors {
 		if v.Stale() {
-			log.Trace("Deleting stale visitor %s", v.ip)
+			log.Tag(tagManager).With(v).Trace("Deleting stale visitor")
 			delete(s.visitors, ip)
 			staleVisitors++
 		}
 	}
 	s.mu.Unlock()
-	log.Debug("Manager: Deleted %d stale visitor(s)", staleVisitors)
+	log.Tag(tagManager).Field("stale_visitors", staleVisitors).Debug("Deleted %d stale visitor(s)", staleVisitors)
 
 	// Delete expired user tokens and users
 	if s.userManager != nil {
 		if err := s.userManager.RemoveExpiredTokens(); err != nil {
-			log.Warn("Error expiring user tokens: %s", err.Error())
+			log.Tag(tagManager).Err(err).Warn("Error expiring user tokens")
 		}
 		if err := s.userManager.RemoveDeletedUsers(); err != nil {
-			log.Warn("Error deleting soft-deleted users: %s", err.Error())
+			log.Tag(tagManager).Err(err).Warn("Error deleting soft-deleted users")
 		}
 	}
 
@@ -1316,47 +1316,47 @@ func (s *Server) execManager() {
 	if s.fileCache != nil {
 		ids, err := s.messageCache.AttachmentsExpired()
 		if err != nil {
-			log.Warn("Manager: Error retrieving expired attachments: %s", err.Error())
+			log.Tag(tagManager).Err(err).Warn("Error retrieving expired attachments")
 		} else if len(ids) > 0 {
-			if log.IsDebug() {
-				log.Debug("Manager: Deleting attachments %s", strings.Join(ids, ", "))
+			if log.Tag(tagManager).IsDebug() {
+				log.Tag(tagManager).Debug("Deleting attachments %s", strings.Join(ids, ", "))
 			}
 			if err := s.fileCache.Remove(ids...); err != nil {
-				log.Warn("Manager: Error deleting attachments: %s", err.Error())
+				log.Tag(tagManager).Err(err).Warn("Error deleting attachments")
 			}
 			if err := s.messageCache.MarkAttachmentsDeleted(ids...); err != nil {
-				log.Warn("Manager: Error marking attachments deleted: %s", err.Error())
+				log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
 			}
 		} else {
-			log.Debug("Manager: No expired attachments to delete")
+			log.Tag(tagManager).Debug("No expired attachments to delete")
 		}
 	}
 
 	// Prune messages
-	log.Debug("Manager: Pruning messages")
+	log.Tag(tagManager).Debug("Manager: Pruning messages")
 	expiredMessageIDs, err := s.messageCache.MessagesExpired()
 	if err != nil {
-		log.Warn("Manager: Error retrieving expired messages: %s", err.Error())
+		log.Tag(tagManager).Err(err).Warn("Error retrieving expired messages")
 	} else if len(expiredMessageIDs) > 0 {
 		if err := s.fileCache.Remove(expiredMessageIDs...); err != nil {
-			log.Warn("Manager: Error deleting attachments for expired messages: %s", err.Error())
+			log.Tag(tagManager).Err(err).Warn("Error deleting attachments for expired messages")
 		}
 		if err := s.messageCache.DeleteMessages(expiredMessageIDs...); err != nil {
-			log.Warn("Manager: Error marking attachments deleted: %s", err.Error())
+			log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
 		}
 	} else {
-		log.Debug("Manager: No expired messages to delete")
+		log.Tag(tagManager).Debug("No expired messages to delete")
 	}
 
 	// Message count per topic
-	var messages int
+	var messagesCached int
 	messageCounts, err := s.messageCache.MessageCounts()
 	if err != nil {
-		log.Warn("Manager: Cannot get message counts: %s", err.Error())
+		log.Tag(tagManager).Err(err).Warn("Cannot get message counts")
 		messageCounts = make(map[string]int) // Empty, so we can continue
 	}
 	for _, count := range messageCounts {
-		messages += count
+		messagesCached += count
 	}
 
 	// Remove subscriptions without subscribers
@@ -1364,10 +1364,10 @@ func (s *Server) execManager() {
 	var subscribers int
 	for _, t := range s.topics {
 		subs := t.SubscribersCount()
-		log.Trace("- topic %s: %d subscribers", t.ID, subs)
+		log.Tag(tagManager).Trace("- topic %s: %d subscribers", t.ID, subs)
 		msgs, exists := messageCounts[t.ID]
 		if subs == 0 && (!exists || msgs == 0) {
-			log.Trace("Deleting empty topic %s", t.ID)
+			log.Tag(tagManager).Trace("Deleting empty topic %s", t.ID)
 			delete(s.topics, t.ID)
 			continue
 		}
@@ -1389,10 +1389,22 @@ func (s *Server) execManager() {
 	s.mu.Lock()
 	messagesCount, topicsCount, visitorsCount := s.messages, len(s.topics), len(s.visitors)
 	s.mu.Unlock()
-	log.Info("Stats: %d messages published, %d in cache, %d topic(s) active, %d subscriber(s), %d visitor(s), %d mails received (%d successful, %d failed), %d mails sent (%d successful, %d failed)",
-		messagesCount, messages, topicsCount, subscribers, visitorsCount,
-		receivedMailTotal, receivedMailSuccess, receivedMailFailure,
-		sentMailTotal, sentMailSuccess, sentMailFailure)
+	log.
+		Tag(tagManager).
+		Fields(log.Context{
+			"messages_published":      messagesCount,
+			"messages_cached":         messagesCached,
+			"topics_active":           topicsCount,
+			"subscribers":             subscribers,
+			"visitors":                visitorsCount,
+			"emails_received":         receivedMailTotal,
+			"emails_received_success": receivedMailSuccess,
+			"emails_received_failure": receivedMailFailure,
+			"emails_sent":             sentMailTotal,
+			"emails_sent_success":     sentMailSuccess,
+			"emails_sent_failure":     sentMailFailure,
+		}).
+		Info("Server stats")
 }
 
 func (s *Server) runSMTPServer() error {
@@ -1534,7 +1546,7 @@ func (s *Server) limitRequests(next handleFunc) handleFunc {
 		if util.ContainsIP(s.config.VisitorRequestExemptIPAddrs, v.ip) {
 			return next(w, r, v)
 		} else if err := v.RequestAllowed(); err != nil {
-			logvr(v, r).Err(err).Fields(requestLimiterFields(v.RequestLimiter())).Trace("Request not allowed by rate limiter")
+			logvr(v, r).Err(err).Trace("Request not allowed by rate limiter")
 			return errHTTPTooManyRequestsLimitRequests
 		}
 		return next(w, r, v)

+ 2 - 1
server/server_account.go

@@ -42,11 +42,12 @@ func (s *Server) handleAccountCreate(w http.ResponseWriter, r *http.Request, v *
 	return s.writeJSON(w, newSuccessResponse())
 }
 
-func (s *Server) handleAccountGet(w http.ResponseWriter, _ *http.Request, v *visitor) error {
+func (s *Server) handleAccountGet(w http.ResponseWriter, r *http.Request, v *visitor) error {
 	info, err := v.Info()
 	if err != nil {
 		return err
 	}
+	logvr(v, r).Tag(tagAccount).Fields(visitorExtendedInfoContext(info)).Debug("Retrieving account stats")
 	limits, stats := info.Limits, info.Stats
 	response := &apiAccountResponse{
 		Limits: &apiAccountLimits{

+ 45 - 16
server/visitor.go

@@ -142,8 +142,17 @@ func (v *visitor) Context() log.Context {
 }
 
 func (v *visitor) contextNoLock() log.Context {
+	info := v.infoLightNoLock()
 	fields := log.Context{
-		"visitor_ip": v.ip.String(),
+		"visitor_ip":                     v.ip.String(),
+		"visitor_messages":               info.Stats.Messages,
+		"visitor_messages_limit":         info.Limits.MessageLimit,
+		"visitor_messages_remaining":     info.Stats.MessagesRemaining,
+		"visitor_emails":                 info.Stats.Emails,
+		"visitor_emails_limit":           info.Limits.EmailLimit,
+		"visitor_emails_remaining":       info.Stats.EmailsRemaining,
+		"visitor_request_limiter_limit":  v.requestLimiter.Limit(),
+		"visitor_request_limiter_tokens": v.requestLimiter.Tokens(),
 	}
 	if v.user != nil {
 		fields["user_id"] = v.user.ID
@@ -162,6 +171,17 @@ func (v *visitor) contextNoLock() log.Context {
 	return fields
 }
 
+func visitorExtendedInfoContext(info *visitorInfo) log.Context {
+	return log.Context{
+		"visitor_reservations":                    info.Stats.Reservations,
+		"visitor_reservations_limit":              info.Limits.ReservationsLimit,
+		"visitor_reservations_remaining":          info.Stats.ReservationsRemaining,
+		"visitor_attachment_total_size":           info.Stats.AttachmentTotalSize,
+		"visitor_attachment_total_size_limit":     info.Limits.AttachmentTotalSizeLimit,
+		"visitor_attachment_total_size_remaining": info.Stats.AttachmentTotalSizeRemaining,
+	}
+
+}
 func (v *visitor) RequestAllowed() error {
 	v.mu.Lock() // limiters could be replaced!
 	defer v.mu.Unlock()
@@ -309,7 +329,6 @@ func (v *visitor) MaybeUserID() string {
 }
 
 func (v *visitor) resetLimitersNoLock(messages, emails int64, enqueueUpdate bool) {
-	log.Fields(v.contextNoLock()).Debug("Resetting limiters for visitor")
 	limits := v.limitsNoLock()
 	v.requestLimiter = rate.NewLimiter(limits.RequestLimitReplenish, limits.RequestLimitBurst)
 	v.messagesLimiter = util.NewFixedLimiterWithValue(limits.MessageLimit, messages)
@@ -326,6 +345,7 @@ func (v *visitor) resetLimitersNoLock(messages, emails int64, enqueueUpdate bool
 			Emails:   emails,
 		})
 	}
+	log.Fields(v.contextNoLock()).Debug("Rate limiters reset for visitor") // Must be after function, because contextNoLock() describes rate limiters
 }
 
 func (v *visitor) Limits() *visitorLimits {
@@ -345,7 +365,7 @@ func tierBasedVisitorLimits(conf *Config, tier *user.Tier) *visitorLimits {
 	return &visitorLimits{
 		Basis:                    visitorLimitBasisTier,
 		RequestLimitBurst:        util.MinMax(int(float64(tier.MessageLimit)*visitorMessageToRequestLimitBurstRate), conf.VisitorRequestLimitBurst, visitorMessageToRequestLimitBurstMax),
-		RequestLimitReplenish:    dailyLimitToRate(tier.MessageLimit * visitorMessageToRequestLimitReplenishFactor),
+		RequestLimitReplenish:    util.Max(rate.Every(conf.VisitorRequestLimitReplenish), dailyLimitToRate(tier.MessageLimit*visitorMessageToRequestLimitReplenishFactor)),
 		MessageLimit:             tier.MessageLimit,
 		MessageExpiryDuration:    tier.MessageExpiryDuration,
 		EmailLimit:               tier.EmailLimit,
@@ -383,9 +403,10 @@ func configBasedVisitorLimits(conf *Config) *visitorLimits {
 
 func (v *visitor) Info() (*visitorInfo, error) {
 	v.mu.Lock()
-	messages := v.messagesLimiter.Value()
-	emails := v.emailsLimiter.Value()
+	info := v.infoLightNoLock()
 	v.mu.Unlock()
+
+	// Attachment stats from database
 	var attachmentsBytesUsed int64
 	var err error
 	u := v.User()
@@ -397,6 +418,10 @@ func (v *visitor) Info() (*visitorInfo, error) {
 	if err != nil {
 		return nil, err
 	}
+	info.Stats.AttachmentTotalSize = attachmentsBytesUsed
+	info.Stats.AttachmentTotalSizeRemaining = zeroIfNegative(info.Limits.AttachmentTotalSizeLimit - attachmentsBytesUsed)
+
+	// Reservation stats from database
 	var reservations int64
 	if v.userManager != nil && u != nil {
 		reservations, err = v.userManager.ReservationsCount(u.Name)
@@ -404,23 +429,27 @@ func (v *visitor) Info() (*visitorInfo, error) {
 			return nil, err
 		}
 	}
-	limits := v.Limits()
+	info.Stats.Reservations = reservations
+	info.Stats.ReservationsRemaining = zeroIfNegative(info.Limits.ReservationsLimit - reservations)
+
+	return info, nil
+}
+
+func (v *visitor) infoLightNoLock() *visitorInfo {
+	messages := v.messagesLimiter.Value()
+	emails := v.emailsLimiter.Value()
+	limits := v.limitsNoLock()
 	stats := &visitorStats{
-		Messages:                     messages,
-		MessagesRemaining:            zeroIfNegative(limits.MessageLimit - messages),
-		Emails:                       emails,
-		EmailsRemaining:              zeroIfNegative(limits.EmailLimit - emails),
-		Reservations:                 reservations,
-		ReservationsRemaining:        zeroIfNegative(limits.ReservationsLimit - reservations),
-		AttachmentTotalSize:          attachmentsBytesUsed,
-		AttachmentTotalSizeRemaining: zeroIfNegative(limits.AttachmentTotalSizeLimit - attachmentsBytesUsed),
+		Messages:          messages,
+		MessagesRemaining: zeroIfNegative(limits.MessageLimit - messages),
+		Emails:            emails,
+		EmailsRemaining:   zeroIfNegative(limits.EmailLimit - emails),
 	}
 	return &visitorInfo{
 		Limits: limits,
 		Stats:  stats,
-	}, nil
+	}
 }
-
 func zeroIfNegative(value int64) int64 {
 	if value < 0 {
 		return 0

+ 9 - 0
util/util.go

@@ -6,6 +6,7 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
+	"golang.org/x/time/rate"
 	"io"
 	"math/rand"
 	"net/netip"
@@ -365,6 +366,14 @@ func MinMax[T int | int64](value, min, max T) T {
 	return value
 }
 
+// Max returns the maximum value of the two given values
+func Max[T int | int64 | rate.Limit](a, b T) T {
+	if a > b {
+		return a
+	}
+	return b
+}
+
 // String turns a string into a pointer of a string
 func String(v string) *string {
 	return &v