ソースを参照

Adjust Matrix/UP behavior to work with Synapse+Mastodon

binwiederhier 3 年 前
コミット
0606fbe60a
5 ファイル変更34 行追加61 行削除
  1. 1 2
      server/errors.go
  2. 8 8
      server/server.go
  3. 9 30
      server/server_matrix.go
  4. 5 9
      server/server_matrix_test.go
  5. 11 12
      server/server_test.go

+ 1 - 2
server/errors.go

@@ -61,7 +61,6 @@ var (
 	errHTTPBadRequestMessageJSONInvalid              = &errHTTP{40017, http.StatusBadRequest, "invalid request: request body must be message JSON", "https://ntfy.sh/docs/publish/#publish-as-json"}
 	errHTTPBadRequestActionsInvalid                  = &errHTTP{40018, http.StatusBadRequest, "invalid request: actions invalid", "https://ntfy.sh/docs/publish/#action-buttons"}
 	errHTTPBadRequestMatrixMessageInvalid            = &errHTTP{40019, http.StatusBadRequest, "invalid request: Matrix JSON invalid", "https://ntfy.sh/docs/publish/#matrix-gateway"}
-	errHTTPBadRequestMatrixPushkeyBaseURLMismatch    = &errHTTP{40020, http.StatusBadRequest, "invalid request: push key must be prefixed with base URL", "https://ntfy.sh/docs/publish/#matrix-gateway"}
 	errHTTPBadRequestIconURLInvalid                  = &errHTTP{40021, http.StatusBadRequest, "invalid request: icon URL is invalid", "https://ntfy.sh/docs/publish/#icons"}
 	errHTTPBadRequestSignupNotEnabled                = &errHTTP{40022, http.StatusBadRequest, "invalid request: signup not enabled", "https://ntfy.sh/docs/config"}
 	errHTTPBadRequestNoTokenProvided                 = &errHTTP{40023, http.StatusBadRequest, "invalid request: no token provided", ""}
@@ -92,5 +91,5 @@ var (
 	errHTTPInternalError                             = &errHTTP{50001, http.StatusInternalServerError, "internal server error", ""}
 	errHTTPInternalErrorInvalidPath                  = &errHTTP{50002, http.StatusInternalServerError, "internal server error: invalid path", ""}
 	errHTTPInternalErrorMissingBaseURL               = &errHTTP{50003, http.StatusInternalServerError, "internal server error: base-url must be be configured for this feature", "https://ntfy.sh/docs/config/"}
-	errHTTPInsufficientStorage                       = &errHTTP{50701, http.StatusInsufficientStorage, "internal server error: cannot publish to UnifiedPush topic without active subscriber", ""}
+	errHTTPInsufficientStorage                       = &errHTTP{50701, http.StatusInsufficientStorage, "internal server error: cannot publish to UnifiedPush topic without previously active subscriber", ""}
 )

+ 8 - 8
server/server.go

@@ -328,12 +328,6 @@ func (s *Server) handleError(w http.ResponseWriter, r *http.Request, v *visitor,
 		}
 		return // Do not attempt to write to upgraded connection
 	}
