message_cache_test.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746
  1. package server
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "net/netip"
  6. "path/filepath"
  7. "testing"
  8. "time"
  9. "github.com/stretchr/testify/require"
  10. )
  11. func TestSqliteCache_Messages(t *testing.T) {
  12. testCacheMessages(t, newSqliteTestCache(t))
  13. }
  14. func TestMemCache_Messages(t *testing.T) {
  15. testCacheMessages(t, newMemTestCache(t))
  16. }
  17. func testCacheMessages(t *testing.T, c *messageCache) {
  18. m1 := newDefaultMessage("mytopic", "my message")
  19. m1.Time = 1
  20. m1.MTime = 1000
  21. m2 := newDefaultMessage("mytopic", "my other message")
  22. m2.Time = 2
  23. m2.MTime = 2000
  24. require.Nil(t, c.AddMessage(m1))
  25. require.Nil(t, c.AddMessage(newDefaultMessage("example", "my example message")))
  26. require.Nil(t, c.AddMessage(m2))
  27. // Adding invalid
  28. require.Equal(t, errUnexpectedMessageType, c.AddMessage(newKeepaliveMessage("mytopic"))) // These should not be added!
  29. require.Equal(t, errUnexpectedMessageType, c.AddMessage(newOpenMessage("example"))) // These should not be added!
  30. // mytopic: count
  31. counts, err := c.MessageCounts()
  32. require.Nil(t, err)
  33. require.Equal(t, 2, counts["mytopic"])
  34. // mytopic: since all
  35. messages, _ := c.Messages("mytopic", sinceAllMessages, false)
  36. require.Equal(t, 2, len(messages))
  37. require.Equal(t, "my message", messages[0].Message)
  38. require.Equal(t, "mytopic", messages[0].Topic)
  39. require.Equal(t, messageEvent, messages[0].Event)
  40. require.Equal(t, "", messages[0].Title)
  41. require.Equal(t, 0, messages[0].Priority)
  42. require.Nil(t, messages[0].Tags)
  43. require.Equal(t, "my other message", messages[1].Message)
  44. // mytopic: since none
  45. messages, _ = c.Messages("mytopic", sinceNoMessages, false)
  46. require.Empty(t, messages)
  47. // mytopic: since m1 (by ID)
  48. messages, _ = c.Messages("mytopic", newSinceID(m1.ID), false)
  49. require.Equal(t, 1, len(messages))
  50. require.Equal(t, m2.ID, messages[0].ID)
  51. require.Equal(t, "my other message", messages[0].Message)
  52. require.Equal(t, "mytopic", messages[0].Topic)
  53. // mytopic: since 2
  54. messages, _ = c.Messages("mytopic", newSinceTime(2), false)
  55. require.Equal(t, 1, len(messages))
  56. require.Equal(t, "my other message", messages[0].Message)
  57. // mytopic: latest
  58. messages, _ = c.Messages("mytopic", sinceLatestMessage, false)
  59. require.Equal(t, 1, len(messages))
  60. require.Equal(t, "my other message", messages[0].Message)
  61. // example: count
  62. counts, err = c.MessageCounts()
  63. require.Nil(t, err)
  64. require.Equal(t, 1, counts["example"])
  65. // example: since all
  66. messages, _ = c.Messages("example", sinceAllMessages, false)
  67. require.Equal(t, "my example message", messages[0].Message)
  68. // non-existing: count
  69. counts, err = c.MessageCounts()
  70. require.Nil(t, err)
  71. require.Equal(t, 0, counts["doesnotexist"])
  72. // non-existing: since all
  73. messages, _ = c.Messages("doesnotexist", sinceAllMessages, false)
  74. require.Empty(t, messages)
  75. }
  76. func TestSqliteCache_MessagesScheduled(t *testing.T) {
  77. testCacheMessagesScheduled(t, newSqliteTestCache(t))
  78. }
  79. func TestMemCache_MessagesScheduled(t *testing.T) {
  80. testCacheMessagesScheduled(t, newMemTestCache(t))
  81. }
  82. func testCacheMessagesScheduled(t *testing.T, c *messageCache) {
  83. m1 := newDefaultMessage("mytopic", "message 1")
  84. m2 := newDefaultMessage("mytopic", "message 2")
  85. m2.Time = time.Now().Add(time.Hour).Unix()
  86. m2.MTime = time.Now().Add(time.Hour).UnixMilli()
  87. m3 := newDefaultMessage("mytopic", "message 3")
  88. m3.Time = time.Now().Add(time.Minute).Unix() // earlier than m2!
  89. m3.MTime = time.Now().Add(time.Minute).UnixMilli() // earlier than m2!
  90. m4 := newDefaultMessage("mytopic2", "message 4")
  91. m4.Time = time.Now().Add(time.Minute).Unix()
  92. m4.MTime = time.Now().Add(time.Minute).UnixMilli()
  93. require.Nil(t, c.AddMessage(m1))
  94. require.Nil(t, c.AddMessage(m2))
  95. require.Nil(t, c.AddMessage(m3))
  96. messages, _ := c.Messages("mytopic", sinceAllMessages, false) // exclude scheduled
  97. require.Equal(t, 1, len(messages))
  98. require.Equal(t, "message 1", messages[0].Message)
  99. messages, _ = c.Messages("mytopic", sinceAllMessages, true) // include scheduled
  100. require.Equal(t, 3, len(messages))
  101. require.Equal(t, "message 1", messages[0].Message)
  102. require.Equal(t, "message 3", messages[1].Message) // Order!
  103. require.Equal(t, "message 2", messages[2].Message)
  104. messages, _ = c.MessagesDue()
  105. require.Empty(t, messages)
  106. }
  107. func TestSqliteCache_Topics(t *testing.T) {
  108. testCacheTopics(t, newSqliteTestCache(t))
  109. }
  110. func TestMemCache_Topics(t *testing.T) {
  111. testCacheTopics(t, newMemTestCache(t))
  112. }
  113. func testCacheTopics(t *testing.T, c *messageCache) {
  114. require.Nil(t, c.AddMessage(newDefaultMessage("topic1", "my example message")))
  115. require.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 1")))
  116. require.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 2")))
  117. require.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 3")))
  118. topics, err := c.Topics()
  119. if err != nil {
  120. t.Fatal(err)
  121. }
  122. require.Equal(t, 2, len(topics))
  123. require.Equal(t, "topic1", topics["topic1"].ID)
  124. require.Equal(t, "topic2", topics["topic2"].ID)
  125. }
  126. func TestSqliteCache_MessagesTagsPrioAndTitle(t *testing.T) {
  127. testCacheMessagesTagsPrioAndTitle(t, newSqliteTestCache(t))
  128. }
  129. func TestMemCache_MessagesTagsPrioAndTitle(t *testing.T) {
  130. testCacheMessagesTagsPrioAndTitle(t, newMemTestCache(t))
  131. }
  132. func testCacheMessagesTagsPrioAndTitle(t *testing.T, c *messageCache) {
  133. m := newDefaultMessage("mytopic", "some message")
  134. m.Tags = []string{"tag1", "tag2"}
  135. m.Priority = 5
  136. m.Title = "some title"
  137. require.Nil(t, c.AddMessage(m))
  138. messages, _ := c.Messages("mytopic", sinceAllMessages, false)
  139. require.Equal(t, []string{"tag1", "tag2"}, messages[0].Tags)
  140. require.Equal(t, 5, messages[0].Priority)
  141. require.Equal(t, "some title", messages[0].Title)
  142. }
  143. func TestSqliteCache_MessagesSinceID(t *testing.T) {
  144. testCacheMessagesSinceID(t, newSqliteTestCache(t))
  145. }
  146. func TestMemCache_MessagesSinceID(t *testing.T) {
  147. testCacheMessagesSinceID(t, newMemTestCache(t))
  148. }
  149. func testCacheMessagesSinceID(t *testing.T, c *messageCache) {
  150. m1 := newDefaultMessage("mytopic", "message 1")
  151. m1.Time = 100
  152. m1.MTime = 100000
  153. m2 := newDefaultMessage("mytopic", "message 2")
  154. m2.Time = 200
  155. m2.MTime = 200000
  156. m3 := newDefaultMessage("mytopic", "message 3")
  157. m3.Time = time.Now().Add(time.Hour).Unix() // Scheduled, in the future, later than m7 and m5
  158. m3.MTime = time.Now().Add(time.Hour).UnixMilli() // Scheduled, in the future, later than m7 and m5
  159. m4 := newDefaultMessage("mytopic", "message 4")
  160. m4.Time = 400
  161. m4.MTime = 400000
  162. m5 := newDefaultMessage("mytopic", "message 5")
  163. m5.Time = time.Now().Add(time.Minute).Unix() // Scheduled, in the future, later than m7
  164. m5.MTime = time.Now().Add(time.Minute).UnixMilli() // Scheduled, in the future, later than m7
  165. m6 := newDefaultMessage("mytopic", "message 6")
  166. m6.Time = 600
  167. m6.MTime = 600000
  168. m7 := newDefaultMessage("mytopic", "message 7")
  169. m7.Time = 700
  170. m7.MTime = 700000
  171. require.Nil(t, c.AddMessage(m1))
  172. require.Nil(t, c.AddMessage(m2))
  173. require.Nil(t, c.AddMessage(m3))
  174. require.Nil(t, c.AddMessage(m4))
  175. require.Nil(t, c.AddMessage(m5))
  176. require.Nil(t, c.AddMessage(m6))
  177. require.Nil(t, c.AddMessage(m7))
  178. // Case 1: Since ID exists, exclude scheduled
  179. messages, _ := c.Messages("mytopic", newSinceID(m2.ID), false)
  180. require.Equal(t, 3, len(messages))
  181. require.Equal(t, "message 4", messages[0].Message)
  182. require.Equal(t, "message 6", messages[1].Message) // Not scheduled m3/m5!
  183. require.Equal(t, "message 7", messages[2].Message)
  184. // Case 2: Since ID exists, include scheduled
  185. messages, _ = c.Messages("mytopic", newSinceID(m2.ID), true)
  186. require.Equal(t, 5, len(messages))
  187. require.Equal(t, "message 4", messages[0].Message)
  188. require.Equal(t, "message 6", messages[1].Message)
  189. require.Equal(t, "message 7", messages[2].Message)
  190. require.Equal(t, "message 5", messages[3].Message) // Order!
  191. require.Equal(t, "message 3", messages[4].Message) // Order!
  192. // Case 3: Since ID does not exist (-> Return all messages), include scheduled
  193. messages, _ = c.Messages("mytopic", newSinceID("doesntexist"), true)
  194. require.Equal(t, 7, len(messages))
  195. require.Equal(t, "message 1", messages[0].Message)
  196. require.Equal(t, "message 2", messages[1].Message)
  197. require.Equal(t, "message 4", messages[2].Message)
  198. require.Equal(t, "message 6", messages[3].Message)
  199. require.Equal(t, "message 7", messages[4].Message)
  200. require.Equal(t, "message 5", messages[5].Message) // Order!
  201. require.Equal(t, "message 3", messages[6].Message) // Order!
  202. // Case 4: Since ID exists and is last message (-> Return no messages), exclude scheduled
  203. messages, _ = c.Messages("mytopic", newSinceID(m7.ID), false)
  204. require.Equal(t, 0, len(messages))
  205. // Case 5: Since ID exists and is last message (-> Return no messages), include scheduled
  206. messages, _ = c.Messages("mytopic", newSinceID(m7.ID), true)
  207. require.Equal(t, 2, len(messages))
  208. require.Equal(t, "message 5", messages[0].Message)
  209. require.Equal(t, "message 3", messages[1].Message)
  210. }
  211. func TestSqliteCache_Prune(t *testing.T) {
  212. testCachePrune(t, newSqliteTestCache(t))
  213. }
  214. func TestMemCache_Prune(t *testing.T) {
  215. testCachePrune(t, newMemTestCache(t))
  216. }
  217. func testCachePrune(t *testing.T, c *messageCache) {
  218. now := time.Now().Unix()
  219. m1 := newDefaultMessage("mytopic", "my message")
  220. m1.Time = now - 10
  221. m1.MTime = (now - 10) * 1000
  222. m1.Expires = now - 5
  223. m2 := newDefaultMessage("mytopic", "my other message")
  224. m2.Time = now - 5
  225. m2.MTime = (now - 5) * 1000
  226. m2.Expires = now + 5 // In the future
  227. m3 := newDefaultMessage("another_topic", "and another one")
  228. m3.Time = now - 12
  229. m3.MTime = (now - 12) * 1000
  230. m3.Expires = now - 2
  231. require.Nil(t, c.AddMessage(m1))
  232. require.Nil(t, c.AddMessage(m2))
  233. require.Nil(t, c.AddMessage(m3))
  234. counts, err := c.MessageCounts()
  235. require.Nil(t, err)
  236. require.Equal(t, 2, counts["mytopic"])
  237. require.Equal(t, 1, counts["another_topic"])
  238. expiredMessageIDs, err := c.MessagesExpired()
  239. require.Nil(t, err)
  240. require.Nil(t, c.DeleteMessages(expiredMessageIDs...))
  241. counts, err = c.MessageCounts()
  242. require.Nil(t, err)
  243. require.Equal(t, 1, counts["mytopic"])
  244. require.Equal(t, 0, counts["another_topic"])
  245. messages, err := c.Messages("mytopic", sinceAllMessages, false)
  246. require.Nil(t, err)
  247. require.Equal(t, 1, len(messages))
  248. require.Equal(t, "my other message", messages[0].Message)
  249. }
  250. func TestSqliteCache_Attachments(t *testing.T) {
  251. testCacheAttachments(t, newSqliteTestCache(t))
  252. }
  253. func TestMemCache_Attachments(t *testing.T) {
  254. testCacheAttachments(t, newMemTestCache(t))
  255. }
  256. func testCacheAttachments(t *testing.T, c *messageCache) {
  257. expires1 := time.Now().Add(-4 * time.Hour).Unix() // Expired
  258. m := newDefaultMessage("mytopic", "flower for you")
  259. m.ID = "m1"
  260. m.SID = "m1"
  261. m.Sender = netip.MustParseAddr("1.2.3.4")
  262. m.Attachment = &attachment{
  263. Name: "flower.jpg",
  264. Type: "image/jpeg",
  265. Size: 5000,
  266. Expires: expires1,
  267. URL: "https://ntfy.sh/file/AbDeFgJhal.jpg",
  268. }
  269. require.Nil(t, c.AddMessage(m))
  270. expires2 := time.Now().Add(2 * time.Hour).Unix() // Future
  271. m = newDefaultMessage("mytopic", "sending you a car")
  272. m.ID = "m2"
  273. m.SID = "m2"
  274. m.Sender = netip.MustParseAddr("1.2.3.4")
  275. m.Attachment = &attachment{
  276. Name: "car.jpg",
  277. Type: "image/jpeg",
  278. Size: 10000,
  279. Expires: expires2,
  280. URL: "https://ntfy.sh/file/aCaRURL.jpg",
  281. }
  282. require.Nil(t, c.AddMessage(m))
  283. expires3 := time.Now().Add(1 * time.Hour).Unix() // Future
  284. m = newDefaultMessage("another-topic", "sending you another car")
  285. m.ID = "m3"
  286. m.SID = "m3"
  287. m.User = "u_BAsbaAa"
  288. m.Sender = netip.MustParseAddr("5.6.7.8")
  289. m.Attachment = &attachment{
  290. Name: "another-car.jpg",
  291. Type: "image/jpeg",
  292. Size: 20000,
  293. Expires: expires3,
  294. URL: "https://ntfy.sh/file/zakaDHFW.jpg",
  295. }
  296. require.Nil(t, c.AddMessage(m))
  297. messages, err := c.Messages("mytopic", sinceAllMessages, false)
  298. require.Nil(t, err)
  299. require.Equal(t, 2, len(messages))
  300. require.Equal(t, "flower for you", messages[0].Message)
  301. require.Equal(t, "flower.jpg", messages[0].Attachment.Name)
  302. require.Equal(t, "image/jpeg", messages[0].Attachment.Type)
  303. require.Equal(t, int64(5000), messages[0].Attachment.Size)
  304. require.Equal(t, expires1, messages[0].Attachment.Expires)
  305. require.Equal(t, "https://ntfy.sh/file/AbDeFgJhal.jpg", messages[0].Attachment.URL)
  306. require.Equal(t, "1.2.3.4", messages[0].Sender.String())
  307. require.Equal(t, "sending you a car", messages[1].Message)
  308. require.Equal(t, "car.jpg", messages[1].Attachment.Name)
  309. require.Equal(t, "image/jpeg", messages[1].Attachment.Type)
  310. require.Equal(t, int64(10000), messages[1].Attachment.Size)
  311. require.Equal(t, expires2, messages[1].Attachment.Expires)
  312. require.Equal(t, "https://ntfy.sh/file/aCaRURL.jpg", messages[1].Attachment.URL)
  313. require.Equal(t, "1.2.3.4", messages[1].Sender.String())
  314. size, err := c.AttachmentBytesUsedBySender("1.2.3.4")
  315. require.Nil(t, err)
  316. require.Equal(t, int64(10000), size)
  317. size, err = c.AttachmentBytesUsedBySender("5.6.7.8")
  318. require.Nil(t, err)
  319. require.Equal(t, int64(0), size) // Accounted to the user, not the IP!
  320. size, err = c.AttachmentBytesUsedByUser("u_BAsbaAa")
  321. require.Nil(t, err)
  322. require.Equal(t, int64(20000), size)
  323. }
  324. func TestSqliteCache_Attachments_Expired(t *testing.T) {
  325. testCacheAttachmentsExpired(t, newSqliteTestCache(t))
  326. }
  327. func TestMemCache_Attachments_Expired(t *testing.T) {
  328. testCacheAttachmentsExpired(t, newMemTestCache(t))
  329. }
  330. func testCacheAttachmentsExpired(t *testing.T, c *messageCache) {
  331. m := newDefaultMessage("mytopic", "flower for you")
  332. m.ID = "m1"
  333. m.SID = "m1"
  334. m.Expires = time.Now().Add(time.Hour).Unix()
  335. require.Nil(t, c.AddMessage(m))
  336. m = newDefaultMessage("mytopic", "message with attachment")
  337. m.ID = "m2"
  338. m.SID = "m2"
  339. m.Expires = time.Now().Add(2 * time.Hour).Unix()
  340. m.Attachment = &attachment{
  341. Name: "car.jpg",
  342. Type: "image/jpeg",
  343. Size: 10000,
  344. Expires: time.Now().Add(2 * time.Hour).Unix(),
  345. URL: "https://ntfy.sh/file/aCaRURL.jpg",
  346. }
  347. require.Nil(t, c.AddMessage(m))
  348. m = newDefaultMessage("mytopic", "message with external attachment")
  349. m.ID = "m3"
  350. m.SID = "m3"
  351. m.Expires = time.Now().Add(2 * time.Hour).Unix()
  352. m.Attachment = &attachment{
  353. Name: "car.jpg",
  354. Type: "image/jpeg",
  355. Expires: 0, // Unknown!
  356. URL: "https://somedomain.com/car.jpg",
  357. }
  358. require.Nil(t, c.AddMessage(m))
  359. m = newDefaultMessage("mytopic2", "message with expired attachment")
  360. m.ID = "m4"
  361. m.SID = "m4"
  362. m.Expires = time.Now().Add(2 * time.Hour).Unix()
  363. m.Attachment = &attachment{
  364. Name: "expired-car.jpg",
  365. Type: "image/jpeg",
  366. Size: 20000,
  367. Expires: time.Now().Add(-1 * time.Hour).Unix(),
  368. URL: "https://ntfy.sh/file/aCaRURL.jpg",
  369. }
  370. require.Nil(t, c.AddMessage(m))
  371. ids, err := c.AttachmentsExpired()
  372. require.Nil(t, err)
  373. require.Equal(t, 1, len(ids))
  374. require.Equal(t, "m4", ids[0])
  375. }
  376. func TestSqliteCache_Migration_From0(t *testing.T) {
  377. filename := newSqliteTestCacheFile(t)
  378. db, err := sql.Open("sqlite3", filename)
  379. require.Nil(t, err)
  380. // Create "version 0" schema
  381. _, err = db.Exec(`
  382. BEGIN;
  383. CREATE TABLE IF NOT EXISTS messages (
  384. id VARCHAR(20) PRIMARY KEY,
  385. time INT NOT NULL,
  386. topic VARCHAR(64) NOT NULL,
  387. message VARCHAR(1024) NOT NULL
  388. );
  389. CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
  390. COMMIT;
  391. `)
  392. require.Nil(t, err)
  393. // Insert a bunch of messages
  394. for i := 0; i < 10; i++ {
  395. _, err = db.Exec(`INSERT INTO messages (id, time, topic, message) VALUES (?, ?, ?, ?)`,
  396. fmt.Sprintf("abcd%d", i), time.Now().Unix(), "mytopic", fmt.Sprintf("some message %d", i))
  397. require.Nil(t, err)
  398. }
  399. require.Nil(t, db.Close())
  400. // Create cache to trigger migration
  401. c := newSqliteTestCacheFromFile(t, filename, "")
  402. checkSchemaVersion(t, c.db)
  403. messages, err := c.Messages("mytopic", sinceAllMessages, false)
  404. require.Nil(t, err)
  405. require.Equal(t, 10, len(messages))
  406. require.Equal(t, "some message 5", messages[5].Message)
  407. require.Equal(t, "", messages[5].Title)
  408. require.Nil(t, messages[5].Tags)
  409. require.Equal(t, 0, messages[5].Priority)
  410. }
  411. func TestSqliteCache_Migration_From1(t *testing.T) {
  412. filename := newSqliteTestCacheFile(t)
  413. db, err := sql.Open("sqlite3", filename)
  414. require.Nil(t, err)
  415. // Create "version 1" schema
  416. _, err = db.Exec(`
  417. CREATE TABLE IF NOT EXISTS messages (
  418. id VARCHAR(20) PRIMARY KEY,
  419. time INT NOT NULL,
  420. topic VARCHAR(64) NOT NULL,
  421. message VARCHAR(512) NOT NULL,
  422. title VARCHAR(256) NOT NULL,
  423. priority INT NOT NULL,
  424. tags VARCHAR(256) NOT NULL
  425. );
  426. CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
  427. CREATE TABLE IF NOT EXISTS schemaVersion (
  428. id INT PRIMARY KEY,
  429. version INT NOT NULL
  430. );
  431. INSERT INTO schemaVersion (id, version) VALUES (1, 1);
  432. `)
  433. require.Nil(t, err)
  434. // Insert a bunch of messages
  435. for i := 0; i < 10; i++ {
  436. _, err = db.Exec(`INSERT INTO messages (id, time, topic, message, title, priority, tags) VALUES (?, ?, ?, ?, ?, ?, ?)`,
  437. fmt.Sprintf("abcd%d", i), time.Now().Unix(), "mytopic", fmt.Sprintf("some message %d", i), "", 0, "")
  438. require.Nil(t, err)
  439. }
  440. require.Nil(t, db.Close())
  441. // Create cache to trigger migration
  442. c := newSqliteTestCacheFromFile(t, filename, "")
  443. checkSchemaVersion(t, c.db)
  444. // Add delayed message
  445. delayedMessage := newDefaultMessage("mytopic", "some delayed message")
  446. delayedMessage.Time = time.Now().Add(time.Minute).Unix()
  447. delayedMessage.MTime = time.Now().Add(time.Minute).UnixMilli()
  448. require.Nil(t, c.AddMessage(delayedMessage))
  449. // 10, not 11!
  450. messages, err := c.Messages("mytopic", sinceAllMessages, false)
  451. require.Nil(t, err)
  452. require.Equal(t, 10, len(messages))
  453. // 11!
  454. messages, err = c.Messages("mytopic", sinceAllMessages, true)
  455. require.Nil(t, err)
  456. require.Equal(t, 11, len(messages))
  457. // Check that index "idx_topic" exists
  458. rows, err := c.db.Query(`SELECT name FROM sqlite_master WHERE type='index' AND name='idx_topic'`)
  459. require.Nil(t, err)
  460. require.True(t, rows.Next())
  461. var indexName string
  462. require.Nil(t, rows.Scan(&indexName))
  463. require.Equal(t, "idx_topic", indexName)
  464. }
  465. func TestSqliteCache_Migration_From9(t *testing.T) {
  466. // This primarily tests the awkward migration that introduces the "expires" column.
  467. // The migration logic has to update the column, using the existing "cache-duration" value.
  468. filename := newSqliteTestCacheFile(t)
  469. db, err := sql.Open("sqlite3", filename)
  470. require.Nil(t, err)
  471. // Create "version 8" schema
  472. _, err = db.Exec(`
  473. BEGIN;
  474. CREATE TABLE IF NOT EXISTS messages (
  475. id INTEGER PRIMARY KEY AUTOINCREMENT,
  476. mid TEXT NOT NULL,
  477. time INT NOT NULL,
  478. topic TEXT NOT NULL,
  479. message TEXT NOT NULL,
  480. title TEXT NOT NULL,
  481. priority INT NOT NULL,
  482. tags TEXT NOT NULL,
  483. click TEXT NOT NULL,
  484. icon TEXT NOT NULL,
  485. actions TEXT NOT NULL,
  486. attachment_name TEXT NOT NULL,
  487. attachment_type TEXT NOT NULL,
  488. attachment_size INT NOT NULL,
  489. attachment_expires INT NOT NULL,
  490. attachment_url TEXT NOT NULL,
  491. sender TEXT NOT NULL,
  492. encoding TEXT NOT NULL,
  493. published INT NOT NULL
  494. );
  495. CREATE INDEX IF NOT EXISTS idx_mid ON messages (mid);
  496. CREATE INDEX IF NOT EXISTS idx_time ON messages (time);
  497. CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
  498. CREATE TABLE IF NOT EXISTS schemaVersion (
  499. id INT PRIMARY KEY,
  500. version INT NOT NULL
  501. );
  502. INSERT INTO schemaVersion (id, version) VALUES (1, 9);
  503. COMMIT;
  504. `)
  505. require.Nil(t, err)
  506. // Insert a bunch of messages
  507. insertQuery := `
  508. INSERT INTO messages (mid, time, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, encoding, published)
  509. VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  510. `
  511. for i := 0; i < 10; i++ {
  512. _, err = db.Exec(
  513. insertQuery,
  514. fmt.Sprintf("abcd%d", i),
  515. time.Now().Unix(),
  516. "mytopic",
  517. fmt.Sprintf("some message %d", i),
  518. "", // title
  519. 0, // priority
  520. "", // tags
  521. "", // click
  522. "", // icon
  523. "", // actions
  524. "", // attachment_name
  525. "", // attachment_type
  526. 0, // attachment_size
  527. 0, // attachment_type
  528. "", // attachment_url
  529. "9.9.9.9", // sender
  530. "", // encoding
  531. 1, // published
  532. )
  533. require.Nil(t, err)
  534. }
  535. // Create cache to trigger migration
  536. cacheDuration := 17 * time.Hour
  537. c, err := newSqliteCache(filename, "", cacheDuration, 0, 0, false)
  538. require.Nil(t, err)
  539. checkSchemaVersion(t, c.db)
  540. // Check version
  541. rows, err := db.Query(`SELECT version FROM main.schemaVersion WHERE id = 1`)
  542. require.Nil(t, err)
  543. require.True(t, rows.Next())
  544. var version int
  545. require.Nil(t, rows.Scan(&version))
  546. require.Equal(t, currentSchemaVersion, version)
  547. messages, err := c.Messages("mytopic", sinceAllMessages, false)
  548. require.Nil(t, err)
  549. require.Equal(t, 10, len(messages))
  550. for _, m := range messages {
  551. require.True(t, m.Expires > time.Now().Add(cacheDuration-5*time.Second).Unix())
  552. require.True(t, m.Expires < time.Now().Add(cacheDuration+5*time.Second).Unix())
  553. }
  554. }
  555. func TestSqliteCache_StartupQueries_WAL(t *testing.T) {
  556. filename := newSqliteTestCacheFile(t)
  557. startupQueries := `pragma journal_mode = WAL;
  558. pragma synchronous = normal;
  559. pragma temp_store = memory;`
  560. db, err := newSqliteCache(filename, startupQueries, time.Hour, 0, 0, false)
  561. require.Nil(t, err)
  562. require.Nil(t, db.AddMessage(newDefaultMessage("mytopic", "some message")))
  563. require.FileExists(t, filename)
  564. require.FileExists(t, filename+"-wal")
  565. require.FileExists(t, filename+"-shm")
  566. }
  567. func TestSqliteCache_StartupQueries_None(t *testing.T) {
  568. filename := newSqliteTestCacheFile(t)
  569. startupQueries := ""
  570. db, err := newSqliteCache(filename, startupQueries, time.Hour, 0, 0, false)
  571. require.Nil(t, err)
  572. require.Nil(t, db.AddMessage(newDefaultMessage("mytopic", "some message")))
  573. require.FileExists(t, filename)
  574. require.NoFileExists(t, filename+"-wal")
  575. require.NoFileExists(t, filename+"-shm")
  576. }
  577. func TestSqliteCache_StartupQueries_Fail(t *testing.T) {
  578. filename := newSqliteTestCacheFile(t)
  579. startupQueries := `xx error`
  580. _, err := newSqliteCache(filename, startupQueries, time.Hour, 0, 0, false)
  581. require.Error(t, err)
  582. }
  583. func TestSqliteCache_Sender(t *testing.T) {
  584. testSender(t, newSqliteTestCache(t))
  585. }
  586. func TestMemCache_Sender(t *testing.T) {
  587. testSender(t, newMemTestCache(t))
  588. }
  589. func testSender(t *testing.T, c *messageCache) {
  590. m1 := newDefaultMessage("mytopic", "mymessage")
  591. m1.Sender = netip.MustParseAddr("1.2.3.4")
  592. require.Nil(t, c.AddMessage(m1))
  593. m2 := newDefaultMessage("mytopic", "mymessage without sender")
  594. require.Nil(t, c.AddMessage(m2))
  595. messages, err := c.Messages("mytopic", sinceAllMessages, false)
  596. require.Nil(t, err)
  597. require.Equal(t, 2, len(messages))
  598. require.Equal(t, messages[0].Sender, netip.MustParseAddr("1.2.3.4"))
  599. require.Equal(t, messages[1].Sender, netip.Addr{})
  600. }
  601. func checkSchemaVersion(t *testing.T, db *sql.DB) {
  602. rows, err := db.Query(`SELECT version FROM schemaVersion`)
  603. require.Nil(t, err)
  604. require.True(t, rows.Next())
  605. var schemaVersion int
  606. require.Nil(t, rows.Scan(&schemaVersion))
  607. require.Equal(t, currentSchemaVersion, schemaVersion)
  608. require.Nil(t, rows.Close())
  609. }
  610. func TestMemCache_NopCache(t *testing.T) {
  611. c, _ := newNopCache()
  612. require.Nil(t, c.AddMessage(newDefaultMessage("mytopic", "my message")))
  613. messages, err := c.Messages("mytopic", sinceAllMessages, false)
  614. require.Nil(t, err)
  615. require.Empty(t, messages)
  616. topics, err := c.Topics()
  617. require.Nil(t, err)
  618. require.Empty(t, topics)
  619. }
  620. func newSqliteTestCache(t *testing.T) *messageCache {
  621. c, err := newSqliteCache(newSqliteTestCacheFile(t), "", time.Hour, 0, 0, false)
  622. if err != nil {
  623. t.Fatal(err)
  624. }
  625. return c
  626. }
  627. func newSqliteTestCacheFile(t *testing.T) string {
  628. return filepath.Join(t.TempDir(), "cache.db")
  629. }
  630. func newSqliteTestCacheFromFile(t *testing.T, filename, startupQueries string) *messageCache {
  631. c, err := newSqliteCache(filename, startupQueries, time.Hour, 0, 0, false)
  632. require.Nil(t, err)
  633. return c
  634. }
  635. func newMemTestCache(t *testing.T) *messageCache {
  636. c, err := newMemCache()
  637. require.Nil(t, err)
  638. return c
  639. }