cache_sqlite.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. package server
  2. import (
  3. "database/sql"
  4. "errors"
  5. "fmt"
  6. _ "github.com/mattn/go-sqlite3" // SQLite driver
  7. "log"
  8. "strings"
  9. "time"
  10. )
  11. // Messages cache
  12. const (
  13. createMessagesTableQuery = `
  14. BEGIN;
  15. CREATE TABLE IF NOT EXISTS messages (
  16. id TEXT PRIMARY KEY,
  17. time INT NOT NULL,
  18. topic TEXT NOT NULL,
  19. message TEXT NOT NULL,
  20. title TEXT NOT NULL,
  21. priority INT NOT NULL,
  22. tags TEXT NOT NULL,
  23. attachment_name TEXT NOT NULL,
  24. attachment_type TEXT NOT NULL,
  25. attachment_size INT NOT NULL,
  26. attachment_expires INT NOT NULL,
  27. attachment_preview_url TEXT NOT NULL,
  28. attachment_url TEXT NOT NULL,
  29. published INT NOT NULL
  30. );
  31. CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
  32. COMMIT;
  33. `
  34. insertMessageQuery = `
  35. INSERT INTO messages (id, time, topic, message, title, priority, tags, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_preview_url, attachment_url, published)
  36. VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  37. `
  38. pruneMessagesQuery = `DELETE FROM messages WHERE time < ? AND published = 1`
  39. selectMessagesSinceTimeQuery = `
  40. SELECT id, time, topic, message, title, priority, tags, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_preview_url, attachment_url
  41. FROM messages
  42. WHERE topic = ? AND time >= ? AND published = 1
  43. ORDER BY time ASC
  44. `
  45. selectMessagesSinceTimeIncludeScheduledQuery = `
  46. SELECT id, time, topic, message, title, priority, tags, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_preview_url, attachment_url
  47. FROM messages
  48. WHERE topic = ? AND time >= ?
  49. ORDER BY time ASC
  50. `
  51. selectMessagesDueQuery = `
  52. SELECT id, time, topic, message, title, priority, tags, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_preview_url, attachment_url
  53. FROM messages
  54. WHERE time <= ? AND published = 0
  55. `
  56. updateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE id = ?`
  57. selectMessagesCountQuery = `SELECT COUNT(*) FROM messages`
  58. selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?`
  59. selectTopicsQuery = `SELECT topic FROM messages GROUP BY topic`
  60. )
  61. // Schema management queries
  62. const (
  63. currentSchemaVersion = 3
  64. createSchemaVersionTableQuery = `
  65. CREATE TABLE IF NOT EXISTS schemaVersion (
  66. id INT PRIMARY KEY,
  67. version INT NOT NULL
  68. );
  69. `
  70. insertSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)`
  71. updateSchemaVersion = `UPDATE schemaVersion SET version = ? WHERE id = 1`
  72. selectSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1`
  73. // 0 -> 1
  74. migrate0To1AlterMessagesTableQuery = `
  75. BEGIN;
  76. ALTER TABLE messages ADD COLUMN title TEXT NOT NULL DEFAULT('');
  77. ALTER TABLE messages ADD COLUMN priority INT NOT NULL DEFAULT(0);
  78. ALTER TABLE messages ADD COLUMN tags TEXT NOT NULL DEFAULT('');
  79. COMMIT;
  80. `
  81. // 1 -> 2
  82. migrate1To2AlterMessagesTableQuery = `
  83. ALTER TABLE messages ADD COLUMN published INT NOT NULL DEFAULT(1);
  84. `
  85. // 2 -> 3
  86. migrate2To3AlterMessagesTableQuery = `
  87. BEGIN;
  88. ALTER TABLE messages ADD COLUMN attachment_name TEXT NOT NULL;
  89. ALTER TABLE messages ADD COLUMN attachment_type TEXT NOT NULL;
  90. ALTER TABLE messages ADD COLUMN attachment_size INT NOT NULL;
  91. ALTER TABLE messages ADD COLUMN attachment_expires INT NOT NULL;
  92. ALTER TABLE messages ADD COLUMN attachment_url TEXT NOT NULL;
  93. COMMIT;
  94. `
  95. )
  96. type sqliteCache struct {
  97. db *sql.DB
  98. }
  99. var _ cache = (*sqliteCache)(nil)
  100. func newSqliteCache(filename string) (*sqliteCache, error) {
  101. db, err := sql.Open("sqlite3", filename)
  102. if err != nil {
  103. return nil, err
  104. }
  105. if err := setupDB(db); err != nil {
  106. return nil, err
  107. }
  108. return &sqliteCache{
  109. db: db,
  110. }, nil
  111. }
  112. func (c *sqliteCache) AddMessage(m *message) error {
  113. if m.Event != messageEvent {
  114. return errUnexpectedMessageType
  115. }
  116. published := m.Time <= time.Now().Unix()
  117. tags := strings.Join(m.Tags, ",")
  118. var attachmentName, attachmentType, attachmentPreviewURL, attachmentURL string
  119. var attachmentSize, attachmentExpires int64
  120. if m.Attachment != nil {
  121. attachmentName = m.Attachment.Name
  122. attachmentType = m.Attachment.Type
  123. attachmentSize = m.Attachment.Size
  124. attachmentExpires = m.Attachment.Expires
  125. attachmentPreviewURL = m.Attachment.PreviewURL
  126. attachmentURL = m.Attachment.URL
  127. }
  128. _, err := c.db.Exec(
  129. insertMessageQuery,
  130. m.ID,
  131. m.Time,
  132. m.Topic,
  133. m.Message,
  134. m.Title,
  135. m.Priority,
  136. tags,
  137. attachmentName,
  138. attachmentType,
  139. attachmentSize,
  140. attachmentExpires,
  141. attachmentPreviewURL,
  142. attachmentURL,
  143. published,
  144. )
  145. return err
  146. }
  147. func (c *sqliteCache) Messages(topic string, since sinceTime, scheduled bool) ([]*message, error) {
  148. if since.IsNone() {
  149. return make([]*message, 0), nil
  150. }
  151. var rows *sql.Rows
  152. var err error
  153. if scheduled {
  154. rows, err = c.db.Query(selectMessagesSinceTimeIncludeScheduledQuery, topic, since.Time().Unix())
  155. } else {
  156. rows, err = c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix())
  157. }
  158. if err != nil {
  159. return nil, err
  160. }
  161. return readMessages(rows)
  162. }
  163. func (c *sqliteCache) MessagesDue() ([]*message, error) {
  164. rows, err := c.db.Query(selectMessagesDueQuery, time.Now().Unix())
  165. if err != nil {
  166. return nil, err
  167. }
  168. return readMessages(rows)
  169. }
  170. func (c *sqliteCache) MarkPublished(m *message) error {
  171. _, err := c.db.Exec(updateMessagePublishedQuery, m.ID)
  172. return err
  173. }
  174. func (c *sqliteCache) MessageCount(topic string) (int, error) {
  175. rows, err := c.db.Query(selectMessageCountForTopicQuery, topic)
  176. if err != nil {
  177. return 0, err
  178. }
  179. defer rows.Close()
  180. var count int
  181. if !rows.Next() {
  182. return 0, errors.New("no rows found")
  183. }
  184. if err := rows.Scan(&count); err != nil {
  185. return 0, err
  186. } else if err := rows.Err(); err != nil {
  187. return 0, err
  188. }
  189. return count, nil
  190. }
  191. func (c *sqliteCache) Topics() (map[string]*topic, error) {
  192. rows, err := c.db.Query(selectTopicsQuery)
  193. if err != nil {
  194. return nil, err
  195. }
  196. defer rows.Close()
  197. topics := make(map[string]*topic)
  198. for rows.Next() {
  199. var id string
  200. if err := rows.Scan(&id); err != nil {
  201. return nil, err
  202. }
  203. topics[id] = newTopic(id)
  204. }
  205. if err := rows.Err(); err != nil {
  206. return nil, err
  207. }
  208. return topics, nil
  209. }
  210. func (c *sqliteCache) Prune(olderThan time.Time) error {
  211. _, err := c.db.Exec(pruneMessagesQuery, olderThan.Unix())
  212. return err
  213. }
  214. func readMessages(rows *sql.Rows) ([]*message, error) {
  215. defer rows.Close()
  216. messages := make([]*message, 0)
  217. for rows.Next() {
  218. var timestamp, attachmentSize, attachmentExpires int64
  219. var priority int
  220. var id, topic, msg, title, tagsStr, attachmentName, attachmentType, attachmentPreviewURL, attachmentURL string
  221. if err := rows.Scan(&id, &timestamp, &topic, &msg, &title, &priority, &tagsStr, &attachmentName, &attachmentType, &attachmentSize, &attachmentExpires, &attachmentPreviewURL, &attachmentURL); err != nil {
  222. return nil, err
  223. }
  224. var tags []string
  225. if tagsStr != "" {
  226. tags = strings.Split(tagsStr, ",")
  227. }
  228. var att *attachment
  229. if attachmentName != "" && attachmentURL != "" {
  230. att = &attachment{
  231. Name: attachmentName,
  232. Type: attachmentType,
  233. Size: attachmentSize,
  234. Expires: attachmentExpires,
  235. PreviewURL: attachmentPreviewURL,
  236. URL: attachmentURL,
  237. }
  238. }
  239. messages = append(messages, &message{
  240. ID: id,
  241. Time: timestamp,
  242. Event: messageEvent,
  243. Topic: topic,
  244. Message: msg,
  245. Title: title,
  246. Priority: priority,
  247. Tags: tags,
  248. Attachment: att,
  249. })
  250. }
  251. if err := rows.Err(); err != nil {
  252. return nil, err
  253. }
  254. return messages, nil
  255. }
  256. func setupDB(db *sql.DB) error {
  257. // If 'messages' table does not exist, this must be a new database
  258. rowsMC, err := db.Query(selectMessagesCountQuery)
  259. if err != nil {
  260. return setupNewDB(db)
  261. }
  262. rowsMC.Close()
  263. // If 'messages' table exists, check 'schemaVersion' table
  264. schemaVersion := 0
  265. rowsSV, err := db.Query(selectSchemaVersionQuery)
  266. if err == nil {
  267. defer rowsSV.Close()
  268. if !rowsSV.Next() {
  269. return errors.New("cannot determine schema version: cache file may be corrupt")
  270. }
  271. if err := rowsSV.Scan(&schemaVersion); err != nil {
  272. return err
  273. }
  274. rowsSV.Close()
  275. }
  276. // Do migrations
  277. if schemaVersion == currentSchemaVersion {
  278. return nil
  279. } else if schemaVersion == 0 {
  280. return migrateFrom0(db)
  281. } else if schemaVersion == 1 {
  282. return migrateFrom1(db)
  283. } else if schemaVersion == 2 {
  284. return migrateFrom2(db)
  285. }
  286. return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
  287. }
  288. func setupNewDB(db *sql.DB) error {
  289. if _, err := db.Exec(createMessagesTableQuery); err != nil {
  290. return err
  291. }
  292. if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
  293. return err
  294. }
  295. if _, err := db.Exec(insertSchemaVersion, currentSchemaVersion); err != nil {
  296. return err
  297. }
  298. return nil
  299. }
  300. func migrateFrom0(db *sql.DB) error {
  301. log.Print("Migrating cache database schema: from 0 to 1")
  302. if _, err := db.Exec(migrate0To1AlterMessagesTableQuery); err != nil {
  303. return err
  304. }
  305. if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
  306. return err
  307. }
  308. if _, err := db.Exec(insertSchemaVersion, 1); err != nil {
  309. return err
  310. }
  311. return migrateFrom1(db)
  312. }
  313. func migrateFrom1(db *sql.DB) error {
  314. log.Print("Migrating cache database schema: from 1 to 2")
  315. if _, err := db.Exec(migrate1To2AlterMessagesTableQuery); err != nil {
  316. return err
  317. }
  318. if _, err := db.Exec(updateSchemaVersion, 2); err != nil {
  319. return err
  320. }
  321. return migrateFrom2(db)
  322. }
  323. func migrateFrom2(db *sql.DB) error {
  324. log.Print("Migrating cache database schema: from 2 to 3")
  325. if _, err := db.Exec(migrate2To3AlterMessagesTableQuery); err != nil {
  326. return err
  327. }
  328. if _, err := db.Exec(updateSchemaVersion, 3); err != nil {
  329. return err
  330. }
  331. return nil // Update this when a new version is added
  332. }