cache_sqlite.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. package server
  2. import (
  3. "database/sql"
  4. "errors"
  5. "fmt"
  6. _ "github.com/mattn/go-sqlite3" // SQLite driver
  7. "log"
  8. "strings"
  9. "time"
  10. )
  11. // Messages cache
  12. const (
  13. createMessagesTableQuery = `
  14. BEGIN;
  15. CREATE TABLE IF NOT EXISTS messages (
  16. id VARCHAR(20) PRIMARY KEY,
  17. time INT NOT NULL,
  18. topic VARCHAR(64) NOT NULL,
  19. message VARCHAR(512) NOT NULL,
  20. title VARCHAR(256) NOT NULL,
  21. priority INT NOT NULL,
  22. tags VARCHAR(256) NOT NULL
  23. );
  24. CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
  25. COMMIT;
  26. `
  27. insertMessageQuery = `INSERT INTO messages (id, time, topic, message, title, priority, tags) VALUES (?, ?, ?, ?, ?, ?, ?)`
  28. pruneMessagesQuery = `DELETE FROM messages WHERE time < ?`
  29. selectMessagesSinceTimeQuery = `
  30. SELECT id, time, message, title, priority, tags
  31. FROM messages
  32. WHERE topic = ? AND time >= ?
  33. ORDER BY time ASC
  34. `
  35. selectMessagesCountQuery = `SELECT COUNT(*) FROM messages`
  36. selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?`
  37. selectTopicsQuery = `SELECT topic, MAX(time) FROM messages GROUP BY topic`
  38. )
  39. // Schema management queries
  40. const (
  41. currentSchemaVersion = 1
  42. createSchemaVersionTableQuery = `
  43. CREATE TABLE IF NOT EXISTS schemaVersion (
  44. id INT PRIMARY KEY,
  45. version INT NOT NULL
  46. );
  47. `
  48. insertSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)`
  49. selectSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1`
  50. // 0 -> 1
  51. migrate0To1AlterMessagesTableQuery = `
  52. BEGIN;
  53. ALTER TABLE messages ADD COLUMN title VARCHAR(256) NOT NULL DEFAULT('');
  54. ALTER TABLE messages ADD COLUMN priority INT NOT NULL DEFAULT(0);
  55. ALTER TABLE messages ADD COLUMN tags VARCHAR(256) NOT NULL DEFAULT('');
  56. COMMIT;
  57. `
  58. )
  59. type sqliteCache struct {
  60. db *sql.DB
  61. }
  62. var _ cache = (*sqliteCache)(nil)
  63. func newSqliteCache(filename string) (*sqliteCache, error) {
  64. db, err := sql.Open("sqlite3", filename)
  65. if err != nil {
  66. return nil, err
  67. }
  68. if err := setupDB(db); err != nil {
  69. return nil, err
  70. }
  71. return &sqliteCache{
  72. db: db,
  73. }, nil
  74. }
  75. func (c *sqliteCache) AddMessage(m *message) error {
  76. if m.Event != messageEvent {
  77. return errUnexpectedMessageType
  78. }
  79. _, err := c.db.Exec(insertMessageQuery, m.ID, m.Time, m.Topic, m.Message, m.Title, m.Priority, strings.Join(m.Tags, ","))
  80. return err
  81. }
  82. func (c *sqliteCache) Messages(topic string, since sinceTime) ([]*message, error) {
  83. if since.IsNone() {
  84. return make([]*message, 0), nil
  85. }
  86. rows, err := c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix())
  87. if err != nil {
  88. return nil, err
  89. }
  90. defer rows.Close()
  91. messages := make([]*message, 0)
  92. for rows.Next() {
  93. var timestamp int64
  94. var priority int
  95. var id, msg, title, tagsStr string
  96. if err := rows.Scan(&id, &timestamp, &msg, &title, &priority, &tagsStr); err != nil {
  97. return nil, err
  98. }
  99. var tags []string
  100. if tagsStr != "" {
  101. tags = strings.Split(tagsStr, ",")
  102. }
  103. messages = append(messages, &message{
  104. ID: id,
  105. Time: timestamp,
  106. Event: messageEvent,
  107. Topic: topic,
  108. Message: msg,
  109. Title: title,
  110. Priority: priority,
  111. Tags: tags,
  112. })
  113. }
  114. if err := rows.Err(); err != nil {
  115. return nil, err
  116. }
  117. return messages, nil
  118. }
  119. func (c *sqliteCache) MessageCount(topic string) (int, error) {
  120. rows, err := c.db.Query(selectMessageCountForTopicQuery, topic)
  121. if err != nil {
  122. return 0, err
  123. }
  124. defer rows.Close()
  125. var count int
  126. if !rows.Next() {
  127. return 0, errors.New("no rows found")
  128. }
  129. if err := rows.Scan(&count); err != nil {
  130. return 0, err
  131. } else if err := rows.Err(); err != nil {
  132. return 0, err
  133. }
  134. return count, nil
  135. }
  136. func (c *sqliteCache) Topics() (map[string]*topic, error) {
  137. rows, err := c.db.Query(selectTopicsQuery)
  138. if err != nil {
  139. return nil, err
  140. }
  141. defer rows.Close()
  142. topics := make(map[string]*topic)
  143. for rows.Next() {
  144. var id string
  145. var last int64
  146. if err := rows.Scan(&id, &last); err != nil {
  147. return nil, err
  148. }
  149. topics[id] = newTopic(id, time.Unix(last, 0))
  150. }
  151. if err := rows.Err(); err != nil {
  152. return nil, err
  153. }
  154. return topics, nil
  155. }
  156. func (c *sqliteCache) Prune(keep time.Duration) error {
  157. _, err := c.db.Exec(pruneMessagesQuery, time.Now().Add(-1*keep).Unix())
  158. return err
  159. }
  160. func setupDB(db *sql.DB) error {
  161. // If 'messages' table does not exist, this must be a new database
  162. rowsMC, err := db.Query(selectMessagesCountQuery)
  163. if err != nil {
  164. return setupNewDB(db)
  165. }
  166. defer rowsMC.Close()
  167. // If 'messages' table exists, check 'schemaVersion' table
  168. schemaVersion := 0
  169. rowsSV, err := db.Query(selectSchemaVersionQuery)
  170. if err == nil {
  171. defer rowsSV.Close()
  172. if !rowsSV.Next() {
  173. return errors.New("cannot determine schema version: cache file may be corrupt")
  174. }
  175. if err := rowsSV.Scan(&schemaVersion); err != nil {
  176. return err
  177. }
  178. }
  179. // Do migrations
  180. if schemaVersion == currentSchemaVersion {
  181. return nil
  182. } else if schemaVersion == 0 {
  183. return migrateFrom0To1(db)
  184. }
  185. return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
  186. }
  187. func setupNewDB(db *sql.DB) error {
  188. if _, err := db.Exec(createMessagesTableQuery); err != nil {
  189. return err
  190. }
  191. if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
  192. return err
  193. }
  194. if _, err := db.Exec(insertSchemaVersion, currentSchemaVersion); err != nil {
  195. return err
  196. }
  197. return nil
  198. }
  199. func migrateFrom0To1(db *sql.DB) error {
  200. log.Print("Migrating cache database schema: from 0 to 1")
  201. if _, err := db.Exec(migrate0To1AlterMessagesTableQuery); err != nil {
  202. return err
  203. }
  204. if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
  205. return err
  206. }
  207. if _, err := db.Exec(insertSchemaVersion, 1); err != nil {
  208. return err
  209. }
  210. return nil
  211. }