Parcourir la source

WIP Reject 507s after a while

binwiederhier il y a 3 ans
Parent
commit
4d22ccc7f6
5 fichiers modifiés avec 45 ajouts et 13 suppressions
  1. 1 1
      server/errors.go
  2. 10 1
      server/server.go
  3. 12 7
      server/server_manager.go
  4. 1 0
      server/server_matrix.go
  5. 21 4
      server/topic.go

+ 1 - 1
server/errors.go

@@ -127,5 +127,5 @@ var (
 	errHTTPInternalError                             = &errHTTP{50001, http.StatusInternalServerError, "internal server error", "", nil}
 	errHTTPInternalErrorInvalidPath                  = &errHTTP{50002, http.StatusInternalServerError, "internal server error: invalid path", "", nil}
 	errHTTPInternalErrorMissingBaseURL               = &errHTTP{50003, http.StatusInternalServerError, "internal server error: base-url must be be configured for this feature", "https://ntfy.sh/docs/config/", nil}
-	errHTTPInsufficientStorage                       = &errHTTP{50701, http.StatusInsufficientStorage, "internal server error: cannot publish to UnifiedPush topic without previously active subscriber", "", nil}
+	errHTTPInsufficientStorageUnifiedPush            = &errHTTP{50701, http.StatusInsufficientStorage, "cannot publish to UnifiedPush topic without previously active subscriber", "", nil}
 )

+ 10 - 1
server/server.go

