cache_mem.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package server
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. type memCache struct {
  7. messages map[string][]*message
  8. nop bool
  9. mu sync.Mutex
  10. }
  11. var _ cache = (*memCache)(nil)
  12. // newMemCache creates an in-memory cache
  13. func newMemCache() *memCache {
  14. return &memCache{
  15. messages: make(map[string][]*message),
  16. nop: false,
  17. }
  18. }
  19. // newNopCache creates an in-memory cache that discards all messages;
  20. // it is always empty and can be used if caching is entirely disabled
  21. func newNopCache() *memCache {
  22. return &memCache{
  23. messages: make(map[string][]*message),
  24. nop: true,
  25. }
  26. }
  27. func (s *memCache) AddMessage(m *message) error {
  28. s.mu.Lock()
  29. defer s.mu.Unlock()
  30. if s.nop {
  31. return nil
  32. }
  33. if m.Event != messageEvent {
  34. return errUnexpectedMessageType
  35. }
  36. if _, ok := s.messages[m.Topic]; !ok {
  37. s.messages[m.Topic] = make([]*message, 0)
  38. }
  39. s.messages[m.Topic] = append(s.messages[m.Topic], m)
  40. return nil
  41. }
  42. func (s *memCache) Messages(topic string, since sinceTime) ([]*message, error) {
  43. s.mu.Lock()
  44. defer s.mu.Unlock()
  45. if _, ok := s.messages[topic]; !ok || since.IsNone() {
  46. return make([]*message, 0), nil
  47. }
  48. messages := make([]*message, 0) // copy!
  49. for _, m := range s.messages[topic] {
  50. msgTime := time.Unix(m.Time, 0)
  51. if msgTime == since.Time() || msgTime.After(since.Time()) {
  52. messages = append(messages, m)
  53. }
  54. }
  55. return messages, nil
  56. }
  57. func (s *memCache) MessageCount(topic string) (int, error) {
  58. s.mu.Lock()
  59. defer s.mu.Unlock()
  60. if _, ok := s.messages[topic]; !ok {
  61. return 0, nil
  62. }
  63. return len(s.messages[topic]), nil
  64. }
  65. func (s *memCache) Topics() (map[string]*topic, error) {
  66. s.mu.Lock()
  67. defer s.mu.Unlock()
  68. topics := make(map[string]*topic)
  69. for topic := range s.messages {
  70. topics[topic] = newTopic(topic)
  71. }
  72. return topics, nil
  73. }
  74. func (s *memCache) Prune(olderThan time.Time) error {
  75. s.mu.Lock()
  76. defer s.mu.Unlock()
  77. for topic := range s.messages {
  78. s.pruneTopic(topic, olderThan)
  79. }
  80. return nil
  81. }
  82. func (s *memCache) pruneTopic(topic string, olderThan time.Time) {
  83. messages := make([]*message, 0)
  84. for _, m := range s.messages[topic] {
  85. if m.Time >= olderThan.Unix() {
  86. messages = append(messages, m)
  87. }
  88. }
  89. s.messages[topic] = messages
  90. }