瀏覽代碼

Subscribe endpoint consolidation; same behavior for all endpoints; keepalive

Philipp Heckel 4 年之前
父節點
當前提交
a38aca47bd
共有 8 個文件被更改,包括 153 次插入92 次删除
  1. 3 0
      cmd/app.go
  2. 13 10
      config/config.go
  3. 34 16
      server/index.html
  4. 43 0
      server/message.go
  5. 53 58
      server/server.go
  6. 5 1
      server/static/css/app.css
  7. 1 1
      server/static/js/app.js
  8. 1 6
      server/topic.go

+ 3 - 0
cmd/app.go

@@ -16,6 +16,7 @@ func New() *cli.App {
 	flags := []cli.Flag{
 		&cli.StringFlag{Name: "config", Aliases: []string{"c"}, EnvVars: []string{"NTFY_CONFIG_FILE"}, Value: "/etc/ntfy/config.yml", DefaultText: "/etc/ntfy/config.yml", Usage: "config file"},
 		altsrc.NewStringFlag(&cli.StringFlag{Name: "listen-http", Aliases: []string{"l"}, EnvVars: []string{"NTFY_LISTEN_HTTP"}, Value: config.DefaultListenHTTP, Usage: "ip:port used to as listen address"}),
+		altsrc.NewDurationFlag(&cli.DurationFlag{Name: "keepalive-interval", Aliases: []string{"k"}, EnvVars: []string{"NTFY_KEEPALIVE_INTERVAL"}, Value: config.DefaultKeepaliveInterval, Usage: "default interval of keepalive messages"}),
 	}
 	return &cli.App{
 		Name:                   "ntfy",
@@ -37,9 +38,11 @@ func New() *cli.App {
 func execRun(c *cli.Context) error {
 	// Read all the options
 	listenHTTP := c.String("listen-http")
+	keepaliveInterval := c.Duration("keepalive-interval")
 
 	// Run main bot, can be killed by signal
 	conf := config.New(listenHTTP)
+	conf.KeepaliveInterval = keepaliveInterval
 	s := server.New(conf)
 	if err := s.Run(); err != nil {
 		log.Fatalln(err)

+ 13 - 10
config/config.go

@@ -8,8 +8,9 @@ import (
 
 // Defines default config settings
 const (
-	DefaultListenHTTP      = ":80"
-	defaultManagerInterval = time.Minute
+	DefaultListenHTTP        = ":80"
+	DefaultKeepaliveInterval = 30 * time.Second
+	defaultManagerInterval   = time.Minute
 )
 
 // Defines the max number of requests, here:
@@ -21,18 +22,20 @@ var (
 
 // Config is the main config struct for the application. Use New to instantiate a default config struct.
 type Config struct {
-	ListenHTTP      string
-	Limit           rate.Limit
-	LimitBurst      int
-	ManagerInterval time.Duration
+	ListenHTTP        string
+	Limit             rate.Limit
+	LimitBurst        int
+	KeepaliveInterval time.Duration
+	ManagerInterval   time.Duration
 }
 
 // New instantiates a default new config
 func New(listenHTTP string) *Config {
 	return &Config{
-		ListenHTTP:      listenHTTP,
-		Limit:           defaultLimit,
-		LimitBurst:      defaultLimitBurst,
-		ManagerInterval: defaultManagerInterval,
+		ListenHTTP:        listenHTTP,
+		Limit:             defaultLimit,
+		LimitBurst:        defaultLimitBurst,
+		KeepaliveInterval: DefaultKeepaliveInterval,
+		ManagerInterval:   defaultManagerInterval,
 	}
 }

+ 34 - 16
server/index.html

@@ -65,20 +65,7 @@
     <audio id="notifySound" src="static/sound/mixkit-message-pop-alert-2354.mp3"></audio>
 
     <h3>Subscribe via your app, or via the CLI</h3>
-    <p>
-        Here are some examples using <tt>curl</tt>:
-    </p>
-    <code>
-        # one message per line (\n are replaced with a space)<br/>
-        curl -s ntfy.sh/mytopic/raw<br/><br/>
-
-        # one JSON message per line<br/>
-        curl -s ntfy.sh/mytopic/json<br/><br/>
-
-        # server-sent events (SSE) stream, use with EventSource<br/>
-        curl -s ntfy.sh/mytopic/sse
-    </code>
-    <p>
+    <p class="smallMarginBottom">
         Using <a href="https://developer.mozilla.org/en-US/docs/Web/API/EventSource">EventSource</a>, you can consume
         notifications like this (see <a href="https://github.com/binwiederhier/ntfy/tree/main/examples">full example</a>):
     </p>
@@ -88,15 +75,46 @@
         &nbsp;&nbsp;// Do something with e.data<br/>
         };
     </code>
+    <p class="smallMarginBottom">
+        Or you can use <tt>curl</tt> or any other HTTP library. Here's an example for the <tt>/json</tt> endpoint,
+        which prints one JSON message per line (keepalive and open messages have an "event" field):
+    </p>
+    <code>
+        $ curl -s ntfy.sh/mytopic/json<br/>
+        {"time":1635359841,"event":"open"}<br/>
+        {"time":1635359844,"message":"This is a notification"}<br/>
+        {"time":1635359851,"event":"keepalive"}
+    </code>
+    <p class="smallMarginBottom">
+        Using the <tt>/sse</tt> endpoint (SSE, server-sent events stream):
+    </p>
+    <code>
+        $ curl -s ntfy.sh/mytopic/sse<br/>
+        event: open<br/>
+        data: {"time":1635359796,"event":"open"}<br/><br/>
+
+        data: {"time":1635359803,"message":"This is a notification"}<br/><br/>
+
+        event: keepalive<br/>
+        data: {"time":1635359806,"event":"keepalive"}
+    </code>
+    <p class="smallMarginBottom">
+        Using the <tt>/raw</tt> endpoint (empty lines are keepalive messages):
+    </p>
+    <code>
+        $ curl -s ntfy.sh/mytopic/raw<br/>
+        <br/>
+        This is a notification
+    </code>
 
     <h2>Publishing messages</h2>
-    <p>
+    <p class="smallMarginBottom">
         Publishing messages can be done via PUT or POST using. Here's an example using <tt>curl</tt>:
     </p>
     <code>
         curl -d "long process is done" ntfy.sh/mytopic
     </code>
-    <p>
+    <p class="smallMarginBottom">
         Here's an example in JS with <tt>fetch()</tt> (see <a href="https://github.com/binwiederhier/ntfy/tree/main/examples">full example</a>):
     </p>
     <code>

+ 43 - 0
server/message.go

@@ -0,0 +1,43 @@
+package server
+
+import "time"
+
+// List of possible events
+const (
+	openEvent      = "open"
+	keepaliveEvent = "keepalive"
+)
+
+// message represents a message published to a topic
+type message struct {
+	Time    int64  `json:"time"`            // Unix time in seconds
+	Event   string `json:"event,omitempty"` // One of the above
+	Message string `json:"message,omitempty"`
+}
+
+// messageEncoder is a function that knows how to encode a message
+type messageEncoder func(msg *message) (string, error)
+
+// newMessage creates a new message with the current timestamp
+func newMessage(event string, msg string) *message {
+	return &message{
+		Time:    time.Now().Unix(),
+		Event:   event,
+		Message: msg,
+	}
+}
+
+// newOpenMessage is a convenience method to create an open message
+func newOpenMessage() *message {
+	return newMessage(openEvent, "")
+}
+
+// newKeepaliveMessage is a convenience method to create a keepalive message
+func newKeepaliveMessage() *message {
+	return newMessage(keepaliveEvent, "")
+}
+
+// newDefaultMessage is a convenience method to create a notification message
+func newDefaultMessage(msg string) *message {
+	return newMessage("", msg)
+}

+ 53 - 58
server/server.go

@@ -48,11 +48,11 @@ const (
 )
 
 var (
-	topicRegex = regexp.MustCompile(`^/[^/]+$`)
-	jsonRegex  = regexp.MustCompile(`^/[^/]+/json$`)
-	sseRegex   = regexp.MustCompile(`^/[^/]+/sse$`)
-	rawRegex   = regexp.MustCompile(`^/[^/]+/raw$`)
-	staticRegex   = regexp.MustCompile(`^/static/.+`)
+	topicRegex  = regexp.MustCompile(`^/[^/]+$`)
+	jsonRegex   = regexp.MustCompile(`^/[^/]+/json$`)
+	sseRegex    = regexp.MustCompile(`^/[^/]+/sse$`)
+	rawRegex    = regexp.MustCompile(`^/[^/]+/raw$`)
+	staticRegex = regexp.MustCompile(`^/static/.+`)
 
 	//go:embed "index.html"
 	indexSource string
@@ -159,11 +159,7 @@ func (s *Server) handlePublishHTTP(w http.ResponseWriter, r *http.Request) error
 	if err != nil {
 		return err
 	}
-	msg := &message{
-		Time:    time.Now().UnixMilli(),
-		Message: string(b),
-	}
-	if err := t.Publish(msg); err != nil {
+	if err := t.Publish(newDefaultMessage(string(b))); err != nil {
 		return err
 	}
 	w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
@@ -171,75 +167,74 @@ func (s *Server) handlePublishHTTP(w http.ResponseWriter, r *http.Request) error
 }
 
 func (s *Server) handleSubscribeJSON(w http.ResponseWriter, r *http.Request) error {
-	t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/json")) // Hack
-	subscriberID := t.Subscribe(func(msg *message) error {
-		if err := json.NewEncoder(w).Encode(&msg); err != nil {
-			return err
-		}
-		if fl, ok := w.(http.Flusher); ok {
-			fl.Flush()
+	encoder := func(msg *message) (string, error) {
+		var buf bytes.Buffer
+		if err := json.NewEncoder(&buf).Encode(&msg); err != nil {
+			return "", err
 		}
-		return nil
-	})
-	defer s.unsubscribe(t, subscriberID)
-	w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
-	select {
-	case <-t.ctx.Done():
-	case <-r.Context().Done():
+		return buf.String(), nil
 	}
-	return nil
+	return s.handleSubscribe(w, r, "json", "application/stream+json", encoder)
 }
 
 func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request) error {
-	t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/sse")) // Hack
-	subscriberID := t.Subscribe(func(msg *message) error {
+	encoder := func(msg *message) (string, error) {
 		var buf bytes.Buffer
 		if err := json.NewEncoder(&buf).Encode(&msg); err != nil {
-			return err
+			return "", err
 		}
-		m := fmt.Sprintf("data: %s\n", buf.String())
-		if _, err := io.WriteString(w, m); err != nil {
-			return err
+		if msg.Event != "" {
+			return fmt.Sprintf("event: %s\ndata: %s\n", msg.Event, buf.String()), nil // Browser's .onmessage() does not fire on this!
 		}
-		if fl, ok := w.(http.Flusher); ok {
-			fl.Flush()
-		}
-		return nil
-	})
-	defer s.unsubscribe(t, subscriberID)
-	w.Header().Set("Content-Type", "text/event-stream")
-	w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
-	if _, err := io.WriteString(w, "event: open\n\n"); err != nil {
-		return err
-	}
-	if fl, ok := w.(http.Flusher); ok {
-		fl.Flush()
+		return fmt.Sprintf("data: %s\n", buf.String()), nil
 	}
-	select {
-	case <-t.ctx.Done():
-	case <-r.Context().Done():
-	}
-	return nil
+	return s.handleSubscribe(w, r, "sse", "text/event-stream", encoder)
 }
 
 func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request) error {
-	t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/raw")) // Hack
-	subscriberID := t.Subscribe(func(msg *message) error {
-		m := strings.ReplaceAll(msg.Message, "\n", " ") + "\n"
-		if _, err := io.WriteString(w, m); err != nil {
+	encoder := func(msg *message) (string, error) {
+		if msg.Event == "" { // only handle default events
+			return strings.ReplaceAll(msg.Message, "\n", " ") + "\n", nil
+		}
+		return "\n", nil // "keepalive" and "open" events just send an empty line
+	}
+	return s.handleSubscribe(w, r, "raw", "text/plain", encoder)
+}
+
+func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, format string, contentType string, encoder messageEncoder) error {
+	t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/"+format)) // Hack
+	sub := func(msg *message) error {
+		m, err := encoder(msg)
+		if err != nil {
+			return err
+		}
+		if _, err := w.Write([]byte(m)); err != nil {
 			return err
 		}
 		if fl, ok := w.(http.Flusher); ok {
 			fl.Flush()
 		}
 		return nil
-	})
+	}
+	subscriberID := t.Subscribe(sub)
 	defer s.unsubscribe(t, subscriberID)
-	select {
-	case <-t.ctx.Done():
-	case <-r.Context().Done():
+	w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
+	w.Header().Set("Content-Type", contentType)
+	if err := sub(newOpenMessage()); err != nil { // Send out open message
+		return err
+	}
+	for {
+		select {
+		case <-t.ctx.Done():
+			return nil
+		case <-r.Context().Done():
+			return nil
+		case <-time.After(s.config.KeepaliveInterval):
+			if err := sub(newKeepaliveMessage()); err != nil { // Send keepalive message
+				return err
+			}
+		}
 	}
-	return nil
 }
 
 func (s *Server) handleOptions(w http.ResponseWriter, r *http.Request) error {

+ 5 - 1
server/static/css/app.css

@@ -40,6 +40,10 @@ p {
     line-height: 140%;
 }
 
+p.smallMarginBottom {
+    margin-bottom: 10px;
+}
+
 tt {
     background: #eee;
     padding: 2px 7px;
@@ -53,7 +57,7 @@ code {
     padding: 20px;
     border-radius: 3px;
     margin-top: 10px;
-    margin-bottom: 10px;
+    margin-bottom: 20px;
 }
 
 /* Lato font (OFL), https://fonts.google.com/specimen/Lato#about,

+ 1 - 1
server/static/js/app.js

@@ -60,7 +60,7 @@ const subscribeInternal = (topic, delaySec) => {
         eventSource.onmessage = (e) => {
             const event = JSON.parse(e.data);
             notifySound.play();
-            new Notification(topic, {
+            new Notification(`${location.host}/${topic}`, {
                 body: event.message,
                 icon: '/static/img/favicon.png'
             });

+ 1 - 6
server/topic.go

@@ -21,15 +21,10 @@ type topic struct {
 	mu          sync.Mutex
 }
 
-// message represents a message published to a topic
-type message struct {
-	Time    int64  `json:"time"`
-	Message string `json:"message"`
-}
-
 // subscriber is a function that is called for every new message on a topic
 type subscriber func(msg *message) error
 
+// newTopic creates a new topic
 func newTopic(id string) *topic {
 	ctx, cancel := context.WithCancel(context.Background())
 	return &topic{