topic.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package server
  2. import (
  3. "context"
  4. "log"
  5. "math/rand"
  6. "sync"
  7. "time"
  8. )
  9. // topic represents a channel to which subscribers can subscribe, and publishers
  10. // can publish a message
  11. type topic struct {
  12. id string
  13. subscribers map[int]subscriber
  14. messages []*message
  15. last time.Time
  16. ctx context.Context
  17. cancel context.CancelFunc
  18. mu sync.Mutex
  19. }
  20. // subscriber is a function that is called for every new message on a topic
  21. type subscriber func(msg *message) error
  22. // newTopic creates a new topic
  23. func newTopic(id string) *topic {
  24. ctx, cancel := context.WithCancel(context.Background())
  25. return &topic{
  26. id: id,
  27. subscribers: make(map[int]subscriber),
  28. last: time.Now(),
  29. ctx: ctx,
  30. cancel: cancel,
  31. }
  32. }
  33. func (t *topic) Subscribe(s subscriber) int {
  34. t.mu.Lock()
  35. defer t.mu.Unlock()
  36. subscriberID := rand.Int()
  37. t.subscribers[subscriberID] = s
  38. t.last = time.Now()
  39. return subscriberID
  40. }
  41. func (t *topic) Unsubscribe(id int) {
  42. t.mu.Lock()
  43. defer t.mu.Unlock()
  44. delete(t.subscribers, id)
  45. }
  46. func (t *topic) Publish(m *message) error {
  47. t.mu.Lock()
  48. defer t.mu.Unlock()
  49. t.last = time.Now()
  50. t.messages = append(t.messages, m)
  51. for _, s := range t.subscribers {
  52. if err := s(m); err != nil {
  53. log.Printf("error publishing message to subscriber")
  54. }
  55. }
  56. return nil
  57. }
  58. func (t *topic) Messages(since time.Time) []*message {
  59. t.mu.Lock()
  60. defer t.mu.Unlock()
  61. messages := make([]*message, 0) // copy!
  62. for _, m := range t.messages {
  63. msgTime := time.Unix(m.Time, 0)
  64. if msgTime == since || msgTime.After(since) {
  65. messages = append(messages, m)
  66. }
  67. }
  68. return messages
  69. }
  70. func (t *topic) Prune(keep time.Duration) {
  71. t.mu.Lock()
  72. defer t.mu.Unlock()
  73. for i, m := range t.messages {
  74. msgTime := time.Unix(m.Time, 0)
  75. if time.Since(msgTime) < keep {
  76. t.messages = t.messages[i:]
  77. return
  78. }
  79. }
  80. t.messages = make([]*message, 0)
  81. }
  82. func (t *topic) Stats() (subscribers int, messages int) {
  83. t.mu.Lock()
  84. defer t.mu.Unlock()
  85. return len(t.subscribers), len(t.messages)
  86. }
  87. func (t *topic) Close() {
  88. t.cancel()
  89. }