topic.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package server
  2. import (
  3. "context"
  4. "log"
  5. "math/rand"
  6. "sync"
  7. "time"
  8. )
  9. // topic represents a channel to which subscribers can subscribe, and publishers
  10. // can publish a message
  11. type topic struct {
  12. id string
  13. subscribers map[int]subscriber
  14. messages []*message
  15. last time.Time
  16. ctx context.Context
  17. cancel context.CancelFunc
  18. mu sync.Mutex
  19. }
  20. // subscriber is a function that is called for every new message on a topic
  21. type subscriber func(msg *message) error
  22. // newTopic creates a new topic
  23. func newTopic(id string) *topic {
  24. ctx, cancel := context.WithCancel(context.Background())
  25. return &topic{
  26. id: id,
  27. subscribers: make(map[int]subscriber),
  28. messages: make([]*message, 0),
  29. last: time.Now(),
  30. ctx: ctx,
  31. cancel: cancel,
  32. }
  33. }
  34. func (t *topic) Subscribe(s subscriber) int {
  35. t.mu.Lock()
  36. defer t.mu.Unlock()
  37. subscriberID := rand.Int()
  38. t.subscribers[subscriberID] = s
  39. t.last = time.Now()
  40. return subscriberID
  41. }
  42. func (t *topic) Unsubscribe(id int) {
  43. t.mu.Lock()
  44. defer t.mu.Unlock()
  45. delete(t.subscribers, id)
  46. }
  47. func (t *topic) Publish(m *message) error {
  48. t.mu.Lock()
  49. defer t.mu.Unlock()
  50. t.last = time.Now()
  51. t.messages = append(t.messages, m)
  52. for _, s := range t.subscribers {
  53. if err := s(m); err != nil {
  54. log.Printf("error publishing message to subscriber")
  55. }
  56. }
  57. return nil
  58. }
  59. func (t *topic) Messages(since time.Time) []*message {
  60. t.mu.Lock()
  61. defer t.mu.Unlock()
  62. messages := make([]*message, 0) // copy!
  63. for _, m := range t.messages {
  64. msgTime := time.Unix(m.Time, 0)
  65. if msgTime == since || msgTime.After(since) {
  66. messages = append(messages, m)
  67. }
  68. }
  69. return messages
  70. }
  71. func (t *topic) Prune(keep time.Duration) {
  72. t.mu.Lock()
  73. defer t.mu.Unlock()
  74. for i, m := range t.messages {
  75. msgTime := time.Unix(m.Time, 0)
  76. if time.Since(msgTime) < keep {
  77. t.messages = t.messages[i:]
  78. return
  79. }
  80. }
  81. t.messages = make([]*message, 0)
  82. }
  83. func (t *topic) Stats() (subscribers int, messages int) {
  84. t.mu.Lock()
  85. defer t.mu.Unlock()
  86. return len(t.subscribers), len(t.messages)
  87. }
  88. func (t *topic) Close() {
  89. t.cancel()
  90. }