cache_mem.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package server
  2. import (
  3. "sort"
  4. "sync"
  5. "time"
  6. )
  7. type memCache struct {
  8. messages map[string][]*message
  9. scheduled map[string]*message // Message ID -> message
  10. nop bool
  11. mu sync.Mutex
  12. }
  13. var _ cache = (*memCache)(nil)
  14. // newMemCache creates an in-memory cache
  15. func newMemCache() *memCache {
  16. return &memCache{
  17. messages: make(map[string][]*message),
  18. scheduled: make(map[string]*message),
  19. nop: false,
  20. }
  21. }
  22. // newNopCache creates an in-memory cache that discards all messages;
  23. // it is always empty and can be used if caching is entirely disabled
  24. func newNopCache() *memCache {
  25. return &memCache{
  26. messages: make(map[string][]*message),
  27. scheduled: make(map[string]*message),
  28. nop: true,
  29. }
  30. }
  31. func (c *memCache) AddMessage(m *message) error {
  32. c.mu.Lock()
  33. defer c.mu.Unlock()
  34. if c.nop {
  35. return nil
  36. }
  37. if m.Event != messageEvent {
  38. return errUnexpectedMessageType
  39. }
  40. if _, ok := c.messages[m.Topic]; !ok {
  41. c.messages[m.Topic] = make([]*message, 0)
  42. }
  43. delayed := m.Time > time.Now().Unix()
  44. if delayed {
  45. c.scheduled[m.ID] = m
  46. }
  47. c.messages[m.Topic] = append(c.messages[m.Topic], m)
  48. return nil
  49. }
  50. func (c *memCache) Messages(topic string, since sinceTime, scheduled bool) ([]*message, error) {
  51. c.mu.Lock()
  52. defer c.mu.Unlock()
  53. if _, ok := c.messages[topic]; !ok || since.IsNone() {
  54. return make([]*message, 0), nil
  55. }
  56. messages := make([]*message, 0)
  57. for _, m := range c.messages[topic] {
  58. _, messageScheduled := c.scheduled[m.ID]
  59. include := m.Time >= since.Time().Unix() && (!messageScheduled || scheduled)
  60. if include {
  61. messages = append(messages, m)
  62. }
  63. }
  64. sort.Slice(messages, func(i, j int) bool {
  65. return messages[i].Time < messages[j].Time
  66. })
  67. return messages, nil
  68. }
  69. func (c *memCache) MessagesDue() ([]*message, error) {
  70. c.mu.Lock()
  71. defer c.mu.Unlock()
  72. messages := make([]*message, 0)
  73. for _, m := range c.scheduled {
  74. due := time.Now().Unix() >= m.Time
  75. if due {
  76. messages = append(messages, m)
  77. }
  78. }
  79. sort.Slice(messages, func(i, j int) bool {
  80. return messages[i].Time < messages[j].Time
  81. })
  82. return messages, nil
  83. }
  84. func (c *memCache) MarkPublished(m *message) error {
  85. c.mu.Lock()
  86. delete(c.scheduled, m.ID)
  87. c.mu.Unlock()
  88. return nil
  89. }
  90. func (c *memCache) MessageCount(topic string) (int, error) {
  91. c.mu.Lock()
  92. defer c.mu.Unlock()
  93. if _, ok := c.messages[topic]; !ok {
  94. return 0, nil
  95. }
  96. return len(c.messages[topic]), nil
  97. }
  98. func (c *memCache) Topics() (map[string]*topic, error) {
  99. c.mu.Lock()
  100. defer c.mu.Unlock()
  101. topics := make(map[string]*topic)
  102. for topic := range c.messages {
  103. topics[topic] = newTopic(topic)
  104. }
  105. return topics, nil
  106. }
  107. func (c *memCache) Prune(olderThan time.Time) error {
  108. c.mu.Lock()
  109. defer c.mu.Unlock()
  110. for topic := range c.messages {
  111. c.pruneTopic(topic, olderThan)
  112. }
  113. return nil
  114. }
  115. func (c *memCache) AttachmentsSize(owner string) (int64, error) {
  116. c.mu.Lock()
  117. defer c.mu.Unlock()
  118. var size int64
  119. for topic := range c.messages {
  120. for _, m := range c.messages[topic] {
  121. counted := m.Attachment != nil && m.Attachment.Owner == owner && m.Attachment.Expires > time.Now().Unix()
  122. if counted {
  123. size += m.Attachment.Size
  124. }
  125. }
  126. }
  127. return size, nil
  128. }
  129. func (c *memCache) AttachmentsExpired() ([]string, error) {
  130. c.mu.Lock()
  131. defer c.mu.Unlock()
  132. ids := make([]string, 0)
  133. for topic := range c.messages {
  134. for _, m := range c.messages[topic] {
  135. if m.Attachment != nil && m.Attachment.Expires > 0 && m.Attachment.Expires < time.Now().Unix() {
  136. ids = append(ids, m.ID)
  137. }
  138. }
  139. }
  140. return ids, nil
  141. }
  142. func (c *memCache) pruneTopic(topic string, olderThan time.Time) {
  143. messages := make([]*message, 0)
  144. for _, m := range c.messages[topic] {
  145. if m.Time >= olderThan.Unix() {
  146. messages = append(messages, m)
  147. }
  148. }
  149. c.messages[topic] = messages
  150. }