topic.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package server
  2. import (
  3. "log"
  4. "math/rand"
  5. "sync"
  6. "time"
  7. )
  8. // topic represents a channel to which subscribers can subscribe, and publishers
  9. // can publish a message
  10. type topic struct {
  11. id string
  12. last time.Time
  13. subscribers map[int]subscriber
  14. mu sync.Mutex
  15. }
  16. // subscriber is a function that is called for every new message on a topic
  17. type subscriber func(msg *message) error
  18. // newTopic creates a new topic
  19. func newTopic(id string, last time.Time) *topic {
  20. return &topic{
  21. id: id,
  22. last: last,
  23. subscribers: make(map[int]subscriber),
  24. }
  25. }
  26. // Subscribe subscribes to this topic
  27. func (t *topic) Subscribe(s subscriber) int {
  28. t.mu.Lock()
  29. defer t.mu.Unlock()
  30. subscriberID := rand.Int()
  31. t.subscribers[subscriberID] = s
  32. t.last = time.Now()
  33. return subscriberID
  34. }
  35. // Unsubscribe removes the subscription from the list of subscribers
  36. func (t *topic) Unsubscribe(id int) {
  37. t.mu.Lock()
  38. defer t.mu.Unlock()
  39. delete(t.subscribers, id)
  40. }
  41. // Publish asynchronously publishes to all subscribers
  42. func (t *topic) Publish(m *message) error {
  43. go func() {
  44. t.mu.Lock()
  45. defer t.mu.Unlock()
  46. t.last = time.Now()
  47. for _, s := range t.subscribers {
  48. if err := s(m); err != nil {
  49. log.Printf("error publishing message to subscriber")
  50. }
  51. }
  52. }()
  53. return nil
  54. }
  55. // Subscribers returns the number of subscribers to this topic
  56. func (t *topic) Subscribers() int {
  57. t.mu.Lock()
  58. defer t.mu.Unlock()
  59. return len(t.subscribers)
  60. }