client.go 8.6 KB


  1. // Package client provides a ntfy client to publish and subscribe to topics
  2. package client
  3. import (
  4. "bufio"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "heckel.io/ntfy/util"
  9. "io"
  10. "log"
  11. "net/http"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. // Event type constants
  17. const (
  18. MessageEvent = "message"
  19. KeepaliveEvent = "keepalive"
  20. OpenEvent = "open"
  21. )
  22. const (
  23. maxResponseBytes = 4096
  24. )
  25. // Client is the ntfy client that can be used to publish and subscribe to ntfy topics
  26. type Client struct {
  27. Messages chan *Message
  28. config *Config
  29. subscriptions map[string]*subscription
  30. mu sync.Mutex
  31. }
  32. // Message is a struct that represents a ntfy message
  33. type Message struct { // TODO combine with server.message
  34. ID string
  35. Event string
  36. Time int64
  37. Topic string
  38. Message string
  39. Title string
  40. Priority int
  41. Tags []string
  42. Click string
  43. Attachment *Attachment
  44. // Additional fields
  45. TopicURL string
  46. SubscriptionID string
  47. Raw string
  48. }
  49. // Attachment represents a message attachment
  50. type Attachment struct {
  51. Name string `json:"name"`
  52. Type string `json:"type,omitempty"`
  53. Size int64 `json:"size,omitempty"`
  54. Expires int64 `json:"expires,omitempty"`
  55. URL string `json:"url"`
  56. Owner string `json:"-"` // IP address of uploader, used for rate limiting
  57. }
  58. type subscription struct {
  59. ID string
  60. topicURL string
  61. cancel context.CancelFunc
  62. }
  63. // New creates a new Client using a given Config
  64. func New(config *Config) *Client {
  65. return &Client{
  66. Messages: make(chan *Message, 50), // Allow reading a few messages
  67. config: config,
  68. subscriptions: make(map[string]*subscription),
  69. }
  70. }
  71. // Publish sends a message to a specific topic, optionally using options.
  72. // See PublishReader for details.
  73. func (c *Client) Publish(topic, message string, options ...PublishOption) (*Message, error) {
  74. return c.PublishReader(topic, strings.NewReader(message), options...)
  75. }
  76. // PublishReader sends a message to a specific topic, optionally using options.
  77. //
  78. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  79. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  80. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  81. //
  82. // To pass title, priority and tags, check out WithTitle, WithPriority, WithTagsList, WithDelay, WithNoCache,
  83. // WithNoFirebase, and the generic WithHeader.
  84. func (c *Client) PublishReader(topic string, body io.Reader, options ...PublishOption) (*Message, error) {
  85. topicURL := c.expandTopicURL(topic)
  86. req, _ := http.NewRequest("POST", topicURL, body)
  87. for _, option := range options {
  88. if err := option(req); err != nil {
  89. return nil, err
  90. }
  91. }
  92. resp, err := http.DefaultClient.Do(req)
  93. if err != nil {
  94. return nil, err
  95. }
  96. defer resp.Body.Close()
  97. if resp.StatusCode != http.StatusOK {
  98. return nil, fmt.Errorf("unexpected response %d from server", resp.StatusCode)
  99. }
  100. b, err := io.ReadAll(io.LimitReader(resp.Body, maxResponseBytes))
  101. if err != nil {
  102. return nil, err
  103. }
  104. m, err := toMessage(string(b), topicURL, "")
  105. if err != nil {
  106. return nil, err
  107. }
  108. return m, nil
  109. }
  110. // Poll queries a topic for all (or a limited set) of messages. Unlike Subscribe, this method only polls for
  111. // messages and does not subscribe to messages that arrive after this call.
  112. //
  113. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  114. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  115. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  116. //
  117. // By default, all messages will be returned, but you can change this behavior using a SubscribeOption.
  118. // See WithSince, WithSinceAll, WithSinceUnixTime, WithScheduled, and the generic WithQueryParam.
  119. func (c *Client) Poll(topic string, options ...SubscribeOption) ([]*Message, error) {
  120. ctx := context.Background()
  121. messages := make([]*Message, 0)
  122. msgChan := make(chan *Message)
  123. errChan := make(chan error)
  124. topicURL := c.expandTopicURL(topic)
  125. options = append(options, WithPoll())
  126. go func() {
  127. err := performSubscribeRequest(ctx, msgChan, topicURL, "", options...)
  128. close(msgChan)
  129. errChan <- err
  130. }()
  131. for m := range msgChan {
  132. messages = append(messages, m)
  133. }
  134. return messages, <-errChan
  135. }
  136. // Subscribe subscribes to a topic to listen for newly incoming messages. The method starts a connection in the
  137. // background and returns new messages via the Messages channel.
  138. //
  139. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  140. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  141. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  142. //
  143. // By default, only new messages will be returned, but you can change this behavior using a SubscribeOption.
  144. // See WithSince, WithSinceAll, WithSinceUnixTime, WithScheduled, and the generic WithQueryParam.
  145. //
  146. // The method returns a unique subscriptionID that can be used in Unsubscribe.
  147. //
  148. // Example:
  149. // c := client.New(client.NewConfig())
  150. // subscriptionID := c.Subscribe("mytopic")
  151. // for m := range c.Messages {
  152. // fmt.Printf("New message: %s", m.Message)
  153. // }
  154. func (c *Client) Subscribe(topic string, options ...SubscribeOption) string {
  155. c.mu.Lock()
  156. defer c.mu.Unlock()
  157. subscriptionID := util.RandomString(10)
  158. topicURL := c.expandTopicURL(topic)
  159. ctx, cancel := context.WithCancel(context.Background())
  160. c.subscriptions[subscriptionID] = &subscription{
  161. ID: subscriptionID,
  162. topicURL: topicURL,
  163. cancel: cancel,
  164. }
  165. go handleSubscribeConnLoop(ctx, c.Messages, topicURL, subscriptionID, options...)
  166. return subscriptionID
  167. }
  168. // Unsubscribe unsubscribes from a topic that has been previously subscribed to using the unique
  169. // subscriptionID returned in Subscribe.
  170. func (c *Client) Unsubscribe(subscriptionID string) {
  171. c.mu.Lock()
  172. defer c.mu.Unlock()
  173. sub, ok := c.subscriptions[subscriptionID]
  174. if !ok {
  175. return
  176. }
  177. delete(c.subscriptions, subscriptionID)
  178. sub.cancel()
  179. }
  180. // UnsubscribeAll unsubscribes from a topic that has been previously subscribed with Subscribe.
  181. // If there are multiple subscriptions matching the topic, all of them are unsubscribed from.
  182. //
  183. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  184. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  185. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  186. func (c *Client) UnsubscribeAll(topic string) {
  187. c.mu.Lock()
  188. defer c.mu.Unlock()
  189. topicURL := c.expandTopicURL(topic)
  190. for _, sub := range c.subscriptions {
  191. if sub.topicURL == topicURL {
  192. delete(c.subscriptions, sub.ID)
  193. sub.cancel()
  194. }
  195. }
  196. }
  197. func (c *Client) expandTopicURL(topic string) string {
  198. if strings.HasPrefix(topic, "http://") || strings.HasPrefix(topic, "https://") {
  199. return topic
  200. } else if strings.Contains(topic, "/") {
  201. return fmt.Sprintf("https://%s", topic)
  202. }
  203. return fmt.Sprintf("%s/%s", c.config.DefaultHost, topic)
  204. }
  205. func handleSubscribeConnLoop(ctx context.Context, msgChan chan *Message, topicURL, subcriptionID string, options ...SubscribeOption) {
  206. for {
  207. // TODO The retry logic is crude and may lose messages. It should record the last message like the
  208. // Android client, use since=, and do incremental backoff too
  209. if err := performSubscribeRequest(ctx, msgChan, topicURL, subcriptionID, options...); err != nil {
  210. log.Printf("Connection to %s failed: %s", topicURL, err.Error())
  211. }
  212. select {
  213. case <-ctx.Done():
  214. log.Printf("Connection to %s exited", topicURL)
  215. return
  216. case <-time.After(10 * time.Second): // TODO Add incremental backoff
  217. }
  218. }
  219. }
  220. func performSubscribeRequest(ctx context.Context, msgChan chan *Message, topicURL string, subscriptionID string, options ...SubscribeOption) error {
  221. req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/json", topicURL), nil)
  222. if err != nil {
  223. return err
  224. }
  225. for _, option := range options {
  226. if err := option(req); err != nil {
  227. return err
  228. }
  229. }
  230. resp, err := http.DefaultClient.Do(req)
  231. if err != nil {
  232. return err
  233. }
  234. defer resp.Body.Close()
  235. scanner := bufio.NewScanner(resp.Body)
  236. for scanner.Scan() {
  237. m, err := toMessage(scanner.Text(), topicURL, subscriptionID)
  238. if err != nil {
  239. return err
  240. }
  241. if m.Event == MessageEvent {
  242. msgChan <- m
  243. }
  244. }
  245. return nil
  246. }
  247. func toMessage(s, topicURL, subscriptionID string) (*Message, error) {
  248. var m *Message
  249. if err := json.NewDecoder(strings.NewReader(s)).Decode(&m); err != nil {
  250. return nil, err
  251. }
  252. m.TopicURL = topicURL
  253. m.SubscriptionID = subscriptionID
  254. m.Raw = s
  255. return m, nil
  256. }