Browse Source

Updated/cancel scheduled messages

binwiederhier 1 month ago
parent
commit
c23d201186
4 changed files with 184 additions and 0 deletions
  1. 9 0
      server/message_cache.go
  2. 69 0
      server/message_cache_test.go
  3. 8 0
      server/server.go
  4. 98 0
      server/server_test.go

+ 9 - 0
server/message_cache.go

@@ -73,6 +73,7 @@ const (
 		VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
 		VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
 	`
 	`
 	deleteMessageQuery                = `DELETE FROM messages WHERE mid = ?`
 	deleteMessageQuery                = `DELETE FROM messages WHERE mid = ?`
+	deleteScheduledBySequenceIDQuery  = `DELETE FROM messages WHERE topic = ? AND sequence_id = ? AND published = 0`
 	updateMessagesForTopicExpiryQuery = `UPDATE messages SET expires = ? WHERE topic = ?`
 	updateMessagesForTopicExpiryQuery = `UPDATE messages SET expires = ? WHERE topic = ?`
 	selectRowIDFromMessageID          = `SELECT id FROM messages WHERE mid = ?` // Do not include topic, see #336 and TestServer_PollSinceID_MultipleTopics
 	selectRowIDFromMessageID          = `SELECT id FROM messages WHERE mid = ?` // Do not include topic, see #336 and TestServer_PollSinceID_MultipleTopics
 	selectMessagesByIDQuery           = `
 	selectMessagesByIDQuery           = `
@@ -607,6 +608,14 @@ func (c *messageCache) DeleteMessages(ids ...string) error {
 	return tx.Commit()
 	return tx.Commit()
 }
 }
 
 
