Selaa lähdekoodia

add sid, mtime, and deleted to message_cache

Hunter Kehoe 4 kuukautta sitten
vanhempi
sitoutus
2aae3577cb
3 muutettua tiedostoa jossa 98 lisäystä ja 23 poistoa
  1. 60 19
      server/message_cache.go
  2. 26 3
      server/message_cache_test.go
  3. 12 1
      server/types.go

+ 60 - 19
server/message_cache.go

@@ -28,7 +28,9 @@ const (
 		CREATE TABLE IF NOT EXISTS messages (
 			id INTEGER PRIMARY KEY AUTOINCREMENT,
 			mid TEXT NOT NULL,
+			sid TEXT NOT NULL,
 			time INT NOT NULL,
+			mtime INT NOT NULL,
 			expires INT NOT NULL,
 			topic TEXT NOT NULL,
 			message TEXT NOT NULL,
@@ -48,10 +50,13 @@ const (
 			user TEXT NOT NULL,
 			content_type TEXT NOT NULL,
 			encoding TEXT NOT NULL,
-			published INT NOT NULL
+			published INT NOT NULL,
+			deleted INT NOT NULL
 		);
 		CREATE INDEX IF NOT EXISTS idx_mid ON messages (mid);
+		CREATE INDEX IF NOT EXISTS idx_sid ON messages (sid);
 		CREATE INDEX IF NOT EXISTS idx_time ON messages (time);
+		CREATE INDEX IF NOT EXISTS idx_mtime ON messages (mtime);
 		CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
 		CREATE INDEX IF NOT EXISTS idx_expires ON messages (expires);
 		CREATE INDEX IF NOT EXISTS idx_sender ON messages (sender);
@@ -65,56 +70,57 @@ const (
 		COMMIT;
 	`
 	insertMessageQuery = `
-		INSERT INTO messages (mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_deleted, sender, user, content_type, encoding, published)
-		VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+		INSERT INTO messages (mid, sid, time, mtime, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_deleted, sender, user, content_type, encoding, published, deleted)
+		VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
 	`
 	deleteMessageQuery                = `DELETE FROM messages WHERE mid = ?`
 	updateMessagesForTopicExpiryQuery = `UPDATE messages SET expires = ? WHERE topic = ?`
 	selectRowIDFromMessageID          = `SELECT id FROM messages WHERE mid = ?` // Do not include topic, see #336 and TestServer_PollSinceID_MultipleTopics
 	selectMessagesByIDQuery           = `
-		SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding
+		SELECT mid, sid, time, mtime, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, deleted
 		FROM messages 
 		WHERE mid = ?
 	`
 	selectMessagesSinceTimeQuery = `
-		SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding
+		SELECT mid, sid, time, mtime, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, deleted
 		FROM messages 
 		WHERE topic = ? AND time >= ? AND published = 1
-		ORDER BY time, id
+		ORDER BY mtime, id
 	`
 	selectMessagesSinceTimeIncludeScheduledQuery = `
-		SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding
+		SELECT mid, sid, time, mtime, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, deleted
 		FROM messages 
 		WHERE topic = ? AND time >= ?
-		ORDER BY time, id
+		ORDER BY mtime, id
 	`
 	selectMessagesSinceIDQuery = `
-		SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding
+		SELECT mid, sid, time, mtime, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, deleted
 		FROM messages 
 		WHERE topic = ? AND id > ? AND published = 1 
-		ORDER BY time, id
+		ORDER BY mtime, id
 	`
 	selectMessagesSinceIDIncludeScheduledQuery = `
-		SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding
+		SELECT mid, sid, time, mtime, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, deleted
 		FROM messages 
 		WHERE topic = ? AND (id > ? OR published = 0)
-		ORDER BY time, id
+		ORDER BY mtime, id
 	`
 	selectMessagesLatestQuery = `
-		SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding
+		SELECT mid, sid, time, mtime, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, deleted
 		FROM messages
 		WHERE topic = ? AND published = 1
 		ORDER BY time DESC, id DESC
 		LIMIT 1
   `
 	selectMessagesDueQuery = `
-		SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding
+		SELECT mid, sid, time, mtime, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, deleted
 		FROM messages 
 		WHERE time <= ? AND published = 0
-		ORDER BY time, id
+		ORDER BY mtime, id
 	`
 	selectMessagesExpiredQuery      = `SELECT mid FROM messages WHERE expires <= ? AND published = 1`
 	updateMessagePublishedQuery     = `UPDATE messages SET published = 1 WHERE mid = ?`
+	updateMessageDeletedQuery       = `UPDATE messages SET deleted = 1 WHERE mid = ?`
 	selectMessagesCountQuery        = `SELECT COUNT(*) FROM messages`
 	selectMessageCountPerTopicQuery = `SELECT topic, COUNT(*) FROM messages GROUP BY topic`
 	selectTopicsQuery               = `SELECT topic FROM messages GROUP BY topic`
@@ -130,7 +136,7 @@ const (
 
 // Schema management queries
 const (
-	currentSchemaVersion          = 13
+	currentSchemaVersion          = 14
 	createSchemaVersionTableQuery = `
 		CREATE TABLE IF NOT EXISTS schemaVersion (
 			id INT PRIMARY KEY,
@@ -259,6 +265,15 @@ const (
 	migrate12To13AlterMessagesTableQuery = `
 		CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
 	`
+
+	//13 -> 14
+	migrate13To14AlterMessagesTableQuery = `
+	  ALTER TABLE messages ADD COLUMN sid TEXT NOT NULL DEFAULT('');
+		ALTER TABLE messages ADD COLUMN mtime INT NOT NULL DEFAULT('0');
+		ALTER TABLE messages ADD COLUMN deleted INT NOT NULL DEFAULT('0');
+		CREATE INDEX IF NOT EXISTS idx_sid ON messages (sid);
+		CREATE INDEX IF NOT EXISTS idx_mtime ON messages (mtime);
+	`
 )
 
 var (
@@ -276,6 +291,7 @@ var (
 		10: migrateFrom10,
 		11: migrateFrom11,
 		12: migrateFrom12,
+		13: migrateFrom13,
 	}
 )
 
@@ -393,7 +409,9 @@ func (c *messageCache) addMessages(ms []*message) error {
 		}
 		_, err := stmt.Exec(
 			m.ID,
+			m.SID,
 			m.Time,
+			m.MTime,
 			m.Expires,
 			m.Topic,
 			m.Message,
@@ -414,6 +432,7 @@ func (c *messageCache) addMessages(ms []*message) error {
 			m.ContentType,
 			m.Encoding,
 			published,
+			0,
 		)
 		if err != nil {
 			return err
@@ -692,12 +711,14 @@ func readMessages(rows *sql.Rows) ([]*message, error) {
 }
 
 func readMessage(rows *sql.Rows) (*message, error) {
-	var timestamp, expires, attachmentSize, attachmentExpires int64
-	var priority int
-	var id, topic, msg, title, tagsStr, click, icon, actionsStr, attachmentName, attachmentType, attachmentURL, sender, user, contentType, encoding string
+	var timestamp, mtimestamp, expires, attachmentSize, attachmentExpires int64
+	var priority, deleted int
+	var id, sid, topic, msg, title, tagsStr, click, icon, actionsStr, attachmentName, attachmentType, attachmentURL, sender, user, contentType, encoding string
 	err := rows.Scan(
 		&id,
+		&sid,
 		&timestamp,
+		&mtimestamp,
 		&expires,
 		&topic,
 		&msg,
@@ -716,6 +737,7 @@ func readMessage(rows *sql.Rows) (*message, error) {
 		&user,
 		&contentType,
 		&encoding,
+		&deleted,
 	)
 	if err != nil {
 		return nil, err
@@ -746,7 +768,9 @@ func readMessage(rows *sql.Rows) (*message, error) {
 	}
 	return &message{
 		ID:          id,
+		SID:         sid,
 		Time:        timestamp,
+		MTime:       mtimestamp,
 		Expires:     expires,
 		Event:       messageEvent,
 		Topic:       topic,
@@ -762,6 +786,7 @@ func readMessage(rows *sql.Rows) (*message, error) {
 		User:        user,
 		ContentType: contentType,
 		Encoding:    encoding,
+		Deleted:     deleted,
 	}, nil
 }
 
@@ -1016,3 +1041,19 @@ func migrateFrom12(db *sql.DB, _ time.Duration) error {
 	}
 	return tx.Commit()
 }
+
+func migrateFrom13(db *sql.DB, _ time.Duration) error {
+	log.Tag(tagMessageCache).Info("Migrating cache database schema: from 13 to 14")
+	tx, err := db.Begin()
+	if err != nil {
+		return err
+	}
+	defer tx.Rollback()
+	if _, err := tx.Exec(migrate13To14AlterMessagesTableQuery); err != nil {
+		return err
+	}
+	if _, err := tx.Exec(updateSchemaVersion, 14); err != nil {
+		return err
+	}
+	return tx.Commit()
+}

+ 26 - 3
server/message_cache_test.go

@@ -22,9 +22,11 @@ func TestMemCache_Messages(t *testing.T) {
 func testCacheMessages(t *testing.T, c *messageCache) {
 	m1 := newDefaultMessage("mytopic", "my message")
 	m1.Time = 1
+	m1.MTime = 1000
 
 	m2 := newDefaultMessage("mytopic", "my other message")
 	m2.Time = 2
+	m2.MTime = 2000
 
 	require.Nil(t, c.AddMessage(m1))
 	require.Nil(t, c.AddMessage(newDefaultMessage("example", "my example message")))
@@ -102,10 +104,13 @@ func testCacheMessagesScheduled(t *testing.T, c *messageCache) {
 	m1 := newDefaultMessage("mytopic", "message 1")
 	m2 := newDefaultMessage("mytopic", "message 2")
 	m2.Time = time.Now().Add(time.Hour).Unix()
+	m2.MTime = time.Now().Add(time.Hour).UnixMilli()
 	m3 := newDefaultMessage("mytopic", "message 3")
-	m3.Time = time.Now().Add(time.Minute).Unix() // earlier than m2!
+	m3.Time = time.Now().Add(time.Minute).Unix()       // earlier than m2!
+	m3.MTime = time.Now().Add(time.Minute).UnixMilli() // earlier than m2!
 	m4 := newDefaultMessage("mytopic2", "message 4")
 	m4.Time = time.Now().Add(time.Minute).Unix()
+	m4.MTime = time.Now().Add(time.Minute).UnixMilli()
 	require.Nil(t, c.AddMessage(m1))
 	require.Nil(t, c.AddMessage(m2))
 	require.Nil(t, c.AddMessage(m3))
@@ -179,18 +184,25 @@ func TestMemCache_MessagesSinceID(t *testing.T) {
 func testCacheMessagesSinceID(t *testing.T, c *messageCache) {
 	m1 := newDefaultMessage("mytopic", "message 1")
 	m1.Time = 100
+	m1.MTime = 100000
 	m2 := newDefaultMessage("mytopic", "message 2")
 	m2.Time = 200
+	m2.MTime = 200000
 	m3 := newDefaultMessage("mytopic", "message 3")
-	m3.Time = time.Now().Add(time.Hour).Unix() // Scheduled, in the future, later than m7 and m5
+	m3.Time = time.Now().Add(time.Hour).Unix()       // Scheduled, in the future, later than m7 and m5
+	m3.MTime = time.Now().Add(time.Hour).UnixMilli() // Scheduled, in the future, later than m7 and m5
 	m4 := newDefaultMessage("mytopic", "message 4")
 	m4.Time = 400
+	m4.MTime = 400000
 	m5 := newDefaultMessage("mytopic", "message 5")
-	m5.Time = time.Now().Add(time.Minute).Unix() // Scheduled, in the future, later than m7
+	m5.Time = time.Now().Add(time.Minute).Unix()       // Scheduled, in the future, later than m7
+	m5.MTime = time.Now().Add(time.Minute).UnixMilli() // Scheduled, in the future, later than m7
 	m6 := newDefaultMessage("mytopic", "message 6")
 	m6.Time = 600
+	m6.MTime = 600000
 	m7 := newDefaultMessage("mytopic", "message 7")
 	m7.Time = 700
+	m7.MTime = 700000
 
 	require.Nil(t, c.AddMessage(m1))
 	require.Nil(t, c.AddMessage(m2))
@@ -251,14 +263,17 @@ func testCachePrune(t *testing.T, c *messageCache) {
 
 	m1 := newDefaultMessage("mytopic", "my message")
 	m1.Time = now - 10
+	m1.MTime = (now - 10) * 1000
 	m1.Expires = now - 5
 
 	m2 := newDefaultMessage("mytopic", "my other message")
 	m2.Time = now - 5
+	m2.MTime = (now - 5) * 1000
 	m2.Expires = now + 5 // In the future
 
 	m3 := newDefaultMessage("another_topic", "and another one")
 	m3.Time = now - 12
+	m3.MTime = (now - 12) * 1000
 	m3.Expires = now - 2
 
 	require.Nil(t, c.AddMessage(m1))
@@ -297,6 +312,7 @@ func testCacheAttachments(t *testing.T, c *messageCache) {
 	expires1 := time.Now().Add(-4 * time.Hour).Unix() // Expired
 	m := newDefaultMessage("mytopic", "flower for you")
 	m.ID = "m1"
+	m.SID = "m1"
 	m.Sender = netip.MustParseAddr("1.2.3.4")
 	m.Attachment = &attachment{
 		Name:    "flower.jpg",
@@ -310,6 +326,7 @@ func testCacheAttachments(t *testing.T, c *messageCache) {
 	expires2 := time.Now().Add(2 * time.Hour).Unix() // Future
 	m = newDefaultMessage("mytopic", "sending you a car")
 	m.ID = "m2"
+	m.SID = "m2"
 	m.Sender = netip.MustParseAddr("1.2.3.4")
 	m.Attachment = &attachment{
 		Name:    "car.jpg",
@@ -323,6 +340,7 @@ func testCacheAttachments(t *testing.T, c *messageCache) {
 	expires3 := time.Now().Add(1 * time.Hour).Unix() // Future
 	m = newDefaultMessage("another-topic", "sending you another car")
 	m.ID = "m3"
+	m.SID = "m3"
 	m.User = "u_BAsbaAa"
 	m.Sender = netip.MustParseAddr("5.6.7.8")
 	m.Attachment = &attachment{
@@ -378,11 +396,13 @@ func TestMemCache_Attachments_Expired(t *testing.T) {
 func testCacheAttachmentsExpired(t *testing.T, c *messageCache) {
 	m := newDefaultMessage("mytopic", "flower for you")
 	m.ID = "m1"
+	m.SID = "m1"
 	m.Expires = time.Now().Add(time.Hour).Unix()
 	require.Nil(t, c.AddMessage(m))
 
 	m = newDefaultMessage("mytopic", "message with attachment")
 	m.ID = "m2"
+	m.SID = "m2"
 	m.Expires = time.Now().Add(2 * time.Hour).Unix()
 	m.Attachment = &attachment{
 		Name:    "car.jpg",
@@ -395,6 +415,7 @@ func testCacheAttachmentsExpired(t *testing.T, c *messageCache) {
 
 	m = newDefaultMessage("mytopic", "message with external attachment")
 	m.ID = "m3"
+	m.SID = "m3"
 	m.Expires = time.Now().Add(2 * time.Hour).Unix()
 	m.Attachment = &attachment{
 		Name:    "car.jpg",
@@ -406,6 +427,7 @@ func testCacheAttachmentsExpired(t *testing.T, c *messageCache) {
 
 	m = newDefaultMessage("mytopic2", "message with expired attachment")
 	m.ID = "m4"
+	m.SID = "m4"
 	m.Expires = time.Now().Add(2 * time.Hour).Unix()
 	m.Attachment = &attachment{
 		Name:    "expired-car.jpg",
@@ -502,6 +524,7 @@ func TestSqliteCache_Migration_From1(t *testing.T) {
 	// Add delayed message
 	delayedMessage := newDefaultMessage("mytopic", "some delayed message")
 	delayedMessage.Time = time.Now().Add(time.Minute).Unix()
+	delayedMessage.MTime = time.Now().Add(time.Minute).UnixMilli()
 	require.Nil(t, c.AddMessage(delayedMessage))
 
 	// 10, not 11!

+ 12 - 1
server/types.go

@@ -25,7 +25,9 @@ const (
 // message represents a message published to a topic
 type message struct {
 	ID          string      `json:"id"`                // Random message ID
+	SID         string      `json:"sid"`               // Message sequence ID for updating message contents
 	Time        int64       `json:"time"`              // Unix time in seconds
+	MTime       int64       `json:"mtime"`             // Unix time in milliseconds
 	Expires     int64       `json:"expires,omitempty"` // Unix time in seconds (not required for open/keepalive)
 	Event       string      `json:"event"`             // One of the above
 	Topic       string      `json:"topic"`
@@ -42,13 +44,16 @@ type message struct {
 	Encoding    string      `json:"encoding,omitempty"`     // empty for raw UTF-8, or "base64" for encoded bytes
 	Sender      netip.Addr  `json:"-"`                      // IP address of uploader, used for rate limiting
 	User        string      `json:"-"`                      // UserID of the uploader, used to associated attachments
+	Deleted     int         `json:"deleted,omitempty"`
 }
 
 func (m *message) Context() log.Context {
 	fields := map[string]any{
 		"topic":             m.Topic,
 		"message_id":        m.ID,
+		"message_sid":       m.SID,
 		"message_time":      m.Time,
+		"message_mtime":     m.MTime,
 		"message_event":     m.Event,
 		"message_body_size": len(m.Message),
 	}
@@ -92,6 +97,7 @@ func newAction() *action {
 // publishMessage is used as input when publishing as JSON
 type publishMessage struct {
 	Topic    string   `json:"topic"`
+	SID      string   `json:"sid"`
 	Title    string   `json:"title"`
 	Message  string   `json:"message"`
 	Priority int      `json:"priority"`
@@ -117,6 +123,7 @@ func newMessage(event, topic, msg string) *message {
 	return &message{
 		ID:      util.RandomString(messageIDLength),
 		Time:    time.Now().Unix(),
+		MTime:   time.Now().UnixMilli(),
 		Event:   event,
 		Topic:   topic,
 		Message: msg,
@@ -155,7 +162,11 @@ type sinceMarker struct {
 }
 
 func newSinceTime(timestamp int64) sinceMarker {
-	return sinceMarker{time.Unix(timestamp, 0), ""}
+	return newSinceMTime(timestamp * 1000)
+}
+
+func newSinceMTime(mtimestamp int64) sinceMarker {
+	return sinceMarker{time.UnixMilli(mtimestamp), ""}
 }
 
 func newSinceID(id string) sinceMarker {