client.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. // Package client provides a ntfy client to publish and subscribe to topics
  2. package client
  3. import (
  4. "bufio"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "log"
  10. "net/http"
  11. "strings"
  12. "sync"
  13. "time"
  14. )
  15. // Event type constants
  16. const (
  17. MessageEvent = "message"
  18. KeepaliveEvent = "keepalive"
  19. OpenEvent = "open"
  20. )
  21. const (
  22. maxResponseBytes = 4096
  23. )
  24. // Client is the ntfy client that can be used to publish and subscribe to ntfy topics
  25. type Client struct {
  26. Messages chan *Message
  27. config *Config
  28. subscriptions map[string]*subscription
  29. mu sync.Mutex
  30. }
  31. // Message is a struct that represents a ntfy message
  32. type Message struct {
  33. ID string
  34. Event string
  35. Time int64
  36. Topic string
  37. TopicURL string
  38. Message string
  39. Title string
  40. Priority int
  41. Tags []string
  42. Raw string
  43. }
  44. type subscription struct {
  45. cancel context.CancelFunc
  46. }
  47. // New creates a new Client using a given Config
  48. func New(config *Config) *Client {
  49. return &Client{
  50. Messages: make(chan *Message),
  51. config: config,
  52. subscriptions: make(map[string]*subscription),
  53. }
  54. }
  55. // Publish sends a message to a specific topic, optionally using options.
  56. //
  57. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  58. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  59. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  60. //
  61. // To pass title, priority and tags, check out WithTitle, WithPriority, WithTagsList, WithDelay, WithNoCache,
  62. // WithNoFirebase, and the generic WithHeader.
  63. func (c *Client) Publish(topic, message string, options ...PublishOption) (*Message, error) {
  64. topicURL := c.expandTopicURL(topic)
  65. req, _ := http.NewRequest("POST", topicURL, strings.NewReader(message))
  66. for _, option := range options {
  67. if err := option(req); err != nil {
  68. return nil, err
  69. }
  70. }
  71. resp, err := http.DefaultClient.Do(req)
  72. if err != nil {
  73. return nil, err
  74. }
  75. defer resp.Body.Close()
  76. if resp.StatusCode != http.StatusOK {
  77. return nil, fmt.Errorf("unexpected response %d from server", resp.StatusCode)
  78. }
  79. b, err := io.ReadAll(io.LimitReader(resp.Body, maxResponseBytes))
  80. if err != nil {
  81. return nil, err
  82. }
  83. m, err := toMessage(string(b), topicURL)
  84. if err != nil {
  85. return nil, err
  86. }
  87. return m, nil
  88. }
  89. // Poll queries a topic for all (or a limited set) of messages. Unlike Subscribe, this method only polls for
  90. // messages and does not subscribe to messages that arrive after this call.
  91. //
  92. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  93. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  94. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  95. //
  96. // By default, all messages will be returned, but you can change this behavior using a SubscribeOption.
  97. // See WithSince, WithSinceAll, WithSinceUnixTime, WithScheduled, and the generic WithQueryParam.
  98. func (c *Client) Poll(topic string, options ...SubscribeOption) ([]*Message, error) {
  99. ctx := context.Background()
  100. messages := make([]*Message, 0)
  101. msgChan := make(chan *Message)
  102. errChan := make(chan error)
  103. topicURL := c.expandTopicURL(topic)
  104. go func() {
  105. err := performSubscribeRequest(ctx, msgChan, topicURL, options...)
  106. close(msgChan)
  107. errChan <- err
  108. }()
  109. for m := range msgChan {
  110. messages = append(messages, m)
  111. }
  112. return messages, <-errChan
  113. }
  114. // Subscribe subscribes to a topic to listen for newly incoming messages. The method starts a connection in the
  115. // background and returns new messages via the Messages channel.
  116. //
  117. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  118. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  119. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  120. //
  121. // By default, only new messages will be returned, but you can change this behavior using a SubscribeOption.
  122. // See WithSince, WithSinceAll, WithSinceUnixTime, WithScheduled, and the generic WithQueryParam.
  123. //
  124. // Example:
  125. // c := client.New(client.NewConfig())
  126. // c.Subscribe("mytopic")
  127. // for m := range c.Messages {
  128. // fmt.Printf("New message: %s", m.Message)
  129. // }
  130. func (c *Client) Subscribe(topic string, options ...SubscribeOption) string {
  131. c.mu.Lock()
  132. defer c.mu.Unlock()
  133. topicURL := c.expandTopicURL(topic)
  134. if _, ok := c.subscriptions[topicURL]; ok {
  135. return topicURL
  136. }
  137. ctx, cancel := context.WithCancel(context.Background())
  138. c.subscriptions[topicURL] = &subscription{cancel}
  139. go handleSubscribeConnLoop(ctx, c.Messages, topicURL, options...)
  140. return topicURL
  141. }
  142. // Unsubscribe unsubscribes from a topic that has been previously subscribed with Subscribe.
  143. //
  144. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  145. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  146. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  147. func (c *Client) Unsubscribe(topic string) {
  148. c.mu.Lock()
  149. defer c.mu.Unlock()
  150. topicURL := c.expandTopicURL(topic)
  151. sub, ok := c.subscriptions[topicURL]
  152. if !ok {
  153. return
  154. }
  155. sub.cancel()
  156. }
  157. func (c *Client) expandTopicURL(topic string) string {
  158. if strings.HasPrefix(topic, "http://") || strings.HasPrefix(topic, "https://") {
  159. return topic
  160. } else if strings.Contains(topic, "/") {
  161. return fmt.Sprintf("https://%s", topic)
  162. }
  163. return fmt.Sprintf("%s/%s", c.config.DefaultHost, topic)
  164. }
  165. func handleSubscribeConnLoop(ctx context.Context, msgChan chan *Message, topicURL string, options ...SubscribeOption) {
  166. for {
  167. if err := performSubscribeRequest(ctx, msgChan, topicURL, options...); err != nil {
  168. log.Printf("Connection to %s failed: %s", topicURL, err.Error())
  169. }
  170. select {
  171. case <-ctx.Done():
  172. log.Printf("Connection to %s exited", topicURL)
  173. return
  174. case <-time.After(10 * time.Second): // TODO Add incremental backoff
  175. }
  176. }
  177. }
  178. func performSubscribeRequest(ctx context.Context, msgChan chan *Message, topicURL string, options ...SubscribeOption) error {
  179. req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/json", topicURL), nil)
  180. if err != nil {
  181. return err
  182. }
  183. for _, option := range options {
  184. if err := option(req); err != nil {
  185. return err
  186. }
  187. }
  188. resp, err := http.DefaultClient.Do(req)
  189. if err != nil {
  190. return err
  191. }
  192. defer resp.Body.Close()
  193. scanner := bufio.NewScanner(resp.Body)
  194. for scanner.Scan() {
  195. m, err := toMessage(scanner.Text(), topicURL)
  196. if err != nil {
  197. return err
  198. }
  199. msgChan <- m
  200. }
  201. return nil
  202. }
  203. func toMessage(s, topicURL string) (*Message, error) {
  204. var m *Message
  205. if err := json.NewDecoder(strings.NewReader(s)).Decode(&m); err != nil {
  206. return nil, err
  207. }
  208. m.TopicURL = topicURL
  209. m.Raw = s
  210. return m, nil
  211. }