-	if matrixErr, ok := err.(*errMatrix); ok {
-		if err := writeMatrixError(w, r, v, matrixErr); err != nil {
-			logvr(v, r).Tag(tagMatrix).Err(err).Debug("Writing Matrix error failed")
-		}
-		return
-	}
 	if isNormalError {
 		logvr(v, r).Err(err).Debug("Connection closed with HTTP %d (ntfy error %d)", httpErr.HTTPCode, httpErr.Code)
 	} else {
@@ -582,6 +576,10 @@ func (s *Server) handlePublishWithoutResponse(r *http.Request, v *visitor) (*mes
 		return nil, err
 	}
 	if unifiedpush && t.RateVisitor() == nil {
+		// UnifiedPush clients must subscribe before publishing to allow proper subscriber-based rate limiting (see
+		// Rate-Topics header). The 5xx response is because some app servers (in particular Mastodon) will remove
+		// the subscription as invalid if any 400-499 code (except 429/408) is returned.
+		// See https://github.com/mastodon/mastodon/blob/730bb3e211a84a2f30e3e2bbeae3f77149824a68/app/workers/web/push_notification_worker.rb#L35-L46
 		return nil, errHTTPInsufficientStorage
 	} else if !util.ContainsIP(s.config.VisitorRequestExemptIPAddrs, v.ip) && !vrate.MessageAllowed() {
 		return nil, errHTTPTooManyRequestsLimitMessages
@@ -1525,12 +1523,14 @@ func (s *Server) transformMatrixJSON(next handleFunc) handleFunc {
 		newRequest, err := newRequestFromMatrixJSON(r, s.config.BaseURL, s.config.MessageLimit)
 		if err != nil {
 			logvr(v, r).Tag(tagMatrix).Err(err).Debug("Invalid Matrix request")
+			if e, ok := err.(*errMatrixPushkeyRejected); ok {
+				return writeMatrixResponse(w, e.rejectedPushKey)
+			}
 			return err
 		}
 		if err := next(w, newRequest, v); err != nil {
 			logvr(v, r).Tag(tagMatrix).Err(err).Debug("Error handling Matrix request")
-			// No normal error should cause pushKey rejection; don't set errMatrix.pushKey.
-			return &errMatrix{err: err}
+			return err
 		}
 		return nil
 	}

+ 9 - 30
server/server_matrix.go

@@ -71,28 +71,19 @@ type matrixResponse struct {
 	Rejected []string `json:"rejected"`
 }
 
-// errMatrix represents an error when handing Matrix gateway messages
+// errMatrixPushkeyRejected represents an error when handing Matrix gateway messages
 //
-// If the pushKey is set, the app server will remove it and will never send messages using the same
+// If the push key is set, the app server will remove it and will never send messages using the same
 // push key again, until the user repairs it.
-type errMatrix struct {
-	pushKey string
-	err     error
+type errMatrixPushkeyRejected struct {
+	rejectedPushKey   string
+	configuredBaseURL string
 }
 
-func (e errMatrix) Error() string {
-	if e.err != nil {
-		return fmt.Sprintf("message with push key %s rejected: %s", e.pushKey, e.err.Error())
-	}
-	return fmt.Sprintf("message with push key %s rejected", e.pushKey)
+func (e errMatrixPushkeyRejected) Error() string {
+	return fmt.Sprintf("push key must be prefixed with base URL, received push key: %s, configured base URL: %s", e.rejectedPushKey, e.configuredBaseURL)
 }
 
-const (
-	// matrixPushKeyHeader is a header that's used internally to pass the Matrix push key (from the matrixRequest)
-	// along with the request. The push key is only used if an error occurs down the line.
-	matrixPushKeyHeader = "X-Matrix-Pushkey"
-)
-
 // newRequestFromMatrixJSON reads the request body as a Matrix JSON message, parses the "pushkey", and creates a new
 // HTTP request that looks like a normal ntfy request from it.
 //
@@ -125,17 +116,16 @@ func newRequestFromMatrixJSON(r *http.Request, baseURL string, messageLimit int)
 	}
 	pushKey := m.Notification.Devices[0].PushKey // We ignore other devices for now, see discussion in #316
 	if !strings.HasPrefix(pushKey, baseURL+"/") {
-		return nil, &errMatrix{pushKey: pushKey, err: wrapErrHTTP(errHTTPBadRequestMatrixPushkeyBaseURLMismatch, "received push key: %s, configured base URL: %s", pushKey, baseURL)}
+		return nil, &errMatrixPushkeyRejected{rejectedPushKey: pushKey, configuredBaseURL: baseURL}
 	}
 	newRequest, err := http.NewRequest(http.MethodPost, pushKey, io.NopCloser(bytes.NewReader(body.PeekedBytes)))
 	if err != nil {
-		return nil, &errMatrix{pushKey: pushKey, err: err}
+		return nil, err
 	}
 	newRequest.RemoteAddr = r.RemoteAddr // Not strictly necessary, since visitor was already extracted
 	if r.Header.Get("X-Forwarded-For") != "" {
 		newRequest.Header.Set("X-Forwarded-For", r.Header.Get("X-Forwarded-For"))
 	}
-	newRequest.Header.Set(matrixPushKeyHeader, pushKey)
 	return newRequest, nil
 }
 
@@ -147,17 +137,6 @@ func writeMatrixDiscoveryResponse(w http.ResponseWriter) error {
 	return err
 }
 
-// writeMatrixError logs and writes the errMatrix to the given http.ResponseWriter as a matrixResponse
-func writeMatrixError(w http.ResponseWriter, r *http.Request, v *visitor, err *errMatrix) error {
-	logvr(v, r).Tag(tagMatrix).Err(err).Debug("Matrix gateway error")
-	if httpErr, ok := err.err.(*errHTTP); ok {
-		w.Header().Set("X-Ntfy-Error-Code", fmt.Sprintf("%d", httpErr.Code))
-		w.Header().Set("X-Ntfy-Error-Message", httpErr.Message)
-		w.WriteHeader(httpErr.HTTPCode)
-	}
-	return writeMatrixResponse(w, err.pushKey)
-}
-
 // writeMatrixSuccess writes a successful matrixResponse (no rejected push key) to the given http.ResponseWriter
 func writeMatrixSuccess(w http.ResponseWriter) error {
 	return writeMatrixResponse(w, "")

+ 5 - 9
server/server_matrix_test.go

@@ -3,7 +3,6 @@ package server
 import (
 	"net/http"
 	"net/http/httptest"
-	"net/netip"
 	"strings"
 	"testing"
 
@@ -19,7 +18,6 @@ func TestMatrix_NewRequestFromMatrixJSON_Success(t *testing.T) {
 	require.Nil(t, err)
 	require.Equal(t, "POST", newRequest.Method)
 	require.Equal(t, "https://ntfy.sh/upABCDEFGHI?up=1", newRequest.URL.String())
-	require.Equal(t, "https://ntfy.sh/upABCDEFGHI?up=1", newRequest.Header.Get("X-Matrix-Pushkey"))
 	require.Equal(t, body, readAll(t, newRequest.Body))
 }
 
@@ -56,10 +54,10 @@ func TestMatrix_NewRequestFromMatrixJSON_MismatchingPushKey(t *testing.T) {
 	body := `{"notification":{"content":{"body":"I'm floating in a most peculiar way.","msgtype":"m.text"},"counts":{"missed_calls":1,"unread":2},"devices":[{"app_id":"org.matrix.matrixConsole.ios","data":{},"pushkey":"https://ntfy.example.com/upABCDEFGHI?up=1","pushkey_ts":12345678,"tweaks":{"sound":"bing"}}],"event_id":"$3957tyerfgewrf384","prio":"high","room_alias":"#exampleroom:matrix.org","room_id":"!slw48wfj34rtnrf:example.com","room_name":"Mission Control","sender":"@exampleuser:matrix.org","sender_display_name":"Major Tom","type":"m.room.message"}}`
 	r, _ := http.NewRequest("POST", "http://ntfy.example.com/_matrix/push/v1/notify", strings.NewReader(body))
 	_, err := newRequestFromMatrixJSON(r, baseURL, maxLength)
-	matrixErr, ok := err.(*errMatrix)
+	matrixErr, ok := err.(*errMatrixPushkeyRejected)
 	require.True(t, ok)
-	require.Equal(t, "invalid request: push key must be prefixed with base URL, received push key: https://ntfy.example.com/upABCDEFGHI?up=1, configured base URL: https://ntfy.sh", matrixErr.err.Error())
-	require.Equal(t, "https://ntfy.example.com/upABCDEFGHI?up=1", matrixErr.pushKey)
+	require.Equal(t, "push key must be prefixed with base URL, received push key: https://ntfy.example.com/upABCDEFGHI?up=1, configured base URL: https://ntfy.sh", matrixErr.Error())
+	require.Equal(t, "https://ntfy.example.com/upABCDEFGHI?up=1", matrixErr.rejectedPushKey)
 }
 
 func TestMatrix_WriteMatrixDiscoveryResponse(t *testing.T) {
@@ -71,10 +69,8 @@ func TestMatrix_WriteMatrixDiscoveryResponse(t *testing.T) {
 
 func TestMatrix_WriteMatrixError(t *testing.T) {
 	w := httptest.NewRecorder()
-	r, _ := http.NewRequest("POST", "http://ntfy.example.com/_matrix/push/v1/notify", nil)
-	v := newVisitor(newTestConfig(t), nil, nil, netip.MustParseAddr("1.2.3.4"), nil)
-	require.Nil(t, writeMatrixError(w, r, v, &errMatrix{"https://ntfy.example.com/upABCDEFGHI?up=1", errHTTPBadRequestMatrixPushkeyBaseURLMismatch}))
-	require.Equal(t, 400, w.Result().StatusCode)
+	require.Nil(t, writeMatrixResponse(w, "https://ntfy.example.com/upABCDEFGHI?up=1"))
+	require.Equal(t, 200, w.Result().StatusCode)
 	require.Equal(t, `{"rejected":["https://ntfy.example.com/upABCDEFGHI?up=1"]}`+"\n", w.Body.String())
 }
 

+ 11 - 12
server/server_test.go

@@ -1231,7 +1231,7 @@ func TestServer_MatrixGateway_Push_Success(t *testing.T) {
 	s := newTestServer(t, newTestConfig(t))
 
 	response := request(t, s, "GET", "/mytopic/json?poll=1", "", map[string]string{
-		"Rate-Topics": "mytopic",
+		"Rate-Topics": "mytopic", // Register first!
 	})
 	require.Equal(t, 200, response.Code)
 
@@ -1251,17 +1251,15 @@ func TestServer_MatrixGateway_Push_Failure_NoSubscriber(t *testing.T) {
 	notification := `{"notification":{"devices":[{"pushkey":"http://127.0.0.1:12345/mytopic?up=1"}]}}`
 	response := request(t, s, "POST", "/_matrix/push/v1/notify", notification, nil)
 	require.Equal(t, 507, response.Code)
-	require.Equal(t, `{"rejected":[]}`+"\n", response.Body.String())
+	require.Equal(t, 50701, toHTTPError(t, response.Body.String()).Code)
 }
 
 func TestServer_MatrixGateway_Push_Failure_InvalidPushkey(t *testing.T) {
 	s := newTestServer(t, newTestConfig(t))
 	notification := `{"notification":{"devices":[{"pushkey":"http://wrong-base-url.com/mytopic?up=1"}]}}`
 	response := request(t, s, "POST", "/_matrix/push/v1/notify", notification, nil)
-	require.Equal(t, 400, response.Code)
+	require.Equal(t, 200, response.Code)
 	require.Equal(t, `{"rejected":["http://wrong-base-url.com/mytopic?up=1"]}`+"\n", response.Body.String())
-	require.Equal(t, "40020", response.Header().Get("X-Ntfy-Error-Code"))
-	require.Equal(t, "invalid request: push key must be prefixed with base URL, received push key: http://wrong-base-url.com/mytopic?up=1, configured base URL: http://127.0.0.1:12345", response.Header().Get("X-Ntfy-Error-Message"))
 
 	response = request(t, s, "GET", "/mytopic/json?poll=1", "", nil)
 	require.Equal(t, 200, response.Code)
@@ -1273,9 +1271,12 @@ func TestServer_MatrixGateway_Push_Failure_EverythingIsWrong(t *testing.T) {
 	notification := `{"message":"this is not really a Matrix message"}`
 	response := request(t, s, "POST", "/_matrix/push/v1/notify", notification, nil)
 	require.Equal(t, 400, response.Code)
-	err := toHTTPError(t, response.Body.String())
-	require.Equal(t, 40019, err.Code)
-	require.Equal(t, 400, err.HTTPCode)
+	require.Equal(t, 40019, toHTTPError(t, response.Body.String()).Code)
+
+	notification = `this isn't even JSON'`
+	response = request(t, s, "POST", "/_matrix/push/v1/notify", notification, nil)
+	require.Equal(t, 400, response.Code)
+	require.Equal(t, 40019, toHTTPError(t, response.Body.String()).Code)
 }
 
 func TestServer_MatrixGateway_Push_Failure_Unconfigured(t *testing.T) {
@@ -1285,9 +1286,7 @@ func TestServer_MatrixGateway_Push_Failure_Unconfigured(t *testing.T) {
 	notification := `{"notification":{"devices":[{"pushkey":"http://127.0.0.1:12345/mytopic?up=1"}]}}`
 	response := request(t, s, "POST", "/_matrix/push/v1/notify", notification, nil)
 	require.Equal(t, 500, response.Code)
-	err := toHTTPError(t, response.Body.String())
-	require.Equal(t, 50003, err.Code)
-	require.Equal(t, 500, err.HTTPCode)
+	require.Equal(t, 50003, toHTTPError(t, response.Body.String()).Code)
 }
 
 func TestServer_PublishActions_AndPoll(t *testing.T) {
@@ -2077,7 +2076,7 @@ func TestServer_Matrix_SubscriberRateLimiting_UP_Only(t *testing.T) {
 		}
 		response := request(t, s, "POST", "/_matrix/push/v1/notify", notification, nil)
 		require.Equal(t, 429, response.Code, notification)
-		require.Equal(t, `{"rejected":[]}`+"\n", response.Body.String())
+		require.Equal(t, 42901, toHTTPError(t, response.Body.String()).Code)
 	}
 }