+// DeleteScheduledBySequenceID deletes unpublished (scheduled) messages with the given topic and sequence ID
+func (c *messageCache) DeleteScheduledBySequenceID(topic, sequenceID string) error {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	_, err := c.db.Exec(deleteScheduledBySequenceIDQuery, topic, sequenceID)
+	return err
+}
+
 func (c *messageCache) ExpireMessages(topics ...string) error {
 func (c *messageCache) ExpireMessages(topics ...string) error {
 	c.mu.Lock()
 	c.mu.Lock()
 	defer c.mu.Unlock()
 	defer c.mu.Unlock()

+ 69 - 0
server/message_cache_test.go

@@ -703,6 +703,75 @@ func testSender(t *testing.T, c *messageCache) {
 	require.Equal(t, messages[1].Sender, netip.Addr{})
 	require.Equal(t, messages[1].Sender, netip.Addr{})
 }
 }
 
 
+func TestSqliteCache_DeleteScheduledBySequenceID(t *testing.T) {
+	testDeleteScheduledBySequenceID(t, newSqliteTestCache(t))
+}
+
+func TestMemCache_DeleteScheduledBySequenceID(t *testing.T) {
+	testDeleteScheduledBySequenceID(t, newMemTestCache(t))
+}
+
+func testDeleteScheduledBySequenceID(t *testing.T, c *messageCache) {
+	// Create a scheduled (unpublished) message
+	scheduledMsg := newDefaultMessage("mytopic", "scheduled message")
+	scheduledMsg.ID = "scheduled1"
+	scheduledMsg.SequenceID = "seq123"
+	scheduledMsg.Time = time.Now().Add(time.Hour).Unix() // Future time makes it scheduled
+	require.Nil(t, c.AddMessage(scheduledMsg))
+
+	// Create a published message with different sequence ID
+	publishedMsg := newDefaultMessage("mytopic", "published message")
+	publishedMsg.ID = "published1"
+	publishedMsg.SequenceID = "seq456"
+	publishedMsg.Time = time.Now().Add(-time.Hour).Unix() // Past time makes it published
+	require.Nil(t, c.AddMessage(publishedMsg))
+
+	// Create a scheduled message in a different topic
+	otherTopicMsg := newDefaultMessage("othertopic", "other scheduled")
+	otherTopicMsg.ID = "other1"
+	otherTopicMsg.SequenceID = "seq123" // Same sequence ID as scheduledMsg
+	otherTopicMsg.Time = time.Now().Add(time.Hour).Unix()
+	require.Nil(t, c.AddMessage(otherTopicMsg))
+
+	// Verify all messages exist (including scheduled)
+	messages, err := c.Messages("mytopic", sinceAllMessages, true)
+	require.Nil(t, err)
+	require.Equal(t, 2, len(messages))
+
+	messages, err = c.Messages("othertopic", sinceAllMessages, true)
+	require.Nil(t, err)
+	require.Equal(t, 1, len(messages))
+
+	// Delete scheduled message by sequence ID
+	err = c.DeleteScheduledBySequenceID("mytopic", "seq123")
+	require.Nil(t, err)
+
+	// Verify scheduled message is deleted
+	messages, err = c.Messages("mytopic", sinceAllMessages, true)
+	require.Nil(t, err)
+	require.Equal(t, 1, len(messages))
+	require.Equal(t, "published message", messages[0].Message)
+
+	// Verify other topic's message still exists (topic-scoped deletion)
+	messages, err = c.Messages("othertopic", sinceAllMessages, true)
+	require.Nil(t, err)
+	require.Equal(t, 1, len(messages))
+	require.Equal(t, "other scheduled", messages[0].Message)
+
+	// Deleting non-existent sequence ID should not error
+	err = c.DeleteScheduledBySequenceID("mytopic", "nonexistent")
+	require.Nil(t, err)
+
+	// Deleting published message should not affect it (only deletes unpublished)
+	err = c.DeleteScheduledBySequenceID("mytopic", "seq456")
+	require.Nil(t, err)
+
+	messages, err = c.Messages("mytopic", sinceAllMessages, true)
+	require.Nil(t, err)
+	require.Equal(t, 1, len(messages))
+	require.Equal(t, "published message", messages[0].Message)
+}
+
 func checkSchemaVersion(t *testing.T, db *sql.DB) {
 func checkSchemaVersion(t *testing.T, db *sql.DB) {
 	rows, err := db.Query(`SELECT version FROM schemaVersion`)
 	rows, err := db.Query(`SELECT version FROM schemaVersion`)
 	require.Nil(t, err)
 	require.Nil(t, err)

+ 8 - 0
server/server.go

@@ -863,6 +863,10 @@ func (s *Server) handlePublishInternal(r *http.Request, v *visitor) (*message, e
 		logvrm(v, r, m).Tag(tagPublish).Debug("Message delayed, will process later")
 		logvrm(v, r, m).Tag(tagPublish).Debug("Message delayed, will process later")
 	}
 	}
 	if cache {
 	if cache {
+		// Delete any existing scheduled message with the same sequence ID
+		if err := s.messageCache.DeleteScheduledBySequenceID(t.ID, m.SequenceID); err != nil {
+			return nil, err
+		}
 		logvrm(v, r, m).Tag(tagPublish).Debug("Adding message to cache")
 		logvrm(v, r, m).Tag(tagPublish).Debug("Adding message to cache")
 		if err := s.messageCache.AddMessage(m); err != nil {
 		if err := s.messageCache.AddMessage(m); err != nil {
 			return nil, err
 			return nil, err
@@ -958,6 +962,10 @@ func (s *Server) handleActionMessage(w http.ResponseWriter, r *http.Request, v *
 	if s.config.WebPushPublicKey != "" {
 	if s.config.WebPushPublicKey != "" {
 		go s.publishToWebPushEndpoints(v, m)
 		go s.publishToWebPushEndpoints(v, m)
 	}
 	}
+	// Delete any existing scheduled message with the same sequence ID
+	if err := s.messageCache.DeleteScheduledBySequenceID(t.ID, sequenceID); err != nil {
+		return err
+	}
 	// Add to message cache
 	// Add to message cache
 	if err := s.messageCache.AddMessage(m); err != nil {
 	if err := s.messageCache.AddMessage(m); err != nil {
 		return err
 		return err

+ 98 - 0
server/server_test.go

@@ -3495,6 +3495,104 @@ func TestServer_ClearMessage_WithFirebase(t *testing.T) {
 	require.Equal(t, "firebase-clear-seq", sender.Messages()[1].Data["sequence_id"])
 	require.Equal(t, "firebase-clear-seq", sender.Messages()[1].Data["sequence_id"])
 }
 }
 
 
+func TestServer_UpdateScheduledMessage(t *testing.T) {
+	t.Parallel()
+	s := newTestServer(t, newTestConfig(t))
+
+	// Publish a scheduled message (future delivery)
+	response := request(t, s, "PUT", "/mytopic/sched-seq?delay=1h", "original scheduled message", nil)
+	require.Equal(t, 200, response.Code)
+	msg1 := toMessage(t, response.Body.String())
+	require.Equal(t, "sched-seq", msg1.SequenceID)
+	require.Equal(t, "original scheduled message", msg1.Message)
+
+	// Verify scheduled message exists
+	response = request(t, s, "GET", "/mytopic/json?poll=1&scheduled=1", "", nil)
+	require.Equal(t, 200, response.Code)
+	messages := toMessages(t, response.Body.String())
+	require.Equal(t, 1, len(messages))
+	require.Equal(t, "original scheduled message", messages[0].Message)
+
+	// Update the scheduled message (same sequence ID, new content)
+	response = request(t, s, "PUT", "/mytopic/sched-seq?delay=2h", "updated scheduled message", nil)
+	require.Equal(t, 200, response.Code)
+	msg2 := toMessage(t, response.Body.String())
+	require.Equal(t, "sched-seq", msg2.SequenceID)
+	require.Equal(t, "updated scheduled message", msg2.Message)
+	require.NotEqual(t, msg1.ID, msg2.ID)
+
+	// Verify only the updated message exists (old scheduled was deleted)
+	response = request(t, s, "GET", "/mytopic/json?poll=1&scheduled=1", "", nil)
+	require.Equal(t, 200, response.Code)
+	messages = toMessages(t, response.Body.String())
+	require.Equal(t, 1, len(messages))
+	require.Equal(t, "updated scheduled message", messages[0].Message)
+	require.Equal(t, msg2.ID, messages[0].ID)
+}
+
+func TestServer_DeleteScheduledMessage(t *testing.T) {
+	t.Parallel()
+	s := newTestServer(t, newTestConfig(t))
+
+	// Publish a scheduled message (future delivery)
+	response := request(t, s, "PUT", "/mytopic/delete-sched-seq?delay=1h", "scheduled message to delete", nil)
+	require.Equal(t, 200, response.Code)
+	msg := toMessage(t, response.Body.String())
+	require.Equal(t, "delete-sched-seq", msg.SequenceID)
+
+	// Verify scheduled message exists
+	response = request(t, s, "GET", "/mytopic/json?poll=1&scheduled=1", "", nil)
+	require.Equal(t, 200, response.Code)
+	messages := toMessages(t, response.Body.String())
+	require.Equal(t, 1, len(messages))
+	require.Equal(t, "scheduled message to delete", messages[0].Message)
+
+	// Delete the scheduled message
+	response = request(t, s, "DELETE", "/mytopic/delete-sched-seq", "", nil)
+	require.Equal(t, 200, response.Code)
+	deleteMsg := toMessage(t, response.Body.String())
+	require.Equal(t, "delete-sched-seq", deleteMsg.SequenceID)
+	require.Equal(t, "message_delete", deleteMsg.Event)
+
+	// Verify scheduled message was deleted, only delete event remains
+	response = request(t, s, "GET", "/mytopic/json?poll=1&scheduled=1", "", nil)
+	require.Equal(t, 200, response.Code)
+	messages = toMessages(t, response.Body.String())
+	require.Equal(t, 1, len(messages))
+	require.Equal(t, "message_delete", messages[0].Event)
+	require.Equal(t, "delete-sched-seq", messages[0].SequenceID)
+}
+
+func TestServer_UpdateScheduledMessage_TopicScoped(t *testing.T) {
+	t.Parallel()
+	s := newTestServer(t, newTestConfig(t))
+
+	// Publish scheduled messages with same sequence ID in different topics
+	response := request(t, s, "PUT", "/topic1/shared-seq?delay=1h", "topic1 scheduled", nil)
+	require.Equal(t, 200, response.Code)
+
+	response = request(t, s, "PUT", "/topic2/shared-seq?delay=1h", "topic2 scheduled", nil)
+	require.Equal(t, 200, response.Code)
+
+	// Update scheduled message in topic1 only
+	response = request(t, s, "PUT", "/topic1/shared-seq?delay=2h", "topic1 updated", nil)
+	require.Equal(t, 200, response.Code)
+
+	// Verify topic1 has only the updated message
+	response = request(t, s, "GET", "/topic1/json?poll=1&scheduled=1", "", nil)
+	require.Equal(t, 200, response.Code)
+	messages := toMessages(t, response.Body.String())
+	require.Equal(t, 1, len(messages))
+	require.Equal(t, "topic1 updated", messages[0].Message)
+
+	// Verify topic2 still has its original scheduled message (not affected)
+	response = request(t, s, "GET", "/topic2/json?poll=1&scheduled=1", "", nil)
+	require.Equal(t, 200, response.Code)
+	messages = toMessages(t, response.Body.String())
+	require.Equal(t, 1, len(messages))
+	require.Equal(t, "topic2 scheduled", messages[0].Message)
+}
+
 func newTestConfig(t *testing.T) *Config {
 func newTestConfig(t *testing.T) *Config {
 	conf := NewConfig()
 	conf := NewConfig()
 	conf.BaseURL = "http://127.0.0.1:12345"
 	conf.BaseURL = "http://127.0.0.1:12345"