client.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. // Package client provides a ntfy client to publish and subscribe to topics
  2. package client
  3. import (
  4. "bufio"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "heckel.io/ntfy/util"
  9. "io"
  10. "log"
  11. "net/http"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. // Event type constants
  17. const (
  18. MessageEvent = "message"
  19. KeepaliveEvent = "keepalive"
  20. OpenEvent = "open"
  21. )
  22. const (
  23. maxResponseBytes = 4096
  24. )
  25. // Client is the ntfy client that can be used to publish and subscribe to ntfy topics
  26. type Client struct {
  27. Messages chan *Message
  28. config *Config
  29. subscriptions map[string]*subscription
  30. mu sync.Mutex
  31. }
  32. // Message is a struct that represents a ntfy message
  33. type Message struct { // TODO combine with server.message
  34. ID string
  35. Event string
  36. Time int64
  37. Topic string
  38. Message string
  39. Title string
  40. Priority int
  41. Tags []string
  42. // Additional fields
  43. TopicURL string
  44. SubscriptionID string
  45. Raw string
  46. }
  47. type subscription struct {
  48. ID string
  49. topicURL string
  50. cancel context.CancelFunc
  51. }
  52. // New creates a new Client using a given Config
  53. func New(config *Config) *Client {
  54. return &Client{
  55. Messages: make(chan *Message, 50), // Allow reading a few messages
  56. config: config,
  57. subscriptions: make(map[string]*subscription),
  58. }
  59. }
  60. // Publish sends a message to a specific topic, optionally using options.
  61. // See PublishReader for details.
  62. func (c *Client) Publish(topic, message string, options ...PublishOption) (*Message, error) {
  63. return c.PublishReader(topic, strings.NewReader(message), options...)
  64. }
  65. // PublishReader sends a message to a specific topic, optionally using options.
  66. //
  67. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  68. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  69. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  70. //
  71. // To pass title, priority and tags, check out WithTitle, WithPriority, WithTagsList, WithDelay, WithNoCache,
  72. // WithNoFirebase, and the generic WithHeader.
  73. func (c *Client) PublishReader(topic string, body io.Reader, options ...PublishOption) (*Message, error) {
  74. topicURL := c.expandTopicURL(topic)
  75. req, _ := http.NewRequest("POST", topicURL, body)
  76. for _, option := range options {
  77. if err := option(req); err != nil {
  78. return nil, err
  79. }
  80. }
  81. resp, err := http.DefaultClient.Do(req)
  82. if err != nil {
  83. return nil, err
  84. }
  85. defer resp.Body.Close()
  86. if resp.StatusCode != http.StatusOK {
  87. return nil, fmt.Errorf("unexpected response %d from server", resp.StatusCode)
  88. }
  89. b, err := io.ReadAll(io.LimitReader(resp.Body, maxResponseBytes))
  90. if err != nil {
  91. return nil, err
  92. }
  93. m, err := toMessage(string(b), topicURL, "")
  94. if err != nil {
  95. return nil, err
  96. }
  97. return m, nil
  98. }
  99. // Poll queries a topic for all (or a limited set) of messages. Unlike Subscribe, this method only polls for
  100. // messages and does not subscribe to messages that arrive after this call.
  101. //
  102. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  103. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  104. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  105. //
  106. // By default, all messages will be returned, but you can change this behavior using a SubscribeOption.
  107. // See WithSince, WithSinceAll, WithSinceUnixTime, WithScheduled, and the generic WithQueryParam.
  108. func (c *Client) Poll(topic string, options ...SubscribeOption) ([]*Message, error) {
  109. ctx := context.Background()
  110. messages := make([]*Message, 0)
  111. msgChan := make(chan *Message)
  112. errChan := make(chan error)
  113. topicURL := c.expandTopicURL(topic)
  114. options = append(options, WithPoll())
  115. go func() {
  116. err := performSubscribeRequest(ctx, msgChan, topicURL, "", options...)
  117. close(msgChan)
  118. errChan <- err
  119. }()
  120. for m := range msgChan {
  121. messages = append(messages, m)
  122. }
  123. return messages, <-errChan
  124. }
  125. // Subscribe subscribes to a topic to listen for newly incoming messages. The method starts a connection in the
  126. // background and returns new messages via the Messages channel.
  127. //
  128. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  129. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  130. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  131. //
  132. // By default, only new messages will be returned, but you can change this behavior using a SubscribeOption.
  133. // See WithSince, WithSinceAll, WithSinceUnixTime, WithScheduled, and the generic WithQueryParam.
  134. //
  135. // The method returns a unique subscriptionID that can be used in Unsubscribe.
  136. //
  137. // Example:
  138. // c := client.New(client.NewConfig())
  139. // subscriptionID := c.Subscribe("mytopic")
  140. // for m := range c.Messages {
  141. // fmt.Printf("New message: %s", m.Message)
  142. // }
  143. func (c *Client) Subscribe(topic string, options ...SubscribeOption) string {
  144. c.mu.Lock()
  145. defer c.mu.Unlock()
  146. subscriptionID := util.RandomString(10)
  147. topicURL := c.expandTopicURL(topic)
  148. ctx, cancel := context.WithCancel(context.Background())
  149. c.subscriptions[subscriptionID] = &subscription{
  150. ID: subscriptionID,
  151. topicURL: topicURL,
  152. cancel: cancel,
  153. }
  154. go handleSubscribeConnLoop(ctx, c.Messages, topicURL, subscriptionID, options...)
  155. return subscriptionID
  156. }
  157. // Unsubscribe unsubscribes from a topic that has been previously subscribed to using the unique
  158. // subscriptionID returned in Subscribe.
  159. func (c *Client) Unsubscribe(subscriptionID string) {
  160. c.mu.Lock()
  161. defer c.mu.Unlock()
  162. sub, ok := c.subscriptions[subscriptionID]
  163. if !ok {
  164. return
  165. }
  166. delete(c.subscriptions, subscriptionID)
  167. sub.cancel()
  168. }
  169. // UnsubscribeAll unsubscribes from a topic that has been previously subscribed with Subscribe.
  170. // If there are multiple subscriptions matching the topic, all of them are unsubscribed from.
  171. //
  172. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  173. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  174. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  175. func (c *Client) UnsubscribeAll(topic string) {
  176. c.mu.Lock()
  177. defer c.mu.Unlock()
  178. topicURL := c.expandTopicURL(topic)
  179. for _, sub := range c.subscriptions {
  180. if sub.topicURL == topicURL {
  181. delete(c.subscriptions, sub.ID)
  182. sub.cancel()
  183. }
  184. }
  185. }
  186. func (c *Client) expandTopicURL(topic string) string {
  187. if strings.HasPrefix(topic, "http://") || strings.HasPrefix(topic, "https://") {
  188. return topic
  189. } else if strings.Contains(topic, "/") {
  190. return fmt.Sprintf("https://%s", topic)
  191. }
  192. return fmt.Sprintf("%s/%s", c.config.DefaultHost, topic)
  193. }
  194. func handleSubscribeConnLoop(ctx context.Context, msgChan chan *Message, topicURL, subcriptionID string, options ...SubscribeOption) {
  195. for {
  196. // TODO The retry logic is crude and may lose messages. It should record the last message like the
  197. // Android client, use since=, and do incremental backoff too
  198. if err := performSubscribeRequest(ctx, msgChan, topicURL, subcriptionID, options...); err != nil {
  199. log.Printf("Connection to %s failed: %s", topicURL, err.Error())
  200. }
  201. select {
  202. case <-ctx.Done():
  203. log.Printf("Connection to %s exited", topicURL)
  204. return
  205. case <-time.After(10 * time.Second): // TODO Add incremental backoff
  206. }
  207. }
  208. }
  209. func performSubscribeRequest(ctx context.Context, msgChan chan *Message, topicURL string, subscriptionID string, options ...SubscribeOption) error {
  210. req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/json", topicURL), nil)
  211. if err != nil {
  212. return err
  213. }
  214. for _, option := range options {
  215. if err := option(req); err != nil {
  216. return err
  217. }
  218. }
  219. resp, err := http.DefaultClient.Do(req)
  220. if err != nil {
  221. return err
  222. }
  223. defer resp.Body.Close()
  224. scanner := bufio.NewScanner(resp.Body)
  225. for scanner.Scan() {
  226. m, err := toMessage(scanner.Text(), topicURL, subscriptionID)
  227. if err != nil {
  228. return err
  229. }
  230. if m.Event == MessageEvent {
  231. msgChan <- m
  232. }
  233. }
  234. return nil
  235. }
  236. func toMessage(s, topicURL, subscriptionID string) (*Message, error) {
  237. var m *Message
  238. if err := json.NewDecoder(strings.NewReader(s)).Decode(&m); err != nil {
  239. return nil, err
  240. }
  241. m.TopicURL = topicURL
  242. m.SubscriptionID = subscriptionID
  243. m.Raw = s
  244. return m, nil
  245. }