1
0

topic.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package server
  2. import (
  3. "context"
  4. "errors"
  5. "log"
  6. "math/rand"
  7. "sync"
  8. "time"
  9. )
  10. type topic struct {
  11. id string
  12. subscribers map[int]subscriber
  13. messages int
  14. last time.Time
  15. ctx context.Context
  16. cancel context.CancelFunc
  17. mu sync.Mutex
  18. }
  19. type subscriber func(msg *message) error
  20. func newTopic(id string) *topic {
  21. ctx, cancel := context.WithCancel(context.Background())
  22. return &topic{
  23. id: id,
  24. subscribers: make(map[int]subscriber),
  25. last: time.Now(),
  26. ctx: ctx,
  27. cancel: cancel,
  28. }
  29. }
  30. func (t *topic) Subscribe(s subscriber) int {
  31. t.mu.Lock()
  32. defer t.mu.Unlock()
  33. subscriberID := rand.Int()
  34. t.subscribers[subscriberID] = s
  35. t.last = time.Now()
  36. return subscriberID
  37. }
  38. func (t *topic) Unsubscribe(id int) {
  39. t.mu.Lock()
  40. defer t.mu.Unlock()
  41. delete(t.subscribers, id)
  42. }
  43. func (t *topic) Publish(m *message) error {
  44. t.mu.Lock()
  45. defer t.mu.Unlock()
  46. if len(t.subscribers) == 0 {
  47. return errors.New("no subscribers")
  48. }
  49. t.last = time.Now()
  50. t.messages++
  51. for _, s := range t.subscribers {
  52. if err := s(m); err != nil {
  53. log.Printf("error publishing message to subscriber x")
  54. }
  55. }
  56. return nil
  57. }
  58. func (t *topic) Close() {
  59. t.cancel()
  60. }