cache_sqlite.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package server
  2. import (
  3. "database/sql"
  4. "errors"
  5. "fmt"
  6. _ "github.com/mattn/go-sqlite3" // SQLite driver
  7. "strings"
  8. "time"
  9. )
  10. // Messages cache
  11. const (
  12. createMessagesTableQuery = `
  13. BEGIN;
  14. CREATE TABLE IF NOT EXISTS messages (
  15. id VARCHAR(20) PRIMARY KEY,
  16. time INT NOT NULL,
  17. topic VARCHAR(64) NOT NULL,
  18. message VARCHAR(512) NOT NULL,
  19. title VARCHAR(256) NOT NULL,
  20. priority INT NOT NULL,
  21. tags VARCHAR(256) NOT NULL
  22. );
  23. CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
  24. COMMIT;
  25. `
  26. insertMessageQuery = `INSERT INTO messages (id, time, topic, message, title, priority, tags) VALUES (?, ?, ?, ?, ?, ?, ?)`
  27. pruneMessagesQuery = `DELETE FROM messages WHERE time < ?`
  28. selectMessagesSinceTimeQuery = `
  29. SELECT id, time, message, title, priority, tags
  30. FROM messages
  31. WHERE topic = ? AND time >= ?
  32. ORDER BY time ASC
  33. `
  34. selectMessagesCountQuery = `SELECT COUNT(*) FROM messages`
  35. selectMessageCountForTopicQuery = `SELECT count(*) FROM messages WHERE topic = ?`
  36. selectTopicsQuery = `SELECT topic, MAX(time) FROM messages GROUP BY topic`
  37. )
  38. // Schema management queries
  39. const (
  40. currentSchemaVersion = 1
  41. createSchemaVersionTableQuery = `
  42. CREATE TABLE IF NOT EXISTS schemaVersion (
  43. id INT PRIMARY KEY,
  44. version INT NOT NULL
  45. );
  46. `
  47. insertSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)`
  48. selectSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1`
  49. // 0 -> 1
  50. migrate0To1AlterMessagesTableQuery = `
  51. BEGIN;
  52. ALTER TABLE messages ADD COLUMN title VARCHAR(256) NOT NULL DEFAULT('');
  53. ALTER TABLE messages ADD COLUMN priority INT NOT NULL DEFAULT(0);
  54. ALTER TABLE messages ADD COLUMN tags VARCHAR(256) NOT NULL DEFAULT('');
  55. COMMIT;
  56. `
  57. )
  58. type sqliteCache struct {
  59. db *sql.DB
  60. }
  61. var _ cache = (*sqliteCache)(nil)
  62. func newSqliteCache(filename string) (*sqliteCache, error) {
  63. db, err := sql.Open("sqlite3", filename)
  64. if err != nil {
  65. return nil, err
  66. }
  67. if err := setupDB(db); err != nil {
  68. return nil, err
  69. }
  70. return &sqliteCache{
  71. db: db,
  72. }, nil
  73. }
  74. func (c *sqliteCache) AddMessage(m *message) error {
  75. _, err := c.db.Exec(insertMessageQuery, m.ID, m.Time, m.Topic, m.Message, m.Title, m.Priority, strings.Join(m.Tags, ","))
  76. return err
  77. }
  78. func (c *sqliteCache) Messages(topic string, since sinceTime) ([]*message, error) {
  79. rows, err := c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix())
  80. if err != nil {
  81. return nil, err
  82. }
  83. defer rows.Close()
  84. messages := make([]*message, 0)
  85. for rows.Next() {
  86. var timestamp int64
  87. var priority int
  88. var id, msg, title, tagsStr string
  89. if err := rows.Scan(&id, &timestamp, &msg, &title, &priority, &tagsStr); err != nil {
  90. return nil, err
  91. }
  92. if msg == "" {
  93. msg = " " // Hack: never return empty messages; this should not happen
  94. }
  95. var tags []string
  96. if tagsStr != "" {
  97. tags = strings.Split(tagsStr, ",")
  98. }
  99. messages = append(messages, &message{
  100. ID: id,
  101. Time: timestamp,
  102. Event: messageEvent,
  103. Topic: topic,
  104. Message: msg,
  105. Title: title,
  106. Priority: priority,
  107. Tags: tags,
  108. })
  109. }
  110. if err := rows.Err(); err != nil {
  111. return nil, err
  112. }
  113. return messages, nil
  114. }
  115. func (c *sqliteCache) MessageCount(topic string) (int, error) {
  116. rows, err := c.db.Query(selectMessageCountForTopicQuery, topic)
  117. if err != nil {
  118. return 0, err
  119. }
  120. defer rows.Close()
  121. var count int
  122. if !rows.Next() {
  123. return 0, errors.New("no rows found")
  124. }
  125. if err := rows.Scan(&count); err != nil {
  126. return 0, err
  127. } else if err := rows.Err(); err != nil {
  128. return 0, err
  129. }
  130. return count, nil
  131. }
  132. func (s *sqliteCache) Topics() (map[string]*topic, error) {
  133. rows, err := s.db.Query(selectTopicsQuery)
  134. if err != nil {
  135. return nil, err
  136. }
  137. defer rows.Close()
  138. topics := make(map[string]*topic, 0)
  139. for rows.Next() {
  140. var id string
  141. var last int64
  142. if err := rows.Scan(&id, &last); err != nil {
  143. return nil, err
  144. }
  145. topics[id] = newTopic(id, time.Unix(last, 0))
  146. }
  147. if err := rows.Err(); err != nil {
  148. return nil, err
  149. }
  150. return topics, nil
  151. }
  152. func (s *sqliteCache) Prune(keep time.Duration) error {
  153. _, err := s.db.Exec(pruneMessagesQuery, time.Now().Add(-1*keep).Unix())
  154. return err
  155. }
  156. func setupDB(db *sql.DB) error {
  157. // If 'messages' table does not exist, this must be a new database
  158. rowsMC, err := db.Query(selectMessagesCountQuery)
  159. if err != nil {
  160. return setupNewDB(db)
  161. }
  162. defer rowsMC.Close()
  163. // If 'messages' table exists, check 'schemaVersion' table
  164. schemaVersion := 0
  165. rowsSV, err := db.Query(selectSchemaVersionQuery)
  166. if err == nil {
  167. defer rowsSV.Close()
  168. if !rowsSV.Next() {
  169. return errors.New("cannot determine schema version: cache file may be corrupt")
  170. }
  171. if err := rowsSV.Scan(&schemaVersion); err != nil {
  172. return err
  173. }
  174. }
  175. // Do migrations
  176. if schemaVersion == currentSchemaVersion {
  177. return nil
  178. } else if schemaVersion == 0 {
  179. return migrateFrom0To1(db)
  180. }
  181. return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
  182. }
  183. func setupNewDB(db *sql.DB) error {
  184. if _, err := db.Exec(createMessagesTableQuery); err != nil {
  185. return err
  186. }
  187. if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
  188. return err
  189. }
  190. if _, err := db.Exec(insertSchemaVersion, currentSchemaVersion); err != nil {
  191. return err
  192. }
  193. return nil
  194. }
  195. func migrateFrom0To1(db *sql.DB) error {
  196. if _, err := db.Exec(migrate0To1AlterMessagesTableQuery); err != nil {
  197. return err
  198. }
  199. if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
  200. return err
  201. }
  202. if _, err := db.Exec(insertSchemaVersion, 1); err != nil {
  203. return err
  204. }
  205. return nil
  206. }