Kaynağa Gözat

Merge pull request #1421 from binwiederhier/message-cache-lock

Message cache lock
Philipp C. Heckel 6 ay önce
ebeveyn
işleme
ce4b2ae9a0
3 değiştirilmiş dosya ile 45 ekleme ve 8 silme
  1. 1 0
      docs/releases.md
  2. 22 8
      server/message_cache.go
  3. 22 0
      server/message_cache_test.go

+ 1 - 0
docs/releases.md

@@ -1472,6 +1472,7 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release
 
 **Bug fixes + maintenance:**
 
+* Add mutex around message cache writes to avoid `database locked` errors ([#1397](https://github.com/binwiederhier/ntfy/pull/1397), [#1391](https://github.com/binwiederhier/ntfy/issues/1391), thanks to [@timofej673](https://github.com/timofej673))
 * Add build tags `nopayments`, `nofirebase` and `nowebpush` to allow excluding external dependencies, useful for 
   packaging in Debian ([#1420](https://github.com/binwiederhier/ntfy/pull/1420), discussion in [#1258](https://github.com/binwiederhier/ntfy/issues/1258), thanks to [@thekhalifa](https://github.com/thekhalifa) for packaging ntfy for Debian/Ubuntu)
 

+ 22 - 8
server/message_cache.go

@@ -8,6 +8,7 @@ import (
 	"net/netip"
 	"path/filepath"
 	"strings"
+	"sync"
 	"time"
 
 	_ "github.com/mattn/go-sqlite3" // SQLite driver
@@ -36,7 +37,7 @@ const (
 			priority INT NOT NULL,
 			tags TEXT NOT NULL,
 			click TEXT NOT NULL,
-			icon TEXT NOT NULL,			
+			icon TEXT NOT NULL,
 			actions TEXT NOT NULL,
 			attachment_name TEXT NOT NULL,
 			attachment_type TEXT NOT NULL,
@@ -73,30 +74,30 @@ const (
 	selectRowIDFromMessageID          = `SELECT id FROM messages WHERE mid = ?` // Do not include topic, see #336 and TestServer_PollSinceID_MultipleTopics
 	selectMessagesByIDQuery           = `
 		SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding
-		FROM messages 
+		FROM messages
 		WHERE mid = ?
 	`
 	selectMessagesSinceTimeQuery = `
 		SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding
-		FROM messages 
+		FROM messages
 		WHERE topic = ? AND time >= ? AND published = 1
 		ORDER BY time, id
 	`
 	selectMessagesSinceTimeIncludeScheduledQuery = `
 		SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding
-		FROM messages 
+		FROM messages
 		WHERE topic = ? AND time >= ?
 		ORDER BY time, id
 	`
 	selectMessagesSinceIDQuery = `
 		SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding
-		FROM messages 
+		FROM messages
 		WHERE topic = ? AND id > ? AND published = 1 
 		ORDER BY time, id
 	`
 	selectMessagesSinceIDIncludeScheduledQuery = `
 		SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding
-		FROM messages 
+		FROM messages
 		WHERE topic = ? AND (id > ? OR published = 0)
 		ORDER BY time, id
 	`
@@ -106,10 +107,10 @@ const (
 		WHERE topic = ? AND published = 1
 		ORDER BY time DESC, id DESC
 		LIMIT 1
-  `
+	`
 	selectMessagesDueQuery = `
 		SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding
-		FROM messages 
+		FROM messages
 		WHERE time <= ? AND published = 0
 		ORDER BY time, id
 	`
@@ -283,6 +284,7 @@ type messageCache struct {
 	db    *sql.DB
 	queue *util.BatchingQueue[*message]
 	nop   bool
+	mu    sync.Mutex
 }
 
 // newSqliteCache creates a SQLite file-backed cache
@@ -347,6 +349,8 @@ func (c *messageCache) AddMessage(m *message) error {
 // addMessages synchronously stores a match of messages. If the database is locked, the transaction waits until
 // SQLite's busy_timeout is exceeded before erroring out.
 func (c *messageCache) addMessages(ms []*message) error {
+	c.mu.Lock()
+	defer c.mu.Unlock()
 	if c.nop {
 		return nil
 	}
@@ -528,6 +532,8 @@ func (c *messageCache) Message(id string) (*message, error) {
 }
 
 func (c *messageCache) MarkPublished(m *message) error {
+	c.mu.Lock()
+	defer c.mu.Unlock()
 	_, err := c.db.Exec(updateMessagePublishedQuery, m.ID)
 	return err
 }
@@ -573,6 +579,8 @@ func (c *messageCache) Topics() (map[string]*topic, error) {
 }
 
 func (c *messageCache) DeleteMessages(ids ...string) error {
+	c.mu.Lock()
+	defer c.mu.Unlock()
 	tx, err := c.db.Begin()
 	if err != nil {
 		return err
@@ -587,6 +595,8 @@ func (c *messageCache) DeleteMessages(ids ...string) error {
 }
 
 func (c *messageCache) ExpireMessages(topics ...string) error {
+	c.mu.Lock()
+	defer c.mu.Unlock()
 	tx, err := c.db.Begin()
 	if err != nil {
 		return err
@@ -621,6 +631,8 @@ func (c *messageCache) AttachmentsExpired() ([]string, error) {
 }
 
 func (c *messageCache) MarkAttachmentsDeleted(ids ...string) error {
+	c.mu.Lock()
+	defer c.mu.Unlock()
 	tx, err := c.db.Begin()
 	if err != nil {
 		return err
@@ -766,6 +778,8 @@ func readMessage(rows *sql.Rows) (*message, error) {
 }
 
 func (c *messageCache) UpdateStats(messages int64) error {
+	c.mu.Lock()
+	defer c.mu.Unlock()
 	_, err := c.db.Exec(updateStatsQuery, messages)
 	return err
 }

+ 22 - 0
server/message_cache_test.go

@@ -3,8 +3,10 @@ package server
 import (
 	"database/sql"
 	"fmt"
+	"github.com/stretchr/testify/assert"
 	"net/netip"
 	"path/filepath"
+	"sync"
 	"testing"
 	"time"
 
@@ -90,6 +92,26 @@ func testCacheMessages(t *testing.T, c *messageCache) {
 	require.Empty(t, messages)
 }
 
+func TestSqliteCache_MessagesLock(t *testing.T) {
+	testCacheMessagesLock(t, newSqliteTestCache(t))
+}
+
+func TestMemCache_MessagesLock(t *testing.T) {
+	testCacheMessagesLock(t, newMemTestCache(t))
+}
+
+func testCacheMessagesLock(t *testing.T, c *messageCache) {
+	var wg sync.WaitGroup
+	for i := 0; i < 5000; i++ {
+		wg.Add(1)
+		go func() {
+			assert.Nil(t, c.AddMessage(newDefaultMessage("mytopic", "test message")))
+			wg.Done()
+		}()
+	}
+	wg.Wait()
+}
+
 func TestSqliteCache_MessagesScheduled(t *testing.T) {
 	testCacheMessagesScheduled(t, newSqliteTestCache(t))
 }