client.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package client
  2. import (
  3. "bufio"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "log"
  8. "net/http"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. const (
  14. DefaultBaseURL = "https://ntfy.sh"
  15. )
  16. const (
  17. MessageEvent = "message"
  18. KeepaliveEvent = "keepalive"
  19. OpenEvent = "open"
  20. )
  21. type Client struct {
  22. BaseURL string
  23. Messages chan *Message
  24. subscriptions map[string]*subscription
  25. mu sync.Mutex
  26. }
  27. type Message struct {
  28. ID string
  29. Event string
  30. Time int64
  31. Topic string
  32. BaseURL string
  33. TopicURL string
  34. Message string
  35. Title string
  36. Priority int
  37. Tags []string
  38. Raw string
  39. }
  40. type subscription struct {
  41. cancel context.CancelFunc
  42. }
  43. var DefaultClient = New()
  44. func New() *Client {
  45. return &Client{
  46. Messages: make(chan *Message),
  47. subscriptions: make(map[string]*subscription),
  48. }
  49. }
  50. func (c *Client) Publish(topicURL, message string, options ...PublishOption) error {
  51. req, _ := http.NewRequest("POST", topicURL, strings.NewReader(message))
  52. for _, option := range options {
  53. if err := option(req); err != nil {
  54. return err
  55. }
  56. }
  57. resp, err := http.DefaultClient.Do(req)
  58. if err != nil {
  59. return err
  60. }
  61. if resp.StatusCode != http.StatusOK {
  62. return fmt.Errorf("unexpected response %d from server", resp.StatusCode)
  63. }
  64. return err
  65. }
  66. func (c *Client) Poll(topicURL string, options ...SubscribeOption) ([]*Message, error) {
  67. ctx := context.Background()
  68. messages := make([]*Message, 0)
  69. msgChan := make(chan *Message)
  70. errChan := make(chan error)
  71. go func() {
  72. err := performSubscribeRequest(ctx, msgChan, topicURL, options...)
  73. close(msgChan)
  74. errChan <- err
  75. }()
  76. for m := range msgChan {
  77. messages = append(messages, m)
  78. }
  79. return messages, <-errChan
  80. }
  81. func (c *Client) Subscribe(topicURL string, options ...SubscribeOption) {
  82. c.mu.Lock()
  83. defer c.mu.Unlock()
  84. if _, ok := c.subscriptions[topicURL]; ok {
  85. return
  86. }
  87. ctx, cancel := context.WithCancel(context.Background())
  88. c.subscriptions[topicURL] = &subscription{cancel}
  89. go handleSubscribeConnLoop(ctx, c.Messages, topicURL, options...)
  90. }
  91. func (c *Client) Unsubscribe(topicURL string) {
  92. c.mu.Lock()
  93. defer c.mu.Unlock()
  94. sub, ok := c.subscriptions[topicURL]
  95. if !ok {
  96. return
  97. }
  98. sub.cancel()
  99. return
  100. }
  101. func handleSubscribeConnLoop(ctx context.Context, msgChan chan *Message, topicURL string, options ...SubscribeOption) {
  102. for {
  103. if err := performSubscribeRequest(ctx, msgChan, topicURL, options...); err != nil {
  104. log.Printf("Connection to %s failed: %s", topicURL, err.Error())
  105. }
  106. select {
  107. case <-ctx.Done():
  108. log.Printf("Connection to %s exited", topicURL)
  109. return
  110. case <-time.After(5 * time.Second):
  111. }
  112. }
  113. }
  114. func performSubscribeRequest(ctx context.Context, msgChan chan *Message, topicURL string, options ...SubscribeOption) error {
  115. req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/json", topicURL), nil)
  116. if err != nil {
  117. return err
  118. }
  119. for _, option := range options {
  120. if err := option(req); err != nil {
  121. return err
  122. }
  123. }
  124. resp, err := http.DefaultClient.Do(req)
  125. if err != nil {
  126. return err
  127. }
  128. defer resp.Body.Close()
  129. scanner := bufio.NewScanner(resp.Body)
  130. for scanner.Scan() {
  131. var m *Message
  132. line := scanner.Text()
  133. if err := json.NewDecoder(strings.NewReader(line)).Decode(&m); err != nil {
  134. return err
  135. }
  136. m.BaseURL = strings.TrimSuffix(topicURL, "/"+m.Topic) // FIXME hack!
  137. m.TopicURL = topicURL
  138. m.Raw = line
  139. msgChan <- m
  140. }
  141. return nil
  142. }