| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- package client
- import (
- "bufio"
- "context"
- "encoding/json"
- "fmt"
- "log"
- "net/http"
- "strings"
- "sync"
- "time"
- )
- const (
- MessageEvent = "message"
- KeepaliveEvent = "keepalive"
- OpenEvent = "open"
- )
- type Client struct {
- Messages chan *Message
- config *Config
- subscriptions map[string]*subscription
- mu sync.Mutex
- }
- type Message struct {
- ID string
- Event string
- Time int64
- Topic string
- TopicURL string
- Message string
- Title string
- Priority int
- Tags []string
- Raw string
- }
- type subscription struct {
- cancel context.CancelFunc
- }
- func New(config *Config) *Client {
- return &Client{
- Messages: make(chan *Message),
- config: config,
- subscriptions: make(map[string]*subscription),
- }
- }
- func (c *Client) Publish(topicURL, message string, options ...PublishOption) error {
- req, _ := http.NewRequest("POST", topicURL, strings.NewReader(message))
- for _, option := range options {
- if err := option(req); err != nil {
- return err
- }
- }
- resp, err := http.DefaultClient.Do(req)
- if err != nil {
- return err
- }
- if resp.StatusCode != http.StatusOK {
- return fmt.Errorf("unexpected response %d from server", resp.StatusCode)
- }
- return err
- }
- func (c *Client) Poll(topic string, options ...SubscribeOption) ([]*Message, error) {
- ctx := context.Background()
- messages := make([]*Message, 0)
- msgChan := make(chan *Message)
- errChan := make(chan error)
- topicURL := c.expandTopicURL(topic)
- go func() {
- err := performSubscribeRequest(ctx, msgChan, topicURL, options...)
- close(msgChan)
- errChan <- err
- }()
- for m := range msgChan {
- messages = append(messages, m)
- }
- return messages, <-errChan
- }
- func (c *Client) Subscribe(topic string, options ...SubscribeOption) string {
- c.mu.Lock()
- defer c.mu.Unlock()
- topicURL := c.expandTopicURL(topic)
- if _, ok := c.subscriptions[topicURL]; ok {
- return topicURL
- }
- ctx, cancel := context.WithCancel(context.Background())
- c.subscriptions[topicURL] = &subscription{cancel}
- go handleSubscribeConnLoop(ctx, c.Messages, topicURL, options...)
- return topicURL
- }
- func (c *Client) Unsubscribe(topic string) {
- c.mu.Lock()
- defer c.mu.Unlock()
- topicURL := c.expandTopicURL(topic)
- sub, ok := c.subscriptions[topicURL]
- if !ok {
- return
- }
- sub.cancel()
- return
- }
- func (c *Client) expandTopicURL(topic string) string {
- if strings.HasPrefix(topic, "http://") || strings.HasPrefix(topic, "https://") {
- return topic
- } else if strings.Contains(topic, "/") {
- return fmt.Sprintf("https://%s", topic)
- }
- return fmt.Sprintf("%s/%s", c.config.DefaultHost, topic)
- }
- func handleSubscribeConnLoop(ctx context.Context, msgChan chan *Message, topicURL string, options ...SubscribeOption) {
- for {
- if err := performSubscribeRequest(ctx, msgChan, topicURL, options...); err != nil {
- log.Printf("Connection to %s failed: %s", topicURL, err.Error())
- }
- select {
- case <-ctx.Done():
- log.Printf("Connection to %s exited", topicURL)
- return
- case <-time.After(5 * time.Second):
- }
- }
- }
- func performSubscribeRequest(ctx context.Context, msgChan chan *Message, topicURL string, options ...SubscribeOption) error {
- req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/json", topicURL), nil)
- if err != nil {
- return err
- }
- for _, option := range options {
- if err := option(req); err != nil {
- return err
- }
- }
- resp, err := http.DefaultClient.Do(req)
- if err != nil {
- return err
- }
- defer resp.Body.Close()
- scanner := bufio.NewScanner(resp.Body)
- for scanner.Scan() {
- var m *Message
- line := scanner.Text()
- if err := json.NewDecoder(strings.NewReader(line)).Decode(&m); err != nil {
- return err
- }
- m.TopicURL = topicURL
- m.Raw = line
- msgChan <- m
- }
- return nil
- }
|