client.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. // Package client provides a ntfy client to publish and subscribe to topics
  2. package client
  3. import (
  4. "bufio"
  5. "context"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "heckel.io/ntfy/log"
  10. "heckel.io/ntfy/util"
  11. "io"
  12. "net/http"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. // Event type constants
  18. const (
  19. MessageEvent = "message"
  20. KeepaliveEvent = "keepalive"
  21. OpenEvent = "open"
  22. PollRequestEvent = "poll_request"
  23. )
  24. const (
  25. maxResponseBytes = 4096
  26. )
  27. // Client is the ntfy client that can be used to publish and subscribe to ntfy topics
  28. type Client struct {
  29. Messages chan *Message
  30. config *Config
  31. subscriptions map[string]*subscription
  32. mu sync.Mutex
  33. }
  34. // Message is a struct that represents a ntfy message
  35. type Message struct { // TODO combine with server.message
  36. ID string
  37. Event string
  38. Time int64
  39. Topic string
  40. Message string
  41. Title string
  42. Priority int
  43. Tags []string
  44. Click string
  45. Icon string
  46. Attachment *Attachment
  47. // Additional fields
  48. TopicURL string
  49. SubscriptionID string
  50. Raw string
  51. }
  52. // Attachment represents a message attachment
  53. type Attachment struct {
  54. Name string `json:"name"`
  55. Type string `json:"type,omitempty"`
  56. Size int64 `json:"size,omitempty"`
  57. Expires int64 `json:"expires,omitempty"`
  58. URL string `json:"url"`
  59. Owner string `json:"-"` // IP address of uploader, used for rate limiting
  60. }
  61. type subscription struct {
  62. ID string
  63. topicURL string
  64. cancel context.CancelFunc
  65. }
  66. // New creates a new Client using a given Config
  67. func New(config *Config) *Client {
  68. return &Client{
  69. Messages: make(chan *Message, 50), // Allow reading a few messages
  70. config: config,
  71. subscriptions: make(map[string]*subscription),
  72. }
  73. }
  74. // Publish sends a message to a specific topic, optionally using options.
  75. // See PublishReader for details.
  76. func (c *Client) Publish(topic, message string, options ...PublishOption) (*Message, error) {
  77. return c.PublishReader(topic, strings.NewReader(message), options...)
  78. }
  79. // PublishReader sends a message to a specific topic, optionally using options.
  80. //
  81. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  82. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  83. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  84. //
  85. // To pass title, priority and tags, check out WithTitle, WithPriority, WithTagsList, WithDelay, WithNoCache,
  86. // WithNoFirebase, and the generic WithHeader.
  87. func (c *Client) PublishReader(topic string, body io.Reader, options ...PublishOption) (*Message, error) {
  88. topicURL := c.expandTopicURL(topic)
  89. req, err := http.NewRequest("POST", topicURL, body)
  90. if err != nil {
  91. return nil, err
  92. }
  93. for _, option := range options {
  94. if err := option(req); err != nil {
  95. return nil, err
  96. }
  97. }
  98. log.Debug("%s Publishing message with headers %s", util.ShortTopicURL(topicURL), req.Header)
  99. resp, err := http.DefaultClient.Do(req)
  100. if err != nil {
  101. return nil, err
  102. }
  103. defer resp.Body.Close()
  104. b, err := io.ReadAll(io.LimitReader(resp.Body, maxResponseBytes))
  105. if err != nil {
  106. return nil, err
  107. }
  108. if resp.StatusCode != http.StatusOK {
  109. return nil, errors.New(strings.TrimSpace(string(b)))
  110. }
  111. m, err := toMessage(string(b), topicURL, "")
  112. if err != nil {
  113. return nil, err
  114. }
  115. return m, nil
  116. }
  117. // Poll queries a topic for all (or a limited set) of messages. Unlike Subscribe, this method only polls for
  118. // messages and does not subscribe to messages that arrive after this call.
  119. //
  120. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  121. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  122. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  123. //
  124. // By default, all messages will be returned, but you can change this behavior using a SubscribeOption.
  125. // See WithSince, WithSinceAll, WithSinceUnixTime, WithScheduled, and the generic WithQueryParam.
  126. func (c *Client) Poll(topic string, options ...SubscribeOption) ([]*Message, error) {
  127. ctx := context.Background()
  128. messages := make([]*Message, 0)
  129. msgChan := make(chan *Message)
  130. errChan := make(chan error)
  131. topicURL := c.expandTopicURL(topic)
  132. log.Debug("%s Polling from topic", util.ShortTopicURL(topicURL))
  133. options = append(options, WithPoll())
  134. go func() {
  135. err := performSubscribeRequest(ctx, msgChan, topicURL, "", options...)
  136. close(msgChan)
  137. errChan <- err
  138. }()
  139. for m := range msgChan {
  140. messages = append(messages, m)
  141. }
  142. return messages, <-errChan
  143. }
  144. // Subscribe subscribes to a topic to listen for newly incoming messages. The method starts a connection in the
  145. // background and returns new messages via the Messages channel.
  146. //
  147. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  148. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  149. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  150. //
  151. // By default, only new messages will be returned, but you can change this behavior using a SubscribeOption.
  152. // See WithSince, WithSinceAll, WithSinceUnixTime, WithScheduled, and the generic WithQueryParam.
  153. //
  154. // The method returns a unique subscriptionID that can be used in Unsubscribe.
  155. //
  156. // Example:
  157. //
  158. // c := client.New(client.NewConfig())
  159. // subscriptionID := c.Subscribe("mytopic")
  160. // for m := range c.Messages {
  161. // fmt.Printf("New message: %s", m.Message)
  162. // }
  163. func (c *Client) Subscribe(topic string, options ...SubscribeOption) string {
  164. c.mu.Lock()
  165. defer c.mu.Unlock()
  166. subscriptionID := util.RandomString(10)
  167. topicURL := c.expandTopicURL(topic)
  168. log.Debug("%s Subscribing to topic", util.ShortTopicURL(topicURL))
  169. ctx, cancel := context.WithCancel(context.Background())
  170. c.subscriptions[subscriptionID] = &subscription{
  171. ID: subscriptionID,
  172. topicURL: topicURL,
  173. cancel: cancel,
  174. }
  175. go handleSubscribeConnLoop(ctx, c.Messages, topicURL, subscriptionID, options...)
  176. return subscriptionID
  177. }
  178. // Unsubscribe unsubscribes from a topic that has been previously subscribed to using the unique
  179. // subscriptionID returned in Subscribe.
  180. func (c *Client) Unsubscribe(subscriptionID string) {
  181. c.mu.Lock()
  182. defer c.mu.Unlock()
  183. sub, ok := c.subscriptions[subscriptionID]
  184. if !ok {
  185. return
  186. }
  187. delete(c.subscriptions, subscriptionID)
  188. sub.cancel()
  189. }
  190. // UnsubscribeAll unsubscribes from a topic that has been previously subscribed with Subscribe.
  191. // If there are multiple subscriptions matching the topic, all of them are unsubscribed from.
  192. //
  193. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  194. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  195. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  196. func (c *Client) UnsubscribeAll(topic string) {
  197. c.mu.Lock()
  198. defer c.mu.Unlock()
  199. topicURL := c.expandTopicURL(topic)
  200. for _, sub := range c.subscriptions {
  201. if sub.topicURL == topicURL {
  202. delete(c.subscriptions, sub.ID)
  203. sub.cancel()
  204. }
  205. }
  206. }
  207. func (c *Client) expandTopicURL(topic string) string {
  208. if strings.HasPrefix(topic, "http://") || strings.HasPrefix(topic, "https://") {
  209. return topic
  210. } else if strings.Contains(topic, "/") {
  211. return fmt.Sprintf("https://%s", topic)
  212. }
  213. return fmt.Sprintf("%s/%s", c.config.DefaultHost, topic)
  214. }
  215. func handleSubscribeConnLoop(ctx context.Context, msgChan chan *Message, topicURL, subcriptionID string, options ...SubscribeOption) {
  216. for {
  217. // TODO The retry logic is crude and may lose messages. It should record the last message like the
  218. // Android client, use since=, and do incremental backoff too
  219. if err := performSubscribeRequest(ctx, msgChan, topicURL, subcriptionID, options...); err != nil {
  220. log.Warn("%s Connection failed: %s", util.ShortTopicURL(topicURL), err.Error())
  221. }
  222. select {
  223. case <-ctx.Done():
  224. log.Info("%s Connection exited", util.ShortTopicURL(topicURL))
  225. return
  226. case <-time.After(10 * time.Second): // TODO Add incremental backoff
  227. }
  228. }
  229. }
  230. func performSubscribeRequest(ctx context.Context, msgChan chan *Message, topicURL string, subscriptionID string, options ...SubscribeOption) error {
  231. streamURL := fmt.Sprintf("%s/json", topicURL)
  232. log.Debug("%s Listening to %s", util.ShortTopicURL(topicURL), streamURL)
  233. req, err := http.NewRequestWithContext(ctx, http.MethodGet, streamURL, nil)
  234. if err != nil {
  235. return err
  236. }
  237. for _, option := range options {
  238. if err := option(req); err != nil {
  239. return err
  240. }
  241. }
  242. resp, err := http.DefaultClient.Do(req)
  243. if err != nil {
  244. return err
  245. }
  246. defer resp.Body.Close()
  247. if resp.StatusCode != http.StatusOK {
  248. b, err := io.ReadAll(io.LimitReader(resp.Body, maxResponseBytes))
  249. if err != nil {
  250. return err
  251. }
  252. return errors.New(strings.TrimSpace(string(b)))
  253. }
  254. scanner := bufio.NewScanner(resp.Body)
  255. for scanner.Scan() {
  256. messageJSON := scanner.Text()
  257. m, err := toMessage(messageJSON, topicURL, subscriptionID)
  258. if err != nil {
  259. return err
  260. }
  261. log.Trace("%s Message received: %s", util.ShortTopicURL(topicURL), messageJSON)
  262. if m.Event == MessageEvent {
  263. msgChan <- m
  264. }
  265. }
  266. return nil
  267. }
  268. func toMessage(s, topicURL, subscriptionID string) (*Message, error) {
  269. var m *Message
  270. if err := json.NewDecoder(strings.NewReader(s)).Decode(&m); err != nil {
  271. return nil, err
  272. }
  273. m.TopicURL = topicURL
  274. m.SubscriptionID = subscriptionID
  275. m.Raw = s
  276. return m, nil
  277. }