@@ -602,7 +602,7 @@ func (s *Server) handlePublishWithoutResponse(r *http.Request, v *visitor) (*mes
 		// Rate-Topics header). The 5xx response is because some app servers (in particular Mastodon) will remove
 		// the subscription as invalid if any 400-499 code (except 429/408) is returned.
 		// See https://github.com/mastodon/mastodon/blob/730bb3e211a84a2f30e3e2bbeae3f77149824a68/app/workers/web/push_notification_worker.rb#L35-L46
-		return nil, errHTTPInsufficientStorage.With(t)
+		return nil, errHTTPInsufficientStorageUnifiedPush.With(t)
 	} else if !util.ContainsIP(s.config.VisitorRequestExemptIPAddrs, v.ip) && !vrate.MessageAllowed() {
 		return nil, errHTTPTooManyRequestsLimitMessages.With(t)
 	} else if email != "" && !vrate.EmailAllowed() {
@@ -680,6 +680,9 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito
 func (s *Server) handlePublishMatrix(w http.ResponseWriter, r *http.Request, v *visitor) error {
 	_, err := s.handlePublishWithoutResponse(r, v)
 	if err != nil {
+		if e, ok := err.(*errHTTP); ok && e.HTTPCode == errHTTPInsufficientStorageUnifiedPush.HTTPCode {
+			return writeMatrixResponse(w, e.rejectedPushKey)
+		}
 		return err
 	}
 	return writeMatrixSuccess(w)
@@ -1036,6 +1039,9 @@ func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *
 		case <-time.After(s.config.KeepaliveInterval):
 			logvr(v, r).Tag(tagSubscribe).Trace("Sending keepalive message")
 			v.Keepalive()
+			for _, t := range topics {
+				t.Keepalive()
+			}
 			if err := sub(v, newKeepaliveMessage(topicsStr)); err != nil { // Send keepalive message
 				return err
 			}
@@ -1123,6 +1129,9 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
 				return &websocket.CloseError{Code: websocket.CloseNormalClosure, Text: "subscription was canceled"}
 			case <-time.After(s.config.KeepaliveInterval):
 				v.Keepalive()
+				for _, t := range topics {
+					t.Keepalive()
+				}
 				if err := ping(); err != nil {
 					return err
 				}

+ 12 - 7
server/server_manager.go

@@ -3,6 +3,7 @@ package server
 import (
 	"heckel.io/ntfy/log"
 	"strings"
+	"time"
 )
 
 func (s *Server) execManager() {
@@ -34,16 +35,20 @@ func (s *Server) execManager() {
 			s.mu.Lock()
 			defer s.mu.Unlock()
 			for _, t := range s.topics {
-				subs := t.SubscribersCount()
-				log.Tag(tagManager).With(t).Trace("- topic %s: %d subscribers", t.ID, subs)
-				msgs, exists := messageCounts[t.ID]
-				if t.Stale() && (!exists || msgs == 0) {
-					log.Tag(tagManager).With(t).Trace("Deleting empty topic %s", t.ID)
+				subs, lastAccess := t.Stats()
+				ev := log.Tag(tagManager).With(t)
+				if t.Stale() {
+					if ev.IsTrace() {
+						ev.Trace("- topic %s: Deleting stale topic (%d subscribers, accessed %s)", t.ID, subs, lastAccess.Format(time.RFC822))
+					}
 					emptyTopics++
 					delete(s.topics, t.ID)
-					continue
+				} else {
+					if ev.IsTrace() {
+						ev.Trace("- topic %s: %d subscribers, accessed %s", t.ID, subs, lastAccess.Format(time.RFC822))
+					}
+					subscribers += subs
 				}
-				subscribers += subs
 			}
 		}).
 		Debug("Removed %d empty topic(s)", emptyTopics)

+ 1 - 0
server/server_matrix.go

@@ -126,6 +126,7 @@ func newRequestFromMatrixJSON(r *http.Request, baseURL string, messageLimit int)
 	if r.Header.Get("X-Forwarded-For") != "" {
 		newRequest.Header.Set("X-Forwarded-For", r.Header.Get("X-Forwarded-For"))
 	}
+	newRequest.Header.Set("X-Matrix-Pushkey", pushKey)
 	return newRequest, nil
 }
 

+ 21 - 4
server/topic.go

@@ -4,6 +4,11 @@ import (
 	"heckel.io/ntfy/log"
 	"math/rand"
 	"sync"
+	"time"
+)
+
+const (
+	topicExpiryDuration = 6 * time.Hour
 )
 
 // topic represents a channel to which subscribers can subscribe, and publishers
@@ -12,6 +17,7 @@ type topic struct {
 	ID          string
 	subscribers map[int]*topicSubscriber
 	rateVisitor *visitor
+	lastAccess  time.Time
 	mu          sync.RWMutex
 }
 
@@ -29,6 +35,7 @@ func newTopic(id string) *topic {
 	return &topic{
 		ID:          id,
 		subscribers: make(map[int]*topicSubscriber),
+		lastAccess:  time.Now(),
 	}
 }
 
@@ -42,6 +49,7 @@ func (t *topic) Subscribe(s subscriber, userID string, cancel func()) int {
 		subscriber: s,
 		cancel:     cancel,
 	}
+	t.lastAccess = time.Now()
 	return subscriberID
 }
 
@@ -51,13 +59,14 @@ func (t *topic) Stale() bool {
 	if t.rateVisitor != nil && !t.rateVisitor.Stale() {
 		return false
 	}
-	return len(t.subscribers) == 0
+	return len(t.subscribers) == 0 && time.Since(t.lastAccess) > topicExpiryDuration
 }
 
 func (t *topic) SetRateVisitor(v *visitor) {
 	t.mu.Lock()
 	defer t.mu.Unlock()
 	t.rateVisitor = v
+	t.lastAccess = time.Now()
 }
 
 func (t *topic) RateVisitor() *visitor {
@@ -96,15 +105,23 @@ func (t *topic) Publish(v *visitor, m *message) error {
 		} else {
 			logvm(v, m).Tag(tagPublish).Trace("No stream or WebSocket subscribers, not forwarding")
 		}
+		t.Keepalive()
 	}()
 	return nil
 }
 
-// SubscribersCount returns the number of subscribers to this topic
-func (t *topic) SubscribersCount() int {
+// Stats returns the number of subscribers and last access to this topic
+func (t *topic) Stats() (int, time.Time) {
 	t.mu.RLock()
 	defer t.mu.RUnlock()
-	return len(t.subscribers)
+	return len(t.subscribers), t.lastAccess
+}
+
+// Keepalive sets the last access time and ensures that Stale does not return true
+func (t *topic) Keepalive() {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+	t.lastAccess = time.Now()
 }
 
 // CancelSubscribers calls the cancel function for all subscribers, forcing