client.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. // Package client provides a ntfy client to publish and subscribe to topics
  2. package client
  3. import (
  4. "bufio"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "log"
  9. "net/http"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. // Event type constants
  15. const (
  16. MessageEvent = "message"
  17. KeepaliveEvent = "keepalive"
  18. OpenEvent = "open"
  19. )
  20. // Client is the ntfy client that can be used to publish and subscribe to ntfy topics
  21. type Client struct {
  22. Messages chan *Message
  23. config *Config
  24. subscriptions map[string]*subscription
  25. mu sync.Mutex
  26. }
  27. // Message is a struct that represents a ntfy message
  28. type Message struct {
  29. ID string
  30. Event string
  31. Time int64
  32. Topic string
  33. TopicURL string
  34. Message string
  35. Title string
  36. Priority int
  37. Tags []string
  38. Raw string
  39. }
  40. type subscription struct {
  41. cancel context.CancelFunc
  42. }
  43. // New creates a new Client using a given Config
  44. func New(config *Config) *Client {
  45. return &Client{
  46. Messages: make(chan *Message),
  47. config: config,
  48. subscriptions: make(map[string]*subscription),
  49. }
  50. }
  51. // Publish sends a message to a specific topic, optionally using options.
  52. //
  53. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  54. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  55. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  56. //
  57. // To pass title, priority and tags, check out WithTitle, WithPriority, WithTagsList, WithDelay, WithNoCache,
  58. // WithNoFirebase, and the generic WithHeader.
  59. func (c *Client) Publish(topic, message string, options ...PublishOption) error {
  60. topicURL := c.expandTopicURL(topic)
  61. req, _ := http.NewRequest("POST", topicURL, strings.NewReader(message))
  62. for _, option := range options {
  63. if err := option(req); err != nil {
  64. return err
  65. }
  66. }
  67. resp, err := http.DefaultClient.Do(req)
  68. if err != nil {
  69. return err
  70. }
  71. if resp.StatusCode != http.StatusOK {
  72. return fmt.Errorf("unexpected response %d from server", resp.StatusCode)
  73. }
  74. return err
  75. }
  76. // Poll queries a topic for all (or a limited set) of messages. Unlike Subscribe, this method only polls for
  77. // messages and does not subscribe to messages that arrive after this call.
  78. //
  79. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  80. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  81. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  82. //
  83. // By default, all messages will be returned, but you can change this behavior using a SubscribeOption.
  84. // See WithSince, WithSinceAll, WithSinceUnixTime, WithScheduled, and the generic WithQueryParam.
  85. func (c *Client) Poll(topic string, options ...SubscribeOption) ([]*Message, error) {
  86. ctx := context.Background()
  87. messages := make([]*Message, 0)
  88. msgChan := make(chan *Message)
  89. errChan := make(chan error)
  90. topicURL := c.expandTopicURL(topic)
  91. go func() {
  92. err := performSubscribeRequest(ctx, msgChan, topicURL, options...)
  93. close(msgChan)
  94. errChan <- err
  95. }()
  96. for m := range msgChan {
  97. messages = append(messages, m)
  98. }
  99. return messages, <-errChan
  100. }
  101. // Subscribe subscribes to a topic to listen for newly incoming messages. The method starts a connection in the
  102. // background and returns new messages via the Messages channel.
  103. //
  104. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  105. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  106. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  107. //
  108. // By default, only new messages will be returned, but you can change this behavior using a SubscribeOption.
  109. // See WithSince, WithSinceAll, WithSinceUnixTime, WithScheduled, and the generic WithQueryParam.
  110. //
  111. // Example:
  112. // c := client.New(client.NewConfig())
  113. // c.Subscribe("mytopic")
  114. // for m := range c.Messages {
  115. // fmt.Printf("New message: %s", m.Message)
  116. // }
  117. func (c *Client) Subscribe(topic string, options ...SubscribeOption) string {
  118. c.mu.Lock()
  119. defer c.mu.Unlock()
  120. topicURL := c.expandTopicURL(topic)
  121. if _, ok := c.subscriptions[topicURL]; ok {
  122. return topicURL
  123. }
  124. ctx, cancel := context.WithCancel(context.Background())
  125. c.subscriptions[topicURL] = &subscription{cancel}
  126. go handleSubscribeConnLoop(ctx, c.Messages, topicURL, options...)
  127. return topicURL
  128. }
  129. // Unsubscribe unsubscribes from a topic that has been previously subscribed with Subscribe.
  130. //
  131. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  132. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  133. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  134. func (c *Client) Unsubscribe(topic string) {
  135. c.mu.Lock()
  136. defer c.mu.Unlock()
  137. topicURL := c.expandTopicURL(topic)
  138. sub, ok := c.subscriptions[topicURL]
  139. if !ok {
  140. return
  141. }
  142. sub.cancel()
  143. }
  144. func (c *Client) expandTopicURL(topic string) string {
  145. if strings.HasPrefix(topic, "http://") || strings.HasPrefix(topic, "https://") {
  146. return topic
  147. } else if strings.Contains(topic, "/") {
  148. return fmt.Sprintf("https://%s", topic)
  149. }
  150. return fmt.Sprintf("%s/%s", c.config.DefaultHost, topic)
  151. }
  152. func handleSubscribeConnLoop(ctx context.Context, msgChan chan *Message, topicURL string, options ...SubscribeOption) {
  153. for {
  154. if err := performSubscribeRequest(ctx, msgChan, topicURL, options...); err != nil {
  155. log.Printf("Connection to %s failed: %s", topicURL, err.Error())
  156. }
  157. select {
  158. case <-ctx.Done():
  159. log.Printf("Connection to %s exited", topicURL)
  160. return
  161. case <-time.After(10 * time.Second): // TODO Add incremental backoff
  162. }
  163. }
  164. }
  165. func performSubscribeRequest(ctx context.Context, msgChan chan *Message, topicURL string, options ...SubscribeOption) error {
  166. req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/json", topicURL), nil)
  167. if err != nil {
  168. return err
  169. }
  170. for _, option := range options {
  171. if err := option(req); err != nil {
  172. return err
  173. }
  174. }
  175. resp, err := http.DefaultClient.Do(req)
  176. if err != nil {
  177. return err
  178. }
  179. defer resp.Body.Close()
  180. scanner := bufio.NewScanner(resp.Body)
  181. for scanner.Scan() {
  182. var m *Message
  183. line := scanner.Text()
  184. if err := json.NewDecoder(strings.NewReader(line)).Decode(&m); err != nil {
  185. return err
  186. }
  187. m.TopicURL = topicURL
  188. m.Raw = line
  189. msgChan <- m
  190. }
  191. return nil
  192. }