topic.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package server
  2. import (
  3. "context"
  4. "errors"
  5. "log"
  6. "math/rand"
  7. "sync"
  8. "time"
  9. )
  10. // topic represents a channel to which subscribers can subscribe, and publishers
  11. // can publish a message
  12. type topic struct {
  13. id string
  14. subscribers map[int]subscriber
  15. messages int
  16. last time.Time
  17. ctx context.Context
  18. cancel context.CancelFunc
  19. mu sync.Mutex
  20. }
  21. // message represents a message published to a topic
  22. type message struct {
  23. Time int64 `json:"time"`
  24. Message string `json:"message"`
  25. }
  26. // subscriber is a function that is called for every new message on a topic
  27. type subscriber func(msg *message) error
  28. func newTopic(id string) *topic {
  29. ctx, cancel := context.WithCancel(context.Background())
  30. return &topic{
  31. id: id,
  32. subscribers: make(map[int]subscriber),
  33. last: time.Now(),
  34. ctx: ctx,
  35. cancel: cancel,
  36. }
  37. }
  38. func (t *topic) Subscribe(s subscriber) int {
  39. t.mu.Lock()
  40. defer t.mu.Unlock()
  41. subscriberID := rand.Int()
  42. t.subscribers[subscriberID] = s
  43. t.last = time.Now()
  44. return subscriberID
  45. }
  46. func (t *topic) Unsubscribe(id int) int {
  47. t.mu.Lock()
  48. defer t.mu.Unlock()
  49. delete(t.subscribers, id)
  50. return len(t.subscribers)
  51. }
  52. func (t *topic) Publish(m *message) error {
  53. t.mu.Lock()
  54. defer t.mu.Unlock()
  55. if len(t.subscribers) == 0 {
  56. return errors.New("no subscribers")
  57. }
  58. t.last = time.Now()
  59. t.messages++
  60. for _, s := range t.subscribers {
  61. if err := s(m); err != nil {
  62. log.Printf("error publishing message to subscriber")
  63. }
  64. }
  65. return nil
  66. }
  67. func (t *topic) Stats() (subscribers int, messages int) {
  68. t.mu.Lock()
  69. defer t.mu.Unlock()
  70. return len(t.subscribers), t.messages
  71. }
  72. func (t *topic) Close() {
  73. t.cancel()
  74. }