cache_sqlite.go 7.7 KB


  1. package server
  2. import (
  3. "database/sql"
  4. "errors"
  5. "fmt"
  6. _ "github.com/mattn/go-sqlite3" // SQLite driver
  7. "log"
  8. "strings"
  9. "time"
  10. )
  11. // Messages cache
  12. const (
  13. createMessagesTableQuery = `
  14. BEGIN;
  15. CREATE TABLE IF NOT EXISTS messages (
  16. id TEXT PRIMARY KEY,
  17. time INT NOT NULL,
  18. topic TEXT NOT NULL,
  19. message TEXT NOT NULL,
  20. title TEXT NOT NULL,
  21. priority INT NOT NULL,
  22. tags TEXT NOT NULL,
  23. click TEXT NOT NULL,
  24. published INT NOT NULL
  25. );
  26. CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
  27. COMMIT;
  28. `
  29. insertMessageQuery = `INSERT INTO messages (id, time, topic, message, title, priority, tags, click, published) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`
  30. pruneMessagesQuery = `DELETE FROM messages WHERE time < ? AND published = 1`
  31. selectMessagesSinceTimeQuery = `
  32. SELECT id, time, topic, message, title, priority, tags, click
  33. FROM messages
  34. WHERE topic = ? AND time >= ? AND published = 1
  35. ORDER BY time ASC
  36. `
  37. selectMessagesSinceTimeIncludeScheduledQuery = `
  38. SELECT id, time, topic, message, title, priority, tags, click
  39. FROM messages
  40. WHERE topic = ? AND time >= ?
  41. ORDER BY time ASC
  42. `
  43. selectMessagesDueQuery = `
  44. SELECT id, time, topic, message, title, priority, tags, click
  45. FROM messages
  46. WHERE time <= ? AND published = 0
  47. `
  48. updateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE id = ?`
  49. selectMessagesCountQuery = `SELECT COUNT(*) FROM messages`
  50. selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?`
  51. selectTopicsQuery = `SELECT topic FROM messages GROUP BY topic`
  52. )
  53. // Schema management queries
  54. const (
  55. currentSchemaVersion = 3
  56. createSchemaVersionTableQuery = `
  57. CREATE TABLE IF NOT EXISTS schemaVersion (
  58. id INT PRIMARY KEY,
  59. version INT NOT NULL
  60. );
  61. `
  62. insertSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)`
  63. updateSchemaVersion = `UPDATE schemaVersion SET version = ? WHERE id = 1`
  64. selectSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1`
  65. // 0 -> 1
  66. migrate0To1AlterMessagesTableQuery = `
  67. BEGIN;
  68. ALTER TABLE messages ADD COLUMN title TEXT NOT NULL DEFAULT('');
  69. ALTER TABLE messages ADD COLUMN priority INT NOT NULL DEFAULT(0);
  70. ALTER TABLE messages ADD COLUMN tags TEXT NOT NULL DEFAULT('');
  71. COMMIT;
  72. `
  73. // 1 -> 2
  74. migrate1To2AlterMessagesTableQuery = `
  75. ALTER TABLE messages ADD COLUMN published INT NOT NULL DEFAULT(1);
  76. `
  77. // 2 -> 3
  78. migrate2To3AlterMessagesTableQuery = `
  79. ALTER TABLE messages ADD COLUMN click TEXT NOT NULL DEFAULT('');
  80. `
  81. )
  82. type sqliteCache struct {
  83. db *sql.DB
  84. }
  85. var _ cache = (*sqliteCache)(nil)
  86. func newSqliteCache(filename string) (*sqliteCache, error) {
  87. db, err := sql.Open("sqlite3", filename)
  88. if err != nil {
  89. return nil, err
  90. }
  91. if err := setupDB(db); err != nil {
  92. return nil, err
  93. }
  94. return &sqliteCache{
  95. db: db,
  96. }, nil
  97. }
  98. func (c *sqliteCache) AddMessage(m *message) error {
  99. if m.Event != messageEvent {
  100. return errUnexpectedMessageType
  101. }
  102. published := m.Time <= time.Now().Unix()
  103. _, err := c.db.Exec(insertMessageQuery, m.ID, m.Time, m.Topic, m.Message, m.Title, m.Priority, strings.Join(m.Tags, ","), m.Click, published)
  104. return err
  105. }
  106. func (c *sqliteCache) Messages(topic string, since sinceTime, scheduled bool) ([]*message, error) {
  107. if since.IsNone() {
  108. return make([]*message, 0), nil
  109. }
  110. var rows *sql.Rows
  111. var err error
  112. if scheduled {
  113. rows, err = c.db.Query(selectMessagesSinceTimeIncludeScheduledQuery, topic, since.Time().Unix())
  114. } else {
  115. rows, err = c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix())
  116. }
  117. if err != nil {
  118. return nil, err
  119. }
  120. return readMessages(rows)
  121. }
  122. func (c *sqliteCache) MessagesDue() ([]*message, error) {
  123. rows, err := c.db.Query(selectMessagesDueQuery, time.Now().Unix())
  124. if err != nil {
  125. return nil, err
  126. }
  127. return readMessages(rows)
  128. }
  129. func (c *sqliteCache) MarkPublished(m *message) error {
  130. _, err := c.db.Exec(updateMessagePublishedQuery, m.ID)
  131. return err
  132. }
  133. func (c *sqliteCache) MessageCount(topic string) (int, error) {
  134. rows, err := c.db.Query(selectMessageCountForTopicQuery, topic)
  135. if err != nil {
  136. return 0, err
  137. }
  138. defer rows.Close()
  139. var count int
  140. if !rows.Next() {
  141. return 0, errors.New("no rows found")
  142. }
  143. if err := rows.Scan(&count); err != nil {
  144. return 0, err
  145. } else if err := rows.Err(); err != nil {
  146. return 0, err
  147. }
  148. return count, nil
  149. }
  150. func (c *sqliteCache) Topics() (map[string]*topic, error) {
  151. rows, err := c.db.Query(selectTopicsQuery)
  152. if err != nil {
  153. return nil, err
  154. }
  155. defer rows.Close()
  156. topics := make(map[string]*topic)
  157. for rows.Next() {
  158. var id string
  159. if err := rows.Scan(&id); err != nil {
  160. return nil, err
  161. }
  162. topics[id] = newTopic(id)
  163. }
  164. if err := rows.Err(); err != nil {
  165. return nil, err
  166. }
  167. return topics, nil
  168. }
  169. func (c *sqliteCache) Prune(olderThan time.Time) error {
  170. _, err := c.db.Exec(pruneMessagesQuery, olderThan.Unix())
  171. return err
  172. }
  173. func readMessages(rows *sql.Rows) ([]*message, error) {
  174. defer rows.Close()
  175. messages := make([]*message, 0)
  176. for rows.Next() {
  177. var timestamp int64
  178. var priority int
  179. var id, topic, msg, title, tagsStr, click string
  180. if err := rows.Scan(&id, &timestamp, &topic, &msg, &title, &priority, &tagsStr, &click); err != nil {
  181. return nil, err
  182. }
  183. var tags []string
  184. if tagsStr != "" {
  185. tags = strings.Split(tagsStr, ",")
  186. }
  187. messages = append(messages, &message{
  188. ID: id,
  189. Time: timestamp,
  190. Event: messageEvent,
  191. Topic: topic,
  192. Message: msg,
  193. Title: title,
  194. Priority: priority,
  195. Tags: tags,
  196. Click: click,
  197. })
  198. }
  199. if err := rows.Err(); err != nil {
  200. return nil, err
  201. }
  202. return messages, nil
  203. }
  204. func setupDB(db *sql.DB) error {
  205. // If 'messages' table does not exist, this must be a new database
  206. rowsMC, err := db.Query(selectMessagesCountQuery)
  207. if err != nil {
  208. return setupNewDB(db)
  209. }
  210. rowsMC.Close()
  211. // If 'messages' table exists, check 'schemaVersion' table
  212. schemaVersion := 0
  213. rowsSV, err := db.Query(selectSchemaVersionQuery)
  214. if err == nil {
  215. defer rowsSV.Close()
  216. if !rowsSV.Next() {
  217. return errors.New("cannot determine schema version: cache file may be corrupt")
  218. }
  219. if err := rowsSV.Scan(&schemaVersion); err != nil {
  220. return err
  221. }
  222. rowsSV.Close()
  223. }
  224. // Do migrations
  225. if schemaVersion == currentSchemaVersion {
  226. return nil
  227. } else if schemaVersion == 0 {
  228. return migrateFrom0(db)
  229. } else if schemaVersion == 1 {
  230. return migrateFrom1(db)
  231. } else if schemaVersion == 2 {
  232. return migrateFrom2(db)
  233. }
  234. return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
  235. }
  236. func setupNewDB(db *sql.DB) error {
  237. if _, err := db.Exec(createMessagesTableQuery); err != nil {
  238. return err
  239. }
  240. if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
  241. return err
  242. }
  243. if _, err := db.Exec(insertSchemaVersion, currentSchemaVersion); err != nil {
  244. return err
  245. }
  246. return nil
  247. }
  248. func migrateFrom0(db *sql.DB) error {
  249. log.Print("Migrating cache database schema: from 0 to 1")
  250. if _, err := db.Exec(migrate0To1AlterMessagesTableQuery); err != nil {
  251. return err
  252. }
  253. if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
  254. return err
  255. }
  256. if _, err := db.Exec(insertSchemaVersion, 1); err != nil {
  257. return err
  258. }
  259. return migrateFrom1(db)
  260. }
  261. func migrateFrom1(db *sql.DB) error {
  262. log.Print("Migrating cache database schema: from 1 to 2")
  263. if _, err := db.Exec(migrate1To2AlterMessagesTableQuery); err != nil {
  264. return err
  265. }
  266. if _, err := db.Exec(updateSchemaVersion, 2); err != nil {
  267. return err
  268. }
  269. return migrateFrom2(db)
  270. }
  271. func migrateFrom2(db *sql.DB) error {
  272. log.Print("Migrating cache database schema: from 2 to 3")
  273. if _, err := db.Exec(migrate2To3AlterMessagesTableQuery); err != nil {
  274. return err
  275. }
  276. if _, err := db.Exec(updateSchemaVersion, 3); err != nil {
  277. return err
  278. }
  279. return nil // Update this when a new version is added
  280. }