client.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. // Package client provides a ntfy client to publish and subscribe to topics
  2. package client
  3. import (
  4. "bufio"
  5. "context"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "heckel.io/ntfy/util"
  10. "io"
  11. "log"
  12. "net/http"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. // Event type constants
  18. const (
  19. MessageEvent = "message"
  20. KeepaliveEvent = "keepalive"
  21. OpenEvent = "open"
  22. )
  23. const (
  24. maxResponseBytes = 4096
  25. )
  26. // Client is the ntfy client that can be used to publish and subscribe to ntfy topics
  27. type Client struct {
  28. Messages chan *Message
  29. config *Config
  30. subscriptions map[string]*subscription
  31. mu sync.Mutex
  32. }
  33. // Message is a struct that represents a ntfy message
  34. type Message struct { // TODO combine with server.message
  35. ID string
  36. Event string
  37. Time int64
  38. Topic string
  39. Message string
  40. Title string
  41. Priority int
  42. Tags []string
  43. Click string
  44. Attachment *Attachment
  45. // Additional fields
  46. TopicURL string
  47. SubscriptionID string
  48. Raw string
  49. }
  50. // Attachment represents a message attachment
  51. type Attachment struct {
  52. Name string `json:"name"`
  53. Type string `json:"type,omitempty"`
  54. Size int64 `json:"size,omitempty"`
  55. Expires int64 `json:"expires,omitempty"`
  56. URL string `json:"url"`
  57. Owner string `json:"-"` // IP address of uploader, used for rate limiting
  58. }
  59. type subscription struct {
  60. ID string
  61. topicURL string
  62. cancel context.CancelFunc
  63. }
  64. // New creates a new Client using a given Config
  65. func New(config *Config) *Client {
  66. return &Client{
  67. Messages: make(chan *Message, 50), // Allow reading a few messages
  68. config: config,
  69. subscriptions: make(map[string]*subscription),
  70. }
  71. }
  72. // Publish sends a message to a specific topic, optionally using options.
  73. // See PublishReader for details.
  74. func (c *Client) Publish(topic, message string, options ...PublishOption) (*Message, error) {
  75. return c.PublishReader(topic, strings.NewReader(message), options...)
  76. }
  77. // PublishReader sends a message to a specific topic, optionally using options.
  78. //
  79. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  80. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  81. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  82. //
  83. // To pass title, priority and tags, check out WithTitle, WithPriority, WithTagsList, WithDelay, WithNoCache,
  84. // WithNoFirebase, and the generic WithHeader.
  85. func (c *Client) PublishReader(topic string, body io.Reader, options ...PublishOption) (*Message, error) {
  86. topicURL := c.expandTopicURL(topic)
  87. req, _ := http.NewRequest("POST", topicURL, body)
  88. for _, option := range options {
  89. if err := option(req); err != nil {
  90. return nil, err
  91. }
  92. }
  93. resp, err := http.DefaultClient.Do(req)
  94. if err != nil {
  95. return nil, err
  96. }
  97. defer resp.Body.Close()
  98. b, err := io.ReadAll(io.LimitReader(resp.Body, maxResponseBytes))
  99. if err != nil {
  100. return nil, err
  101. }
  102. if resp.StatusCode != http.StatusOK {
  103. return nil, errors.New(strings.TrimSpace(string(b)))
  104. }
  105. m, err := toMessage(string(b), topicURL, "")
  106. if err != nil {
  107. return nil, err
  108. }
  109. return m, nil
  110. }
  111. // Poll queries a topic for all (or a limited set) of messages. Unlike Subscribe, this method only polls for
  112. // messages and does not subscribe to messages that arrive after this call.
  113. //
  114. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  115. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  116. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  117. //
  118. // By default, all messages will be returned, but you can change this behavior using a SubscribeOption.
  119. // See WithSince, WithSinceAll, WithSinceUnixTime, WithScheduled, and the generic WithQueryParam.
  120. func (c *Client) Poll(topic string, options ...SubscribeOption) ([]*Message, error) {
  121. ctx := context.Background()
  122. messages := make([]*Message, 0)
  123. msgChan := make(chan *Message)
  124. errChan := make(chan error)
  125. topicURL := c.expandTopicURL(topic)
  126. options = append(options, WithPoll())
  127. go func() {
  128. err := performSubscribeRequest(ctx, msgChan, topicURL, "", options...)
  129. close(msgChan)
  130. errChan <- err
  131. }()
  132. for m := range msgChan {
  133. messages = append(messages, m)
  134. }
  135. return messages, <-errChan
  136. }
  137. // Subscribe subscribes to a topic to listen for newly incoming messages. The method starts a connection in the
  138. // background and returns new messages via the Messages channel.
  139. //
  140. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  141. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  142. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  143. //
  144. // By default, only new messages will be returned, but you can change this behavior using a SubscribeOption.
  145. // See WithSince, WithSinceAll, WithSinceUnixTime, WithScheduled, and the generic WithQueryParam.
  146. //
  147. // The method returns a unique subscriptionID that can be used in Unsubscribe.
  148. //
  149. // Example:
  150. // c := client.New(client.NewConfig())
  151. // subscriptionID := c.Subscribe("mytopic")
  152. // for m := range c.Messages {
  153. // fmt.Printf("New message: %s", m.Message)
  154. // }
  155. func (c *Client) Subscribe(topic string, options ...SubscribeOption) string {
  156. c.mu.Lock()
  157. defer c.mu.Unlock()
  158. subscriptionID := util.RandomString(10)
  159. topicURL := c.expandTopicURL(topic)
  160. ctx, cancel := context.WithCancel(context.Background())
  161. c.subscriptions[subscriptionID] = &subscription{
  162. ID: subscriptionID,
  163. topicURL: topicURL,
  164. cancel: cancel,
  165. }
  166. go handleSubscribeConnLoop(ctx, c.Messages, topicURL, subscriptionID, options...)
  167. return subscriptionID
  168. }
  169. // Unsubscribe unsubscribes from a topic that has been previously subscribed to using the unique
  170. // subscriptionID returned in Subscribe.
  171. func (c *Client) Unsubscribe(subscriptionID string) {
  172. c.mu.Lock()
  173. defer c.mu.Unlock()
  174. sub, ok := c.subscriptions[subscriptionID]
  175. if !ok {
  176. return
  177. }
  178. delete(c.subscriptions, subscriptionID)
  179. sub.cancel()
  180. }
  181. // UnsubscribeAll unsubscribes from a topic that has been previously subscribed with Subscribe.
  182. // If there are multiple subscriptions matching the topic, all of them are unsubscribed from.
  183. //
  184. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  185. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  186. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  187. func (c *Client) UnsubscribeAll(topic string) {
  188. c.mu.Lock()
  189. defer c.mu.Unlock()
  190. topicURL := c.expandTopicURL(topic)
  191. for _, sub := range c.subscriptions {
  192. if sub.topicURL == topicURL {
  193. delete(c.subscriptions, sub.ID)
  194. sub.cancel()
  195. }
  196. }
  197. }
  198. func (c *Client) expandTopicURL(topic string) string {
  199. if strings.HasPrefix(topic, "http://") || strings.HasPrefix(topic, "https://") {
  200. return topic
  201. } else if strings.Contains(topic, "/") {
  202. return fmt.Sprintf("https://%s", topic)
  203. }
  204. return fmt.Sprintf("%s/%s", c.config.DefaultHost, topic)
  205. }
  206. func handleSubscribeConnLoop(ctx context.Context, msgChan chan *Message, topicURL, subcriptionID string, options ...SubscribeOption) {
  207. for {
  208. // TODO The retry logic is crude and may lose messages. It should record the last message like the
  209. // Android client, use since=, and do incremental backoff too
  210. if err := performSubscribeRequest(ctx, msgChan, topicURL, subcriptionID, options...); err != nil {
  211. log.Printf("Connection to %s failed: %s", topicURL, err.Error())
  212. }
  213. select {
  214. case <-ctx.Done():
  215. log.Printf("Connection to %s exited", topicURL)
  216. return
  217. case <-time.After(10 * time.Second): // TODO Add incremental backoff
  218. }
  219. }
  220. }
  221. func performSubscribeRequest(ctx context.Context, msgChan chan *Message, topicURL string, subscriptionID string, options ...SubscribeOption) error {
  222. req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/json", topicURL), nil)
  223. if err != nil {
  224. return err
  225. }
  226. for _, option := range options {
  227. if err := option(req); err != nil {
  228. return err
  229. }
  230. }
  231. resp, err := http.DefaultClient.Do(req)
  232. if err != nil {
  233. return err
  234. }
  235. defer resp.Body.Close()
  236. scanner := bufio.NewScanner(resp.Body)
  237. for scanner.Scan() {
  238. m, err := toMessage(scanner.Text(), topicURL, subscriptionID)
  239. if err != nil {
  240. return err
  241. }
  242. if m.Event == MessageEvent {
  243. msgChan <- m
  244. }
  245. }
  246. return nil
  247. }
  248. func toMessage(s, topicURL, subscriptionID string) (*Message, error) {
  249. var m *Message
  250. if err := json.NewDecoder(strings.NewReader(s)).Decode(&m); err != nil {
  251. return nil, err
  252. }
  253. m.TopicURL = topicURL
  254. m.SubscriptionID = subscriptionID
  255. m.Raw = s
  256. return m, nil
  257. }