cache_sqlite.go 7.2 KB


  1. package server
  2. import (
  3. "database/sql"
  4. "errors"
  5. "fmt"
  6. _ "github.com/mattn/go-sqlite3" // SQLite driver
  7. "log"
  8. "strings"
  9. "time"
  10. )
  11. // Messages cache
  12. const (
  13. createMessagesTableQuery = `
  14. BEGIN;
  15. CREATE TABLE IF NOT EXISTS messages (
  16. id VARCHAR(20) PRIMARY KEY,
  17. time INT NOT NULL,
  18. topic VARCHAR(64) NOT NULL,
  19. message VARCHAR(512) NOT NULL,
  20. title VARCHAR(256) NOT NULL,
  21. priority INT NOT NULL,
  22. tags VARCHAR(256) NOT NULL,
  23. published INT NOT NULL
  24. );
  25. CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
  26. COMMIT;
  27. `
  28. insertMessageQuery = `INSERT INTO messages (id, time, topic, message, title, priority, tags, published) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`
  29. pruneMessagesQuery = `DELETE FROM messages WHERE time < ?`
  30. selectMessagesSinceTimeQuery = `
  31. SELECT id, time, topic, message, title, priority, tags
  32. FROM messages
  33. WHERE topic = ? AND time >= ? AND published = 1
  34. ORDER BY time ASC
  35. `
  36. selectMessagesSinceTimeIncludeScheduledQuery = `
  37. SELECT id, time, topic, message, title, priority, tags
  38. FROM messages
  39. WHERE topic = ? AND time >= ?
  40. ORDER BY time ASC
  41. `
  42. selectMessagesDueQuery = `
  43. SELECT id, time, topic, message, title, priority, tags
  44. FROM messages
  45. WHERE time <= ? AND published = 0
  46. `
  47. updateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE id = ?`
  48. selectMessagesCountQuery = `SELECT COUNT(*) FROM messages`
  49. selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?`
  50. selectTopicsQuery = `SELECT topic FROM messages GROUP BY topic`
  51. )
  52. // Schema management queries
  53. const (
  54. currentSchemaVersion = 2
  55. createSchemaVersionTableQuery = `
  56. CREATE TABLE IF NOT EXISTS schemaVersion (
  57. id INT PRIMARY KEY,
  58. version INT NOT NULL
  59. );
  60. `
  61. insertSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)`
  62. updateSchemaVersion = `UPDATE schemaVersion SET version = ? WHERE id = 1`
  63. selectSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1`
  64. // 0 -> 1
  65. migrate0To1AlterMessagesTableQuery = `
  66. BEGIN;
  67. ALTER TABLE messages ADD COLUMN title VARCHAR(256) NOT NULL DEFAULT('');
  68. ALTER TABLE messages ADD COLUMN priority INT NOT NULL DEFAULT(0);
  69. ALTER TABLE messages ADD COLUMN tags VARCHAR(256) NOT NULL DEFAULT('');
  70. COMMIT;
  71. `
  72. // 1 -> 2
  73. migrate1To2AlterMessagesTableQuery = `
  74. BEGIN;
  75. ALTER TABLE messages ADD COLUMN published INT NOT NULL DEFAULT(1);
  76. COMMIT;
  77. `
  78. )
  79. type sqliteCache struct {
  80. db *sql.DB
  81. }
  82. var _ cache = (*sqliteCache)(nil)
  83. func newSqliteCache(filename string) (*sqliteCache, error) {
  84. db, err := sql.Open("sqlite3", filename)
  85. if err != nil {
  86. return nil, err
  87. }
  88. if err := setupDB(db); err != nil {
  89. return nil, err
  90. }
  91. return &sqliteCache{
  92. db: db,
  93. }, nil
  94. }
  95. func (c *sqliteCache) AddMessage(m *message) error {
  96. if m.Event != messageEvent {
  97. return errUnexpectedMessageType
  98. }
  99. published := m.Time <= time.Now().Unix()
  100. _, err := c.db.Exec(insertMessageQuery, m.ID, m.Time, m.Topic, m.Message, m.Title, m.Priority, strings.Join(m.Tags, ","), published)
  101. return err
  102. }
  103. func (c *sqliteCache) Messages(topic string, since sinceTime, scheduled bool) ([]*message, error) {
  104. if since.IsNone() {
  105. return make([]*message, 0), nil
  106. }
  107. var rows *sql.Rows
  108. var err error
  109. if scheduled {
  110. rows, err = c.db.Query(selectMessagesSinceTimeIncludeScheduledQuery, topic, since.Time().Unix())
  111. } else {
  112. rows, err = c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix())
  113. }
  114. if err != nil {
  115. return nil, err
  116. }
  117. return readMessages(rows)
  118. }
  119. func (c *sqliteCache) MessagesDue() ([]*message, error) {
  120. rows, err := c.db.Query(selectMessagesDueQuery, time.Now().Unix())
  121. if err != nil {
  122. return nil, err
  123. }
  124. return readMessages(rows)
  125. }
  126. func (c *sqliteCache) MarkPublished(m *message) error {
  127. _, err := c.db.Exec(updateMessagePublishedQuery, m.ID)
  128. return err
  129. }
  130. func (c *sqliteCache) MessageCount(topic string) (int, error) {
  131. rows, err := c.db.Query(selectMessageCountForTopicQuery, topic)
  132. if err != nil {
  133. return 0, err
  134. }
  135. defer rows.Close()
  136. var count int
  137. if !rows.Next() {
  138. return 0, errors.New("no rows found")
  139. }
  140. if err := rows.Scan(&count); err != nil {
  141. return 0, err
  142. } else if err := rows.Err(); err != nil {
  143. return 0, err
  144. }
  145. return count, nil
  146. }
  147. func (c *sqliteCache) Topics() (map[string]*topic, error) {
  148. rows, err := c.db.Query(selectTopicsQuery)
  149. if err != nil {
  150. return nil, err
  151. }
  152. defer rows.Close()
  153. topics := make(map[string]*topic)
  154. for rows.Next() {
  155. var id string
  156. if err := rows.Scan(&id); err != nil {
  157. return nil, err
  158. }
  159. topics[id] = newTopic(id)
  160. }
  161. if err := rows.Err(); err != nil {
  162. return nil, err
  163. }
  164. return topics, nil
  165. }
  166. func (c *sqliteCache) Prune(olderThan time.Time) error {
  167. _, err := c.db.Exec(pruneMessagesQuery, olderThan.Unix())
  168. return err
  169. }
  170. func readMessages(rows *sql.Rows) ([]*message, error) {
  171. defer rows.Close()
  172. messages := make([]*message, 0)
  173. for rows.Next() {
  174. var timestamp int64
  175. var priority int
  176. var id, topic, msg, title, tagsStr string
  177. if err := rows.Scan(&id, &timestamp, &topic, &msg, &title, &priority, &tagsStr); err != nil {
  178. return nil, err
  179. }
  180. var tags []string
  181. if tagsStr != "" {
  182. tags = strings.Split(tagsStr, ",")
  183. }
  184. messages = append(messages, &message{
  185. ID: id,
  186. Time: timestamp,
  187. Event: messageEvent,
  188. Topic: topic,
  189. Message: msg,
  190. Title: title,
  191. Priority: priority,
  192. Tags: tags,
  193. })
  194. }
  195. if err := rows.Err(); err != nil {
  196. return nil, err
  197. }
  198. return messages, nil
  199. }
  200. func setupDB(db *sql.DB) error {
  201. // If 'messages' table does not exist, this must be a new database
  202. rowsMC, err := db.Query(selectMessagesCountQuery)
  203. if err != nil {
  204. return setupNewDB(db)
  205. }
  206. defer rowsMC.Close()
  207. // If 'messages' table exists, check 'schemaVersion' table
  208. schemaVersion := 0
  209. rowsSV, err := db.Query(selectSchemaVersionQuery)
  210. if err == nil {
  211. defer rowsSV.Close()
  212. if !rowsSV.Next() {
  213. return errors.New("cannot determine schema version: cache file may be corrupt")
  214. }
  215. if err := rowsSV.Scan(&schemaVersion); err != nil {
  216. return err
  217. }
  218. }
  219. // Do migrations
  220. if schemaVersion == currentSchemaVersion {
  221. return nil
  222. } else if schemaVersion == 0 {
  223. return migrateFrom0(db)
  224. } else if schemaVersion == 1 {
  225. return migrateFrom1(db)
  226. }
  227. return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
  228. }
  229. func setupNewDB(db *sql.DB) error {
  230. if _, err := db.Exec(createMessagesTableQuery); err != nil {
  231. return err
  232. }
  233. if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
  234. return err
  235. }
  236. if _, err := db.Exec(insertSchemaVersion, currentSchemaVersion); err != nil {
  237. return err
  238. }
  239. return nil
  240. }
  241. func migrateFrom0(db *sql.DB) error {
  242. log.Print("Migrating cache database schema: from 0 to 1")
  243. if _, err := db.Exec(migrate0To1AlterMessagesTableQuery); err != nil {
  244. return err
  245. }
  246. if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
  247. return err
  248. }
  249. if _, err := db.Exec(insertSchemaVersion, 1); err != nil {
  250. return err
  251. }
  252. return migrateFrom1(db)
  253. }
  254. func migrateFrom1(db *sql.DB) error {
  255. log.Print("Migrating cache database schema: from 1 to 2")
  256. if _, err := db.Exec(migrate1To2AlterMessagesTableQuery); err != nil {
  257. return err
  258. }
  259. if _, err := db.Exec(updateSchemaVersion, 2); err != nil {
  260. return err
  261. }
  262. return nil // Update this when a new version is added
  263. }