message_cache_test.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659
  1. package server
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "net/netip"
  6. "path/filepath"
  7. "testing"
  8. "time"
  9. "github.com/stretchr/testify/assert"
  10. "github.com/stretchr/testify/require"
  11. )
  12. var (
  13. exampleIP1234 = netip.MustParseAddr("1.2.3.4")
  14. )
  15. func TestSqliteCache_Messages(t *testing.T) {
  16. testCacheMessages(t, newSqliteTestCache(t))
  17. }
  18. func TestMemCache_Messages(t *testing.T) {
  19. testCacheMessages(t, newMemTestCache(t))
  20. }
  21. func testCacheMessages(t *testing.T, c *messageCache) {
  22. m1 := newDefaultMessage("mytopic", "my message")
  23. m1.Time = 1
  24. m2 := newDefaultMessage("mytopic", "my other message")
  25. m2.Time = 2
  26. require.Nil(t, c.AddMessage(m1))
  27. require.Nil(t, c.AddMessage(newDefaultMessage("example", "my example message")))
  28. require.Nil(t, c.AddMessage(m2))
  29. // Adding invalid
  30. require.Equal(t, errUnexpectedMessageType, c.AddMessage(newKeepaliveMessage("mytopic"))) // These should not be added!
  31. require.Equal(t, errUnexpectedMessageType, c.AddMessage(newOpenMessage("example"))) // These should not be added!
  32. // mytopic: count
  33. counts, err := c.MessageCounts()
  34. require.Nil(t, err)
  35. require.Equal(t, 2, counts["mytopic"])
  36. // mytopic: since all
  37. messages, _ := c.Messages("mytopic", sinceAllMessages, false)
  38. require.Equal(t, 2, len(messages))
  39. require.Equal(t, "my message", messages[0].Message)
  40. require.Equal(t, "mytopic", messages[0].Topic)
  41. require.Equal(t, messageEvent, messages[0].Event)
  42. require.Equal(t, "", messages[0].Title)
  43. require.Equal(t, 0, messages[0].Priority)
  44. require.Nil(t, messages[0].Tags)
  45. require.Equal(t, "my other message", messages[1].Message)
  46. // mytopic: since none
  47. messages, _ = c.Messages("mytopic", sinceNoMessages, false)
  48. require.Empty(t, messages)
  49. // mytopic: since m1 (by ID)
  50. messages, _ = c.Messages("mytopic", newSinceID(m1.ID), false)
  51. require.Equal(t, 1, len(messages))
  52. require.Equal(t, m2.ID, messages[0].ID)
  53. require.Equal(t, "my other message", messages[0].Message)
  54. require.Equal(t, "mytopic", messages[0].Topic)
  55. // mytopic: since 2
  56. messages, _ = c.Messages("mytopic", newSinceTime(2), false)
  57. require.Equal(t, 1, len(messages))
  58. require.Equal(t, "my other message", messages[0].Message)
  59. // example: count
  60. counts, err = c.MessageCounts()
  61. require.Nil(t, err)
  62. require.Equal(t, 1, counts["example"])
  63. // example: since all
  64. messages, _ = c.Messages("example", sinceAllMessages, false)
  65. require.Equal(t, "my example message", messages[0].Message)
  66. // non-existing: count
  67. counts, err = c.MessageCounts()
  68. require.Nil(t, err)
  69. require.Equal(t, 0, counts["doesnotexist"])
  70. // non-existing: since all
  71. messages, _ = c.Messages("doesnotexist", sinceAllMessages, false)
  72. require.Empty(t, messages)
  73. }
  74. func TestSqliteCache_MessagesScheduled(t *testing.T) {
  75. testCacheMessagesScheduled(t, newSqliteTestCache(t))
  76. }
  77. func TestMemCache_MessagesScheduled(t *testing.T) {
  78. testCacheMessagesScheduled(t, newMemTestCache(t))
  79. }
  80. func testCacheMessagesScheduled(t *testing.T, c *messageCache) {
  81. m1 := newDefaultMessage("mytopic", "message 1")
  82. m2 := newDefaultMessage("mytopic", "message 2")
  83. m2.Time = time.Now().Add(time.Hour).Unix()
  84. m3 := newDefaultMessage("mytopic", "message 3")
  85. m3.Time = time.Now().Add(time.Minute).Unix() // earlier than m2!
  86. m4 := newDefaultMessage("mytopic2", "message 4")
  87. m4.Time = time.Now().Add(time.Minute).Unix()
  88. require.Nil(t, c.AddMessage(m1))
  89. require.Nil(t, c.AddMessage(m2))
  90. require.Nil(t, c.AddMessage(m3))
  91. messages, _ := c.Messages("mytopic", sinceAllMessages, false) // exclude scheduled
  92. require.Equal(t, 1, len(messages))
  93. require.Equal(t, "message 1", messages[0].Message)
  94. messages, _ = c.Messages("mytopic", sinceAllMessages, true) // include scheduled
  95. require.Equal(t, 3, len(messages))
  96. require.Equal(t, "message 1", messages[0].Message)
  97. require.Equal(t, "message 3", messages[1].Message) // Order!
  98. require.Equal(t, "message 2", messages[2].Message)
  99. messages, _ = c.MessagesDue()
  100. require.Empty(t, messages)
  101. }
  102. func TestSqliteCache_Topics(t *testing.T) {
  103. testCacheTopics(t, newSqliteTestCache(t))
  104. }
  105. func TestMemCache_Topics(t *testing.T) {
  106. testCacheTopics(t, newMemTestCache(t))
  107. }
  108. func testCacheTopics(t *testing.T, c *messageCache) {
  109. require.Nil(t, c.AddMessage(newDefaultMessage("topic1", "my example message")))
  110. require.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 1")))
  111. require.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 2")))
  112. require.Nil(t, c.AddMessage(newDefaultMessage("topic2", "message 3")))
  113. topics, err := c.Topics()
  114. if err != nil {
  115. t.Fatal(err)
  116. }
  117. require.Equal(t, 2, len(topics))
  118. require.Equal(t, "topic1", topics["topic1"].ID)
  119. require.Equal(t, "topic2", topics["topic2"].ID)
  120. }
  121. func TestSqliteCache_MessagesTagsPrioAndTitle(t *testing.T) {
  122. testCacheMessagesTagsPrioAndTitle(t, newSqliteTestCache(t))
  123. }
  124. func TestMemCache_MessagesTagsPrioAndTitle(t *testing.T) {
  125. testCacheMessagesTagsPrioAndTitle(t, newMemTestCache(t))
  126. }
  127. func testCacheMessagesTagsPrioAndTitle(t *testing.T, c *messageCache) {
  128. m := newDefaultMessage("mytopic", "some message")
  129. m.Tags = []string{"tag1", "tag2"}
  130. m.Priority = 5
  131. m.Title = "some title"
  132. require.Nil(t, c.AddMessage(m))
  133. messages, _ := c.Messages("mytopic", sinceAllMessages, false)
  134. require.Equal(t, []string{"tag1", "tag2"}, messages[0].Tags)
  135. require.Equal(t, 5, messages[0].Priority)
  136. require.Equal(t, "some title", messages[0].Title)
  137. }
  138. func TestSqliteCache_MessagesSinceID(t *testing.T) {
  139. testCacheMessagesSinceID(t, newSqliteTestCache(t))
  140. }
  141. func TestMemCache_MessagesSinceID(t *testing.T) {
  142. testCacheMessagesSinceID(t, newMemTestCache(t))
  143. }
  144. func testCacheMessagesSinceID(t *testing.T, c *messageCache) {
  145. m1 := newDefaultMessage("mytopic", "message 1")
  146. m1.Time = 100
  147. m2 := newDefaultMessage("mytopic", "message 2")
  148. m2.Time = 200
  149. m3 := newDefaultMessage("mytopic", "message 3")
  150. m3.Time = time.Now().Add(time.Hour).Unix() // Scheduled, in the future, later than m7 and m5
  151. m4 := newDefaultMessage("mytopic", "message 4")
  152. m4.Time = 400
  153. m5 := newDefaultMessage("mytopic", "message 5")
  154. m5.Time = time.Now().Add(time.Minute).Unix() // Scheduled, in the future, later than m7
  155. m6 := newDefaultMessage("mytopic", "message 6")
  156. m6.Time = 600
  157. m7 := newDefaultMessage("mytopic", "message 7")
  158. m7.Time = 700
  159. require.Nil(t, c.AddMessage(m1))
  160. require.Nil(t, c.AddMessage(m2))
  161. require.Nil(t, c.AddMessage(m3))
  162. require.Nil(t, c.AddMessage(m4))
  163. require.Nil(t, c.AddMessage(m5))
  164. require.Nil(t, c.AddMessage(m6))
  165. require.Nil(t, c.AddMessage(m7))
  166. // Case 1: Since ID exists, exclude scheduled
  167. messages, _ := c.Messages("mytopic", newSinceID(m2.ID), false)
  168. require.Equal(t, 3, len(messages))
  169. require.Equal(t, "message 4", messages[0].Message)
  170. require.Equal(t, "message 6", messages[1].Message) // Not scheduled m3/m5!
  171. require.Equal(t, "message 7", messages[2].Message)
  172. // Case 2: Since ID exists, include scheduled
  173. messages, _ = c.Messages("mytopic", newSinceID(m2.ID), true)
  174. require.Equal(t, 5, len(messages))
  175. require.Equal(t, "message 4", messages[0].Message)
  176. require.Equal(t, "message 6", messages[1].Message)
  177. require.Equal(t, "message 7", messages[2].Message)
  178. require.Equal(t, "message 5", messages[3].Message) // Order!
  179. require.Equal(t, "message 3", messages[4].Message) // Order!
  180. // Case 3: Since ID does not exist (-> Return all messages), include scheduled
  181. messages, _ = c.Messages("mytopic", newSinceID("doesntexist"), true)
  182. require.Equal(t, 7, len(messages))
  183. require.Equal(t, "message 1", messages[0].Message)
  184. require.Equal(t, "message 2", messages[1].Message)
  185. require.Equal(t, "message 4", messages[2].Message)
  186. require.Equal(t, "message 6", messages[3].Message)
  187. require.Equal(t, "message 7", messages[4].Message)
  188. require.Equal(t, "message 5", messages[5].Message) // Order!
  189. require.Equal(t, "message 3", messages[6].Message) // Order!
  190. // Case 4: Since ID exists and is last message (-> Return no messages), exclude scheduled
  191. messages, _ = c.Messages("mytopic", newSinceID(m7.ID), false)
  192. require.Equal(t, 0, len(messages))
  193. // Case 5: Since ID exists and is last message (-> Return no messages), include scheduled
  194. messages, _ = c.Messages("mytopic", newSinceID(m7.ID), true)
  195. require.Equal(t, 2, len(messages))
  196. require.Equal(t, "message 5", messages[0].Message)
  197. require.Equal(t, "message 3", messages[1].Message)
  198. }
  199. func TestSqliteCache_Prune(t *testing.T) {
  200. testCachePrune(t, newSqliteTestCache(t))
  201. }
  202. func TestMemCache_Prune(t *testing.T) {
  203. testCachePrune(t, newMemTestCache(t))
  204. }
  205. func testCachePrune(t *testing.T, c *messageCache) {
  206. now := time.Now().Unix()
  207. m1 := newDefaultMessage("mytopic", "my message")
  208. m1.Time = now - 10
  209. m1.Expires = now - 5
  210. m2 := newDefaultMessage("mytopic", "my other message")
  211. m2.Time = now - 5
  212. m2.Expires = now + 5 // In the future
  213. m3 := newDefaultMessage("another_topic", "and another one")
  214. m3.Time = now - 12
  215. m3.Expires = now - 2
  216. require.Nil(t, c.AddMessage(m1))
  217. require.Nil(t, c.AddMessage(m2))
  218. require.Nil(t, c.AddMessage(m3))
  219. counts, err := c.MessageCounts()
  220. require.Nil(t, err)
  221. require.Equal(t, 2, counts["mytopic"])
  222. require.Equal(t, 1, counts["another_topic"])
  223. expiredMessageIDs, err := c.MessagesExpired()
  224. require.Nil(t, err)
  225. require.Nil(t, c.DeleteMessages(expiredMessageIDs...))
  226. counts, err = c.MessageCounts()
  227. require.Nil(t, err)
  228. require.Equal(t, 1, counts["mytopic"])
  229. require.Equal(t, 0, counts["another_topic"])
  230. messages, err := c.Messages("mytopic", sinceAllMessages, false)
  231. require.Nil(t, err)
  232. require.Equal(t, 1, len(messages))
  233. require.Equal(t, "my other message", messages[0].Message)
  234. }
  235. func TestSqliteCache_Attachments(t *testing.T) {
  236. testCacheAttachments(t, newSqliteTestCache(t))
  237. }
  238. func TestMemCache_Attachments(t *testing.T) {
  239. testCacheAttachments(t, newMemTestCache(t))
  240. }
  241. func testCacheAttachments(t *testing.T, c *messageCache) {
  242. expires1 := time.Now().Add(-4 * time.Hour).Unix()
  243. m := newDefaultMessage("mytopic", "flower for you")
  244. m.ID = "m1"
  245. m.Sender = exampleIP1234
  246. m.Attachment = &attachment{
  247. Name: "flower.jpg",
  248. Type: "image/jpeg",
  249. Size: 5000,
  250. Expires: expires1,
  251. URL: "https://ntfy.sh/file/AbDeFgJhal.jpg",
  252. }
  253. require.Nil(t, c.AddMessage(m))
  254. expires2 := time.Now().Add(2 * time.Hour).Unix() // Future
  255. m = newDefaultMessage("mytopic", "sending you a car")
  256. m.ID = "m2"
  257. m.Sender = exampleIP1234
  258. m.Attachment = &attachment{
  259. Name: "car.jpg",
  260. Type: "image/jpeg",
  261. Size: 10000,
  262. Expires: expires2,
  263. URL: "https://ntfy.sh/file/aCaRURL.jpg",
  264. }
  265. require.Nil(t, c.AddMessage(m))
  266. expires3 := time.Now().Add(1 * time.Hour).Unix() // Future
  267. m = newDefaultMessage("another-topic", "sending you another car")
  268. m.ID = "m3"
  269. m.Sender = exampleIP1234
  270. m.Attachment = &attachment{
  271. Name: "another-car.jpg",
  272. Type: "image/jpeg",
  273. Size: 20000,
  274. Expires: expires3,
  275. URL: "https://ntfy.sh/file/zakaDHFW.jpg",
  276. }
  277. require.Nil(t, c.AddMessage(m))
  278. messages, err := c.Messages("mytopic", sinceAllMessages, false)
  279. require.Nil(t, err)
  280. require.Equal(t, 2, len(messages))
  281. require.Equal(t, "flower for you", messages[0].Message)
  282. require.Equal(t, "flower.jpg", messages[0].Attachment.Name)
  283. require.Equal(t, "image/jpeg", messages[0].Attachment.Type)
  284. require.Equal(t, int64(5000), messages[0].Attachment.Size)
  285. require.Equal(t, expires1, messages[0].Attachment.Expires)
  286. require.Equal(t, "https://ntfy.sh/file/AbDeFgJhal.jpg", messages[0].Attachment.URL)
  287. require.Equal(t, "1.2.3.4", messages[0].Sender.String())
  288. require.Equal(t, "sending you a car", messages[1].Message)
  289. require.Equal(t, "car.jpg", messages[1].Attachment.Name)
  290. require.Equal(t, "image/jpeg", messages[1].Attachment.Type)
  291. require.Equal(t, int64(10000), messages[1].Attachment.Size)
  292. require.Equal(t, expires2, messages[1].Attachment.Expires)
  293. require.Equal(t, "https://ntfy.sh/file/aCaRURL.jpg", messages[1].Attachment.URL)
  294. require.Equal(t, "1.2.3.4", messages[1].Sender.String())
  295. size, err := c.AttachmentBytesUsedBySender("1.2.3.4")
  296. require.Nil(t, err)
  297. require.Equal(t, int64(30000), size)
  298. size, err = c.AttachmentBytesUsedBySender("5.6.7.8")
  299. require.Nil(t, err)
  300. require.Equal(t, int64(0), size)
  301. }
  302. func TestSqliteCache_Migration_From0(t *testing.T) {
  303. filename := newSqliteTestCacheFile(t)
  304. db, err := sql.Open("sqlite3", filename)
  305. require.Nil(t, err)
  306. // Create "version 0" schema
  307. _, err = db.Exec(`
  308. BEGIN;
  309. CREATE TABLE IF NOT EXISTS messages (
  310. id VARCHAR(20) PRIMARY KEY,
  311. time INT NOT NULL,
  312. topic VARCHAR(64) NOT NULL,
  313. message VARCHAR(1024) NOT NULL
  314. );
  315. CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
  316. COMMIT;
  317. `)
  318. require.Nil(t, err)
  319. // Insert a bunch of messages
  320. for i := 0; i < 10; i++ {
  321. _, err = db.Exec(`INSERT INTO messages (id, time, topic, message) VALUES (?, ?, ?, ?)`,
  322. fmt.Sprintf("abcd%d", i), time.Now().Unix(), "mytopic", fmt.Sprintf("some message %d", i))
  323. require.Nil(t, err)
  324. }
  325. require.Nil(t, db.Close())
  326. // Create cache to trigger migration
  327. c := newSqliteTestCacheFromFile(t, filename, "")
  328. checkSchemaVersion(t, c.db)
  329. messages, err := c.Messages("mytopic", sinceAllMessages, false)
  330. require.Nil(t, err)
  331. require.Equal(t, 10, len(messages))
  332. require.Equal(t, "some message 5", messages[5].Message)
  333. require.Equal(t, "", messages[5].Title)
  334. require.Nil(t, messages[5].Tags)
  335. require.Equal(t, 0, messages[5].Priority)
  336. }
  337. func TestSqliteCache_Migration_From1(t *testing.T) {
  338. filename := newSqliteTestCacheFile(t)
  339. db, err := sql.Open("sqlite3", filename)
  340. require.Nil(t, err)
  341. // Create "version 1" schema
  342. _, err = db.Exec(`
  343. CREATE TABLE IF NOT EXISTS messages (
  344. id VARCHAR(20) PRIMARY KEY,
  345. time INT NOT NULL,
  346. topic VARCHAR(64) NOT NULL,
  347. message VARCHAR(512) NOT NULL,
  348. title VARCHAR(256) NOT NULL,
  349. priority INT NOT NULL,
  350. tags VARCHAR(256) NOT NULL
  351. );
  352. CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
  353. CREATE TABLE IF NOT EXISTS schemaVersion (
  354. id INT PRIMARY KEY,
  355. version INT NOT NULL
  356. );
  357. INSERT INTO schemaVersion (id, version) VALUES (1, 1);
  358. `)
  359. require.Nil(t, err)
  360. // Insert a bunch of messages
  361. for i := 0; i < 10; i++ {
  362. _, err = db.Exec(`INSERT INTO messages (id, time, topic, message, title, priority, tags) VALUES (?, ?, ?, ?, ?, ?, ?)`,
  363. fmt.Sprintf("abcd%d", i), time.Now().Unix(), "mytopic", fmt.Sprintf("some message %d", i), "", 0, "")
  364. require.Nil(t, err)
  365. }
  366. require.Nil(t, db.Close())
  367. // Create cache to trigger migration
  368. c := newSqliteTestCacheFromFile(t, filename, "")
  369. checkSchemaVersion(t, c.db)
  370. // Add delayed message
  371. delayedMessage := newDefaultMessage("mytopic", "some delayed message")
  372. delayedMessage.Time = time.Now().Add(time.Minute).Unix()
  373. require.Nil(t, c.AddMessage(delayedMessage))
  374. // 10, not 11!
  375. messages, err := c.Messages("mytopic", sinceAllMessages, false)
  376. require.Nil(t, err)
  377. require.Equal(t, 10, len(messages))
  378. // 11!
  379. messages, err = c.Messages("mytopic", sinceAllMessages, true)
  380. require.Nil(t, err)
  381. require.Equal(t, 11, len(messages))
  382. }
  383. func TestSqliteCache_Migration_From9(t *testing.T) {
  384. // This primarily tests the awkward migration that introduces the "expires" column.
  385. // The migration logic has to update the column, using the existing "cache-duration" value.
  386. filename := newSqliteTestCacheFile(t)
  387. db, err := sql.Open("sqlite3", filename)
  388. require.Nil(t, err)
  389. // Create "version 8" schema
  390. _, err = db.Exec(`
  391. BEGIN;
  392. CREATE TABLE IF NOT EXISTS messages (
  393. id INTEGER PRIMARY KEY AUTOINCREMENT,
  394. mid TEXT NOT NULL,
  395. time INT NOT NULL,
  396. topic TEXT NOT NULL,
  397. message TEXT NOT NULL,
  398. title TEXT NOT NULL,
  399. priority INT NOT NULL,
  400. tags TEXT NOT NULL,
  401. click TEXT NOT NULL,
  402. icon TEXT NOT NULL,
  403. actions TEXT NOT NULL,
  404. attachment_name TEXT NOT NULL,
  405. attachment_type TEXT NOT NULL,
  406. attachment_size INT NOT NULL,
  407. attachment_expires INT NOT NULL,
  408. attachment_url TEXT NOT NULL,
  409. sender TEXT NOT NULL,
  410. encoding TEXT NOT NULL,
  411. published INT NOT NULL
  412. );
  413. CREATE INDEX IF NOT EXISTS idx_mid ON messages (mid);
  414. CREATE INDEX IF NOT EXISTS idx_time ON messages (time);
  415. CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
  416. CREATE TABLE IF NOT EXISTS schemaVersion (
  417. id INT PRIMARY KEY,
  418. version INT NOT NULL
  419. );
  420. INSERT INTO schemaVersion (id, version) VALUES (1, 9);
  421. COMMIT;
  422. `)
  423. require.Nil(t, err)
  424. // Insert a bunch of messages
  425. insertQuery := `
  426. INSERT INTO messages (mid, time, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, encoding, published)
  427. VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  428. `
  429. for i := 0; i < 10; i++ {
  430. _, err = db.Exec(
  431. insertQuery,
  432. fmt.Sprintf("abcd%d", i),
  433. time.Now().Unix(),
  434. "mytopic",
  435. fmt.Sprintf("some message %d", i),
  436. "", // title
  437. 0, // priority
  438. "", // tags
  439. "", // click
  440. "", // icon
  441. "", // actions
  442. "", // attachment_name
  443. "", // attachment_type
  444. 0, // attachment_size
  445. 0, // attachment_type
  446. "", // attachment_url
  447. "9.9.9.9", // sender
  448. "", // encoding
  449. 1, // published
  450. )
  451. require.Nil(t, err)
  452. }
  453. // Create cache to trigger migration
  454. cacheDuration := 17 * time.Hour
  455. c, err := newSqliteCache(filename, "", cacheDuration, 0, 0, false)
  456. require.Nil(t, err)
  457. checkSchemaVersion(t, c.db)
  458. // Check version
  459. rows, err := db.Query(`SELECT version FROM main.schemaVersion WHERE id = 1`)
  460. require.Nil(t, err)
  461. require.True(t, rows.Next())
  462. var version int
  463. require.Nil(t, rows.Scan(&version))
  464. require.Equal(t, currentSchemaVersion, version)
  465. messages, err := c.Messages("mytopic", sinceAllMessages, false)
  466. require.Nil(t, err)
  467. require.Equal(t, 10, len(messages))
  468. for _, m := range messages {
  469. require.True(t, m.Expires > time.Now().Add(cacheDuration-5*time.Second).Unix())
  470. require.True(t, m.Expires < time.Now().Add(cacheDuration+5*time.Second).Unix())
  471. }
  472. }
  473. func TestSqliteCache_StartupQueries_WAL(t *testing.T) {
  474. filename := newSqliteTestCacheFile(t)
  475. startupQueries := `pragma journal_mode = WAL;
  476. pragma synchronous = normal;
  477. pragma temp_store = memory;`
  478. db, err := newSqliteCache(filename, startupQueries, time.Hour, 0, 0, false)
  479. require.Nil(t, err)
  480. require.Nil(t, db.AddMessage(newDefaultMessage("mytopic", "some message")))
  481. require.FileExists(t, filename)
  482. require.FileExists(t, filename+"-wal")
  483. require.FileExists(t, filename+"-shm")
  484. }
  485. func TestSqliteCache_StartupQueries_None(t *testing.T) {
  486. filename := newSqliteTestCacheFile(t)
  487. startupQueries := ""
  488. db, err := newSqliteCache(filename, startupQueries, time.Hour, 0, 0, false)
  489. require.Nil(t, err)
  490. require.Nil(t, db.AddMessage(newDefaultMessage("mytopic", "some message")))
  491. require.FileExists(t, filename)
  492. require.NoFileExists(t, filename+"-wal")
  493. require.NoFileExists(t, filename+"-shm")
  494. }
  495. func TestSqliteCache_StartupQueries_Fail(t *testing.T) {
  496. filename := newSqliteTestCacheFile(t)
  497. startupQueries := `xx error`
  498. _, err := newSqliteCache(filename, startupQueries, time.Hour, 0, 0, false)
  499. require.Error(t, err)
  500. }
  501. func TestSqliteCache_Sender(t *testing.T) {
  502. testSender(t, newSqliteTestCache(t))
  503. }
  504. func TestMemCache_Sender(t *testing.T) {
  505. testSender(t, newMemTestCache(t))
  506. }
  507. func testSender(t *testing.T, c *messageCache) {
  508. m1 := newDefaultMessage("mytopic", "mymessage")
  509. m1.Sender = netip.MustParseAddr("1.2.3.4")
  510. require.Nil(t, c.AddMessage(m1))
  511. m2 := newDefaultMessage("mytopic", "mymessage without sender")
  512. require.Nil(t, c.AddMessage(m2))
  513. messages, err := c.Messages("mytopic", sinceAllMessages, false)
  514. require.Nil(t, err)
  515. require.Equal(t, 2, len(messages))
  516. require.Equal(t, messages[0].Sender, netip.MustParseAddr("1.2.3.4"))
  517. require.Equal(t, messages[1].Sender, netip.Addr{})
  518. }
  519. func checkSchemaVersion(t *testing.T, db *sql.DB) {
  520. rows, err := db.Query(`SELECT version FROM schemaVersion`)
  521. require.Nil(t, err)
  522. require.True(t, rows.Next())
  523. var schemaVersion int
  524. require.Nil(t, rows.Scan(&schemaVersion))
  525. require.Equal(t, currentSchemaVersion, schemaVersion)
  526. require.Nil(t, rows.Close())
  527. }
  528. func TestMemCache_NopCache(t *testing.T) {
  529. c, _ := newNopCache()
  530. assert.Nil(t, c.AddMessage(newDefaultMessage("mytopic", "my message")))
  531. messages, err := c.Messages("mytopic", sinceAllMessages, false)
  532. assert.Nil(t, err)
  533. assert.Empty(t, messages)
  534. topics, err := c.Topics()
  535. assert.Nil(t, err)
  536. assert.Empty(t, topics)
  537. }
  538. func newSqliteTestCache(t *testing.T) *messageCache {
  539. c, err := newSqliteCache(newSqliteTestCacheFile(t), "", time.Hour, 0, 0, false)
  540. if err != nil {
  541. t.Fatal(err)
  542. }
  543. return c
  544. }
  545. func newSqliteTestCacheFile(t *testing.T) string {
  546. return filepath.Join(t.TempDir(), "cache.db")
  547. }
  548. func newSqliteTestCacheFromFile(t *testing.T, filename, startupQueries string) *messageCache {
  549. c, err := newSqliteCache(filename, startupQueries, time.Hour, 0, 0, false)
  550. if err != nil {
  551. t.Fatal(err)
  552. }
  553. return c
  554. }
  555. func newMemTestCache(t *testing.T) *messageCache {
  556. c, err := newMemCache()
  557. if err != nil {
  558. t.Fatal(err)
  559. }
  560. return c
  561. }