| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- package server
- import (
- "sync"
- "time"
- )
- type memCache struct {
- messages map[string][]*message
- nop bool
- mu sync.Mutex
- }
- var _ cache = (*memCache)(nil)
- // newMemCache creates an in-memory cache
- func newMemCache() *memCache {
- return &memCache{
- messages: make(map[string][]*message),
- nop: false,
- }
- }
- // newNopCache creates an in-memory cache that discards all messages;
- // it is always empty and can be used if caching is entirely disabled
- func newNopCache() *memCache {
- return &memCache{
- messages: make(map[string][]*message),
- nop: true,
- }
- }
- func (s *memCache) AddMessage(m *message) error {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.nop {
- return nil
- }
- if m.Event != messageEvent {
- return errUnexpectedMessageType
- }
- if _, ok := s.messages[m.Topic]; !ok {
- s.messages[m.Topic] = make([]*message, 0)
- }
- s.messages[m.Topic] = append(s.messages[m.Topic], m)
- return nil
- }
- func (s *memCache) Messages(topic string, since sinceTime) ([]*message, error) {
- s.mu.Lock()
- defer s.mu.Unlock()
- if _, ok := s.messages[topic]; !ok || since.IsNone() {
- return make([]*message, 0), nil
- }
- messages := make([]*message, 0) // copy!
- for _, m := range s.messages[topic] {
- msgTime := time.Unix(m.Time, 0)
- if msgTime == since.Time() || msgTime.After(since.Time()) {
- messages = append(messages, m)
- }
- }
- return messages, nil
- }
- func (s *memCache) MessageCount(topic string) (int, error) {
- s.mu.Lock()
- defer s.mu.Unlock()
- if _, ok := s.messages[topic]; !ok {
- return 0, nil
- }
- return len(s.messages[topic]), nil
- }
- func (s *memCache) Topics() (map[string]*topic, error) {
- s.mu.Lock()
- defer s.mu.Unlock()
- topics := make(map[string]*topic)
- for topic := range s.messages {
- topics[topic] = newTopic(topic)
- }
- return topics, nil
- }
- func (s *memCache) Prune(olderThan time.Time) error {
- s.mu.Lock()
- defer s.mu.Unlock()
- for topic := range s.messages {
- s.pruneTopic(topic, olderThan)
- }
- return nil
- }
- func (s *memCache) pruneTopic(topic string, olderThan time.Time) {
- messages := make([]*message, 0)
- for _, m := range s.messages[topic] {
- if m.Time >= olderThan.Unix() {
- messages = append(messages, m)
- }
- }
- s.messages[topic] = messages
- }
|