cache_sqlite.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package server
  2. import (
  3. "database/sql"
  4. "errors"
  5. _ "github.com/mattn/go-sqlite3" // SQLite driver
  6. "time"
  7. )
  8. const (
  9. createTableQuery = `
  10. BEGIN;
  11. CREATE TABLE IF NOT EXISTS messages (
  12. id VARCHAR(20) PRIMARY KEY,
  13. time INT NOT NULL,
  14. topic VARCHAR(64) NOT NULL,
  15. message VARCHAR(1024) NOT NULL
  16. );
  17. CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
  18. COMMIT;
  19. `
  20. insertMessageQuery = `INSERT INTO messages (id, time, topic, message) VALUES (?, ?, ?, ?)`
  21. pruneMessagesQuery = `DELETE FROM messages WHERE time < ?`
  22. selectMessagesSinceTimeQuery = `
  23. SELECT id, time, message
  24. FROM messages
  25. WHERE topic = ? AND time >= ?
  26. ORDER BY time ASC
  27. `
  28. selectMessageCountQuery = `SELECT count(*) FROM messages WHERE topic = ?`
  29. selectTopicsQuery = `SELECT topic, MAX(time) FROM messages GROUP BY TOPIC`
  30. )
  31. type sqliteCache struct {
  32. db *sql.DB
  33. }
  34. var _ cache = (*sqliteCache)(nil)
  35. func newSqliteCache(filename string) (*sqliteCache, error) {
  36. db, err := sql.Open("sqlite3", filename)
  37. if err != nil {
  38. return nil, err
  39. }
  40. if _, err := db.Exec(createTableQuery); err != nil {
  41. return nil, err
  42. }
  43. return &sqliteCache{
  44. db: db,
  45. }, nil
  46. }
  47. func (c *sqliteCache) AddMessage(m *message) error {
  48. _, err := c.db.Exec(insertMessageQuery, m.ID, m.Time, m.Topic, m.Message)
  49. return err
  50. }
  51. func (c *sqliteCache) Messages(topic string, since sinceTime) ([]*message, error) {
  52. rows, err := c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix())
  53. if err != nil {
  54. return nil, err
  55. }
  56. defer rows.Close()
  57. messages := make([]*message, 0)
  58. for rows.Next() {
  59. var timestamp int64
  60. var id, msg string
  61. if err := rows.Scan(&id, &timestamp, &msg); err != nil {
  62. return nil, err
  63. }
  64. if msg == "" {
  65. msg = " " // Hack: never return empty messages; this should not happen
  66. }
  67. messages = append(messages, &message{
  68. ID: id,
  69. Time: timestamp,
  70. Event: messageEvent,
  71. Topic: topic,
  72. Message: msg,
  73. })
  74. }
  75. if err := rows.Err(); err != nil {
  76. return nil, err
  77. }
  78. return messages, nil
  79. }
  80. func (c *sqliteCache) MessageCount(topic string) (int, error) {
  81. rows, err := c.db.Query(selectMessageCountQuery, topic)
  82. if err != nil {
  83. return 0, err
  84. }
  85. defer rows.Close()
  86. var count int
  87. if !rows.Next() {
  88. return 0, errors.New("no rows found")
  89. }
  90. if err := rows.Scan(&count); err != nil {
  91. return 0, err
  92. } else if err := rows.Err(); err != nil {
  93. return 0, err
  94. }
  95. return count, nil
  96. }
  97. func (s *sqliteCache) Topics() (map[string]*topic, error) {
  98. rows, err := s.db.Query(selectTopicsQuery)
  99. if err != nil {
  100. return nil, err
  101. }
  102. defer rows.Close()
  103. topics := make(map[string]*topic, 0)
  104. for rows.Next() {
  105. var id string
  106. var last int64
  107. if err := rows.Scan(&id, &last); err != nil {
  108. return nil, err
  109. }
  110. topics[id] = newTopic(id, time.Unix(last, 0))
  111. }
  112. if err := rows.Err(); err != nil {
  113. return nil, err
  114. }
  115. return topics, nil
  116. }
  117. func (c *sqliteCache) Prune(keep time.Duration) error {
  118. _, err := c.db.Exec(pruneMessagesQuery, time.Now().Add(-1*keep).Unix())
  119. return err
  120. }