1
0

client.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package client
  2. import (
  3. "bufio"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "log"
  8. "net/http"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. const (
  14. MessageEvent = "message"
  15. KeepaliveEvent = "keepalive"
  16. OpenEvent = "open"
  17. )
  18. type Client struct {
  19. Messages chan *Message
  20. config *Config
  21. subscriptions map[string]*subscription
  22. mu sync.Mutex
  23. }
  24. type Message struct {
  25. ID string
  26. Event string
  27. Time int64
  28. Topic string
  29. TopicURL string
  30. Message string
  31. Title string
  32. Priority int
  33. Tags []string
  34. Raw string
  35. }
  36. type subscription struct {
  37. cancel context.CancelFunc
  38. }
  39. func New(config *Config) *Client {
  40. return &Client{
  41. Messages: make(chan *Message),
  42. config: config,
  43. subscriptions: make(map[string]*subscription),
  44. }
  45. }
  46. func (c *Client) Publish(topicURL, message string, options ...PublishOption) error {
  47. req, _ := http.NewRequest("POST", topicURL, strings.NewReader(message))
  48. for _, option := range options {
  49. if err := option(req); err != nil {
  50. return err
  51. }
  52. }
  53. resp, err := http.DefaultClient.Do(req)
  54. if err != nil {
  55. return err
  56. }
  57. if resp.StatusCode != http.StatusOK {
  58. return fmt.Errorf("unexpected response %d from server", resp.StatusCode)
  59. }
  60. return err
  61. }
  62. func (c *Client) Poll(topic string, options ...SubscribeOption) ([]*Message, error) {
  63. ctx := context.Background()
  64. messages := make([]*Message, 0)
  65. msgChan := make(chan *Message)
  66. errChan := make(chan error)
  67. topicURL := c.expandTopicURL(topic)
  68. go func() {
  69. err := performSubscribeRequest(ctx, msgChan, topicURL, options...)
  70. close(msgChan)
  71. errChan <- err
  72. }()
  73. for m := range msgChan {
  74. messages = append(messages, m)
  75. }
  76. return messages, <-errChan
  77. }
  78. func (c *Client) Subscribe(topic string, options ...SubscribeOption) string {
  79. c.mu.Lock()
  80. defer c.mu.Unlock()
  81. topicURL := c.expandTopicURL(topic)
  82. if _, ok := c.subscriptions[topicURL]; ok {
  83. return topicURL
  84. }
  85. ctx, cancel := context.WithCancel(context.Background())
  86. c.subscriptions[topicURL] = &subscription{cancel}
  87. go handleSubscribeConnLoop(ctx, c.Messages, topicURL, options...)
  88. return topicURL
  89. }
  90. func (c *Client) Unsubscribe(topic string) {
  91. c.mu.Lock()
  92. defer c.mu.Unlock()
  93. topicURL := c.expandTopicURL(topic)
  94. sub, ok := c.subscriptions[topicURL]
  95. if !ok {
  96. return
  97. }
  98. sub.cancel()
  99. return
  100. }
  101. func (c *Client) expandTopicURL(topic string) string {
  102. if strings.HasPrefix(topic, "http://") || strings.HasPrefix(topic, "https://") {
  103. return topic
  104. } else if strings.Contains(topic, "/") {
  105. return fmt.Sprintf("https://%s", topic)
  106. }
  107. return fmt.Sprintf("%s/%s", c.config.DefaultHost, topic)
  108. }
  109. func handleSubscribeConnLoop(ctx context.Context, msgChan chan *Message, topicURL string, options ...SubscribeOption) {
  110. for {
  111. if err := performSubscribeRequest(ctx, msgChan, topicURL, options...); err != nil {
  112. log.Printf("Connection to %s failed: %s", topicURL, err.Error())
  113. }
  114. select {
  115. case <-ctx.Done():
  116. log.Printf("Connection to %s exited", topicURL)
  117. return
  118. case <-time.After(5 * time.Second):
  119. }
  120. }
  121. }
  122. func performSubscribeRequest(ctx context.Context, msgChan chan *Message, topicURL string, options ...SubscribeOption) error {
  123. req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/json", topicURL), nil)
  124. if err != nil {
  125. return err
  126. }
  127. for _, option := range options {
  128. if err := option(req); err != nil {
  129. return err
  130. }
  131. }
  132. resp, err := http.DefaultClient.Do(req)
  133. if err != nil {
  134. return err
  135. }
  136. defer resp.Body.Close()
  137. scanner := bufio.NewScanner(resp.Body)
  138. for scanner.Scan() {
  139. var m *Message
  140. line := scanner.Text()
  141. if err := json.NewDecoder(strings.NewReader(line)).Decode(&m); err != nil {
  142. return err
  143. }
  144. m.TopicURL = topicURL
  145. m.Raw = line
  146. msgChan <- m
  147. }
  148. return nil
  149. }