Browse Source

Store Sender IP in DB for delayed messages

Philipp Heckel 3 years ago
parent
commit
f9284a098a

+ 2 - 2
server/config.go

@@ -67,7 +67,7 @@ type Config struct {
 	KeepaliveInterval                    time.Duration
 	ManagerInterval                      time.Duration
 	WebRootIsApp                         bool
-	AtSenderInterval                     time.Duration
+	DelayedSenderInterval                time.Duration
 	FirebaseKeepaliveInterval            time.Duration
 	FirebasePollInterval                 time.Duration
 	FirebaseQuotaLimitPenaltyDuration    time.Duration
@@ -120,7 +120,7 @@ func NewConfig() *Config {
 		MessageLimit:                         DefaultMessageLengthLimit,
 		MinDelay:                             DefaultMinDelay,
 		MaxDelay:                             DefaultMaxDelay,
-		AtSenderInterval:                     DefaultAtSenderInterval,
+		DelayedSenderInterval:                DefaultAtSenderInterval,
 		FirebaseKeepaliveInterval:            DefaultFirebaseKeepaliveInterval,
 		FirebasePollInterval:                 DefaultFirebasePollInterval,
 		FirebaseQuotaLimitPenaltyDuration:    DefaultFirebaseQuotaLimitPenaltyDuration,

+ 34 - 17
server/message_cache.go

@@ -36,7 +36,7 @@ const (
 			attachment_size INT NOT NULL,
 			attachment_expires INT NOT NULL,
 			attachment_url TEXT NOT NULL,
-			attachment_owner TEXT NOT NULL,
+			sender TEXT NOT NULL,
 			encoding TEXT NOT NULL,
 			published INT NOT NULL
 		);
@@ -45,37 +45,37 @@ const (
 		COMMIT;
 	`
 	insertMessageQuery = `
-		INSERT INTO messages (mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published) 
+		INSERT INTO messages (mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, encoding, published) 
 		VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
 	`
 	pruneMessagesQuery           = `DELETE FROM messages WHERE time < ? AND published = 1`
 	selectRowIDFromMessageID     = `SELECT id FROM messages WHERE topic = ? AND mid = ?`
 	selectMessagesSinceTimeQuery = `
-		SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
+		SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, encoding
 		FROM messages 
 		WHERE topic = ? AND time >= ? AND published = 1
 		ORDER BY time, id
 	`
 	selectMessagesSinceTimeIncludeScheduledQuery = `
-		SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
+		SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, encoding
 		FROM messages 
 		WHERE topic = ? AND time >= ?
 		ORDER BY time, id
 	`
 	selectMessagesSinceIDQuery = `
-		SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
+		SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, encoding
 		FROM messages 
 		WHERE topic = ? AND id > ? AND published = 1 
 		ORDER BY time, id
 	`
 	selectMessagesSinceIDIncludeScheduledQuery = `
-		SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
+		SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, encoding
 		FROM messages 
 		WHERE topic = ? AND (id > ? OR published = 0)
 		ORDER BY time, id
 	`
 	selectMessagesDueQuery = `
-		SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
+		SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, encoding
 		FROM messages 
 		WHERE time <= ? AND published = 0
 		ORDER BY time, id
@@ -84,13 +84,13 @@ const (
 	selectMessagesCountQuery        = `SELECT COUNT(*) FROM messages`
 	selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?`
 	selectTopicsQuery               = `SELECT topic FROM messages GROUP BY topic`
-	selectAttachmentsSizeQuery      = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE attachment_owner = ? AND attachment_expires >= ?`
+	selectAttachmentsSizeQuery      = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE sender = ? AND attachment_expires >= ?`
 	selectAttachmentsExpiredQuery   = `SELECT mid FROM messages WHERE attachment_expires > 0 AND attachment_expires < ?`
 )
 
 // Schema management queries
 const (
-	currentSchemaVersion          = 6
+	currentSchemaVersion          = 7
 	createSchemaVersionTableQuery = `
 		CREATE TABLE IF NOT EXISTS schemaVersion (
 			id INT PRIMARY KEY,
@@ -173,6 +173,11 @@ const (
 	migrate5To6AlterMessagesTableQuery = `
 		ALTER TABLE messages ADD COLUMN actions TEXT NOT NULL DEFAULT('');
 	`
+
+	// 6 -> 7
+	migrate6To7AlterMessagesTableQuery = `
+		ALTER TABLE messages RENAME COLUMN attachment_owner TO sender;
+	`
 )
 
 type messageCache struct {
@@ -225,7 +230,7 @@ func (c *messageCache) AddMessage(m *message) error {
 	}
 	published := m.Time <= time.Now().Unix()
 	tags := strings.Join(m.Tags, ",")
-	var attachmentName, attachmentType, attachmentURL, attachmentOwner string
+	var attachmentName, attachmentType, attachmentURL string
 	var attachmentSize, attachmentExpires int64
 	if m.Attachment != nil {
 		attachmentName = m.Attachment.Name
@@ -233,7 +238,6 @@ func (c *messageCache) AddMessage(m *message) error {
 		attachmentSize = m.Attachment.Size
 		attachmentExpires = m.Attachment.Expires
 		attachmentURL = m.Attachment.URL
-		attachmentOwner = m.Attachment.Owner
 	}
 	var actionsStr string
 	if len(m.Actions) > 0 {
@@ -259,7 +263,7 @@ func (c *messageCache) AddMessage(m *message) error {
 		attachmentSize,
 		attachmentExpires,
 		attachmentURL,
-		attachmentOwner,
+		m.Sender,
 		m.Encoding,
 		published,
 	)
@@ -371,8 +375,8 @@ func (c *messageCache) Prune(olderThan time.Time) error {
 	return err
 }
 
-func (c *messageCache) AttachmentBytesUsed(owner string) (int64, error) {
-	rows, err := c.db.Query(selectAttachmentsSizeQuery, owner, time.Now().Unix())
+func (c *messageCache) AttachmentBytesUsed(sender string) (int64, error) {
+	rows, err := c.db.Query(selectAttachmentsSizeQuery, sender, time.Now().Unix())
 	if err != nil {
 		return 0, err
 	}
@@ -415,7 +419,7 @@ func readMessages(rows *sql.Rows) ([]*message, error) {
 	for rows.Next() {
 		var timestamp, attachmentSize, attachmentExpires int64
 		var priority int
-		var id, topic, msg, title, tagsStr, click, actionsStr, attachmentName, attachmentType, attachmentURL, attachmentOwner, encoding string
+		var id, topic, msg, title, tagsStr, click, actionsStr, attachmentName, attachmentType, attachmentURL, sender, encoding string
 		err := rows.Scan(
 			&id,
 			&timestamp,
@@ -431,7 +435,7 @@ func readMessages(rows *sql.Rows) ([]*message, error) {
 			&attachmentSize,
 			&attachmentExpires,
 			&attachmentURL,
-			&attachmentOwner,
+			&sender,
 			&encoding,
 		)
 		if err != nil {
@@ -455,7 +459,6 @@ func readMessages(rows *sql.Rows) ([]*message, error) {
 				Size:    attachmentSize,
 				Expires: attachmentExpires,
 				URL:     attachmentURL,
-				Owner:   attachmentOwner,
 			}
 		}
 		messages = append(messages, &message{
@@ -470,6 +473,7 @@ func readMessages(rows *sql.Rows) ([]*message, error) {
 			Click:      click,
 			Actions:    actions,
 			Attachment: att,
+			Sender:     sender,
 			Encoding:   encoding,
 		})
 	}
@@ -516,6 +520,8 @@ func setupCacheDB(db *sql.DB) error {
 		return migrateFrom4(db)
 	} else if schemaVersion == 5 {
 		return migrateFrom5(db)
+	} else if schemaVersion == 6 {
+		return migrateFrom6(db)
 	}
 	return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
 }
@@ -599,5 +605,16 @@ func migrateFrom5(db *sql.DB) error {
 	if _, err := db.Exec(updateSchemaVersion, 6); err != nil {
 		return err
 	}
+	return migrateFrom6(db)
+}
+
+func migrateFrom6(db *sql.DB) error {
+	log.Print("Migrating cache database schema: from 6 to 7")
+	if _, err := db.Exec(migrate6To7AlterMessagesTableQuery); err != nil {
+		return err
+	}
+	if _, err := db.Exec(updateSchemaVersion, 7); err != nil {
+		return err
+	}
 	return nil // Update this when a new version is added
 }

+ 5 - 5
server/message_cache_test.go

@@ -281,39 +281,39 @@ func testCacheAttachments(t *testing.T, c *messageCache) {
 	expires1 := time.Now().Add(-4 * time.Hour).Unix()
 	m := newDefaultMessage("mytopic", "flower for you")
 	m.ID = "m1"
+	m.Sender = "1.2.3.4"
 	m.Attachment = &attachment{
 		Name:    "flower.jpg",
 		Type:    "image/jpeg",
 		Size:    5000,
 		Expires: expires1,
 		URL:     "https://ntfy.sh/file/AbDeFgJhal.jpg",
-		Owner:   "1.2.3.4",
 	}
 	require.Nil(t, c.AddMessage(m))
 
 	expires2 := time.Now().Add(2 * time.Hour).Unix() // Future
 	m = newDefaultMessage("mytopic", "sending you a car")
 	m.ID = "m2"
+	m.Sender = "1.2.3.4"
 	m.Attachment = &attachment{
 		Name:    "car.jpg",
 		Type:    "image/jpeg",
 		Size:    10000,
 		Expires: expires2,
 		URL:     "https://ntfy.sh/file/aCaRURL.jpg",
-		Owner:   "1.2.3.4",
 	}
 	require.Nil(t, c.AddMessage(m))
 
 	expires3 := time.Now().Add(1 * time.Hour).Unix() // Future
 	m = newDefaultMessage("another-topic", "sending you another car")
 	m.ID = "m3"
+	m.Sender = "1.2.3.4"
 	m.Attachment = &attachment{
 		Name:    "another-car.jpg",
 		Type:    "image/jpeg",
 		Size:    20000,
 		Expires: expires3,
 		URL:     "https://ntfy.sh/file/zakaDHFW.jpg",
-		Owner:   "1.2.3.4",
 	}
 	require.Nil(t, c.AddMessage(m))
 
@@ -327,7 +327,7 @@ func testCacheAttachments(t *testing.T, c *messageCache) {
 	require.Equal(t, int64(5000), messages[0].Attachment.Size)
 	require.Equal(t, expires1, messages[0].Attachment.Expires)
 	require.Equal(t, "https://ntfy.sh/file/AbDeFgJhal.jpg", messages[0].Attachment.URL)
-	require.Equal(t, "1.2.3.4", messages[0].Attachment.Owner)
+	require.Equal(t, "1.2.3.4", messages[0].Sender)
 
 	require.Equal(t, "sending you a car", messages[1].Message)
 	require.Equal(t, "car.jpg", messages[1].Attachment.Name)
@@ -335,7 +335,7 @@ func testCacheAttachments(t *testing.T, c *messageCache) {
 	require.Equal(t, int64(10000), messages[1].Attachment.Size)
 	require.Equal(t, expires2, messages[1].Attachment.Expires)
 	require.Equal(t, "https://ntfy.sh/file/aCaRURL.jpg", messages[1].Attachment.URL)
-	require.Equal(t, "1.2.3.4", messages[1].Attachment.Owner)
+	require.Equal(t, "1.2.3.4", messages[1].Sender)
 
 	size, err := c.AttachmentBytesUsed("1.2.3.4")
 	require.Nil(t, err)

+ 19 - 11
server/server.go

@@ -443,7 +443,7 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito
 	if s.mailer != nil && email != "" && !delayed {
 		go s.sendEmail(v, m, email)
 	}
-	if s.config.UpstreamBaseURL != "" {
+	if s.config.UpstreamBaseURL != "" && !delayed {
 		go s.forwardPollRequest(v, m)
 	}
 	if cache {
@@ -484,7 +484,10 @@ func (s *Server) forwardPollRequest(v *visitor, m *message) {
 		return
 	}
 	req.Header.Set("X-Poll-ID", m.ID)
-	response, err := http.DefaultClient.Do(req)
+	var httpClient = &http.Client{
+		Timeout: time.Second * 10,
+	}
+	response, err := httpClient.Do(req)
 	if err != nil {
 		log.Printf("[%s] FWD - Unable to forward poll request: %v", v.ip, err.Error())
 		return
@@ -566,6 +569,7 @@ func (s *Server) parsePublishParams(r *http.Request, v *visitor, m *message) (ca
 			return false, false, "", false, errHTTPBadRequestDelayTooLarge
 		}
 		m.Time = delay.Unix()
+		m.Sender = v.ip // Important for rate limiting
 	}
 	actionsStr := readParam(r, "x-actions", "actions", "action")
 	if actionsStr != "" {
@@ -661,7 +665,7 @@ func (s *Server) handleBodyAsAttachment(r *http.Request, v *visitor, m *message,
 		m.Attachment = &attachment{}
 	}
 	var ext string
-	m.Attachment.Owner = v.ip // Important for attachment rate limiting
+	m.Sender = v.ip // Important for attachment rate limiting
 	m.Attachment.Expires = time.Now().Add(s.config.AttachmentExpiryDuration).Unix()
 	m.Attachment.Type, ext = util.DetectContentType(body.PeekedBytes, m.Attachment.Name)
 	m.Attachment.URL = fmt.Sprintf("%s/file/%s%s", s.config.BaseURL, m.ID, ext)
@@ -1081,7 +1085,7 @@ func (s *Server) runManager() {
 func (s *Server) runDelayedSender() {
 	for {
 		select {
-		case <-time.After(s.config.AtSenderInterval):
+		case <-time.After(s.config.DelayedSenderInterval):
 			if err := s.sendDelayedMessages(); err != nil {
 				log.Printf("error sending scheduled messages: %s", err.Error())
 			}
@@ -1118,7 +1122,7 @@ func (s *Server) sendDelayedMessages() error {
 		return err
 	}
 	for _, m := range messages {
-		v := s.visitorFromIP("0.0.0.0") // FIXME: get message owner!!
+		v := s.visitorFromIP(m.Sender)
 		if err := s.sendDelayedMessage(v, m); err != nil {
 			log.Printf("error sending delayed message: %s", err.Error())
 		}
@@ -1131,14 +1135,18 @@ func (s *Server) sendDelayedMessage(v *visitor, m *message) error {
 	defer s.mu.Unlock()
 	t, ok := s.topics[m.Topic] // If no subscribers, just mark message as published
 	if ok {
-		if err := t.Publish(v, m); err != nil {
-			return fmt.Errorf("unable to publish message %s to topic %s: %v", m.ID, m.Topic, err.Error())
-		}
+		go func() {
+			// We do not rate-limit messages here, since we've rate limited them in the PUT/POST handler
+			if err := t.Publish(v, m); err != nil {
+				log.Printf("unable to publish message %s to topic %s: %v", m.ID, m.Topic, err.Error())
+			}
+		}()
 	}
 	if s.firebase != nil { // Firebase subscribers may not show up in topics map
-		if err := s.firebase(v, m); err != nil {
-			return fmt.Errorf("unable to publish to Firebase: %v", err.Error())
-		}
+		go s.sendToFirebase(v, m)
+	}
+	if s.config.UpstreamBaseURL != "" {
+		go s.forwardPollRequest(v, m)
 	}
 	if err := s.messageCache.MarkPublished(m); err != nil {
 		return err

+ 0 - 1
server/server_firebase_test.go

@@ -119,7 +119,6 @@ func TestToFirebaseMessage_Message_Normal_Allowed(t *testing.T) {
 		Size:    12345,
 		Expires: 98765543,
 		URL:     "https://example.com/file.jpg",
-		Owner:   "some-owner",
 	}
 	fbm, err := toFirebaseMessage(m, &testAuther{Allow: true})
 	require.Nil(t, err)

+ 12 - 5
server/server_test.go

@@ -264,7 +264,7 @@ func TestServer_PublishNoCache(t *testing.T) {
 func TestServer_PublishAt(t *testing.T) {
 	c := newTestConfig(t)
 	c.MinDelay = time.Second
-	c.AtSenderInterval = 100 * time.Millisecond
+	c.DelayedSenderInterval = 100 * time.Millisecond
 	s := newTestServer(t, c)
 
 	response := request(t, s, "PUT", "/mytopic", "a message", map[string]string{
@@ -283,6 +283,13 @@ func TestServer_PublishAt(t *testing.T) {
 	messages = toMessages(t, response.Body.String())
 	require.Equal(t, 1, len(messages))
 	require.Equal(t, "a message", messages[0].Message)
+	require.Equal(t, "", messages[0].Sender) // Never return the sender!
+
+	messages, err := s.messageCache.Messages("mytopic", sinceAllMessages, true)
+	require.Nil(t, err)
+	require.Equal(t, 1, len(messages))
+	require.Equal(t, "a message", messages[0].Message)
+	require.Equal(t, "9.9.9.9", messages[0].Sender) // It's stored in the DB though!
 }
 
 func TestServer_PublishAtWithCacheError(t *testing.T) {
@@ -1019,7 +1026,7 @@ func TestServer_PublishAttachment(t *testing.T) {
 	require.Equal(t, int64(5000), msg.Attachment.Size)
 	require.GreaterOrEqual(t, msg.Attachment.Expires, time.Now().Add(179*time.Minute).Unix()) // Almost 3 hours
 	require.Contains(t, msg.Attachment.URL, "http://127.0.0.1:12345/file/")
-	require.Equal(t, "", msg.Attachment.Owner) // Should never be returned
+	require.Equal(t, "", msg.Sender) // Should never be returned
 	require.FileExists(t, filepath.Join(s.config.AttachmentCacheDir, msg.ID))
 
 	path := strings.TrimPrefix(msg.Attachment.URL, "http://127.0.0.1:12345")
@@ -1048,7 +1055,7 @@ func TestServer_PublishAttachmentShortWithFilename(t *testing.T) {
 	require.Equal(t, int64(21), msg.Attachment.Size)
 	require.GreaterOrEqual(t, msg.Attachment.Expires, time.Now().Add(3*time.Hour).Unix())
 	require.Contains(t, msg.Attachment.URL, "http://127.0.0.1:12345/file/")
-	require.Equal(t, "", msg.Attachment.Owner) // Should never be returned
+	require.Equal(t, "", msg.Sender) // Should never be returned
 	require.FileExists(t, filepath.Join(s.config.AttachmentCacheDir, msg.ID))
 
 	path := strings.TrimPrefix(msg.Attachment.URL, "http://127.0.0.1:12345")
@@ -1075,7 +1082,7 @@ func TestServer_PublishAttachmentExternalWithoutFilename(t *testing.T) {
 	require.Equal(t, "", msg.Attachment.Type)
 	require.Equal(t, int64(0), msg.Attachment.Size)
 	require.Equal(t, int64(0), msg.Attachment.Expires)
-	require.Equal(t, "", msg.Attachment.Owner)
+	require.Equal(t, "", msg.Sender)
 
 	// Slightly unrelated cross-test: make sure we don't add an owner for external attachments
 	size, err := s.messageCache.AttachmentBytesUsed("127.0.0.1")
@@ -1096,7 +1103,7 @@ func TestServer_PublishAttachmentExternalWithFilename(t *testing.T) {
 	require.Equal(t, "", msg.Attachment.Type)
 	require.Equal(t, int64(0), msg.Attachment.Size)
 	require.Equal(t, int64(0), msg.Attachment.Expires)
-	require.Equal(t, "", msg.Attachment.Owner)
+	require.Equal(t, "", msg.Sender)
 }
 
 func TestServer_PublishAttachmentBadURL(t *testing.T) {

+ 1 - 1
server/types.go

@@ -32,6 +32,7 @@ type message struct {
 	Actions    []*action   `json:"actions,omitempty"`
 	Attachment *attachment `json:"attachment,omitempty"`
 	PollID     string      `json:"poll_id,omitempty"`
+	Sender     string      `json:"-"`                  // IP address of uploader, used for rate limiting
 	Encoding   string      `json:"encoding,omitempty"` // empty for raw UTF-8, or "base64" for encoded bytes
 }
 
@@ -41,7 +42,6 @@ type attachment struct {
 	Size    int64  `json:"size,omitempty"`
 	Expires int64  `json:"expires,omitempty"`
 	URL     string `json:"url"`
-	Owner   string `json:"-"` // IP address of uploader, used for rate limiting
 }
 
 type action struct {