server.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package server
  2. import (
  3. "bytes"
  4. _ "embed" // required for go:embed
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "log"
  10. "net/http"
  11. "regexp"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. type Server struct {
  17. topics map[string]*topic
  18. mu sync.Mutex
  19. }
  20. type message struct {
  21. Time int64 `json:"time"`
  22. Message string `json:"message"`
  23. }
  24. const (
  25. messageLimit = 1024
  26. )
  27. var (
  28. topicRegex = regexp.MustCompile(`^/[^/]+$`)
  29. jsonRegex = regexp.MustCompile(`^/[^/]+/json$`)
  30. sseRegex = regexp.MustCompile(`^/[^/]+/sse$`)
  31. //go:embed "index.html"
  32. indexSource string
  33. )
  34. func New() *Server {
  35. return &Server{
  36. topics: make(map[string]*topic),
  37. }
  38. }
  39. func (s *Server) Run() error {
  40. go s.runMonitor()
  41. return s.listenAndServe()
  42. }
  43. func (s *Server) listenAndServe() error {
  44. log.Printf("Listening on :9997")
  45. http.HandleFunc("/", s.handle)
  46. return http.ListenAndServe(":9997", nil)
  47. }
  48. func (s *Server) runMonitor() {
  49. for {
  50. time.Sleep(5 * time.Second)
  51. s.mu.Lock()
  52. log.Printf("topics: %d", len(s.topics))
  53. for _, t := range s.topics {
  54. t.mu.Lock()
  55. log.Printf("- %s: %d subscriber(s), %d message(s) sent, last active = %s",
  56. t.id, len(t.subscribers), t.messages, t.last.String())
  57. t.mu.Unlock()
  58. }
  59. // TODO kill dead topics
  60. s.mu.Unlock()
  61. }
  62. }
  63. func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
  64. if err := s.handleInternal(w, r); err != nil {
  65. w.WriteHeader(http.StatusInternalServerError)
  66. _, _ = io.WriteString(w, err.Error())
  67. }
  68. }
  69. func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request) error {
  70. if r.Method == http.MethodGet && r.URL.Path == "/" {
  71. return s.handleHome(w, r)
  72. } else if r.Method == http.MethodGet && jsonRegex.MatchString(r.URL.Path) {
  73. return s.handleSubscribeJSON(w, r)
  74. } else if r.Method == http.MethodGet && sseRegex.MatchString(r.URL.Path) {
  75. return s.handleSubscribeSSE(w, r)
  76. } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && topicRegex.MatchString(r.URL.Path) {
  77. return s.handlePublishHTTP(w, r)
  78. }
  79. http.NotFound(w, r)
  80. return nil
  81. }
  82. func (s *Server) handleHome(w http.ResponseWriter, r *http.Request) error {
  83. _, err := io.WriteString(w, indexSource)
  84. return err
  85. }
  86. func (s *Server) handlePublishHTTP(w http.ResponseWriter, r *http.Request) error {
  87. t, err := s.topic(r.URL.Path[1:])
  88. if err != nil {
  89. return err
  90. }
  91. reader := io.LimitReader(r.Body, messageLimit)
  92. b, err := io.ReadAll(reader)
  93. if err != nil {
  94. return err
  95. }
  96. msg := &message{
  97. Time: time.Now().UnixMilli(),
  98. Message: string(b),
  99. }
  100. return t.Publish(msg)
  101. }
  102. func (s *Server) handleSubscribeJSON(w http.ResponseWriter, r *http.Request) error {
  103. t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/json")) // Hack
  104. subscriberID := t.Subscribe(func(msg *message) error {
  105. if err := json.NewEncoder(w).Encode(&msg); err != nil {
  106. return err
  107. }
  108. if fl, ok := w.(http.Flusher); ok {
  109. fl.Flush()
  110. }
  111. return nil
  112. })
  113. defer t.Unsubscribe(subscriberID)
  114. select {
  115. case <-t.ctx.Done():
  116. case <-r.Context().Done():
  117. }
  118. return nil
  119. }
  120. func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request) error {
  121. t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/sse")) // Hack
  122. subscriberID := t.Subscribe(func(msg *message) error {
  123. var buf bytes.Buffer
  124. if err := json.NewEncoder(&buf).Encode(&msg); err != nil {
  125. return err
  126. }
  127. m := fmt.Sprintf("data: %s\n", buf.String())
  128. if _, err := io.WriteString(w, m); err != nil {
  129. return err
  130. }
  131. if fl, ok := w.(http.Flusher); ok {
  132. fl.Flush()
  133. }
  134. return nil
  135. })
  136. defer t.Unsubscribe(subscriberID)
  137. w.Header().Set("Content-Type", "text/event-stream")
  138. w.WriteHeader(http.StatusOK)
  139. if _, err := io.WriteString(w, "event: open\n\n"); err != nil {
  140. return err
  141. }
  142. if fl, ok := w.(http.Flusher); ok {
  143. fl.Flush()
  144. }
  145. select {
  146. case <-t.ctx.Done():
  147. case <-r.Context().Done():
  148. }
  149. return nil
  150. }
  151. func (s *Server) createTopic(id string) *topic {
  152. s.mu.Lock()
  153. defer s.mu.Unlock()
  154. if _, ok := s.topics[id]; !ok {
  155. s.topics[id] = newTopic(id)
  156. }
  157. return s.topics[id]
  158. }
  159. func (s *Server) topic(topicID string) (*topic, error) {
  160. s.mu.Lock()
  161. defer s.mu.Unlock()
  162. c, ok := s.topics[topicID]
  163. if !ok {
  164. return nil, errors.New("topic does not exist")
  165. }
  166. return c, nil
  167. }