cache_sqlite.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package server
  2. import (
  3. "database/sql"
  4. "errors"
  5. "fmt"
  6. _ "github.com/mattn/go-sqlite3" // SQLite driver
  7. "log"
  8. "strings"
  9. "time"
  10. )
  11. // Messages cache
  12. const (
  13. createMessagesTableQuery = `
  14. BEGIN;
  15. CREATE TABLE IF NOT EXISTS messages (
  16. id VARCHAR(20) PRIMARY KEY,
  17. time INT NOT NULL,
  18. topic VARCHAR(64) NOT NULL,
  19. message VARCHAR(512) NOT NULL,
  20. title VARCHAR(256) NOT NULL,
  21. priority INT NOT NULL,
  22. tags VARCHAR(256) NOT NULL
  23. );
  24. CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
  25. COMMIT;
  26. `
  27. insertMessageQuery = `INSERT INTO messages (id, time, topic, message, title, priority, tags) VALUES (?, ?, ?, ?, ?, ?, ?)`
  28. pruneMessagesQuery = `DELETE FROM messages WHERE time < ?`
  29. selectMessagesSinceTimeQuery = `
  30. SELECT id, time, message, title, priority, tags
  31. FROM messages
  32. WHERE topic = ? AND time >= ?
  33. ORDER BY time ASC
  34. `
  35. selectMessagesCountQuery = `SELECT COUNT(*) FROM messages`
  36. selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?`
  37. selectTopicsQuery = `SELECT topic, MAX(time) FROM messages GROUP BY topic`
  38. )
  39. // Schema management queries
  40. const (
  41. currentSchemaVersion = 1
  42. createSchemaVersionTableQuery = `
  43. CREATE TABLE IF NOT EXISTS schemaVersion (
  44. id INT PRIMARY KEY,
  45. version INT NOT NULL
  46. );
  47. `
  48. insertSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)`
  49. selectSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1`
  50. // 0 -> 1
  51. migrate0To1AlterMessagesTableQuery = `
  52. BEGIN;
  53. ALTER TABLE messages ADD COLUMN title VARCHAR(256) NOT NULL DEFAULT('');
  54. ALTER TABLE messages ADD COLUMN priority INT NOT NULL DEFAULT(0);
  55. ALTER TABLE messages ADD COLUMN tags VARCHAR(256) NOT NULL DEFAULT('');
  56. COMMIT;
  57. `
  58. )
  59. type sqliteCache struct {
  60. db *sql.DB
  61. }
  62. var _ cache = (*sqliteCache)(nil)
  63. func newSqliteCache(filename string) (*sqliteCache, error) {
  64. db, err := sql.Open("sqlite3", filename)
  65. if err != nil {
  66. return nil, err
  67. }
  68. if err := setupDB(db); err != nil {
  69. return nil, err
  70. }
  71. return &sqliteCache{
  72. db: db,
  73. }, nil
  74. }
  75. func (c *sqliteCache) AddMessage(m *message) error {
  76. _, err := c.db.Exec(insertMessageQuery, m.ID, m.Time, m.Topic, m.Message, m.Title, m.Priority, strings.Join(m.Tags, ","))
  77. return err
  78. }
  79. func (c *sqliteCache) Messages(topic string, since sinceTime) ([]*message, error) {
  80. rows, err := c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix())
  81. if err != nil {
  82. return nil, err
  83. }
  84. defer rows.Close()
  85. messages := make([]*message, 0)
  86. for rows.Next() {
  87. var timestamp int64
  88. var priority int
  89. var id, msg, title, tagsStr string
  90. if err := rows.Scan(&id, &timestamp, &msg, &title, &priority, &tagsStr); err != nil {
  91. return nil, err
  92. }
  93. if msg == "" {
  94. msg = " " // Hack: never return empty messages; this should not happen
  95. }
  96. var tags []string
  97. if tagsStr != "" {
  98. tags = strings.Split(tagsStr, ",")
  99. }
  100. messages = append(messages, &message{
  101. ID: id,
  102. Time: timestamp,
  103. Event: messageEvent,
  104. Topic: topic,
  105. Message: msg,
  106. Title: title,
  107. Priority: priority,
  108. Tags: tags,
  109. })
  110. }
  111. if err := rows.Err(); err != nil {
  112. return nil, err
  113. }
  114. return messages, nil
  115. }
  116. func (c *sqliteCache) MessageCount(topic string) (int, error) {
  117. rows, err := c.db.Query(selectMessageCountForTopicQuery, topic)
  118. if err != nil {
  119. return 0, err
  120. }
  121. defer rows.Close()
  122. var count int
  123. if !rows.Next() {
  124. return 0, errors.New("no rows found")
  125. }
  126. if err := rows.Scan(&count); err != nil {
  127. return 0, err
  128. } else if err := rows.Err(); err != nil {
  129. return 0, err
  130. }
  131. return count, nil
  132. }
  133. func (s *sqliteCache) Topics() (map[string]*topic, error) {
  134. rows, err := s.db.Query(selectTopicsQuery)
  135. if err != nil {
  136. return nil, err
  137. }
  138. defer rows.Close()
  139. topics := make(map[string]*topic, 0)
  140. for rows.Next() {
  141. var id string
  142. var last int64
  143. if err := rows.Scan(&id, &last); err != nil {
  144. return nil, err
  145. }
  146. topics[id] = newTopic(id, time.Unix(last, 0))
  147. }
  148. if err := rows.Err(); err != nil {
  149. return nil, err
  150. }
  151. return topics, nil
  152. }
  153. func (s *sqliteCache) Prune(keep time.Duration) error {
  154. _, err := s.db.Exec(pruneMessagesQuery, time.Now().Add(-1*keep).Unix())
  155. return err
  156. }
  157. func setupDB(db *sql.DB) error {
  158. // If 'messages' table does not exist, this must be a new database
  159. rowsMC, err := db.Query(selectMessagesCountQuery)
  160. if err != nil {
  161. return setupNewDB(db)
  162. }
  163. defer rowsMC.Close()
  164. // If 'messages' table exists, check 'schemaVersion' table
  165. schemaVersion := 0
  166. rowsSV, err := db.Query(selectSchemaVersionQuery)
  167. if err == nil {
  168. defer rowsSV.Close()
  169. if !rowsSV.Next() {
  170. return errors.New("cannot determine schema version: cache file may be corrupt")
  171. }
  172. if err := rowsSV.Scan(&schemaVersion); err != nil {
  173. return err
  174. }
  175. }
  176. // Do migrations
  177. if schemaVersion == currentSchemaVersion {
  178. return nil
  179. } else if schemaVersion == 0 {
  180. return migrateFrom0To1(db)
  181. }
  182. return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
  183. }
  184. func setupNewDB(db *sql.DB) error {
  185. if _, err := db.Exec(createMessagesTableQuery); err != nil {
  186. return err
  187. }
  188. if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
  189. return err
  190. }
  191. if _, err := db.Exec(insertSchemaVersion, currentSchemaVersion); err != nil {
  192. return err
  193. }
  194. return nil
  195. }
  196. func migrateFrom0To1(db *sql.DB) error {
  197. log.Print("Migrating cache database schema: from 0 to 1")
  198. if _, err := db.Exec(migrate0To1AlterMessagesTableQuery); err != nil {
  199. return err
  200. }
  201. if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
  202. return err
  203. }
  204. if _, err := db.Exec(insertSchemaVersion, 1); err != nil {
  205. return err
  206. }
  207. return nil
  208. }