|
@@ -10,6 +10,8 @@ import (
|
|
|
"firebase.google.com/go/messaging"
|
|
"firebase.google.com/go/messaging"
|
|
|
"fmt"
|
|
"fmt"
|
|
|
"github.com/emersion/go-smtp"
|
|
"github.com/emersion/go-smtp"
|
|
|
|
|
+ "github.com/gorilla/websocket"
|
|
|
|
|
+ "golang.org/x/sync/errgroup"
|
|
|
"google.golang.org/api/option"
|
|
"google.golang.org/api/option"
|
|
|
"heckel.io/ntfy/util"
|
|
"heckel.io/ntfy/util"
|
|
|
"html/template"
|
|
"html/template"
|
|
@@ -30,9 +32,6 @@ import (
|
|
|
"unicode/utf8"
|
|
"unicode/utf8"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
-// TODO add "max messages in a topic" limit
|
|
|
|
|
-// TODO implement "since=<ID>"
|
|
|
|
|
-
|
|
|
|
|
// Server is the main server, providing the UI and API for ntfy
|
|
// Server is the main server, providing the UI and API for ntfy
|
|
|
type Server struct {
|
|
type Server struct {
|
|
|
config *Config
|
|
config *Config
|
|
@@ -52,53 +51,18 @@ type Server struct {
|
|
|
mu sync.Mutex
|
|
mu sync.Mutex
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// errHTTP is a generic HTTP error for any non-200 HTTP error
|
|
|
|
|
-type errHTTP struct {
|
|
|
|
|
- Code int `json:"code,omitempty"`
|
|
|
|
|
- HTTPCode int `json:"http"`
|
|
|
|
|
- Message string `json:"error"`
|
|
|
|
|
- Link string `json:"link,omitempty"`
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (e errHTTP) Error() string {
|
|
|
|
|
- return e.Message
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (e errHTTP) JSON() string {
|
|
|
|
|
- b, _ := json.Marshal(&e)
|
|
|
|
|
- return string(b)
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
type indexPage struct {
|
|
type indexPage struct {
|
|
|
Topic string
|
|
Topic string
|
|
|
CacheDuration time.Duration
|
|
CacheDuration time.Duration
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-type sinceTime time.Time
|
|
|
|
|
-
|
|
|
|
|
-func (t sinceTime) IsAll() bool {
|
|
|
|
|
- return t == sinceAllMessages
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (t sinceTime) IsNone() bool {
|
|
|
|
|
- return t == sinceNoMessages
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (t sinceTime) Time() time.Time {
|
|
|
|
|
- return time.Time(t)
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-var (
|
|
|
|
|
- sinceAllMessages = sinceTime(time.Unix(0, 0))
|
|
|
|
|
- sinceNoMessages = sinceTime(time.Unix(1, 0))
|
|
|
|
|
-)
|
|
|
|
|
-
|
|
|
|
|
var (
|
|
var (
|
|
|
topicRegex = regexp.MustCompile(`^[-_A-Za-z0-9]{1,64}$`) // No /!
|
|
topicRegex = regexp.MustCompile(`^[-_A-Za-z0-9]{1,64}$`) // No /!
|
|
|
topicPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app!
|
|
topicPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app!
|
|
|
jsonPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/json$`)
|
|
jsonPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/json$`)
|
|
|
ssePathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/sse$`)
|
|
ssePathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/sse$`)
|
|
|
rawPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/raw$`)
|
|
rawPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/raw$`)
|
|
|
|
|
+ wsPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/ws$`)
|
|
|
publishPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/(publish|send|trigger)$`)
|
|
publishPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/(publish|send|trigger)$`)
|
|
|
|
|
|
|
|
staticRegex = regexp.MustCompile(`^/static/.+`)
|
|
staticRegex = regexp.MustCompile(`^/static/.+`)
|
|
@@ -125,37 +89,20 @@ var (
|
|
|
//go:embed docs
|
|
//go:embed docs
|
|
|
docsStaticFs embed.FS
|
|
docsStaticFs embed.FS
|
|
|
docsStaticCached = &util.CachingEmbedFS{ModTime: time.Now(), FS: docsStaticFs}
|
|
docsStaticCached = &util.CachingEmbedFS{ModTime: time.Now(), FS: docsStaticFs}
|
|
|
-
|
|
|
|
|
- errHTTPBadRequestEmailDisabled = &errHTTP{40001, http.StatusBadRequest, "e-mail notifications are not enabled", "https://ntfy.sh/docs/config/#e-mail-notifications"}
|
|
|
|
|
- errHTTPBadRequestDelayNoCache = &errHTTP{40002, http.StatusBadRequest, "cannot disable cache for delayed message", ""}
|
|
|
|
|
- errHTTPBadRequestDelayNoEmail = &errHTTP{40003, http.StatusBadRequest, "delayed e-mail notifications are not supported", ""}
|
|
|
|
|
- errHTTPBadRequestDelayCannotParse = &errHTTP{40004, http.StatusBadRequest, "invalid delay parameter: unable to parse delay", "https://ntfy.sh/docs/publish/#scheduled-delivery"}
|
|
|
|
|
- errHTTPBadRequestDelayTooSmall = &errHTTP{40005, http.StatusBadRequest, "invalid delay parameter: too small, please refer to the docs", "https://ntfy.sh/docs/publish/#scheduled-delivery"}
|
|
|
|
|
- errHTTPBadRequestDelayTooLarge = &errHTTP{40006, http.StatusBadRequest, "invalid delay parameter: too large, please refer to the docs", "https://ntfy.sh/docs/publish/#scheduled-delivery"}
|
|
|
|
|
- errHTTPBadRequestPriorityInvalid = &errHTTP{40007, http.StatusBadRequest, "invalid priority parameter", "https://ntfy.sh/docs/publish/#message-priority"}
|
|
|
|
|
- errHTTPBadRequestSinceInvalid = &errHTTP{40008, http.StatusBadRequest, "invalid since parameter", "https://ntfy.sh/docs/subscribe/api/#fetch-cached-messages"}
|
|
|
|
|
- errHTTPBadRequestTopicInvalid = &errHTTP{40009, http.StatusBadRequest, "invalid topic: path invalid", ""}
|
|
|
|
|
- errHTTPBadRequestTopicDisallowed = &errHTTP{40010, http.StatusBadRequest, "invalid topic: topic name is disallowed", ""}
|
|
|
|
|
- errHTTPBadRequestMessageNotUTF8 = &errHTTP{40011, http.StatusBadRequest, "invalid message: message must be UTF-8 encoded", ""}
|
|
|
|
|
- errHTTPBadRequestAttachmentTooLarge = &errHTTP{40012, http.StatusBadRequest, "invalid request: attachment too large, or bandwidth limit reached", ""}
|
|
|
|
|
- errHTTPBadRequestAttachmentURLInvalid = &errHTTP{40013, http.StatusBadRequest, "invalid request: attachment URL is invalid", ""}
|
|
|
|
|
- errHTTPBadRequestAttachmentsDisallowed = &errHTTP{40014, http.StatusBadRequest, "invalid request: attachments not allowed", ""}
|
|
|
|
|
- errHTTPBadRequestAttachmentsExpiryBeforeDelivery = &errHTTP{40015, http.StatusBadRequest, "invalid request: attachment expiry before delayed delivery date", ""}
|
|
|
|
|
- errHTTPNotFound = &errHTTP{40401, http.StatusNotFound, "page not found", ""}
|
|
|
|
|
- errHTTPTooManyRequestsLimitRequests = &errHTTP{42901, http.StatusTooManyRequests, "limit reached: too many requests, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
|
|
|
|
|
- errHTTPTooManyRequestsLimitEmails = &errHTTP{42902, http.StatusTooManyRequests, "limit reached: too many emails, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
|
|
|
|
|
- errHTTPTooManyRequestsLimitSubscriptions = &errHTTP{42903, http.StatusTooManyRequests, "limit reached: too many active subscriptions, please be nice", "https://ntfy.sh/docs/publish/#limitations"}
|
|
|
|
|
- errHTTPTooManyRequestsLimitTotalTopics = &errHTTP{42904, http.StatusTooManyRequests, "limit reached: the total number of topics on the server has been reached, please contact the admin", "https://ntfy.sh/docs/publish/#limitations"}
|
|
|
|
|
- errHTTPTooManyRequestsAttachmentBandwidthLimit = &errHTTP{42905, http.StatusTooManyRequests, "too many requests: daily bandwidth limit reached", "https://ntfy.sh/docs/publish/#limitations"}
|
|
|
|
|
- errHTTPInternalError = &errHTTP{50001, http.StatusInternalServerError, "internal server error", ""}
|
|
|
|
|
- errHTTPInternalErrorInvalidFilePath = &errHTTP{50002, http.StatusInternalServerError, "internal server error: invalid file path", ""}
|
|
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
const (
|
|
|
firebaseControlTopic = "~control" // See Android if changed
|
|
firebaseControlTopic = "~control" // See Android if changed
|
|
|
emptyMessageBody = "triggered" // Used if message body is empty
|
|
emptyMessageBody = "triggered" // Used if message body is empty
|
|
|
defaultAttachmentMessage = "You received a file: %s" // Used if message body is empty, and there is an attachment
|
|
defaultAttachmentMessage = "You received a file: %s" // Used if message body is empty, and there is an attachment
|
|
|
- fcmMessageLimit = 4000 // see maybeTruncateFCMMessage for details
|
|
|
|
|
|
|
+)
|
|
|
|
|
+
|
|
|
|
|
+// WebSocket constants
|
|
|
|
|
+const (
|
|
|
|
|
+ wsWriteWait = 2 * time.Second
|
|
|
|
|
+ wsBufferSize = 1024
|
|
|
|
|
+ wsReadLimit = 64 // We only ever receive PINGs
|
|
|
|
|
+ wsPongWait = 15 * time.Second
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
// New instantiates a new Server. It creates the cache and adds a Firebase
|
|
// New instantiates a new Server. It creates the cache and adds a Firebase
|
|
@@ -262,25 +209,6 @@ func createFirebaseSubscriber(conf *Config) (subscriber, error) {
|
|
|
}, nil
|
|
}, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// maybeTruncateFCMMessage performs best-effort truncation of FCM messages.
|
|
|
|
|
-// The docs say the limit is 4000 characters, but during testing it wasn't quite clear
|
|
|
|
|
-// what fields matter; so we're just capping the serialized JSON to 4000 bytes.
|
|
|
|
|
-func maybeTruncateFCMMessage(m *messaging.Message) *messaging.Message {
|
|
|
|
|
- s, err := json.Marshal(m)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return m
|
|
|
|
|
- }
|
|
|
|
|
- if len(s) > fcmMessageLimit {
|
|
|
|
|
- over := len(s) - fcmMessageLimit + 16 // = len("truncated":"1",), sigh ...
|
|
|
|
|
- message, ok := m.Data["message"]
|
|
|
|
|
- if ok && len(message) > over {
|
|
|
|
|
- m.Data["truncated"] = "1"
|
|
|
|
|
- m.Data["message"] = message[:len(message)-over]
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- return m
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
// Run executes the main server. It listens on HTTP (+ HTTPS, if configured), and starts
|
|
// Run executes the main server. It listens on HTTP (+ HTTPS, if configured), and starts
|
|
|
// a manager go routine to print stats and prune messages.
|
|
// a manager go routine to print stats and prune messages.
|
|
|
func (s *Server) Run() error {
|
|
func (s *Server) Run() error {
|
|
@@ -364,16 +292,19 @@ func (s *Server) Stop() {
|
|
|
|
|
|
|
|
func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
|
|
func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
|
|
|
if err := s.handleInternal(w, r); err != nil {
|
|
if err := s.handleInternal(w, r); err != nil {
|
|
|
- var e *errHTTP
|
|
|
|
|
- var ok bool
|
|
|
|
|
- if e, ok = err.(*errHTTP); !ok {
|
|
|
|
|
- e = errHTTPInternalError
|
|
|
|
|
|
|
+ if websocket.IsWebSocketUpgrade(r) {
|
|
|
|
|
+ log.Printf("[%s] WS %s %s - %s", r.RemoteAddr, r.Method, r.URL.Path, err.Error())
|
|
|
|
|
+ return // Do not attempt to write to upgraded connection
|
|
|
}
|
|
}
|
|
|
- log.Printf("[%s] %s - %d - %d - %s", r.RemoteAddr, r.Method, e.HTTPCode, e.Code, err.Error())
|
|
|
|
|
|
|
+ httpErr, ok := err.(*errHTTP)
|
|
|
|
|
+ if !ok {
|
|
|
|
|
+ httpErr = errHTTPInternalError
|
|
|
|
|
+ }
|
|
|
|
|
+ log.Printf("[%s] HTTP %s %s - %d - %d - %s", r.RemoteAddr, r.Method, r.URL.Path, httpErr.HTTPCode, httpErr.Code, err.Error())
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
|
|
w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
|
|
|
- w.WriteHeader(e.HTTPCode)
|
|
|
|
|
- io.WriteString(w, e.JSON()+"\n")
|
|
|
|
|
|
|
+ w.WriteHeader(httpErr.HTTPCode)
|
|
|
|
|
+ io.WriteString(w, httpErr.JSON()+"\n")
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -404,6 +335,8 @@ func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request) error {
|
|
|
return s.withRateLimit(w, r, s.handleSubscribeSSE)
|
|
return s.withRateLimit(w, r, s.handleSubscribeSSE)
|
|
|
} else if r.Method == http.MethodGet && rawPathRegex.MatchString(r.URL.Path) {
|
|
} else if r.Method == http.MethodGet && rawPathRegex.MatchString(r.URL.Path) {
|
|
|
return s.withRateLimit(w, r, s.handleSubscribeRaw)
|
|
return s.withRateLimit(w, r, s.handleSubscribeRaw)
|
|
|
|
|
+ } else if r.Method == http.MethodGet && wsPathRegex.MatchString(r.URL.Path) {
|
|
|
|
|
+ return s.withRateLimit(w, r, s.handleSubscribeWS)
|
|
|
}
|
|
}
|
|
|
return errHTTPNotFound
|
|
return errHTTPNotFound
|
|
|
}
|
|
}
|
|
@@ -416,7 +349,7 @@ func (s *Server) handleHome(w http.ResponseWriter, r *http.Request) error {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (s *Server) handleTopic(w http.ResponseWriter, r *http.Request) error {
|
|
func (s *Server) handleTopic(w http.ResponseWriter, r *http.Request) error {
|
|
|
- unifiedpush := readParam(r, "x-unifiedpush", "unifiedpush", "up") == "1" // see PUT/POST too!
|
|
|
|
|
|
|
+ unifiedpush := readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see PUT/POST too!
|
|
|
if unifiedpush {
|
|
if unifiedpush {
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
|
|
w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
|
|
@@ -522,13 +455,15 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito
|
|
|
if err := json.NewEncoder(w).Encode(m); err != nil {
|
|
if err := json.NewEncoder(w).Encode(m); err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
- s.inc(&s.messages)
|
|
|
|
|
|
|
+ s.mu.Lock()
|
|
|
|
|
+ s.messages++
|
|
|
|
|
+ s.mu.Unlock()
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (s *Server) parsePublishParams(r *http.Request, v *visitor, m *message) (cache bool, firebase bool, email string, err error) {
|
|
func (s *Server) parsePublishParams(r *http.Request, v *visitor, m *message) (cache bool, firebase bool, email string, err error) {
|
|
|
- cache = readParam(r, "x-cache", "cache") != "no"
|
|
|
|
|
- firebase = readParam(r, "x-firebase", "firebase") != "no"
|
|
|
|
|
|
|
+ cache = readBoolParam(r, true, "x-cache", "cache")
|
|
|
|
|
+ firebase = readBoolParam(r, true, "x-firebase", "firebase")
|
|
|
m.Title = readParam(r, "x-title", "title", "t")
|
|
m.Title = readParam(r, "x-title", "title", "t")
|
|
|
m.Click = readParam(r, "x-click", "click")
|
|
m.Click = readParam(r, "x-click", "click")
|
|
|
filename := readParam(r, "x-filename", "filename", "file", "f")
|
|
filename := readParam(r, "x-filename", "filename", "file", "f")
|
|
@@ -599,29 +534,13 @@ func (s *Server) parsePublishParams(r *http.Request, v *visitor, m *message) (ca
|
|
|
}
|
|
}
|
|
|
m.Time = delay.Unix()
|
|
m.Time = delay.Unix()
|
|
|
}
|
|
}
|
|
|
- unifiedpush := readParam(r, "x-unifiedpush", "unifiedpush", "up") == "1" // see GET too!
|
|
|
|
|
|
|
+ unifiedpush := readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see GET too!
|
|
|
if unifiedpush {
|
|
if unifiedpush {
|
|
|
firebase = false
|
|
firebase = false
|
|
|
}
|
|
}
|
|
|
return cache, firebase, email, nil
|
|
return cache, firebase, email, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func readParam(r *http.Request, names ...string) string {
|
|
|
|
|
- for _, name := range names {
|
|
|
|
|
- value := r.Header.Get(name)
|
|
|
|
|
- if value != "" {
|
|
|
|
|
- return strings.TrimSpace(value)
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- for _, name := range names {
|
|
|
|
|
- value := r.URL.Query().Get(strings.ToLower(name))
|
|
|
|
|
- if value != "" {
|
|
|
|
|
- return strings.TrimSpace(value)
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- return ""
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
// handlePublishBody consumes the PUT/POST body and decides whether the body is an attachment or the message.
|
|
// handlePublishBody consumes the PUT/POST body and decides whether the body is an attachment or the message.
|
|
|
//
|
|
//
|
|
|
// 1. curl -H "Attach: http://example.com/file.jpg" ntfy.sh/mytopic
|
|
// 1. curl -H "Attach: http://example.com/file.jpg" ntfy.sh/mytopic
|
|
@@ -705,7 +624,7 @@ func (s *Server) handleSubscribeJSON(w http.ResponseWriter, r *http.Request, v *
|
|
|
}
|
|
}
|
|
|
return buf.String(), nil
|
|
return buf.String(), nil
|
|
|
}
|
|
}
|
|
|
- return s.handleSubscribe(w, r, v, "json", "application/x-ndjson", encoder)
|
|
|
|
|
|
|
+ return s.handleSubscribeHTTP(w, r, v, "application/x-ndjson", encoder)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request, v *visitor) error {
|
|
func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request, v *visitor) error {
|
|
@@ -719,7 +638,7 @@ func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request, v *v
|
|
|
}
|
|
}
|
|
|
return fmt.Sprintf("data: %s\n", buf.String()), nil
|
|
return fmt.Sprintf("data: %s\n", buf.String()), nil
|
|
|
}
|
|
}
|
|
|
- return s.handleSubscribe(w, r, v, "sse", "text/event-stream", encoder)
|
|
|
|
|
|
|
+ return s.handleSubscribeHTTP(w, r, v, "text/event-stream", encoder)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request, v *visitor) error {
|
|
func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request, v *visitor) error {
|
|
@@ -729,33 +648,25 @@ func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request, v *v
|
|
|
}
|
|
}
|
|
|
return "\n", nil // "keepalive" and "open" events just send an empty line
|
|
return "\n", nil // "keepalive" and "open" events just send an empty line
|
|
|
}
|
|
}
|
|
|
- return s.handleSubscribe(w, r, v, "raw", "text/plain", encoder)
|
|
|
|
|
|
|
+ return s.handleSubscribeHTTP(w, r, v, "text/plain", encoder)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visitor, format string, contentType string, encoder messageEncoder) error {
|
|
|
|
|
|
|
+func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *visitor, contentType string, encoder messageEncoder) error {
|
|
|
if err := v.SubscriptionAllowed(); err != nil {
|
|
if err := v.SubscriptionAllowed(); err != nil {
|
|
|
return errHTTPTooManyRequestsLimitSubscriptions
|
|
return errHTTPTooManyRequestsLimitSubscriptions
|
|
|
}
|
|
}
|
|
|
defer v.RemoveSubscription()
|
|
defer v.RemoveSubscription()
|
|
|
- topicsStr := strings.TrimSuffix(r.URL.Path[1:], "/"+format) // Hack
|
|
|
|
|
- topicIDs := util.SplitNoEmpty(topicsStr, ",")
|
|
|
|
|
- topics, err := s.topicsFromIDs(topicIDs...)
|
|
|
|
|
|
|
+ topics, topicsStr, err := s.topicsFromPath(r.URL.Path)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
- poll := readParam(r, "x-poll", "poll", "po") == "1"
|
|
|
|
|
- scheduled := readParam(r, "x-scheduled", "scheduled", "sched") == "1"
|
|
|
|
|
- since, err := parseSince(r, poll)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
- messageFilter, titleFilter, priorityFilter, tagsFilter, err := parseQueryFilters(r)
|
|
|
|
|
|
|
+ poll, since, scheduled, filters, err := parseSubscribeParams(r)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
var wlock sync.Mutex
|
|
var wlock sync.Mutex
|
|
|
sub := func(msg *message) error {
|
|
sub := func(msg *message) error {
|
|
|
- if !passesQueryFilter(msg, messageFilter, titleFilter, priorityFilter, tagsFilter) {
|
|
|
|
|
|
|
+ if !filters.Pass(msg) {
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
m, err := encoder(msg)
|
|
m, err := encoder(msg)
|
|
@@ -805,42 +716,119 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func parseQueryFilters(r *http.Request) (messageFilter string, titleFilter string, priorityFilter []int, tagsFilter []string, err error) {
|
|
|
|
|
- messageFilter = readParam(r, "x-message", "message", "m")
|
|
|
|
|
- titleFilter = readParam(r, "x-title", "title", "t")
|
|
|
|
|
- tagsFilter = util.SplitNoEmpty(readParam(r, "x-tags", "tags", "tag", "ta"), ",")
|
|
|
|
|
- priorityFilter = make([]int, 0)
|
|
|
|
|
- for _, p := range util.SplitNoEmpty(readParam(r, "x-priority", "priority", "prio", "p"), ",") {
|
|
|
|
|
- priority, err := util.ParsePriority(p)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return "", "", nil, nil, err
|
|
|
|
|
|
|
+func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *visitor) error {
|
|
|
|
|
+ if err := v.SubscriptionAllowed(); err != nil {
|
|
|
|
|
+ return errHTTPTooManyRequestsLimitSubscriptions
|
|
|
|
|
+ }
|
|
|
|
|
+ defer v.RemoveSubscription()
|
|
|
|
|
+ topics, topicsStr, err := s.topicsFromPath(r.URL.Path)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ poll, since, scheduled, filters, err := parseSubscribeParams(r)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ upgrader := &websocket.Upgrader{
|
|
|
|
|
+ ReadBufferSize: wsBufferSize,
|
|
|
|
|
+ WriteBufferSize: wsBufferSize,
|
|
|
|
|
+ CheckOrigin: func(r *http.Request) bool {
|
|
|
|
|
+ return true // We're open for business!
|
|
|
|
|
+ },
|
|
|
|
|
+ }
|
|
|
|
|
+ conn, err := upgrader.Upgrade(w, r, nil)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ defer conn.Close()
|
|
|
|
|
+ var wlock sync.Mutex
|
|
|
|
|
+ g, ctx := errgroup.WithContext(context.Background())
|
|
|
|
|
+ g.Go(func() error {
|
|
|
|
|
+ pongWait := s.config.KeepaliveInterval + wsPongWait
|
|
|
|
|
+ conn.SetReadLimit(wsReadLimit)
|
|
|
|
|
+ if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ conn.SetPongHandler(func(appData string) error {
|
|
|
|
|
+ return conn.SetReadDeadline(time.Now().Add(pongWait))
|
|
|
|
|
+ })
|
|
|
|
|
+ for {
|
|
|
|
|
+ _, _, err := conn.NextReader()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ })
|
|
|
|
|
+ g.Go(func() error {
|
|
|
|
|
+ ping := func() error {
|
|
|
|
|
+ wlock.Lock()
|
|
|
|
|
+ defer wlock.Unlock()
|
|
|
|
|
+ if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ return conn.WriteMessage(websocket.PingMessage, nil)
|
|
|
}
|
|
}
|
|
|
- priorityFilter = append(priorityFilter, priority)
|
|
|
|
|
|
|
+ for {
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-ctx.Done():
|
|
|
|
|
+ return nil
|
|
|
|
|
+ case <-time.After(s.config.KeepaliveInterval):
|
|
|
|
|
+ v.Keepalive()
|
|
|
|
|
+ if err := ping(); err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ })
|
|
|
|
|
+ sub := func(msg *message) error {
|
|
|
|
|
+ if !filters.Pass(msg) {
|
|
|
|
|
+ return nil
|
|
|
|
|
+ }
|
|
|
|
|
+ wlock.Lock()
|
|
|
|
|
+ defer wlock.Unlock()
|
|
|
|
|
+ if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ return conn.WriteJSON(msg)
|
|
|
}
|
|
}
|
|
|
- return
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func passesQueryFilter(msg *message, messageFilter string, titleFilter string, priorityFilter []int, tagsFilter []string) bool {
|
|
|
|
|
- if msg.Event != messageEvent {
|
|
|
|
|
- return true // filters only apply to messages
|
|
|
|
|
|
|
+ w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
|
|
|
|
|
+ if poll {
|
|
|
|
|
+ return s.sendOldMessages(topics, since, scheduled, sub)
|
|
|
}
|
|
}
|
|
|
- if messageFilter != "" && msg.Message != messageFilter {
|
|
|
|
|
- return false
|
|
|
|
|
|
|
+ subscriberIDs := make([]int, 0)
|
|
|
|
|
+ for _, t := range topics {
|
|
|
|
|
+ subscriberIDs = append(subscriberIDs, t.Subscribe(sub))
|
|
|
}
|
|
}
|
|
|
- if titleFilter != "" && msg.Title != titleFilter {
|
|
|
|
|
- return false
|
|
|
|
|
|
|
+ defer func() {
|
|
|
|
|
+ for i, subscriberID := range subscriberIDs {
|
|
|
|
|
+ topics[i].Unsubscribe(subscriberID) // Order!
|
|
|
|
|
+ }
|
|
|
|
|
+ }()
|
|
|
|
|
+ if err := sub(newOpenMessage(topicsStr)); err != nil { // Send out open message
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ if err := s.sendOldMessages(topics, since, scheduled, sub); err != nil {
|
|
|
|
|
+ return err
|
|
|
}
|
|
}
|
|
|
- messagePriority := msg.Priority
|
|
|
|
|
- if messagePriority == 0 {
|
|
|
|
|
- messagePriority = 3 // For query filters, default priority (3) is the same as "not set" (0)
|
|
|
|
|
|
|
+ err = g.Wait()
|
|
|
|
|
+ if err != nil && websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
|
|
|
|
|
+ return nil // Normal closures are not errors
|
|
|
}
|
|
}
|
|
|
- if len(priorityFilter) > 0 && !util.InIntList(priorityFilter, messagePriority) {
|
|
|
|
|
- return false
|
|
|
|
|
|
|
+ return err
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func parseSubscribeParams(r *http.Request) (poll bool, since sinceTime, scheduled bool, filters *queryFilter, err error) {
|
|
|
|
|
+ poll = readBoolParam(r, false, "x-poll", "poll", "po")
|
|
|
|
|
+ scheduled = readBoolParam(r, false, "x-scheduled", "scheduled", "sched")
|
|
|
|
|
+ since, err = parseSince(r, poll)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return
|
|
|
}
|
|
}
|
|
|
- if len(tagsFilter) > 0 && !util.InStringListAll(msg.Tags, tagsFilter) {
|
|
|
|
|
- return false
|
|
|
|
|
|
|
+ filters, err = parseQueryFilters(r)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return
|
|
|
}
|
|
}
|
|
|
- return true
|
|
|
|
|
|
|
+ return
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (s *Server) sendOldMessages(topics []*topic, since sinceTime, scheduled bool, sub subscriber) error {
|
|
func (s *Server) sendOldMessages(topics []*topic, since sinceTime, scheduled bool, sub subscriber) error {
|
|
@@ -901,6 +889,19 @@ func (s *Server) topicFromPath(path string) (*topic, error) {
|
|
|
return topics[0], nil
|
|
return topics[0], nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func (s *Server) topicsFromPath(path string) ([]*topic, string, error) {
|
|
|
|
|
+ parts := strings.Split(path, "/")
|
|
|
|
|
+ if len(parts) < 2 {
|
|
|
|
|
+ return nil, "", errHTTPBadRequestTopicInvalid
|
|
|
|
|
+ }
|
|
|
|
|
+ topicIDs := util.SplitNoEmpty(parts[1], ",")
|
|
|
|
|
+ topics, err := s.topicsFromIDs(topicIDs...)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, "", errHTTPBadRequestTopicInvalid
|
|
|
|
|
+ }
|
|
|
|
|
+ return topics, parts[1], nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
func (s *Server) topicsFromIDs(ids ...string) ([]*topic, error) {
|
|
func (s *Server) topicsFromIDs(ids ...string) ([]*topic, error) {
|
|
|
s.mu.Lock()
|
|
s.mu.Lock()
|
|
|
defer s.mu.Unlock()
|
|
defer s.mu.Unlock()
|
|
@@ -1101,9 +1102,3 @@ func (s *Server) visitor(r *http.Request) *visitor {
|
|
|
v.Keepalive()
|
|
v.Keepalive()
|
|
|
return v
|
|
return v
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
-func (s *Server) inc(counter *int64) {
|
|
|
|
|
- s.mu.Lock()
|
|
|
|
|
- defer s.mu.Unlock()
|
|
|
|
|
- *counter++
|
|
|
|
|
-}
|
|
|