Philipp Heckel пре 4 година
родитељ
комит
4af9c07577
7 измењених фајлова са 153 додато и 33 уклоњено
  1. 1 1
      server/cache.go
  2. 1 1
      server/cache_mem.go
  3. 90 14
      server/cache_sqlite.go
  4. 1 1
      server/cache_test.go
  5. 15 7
      server/server.go
  6. 31 8
      server/types.go
  7. 14 1
      util/util.go

+ 1 - 1
server/cache.go

@@ -14,7 +14,7 @@ var (
 // i.e. message structs with the Event messageEvent.
 type cache interface {
 	AddMessage(m *message) error
-	Messages(topic string, since sinceTime, scheduled bool) ([]*message, error)
+	Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error)
 	MessagesDue() ([]*message, error)
 	MessageCount(topic string) (int, error)
 	Topics() (map[string]*topic, error)

+ 1 - 1
server/cache_mem.go

@@ -54,7 +54,7 @@ func (c *memCache) AddMessage(m *message) error {
 	return nil
 }
 
-func (c *memCache) Messages(topic string, since sinceTime, scheduled bool) ([]*message, error) {
+func (c *memCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
 	c.mu.Lock()
 	defer c.mu.Unlock()
 	if _, ok := c.messages[topic]; !ok || since.IsNone() {

+ 90 - 14
server/cache_sqlite.go

@@ -15,7 +15,8 @@ const (
 	createMessagesTableQuery = `
 		BEGIN;
 		CREATE TABLE IF NOT EXISTS messages (
-			id TEXT PRIMARY KEY,
+			id INTEGER PRIMARY KEY AUTOINCREMENT,
+			mid TEXT NOT NULL,
 			time INT NOT NULL,
 			topic TEXT NOT NULL,
 			message TEXT NOT NULL,
@@ -32,42 +33,59 @@ const (
 			encoding TEXT NOT NULL,
 			published INT NOT NULL
 		);
+		CREATE INDEX IF NOT EXISTS idx_mid ON messages (mid);
 		CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
 		COMMIT;
 	`
 	insertMessageQuery = `
-		INSERT INTO messages (id, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published) 
+		INSERT INTO messages (mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published) 
 		VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
 	`
 	pruneMessagesQuery           = `DELETE FROM messages WHERE time < ? AND published = 1`
 	selectMessagesSinceTimeQuery = `
-		SELECT id, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
+		SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
 		FROM messages 
 		WHERE topic = ? AND time >= ? AND published = 1
-		ORDER BY time ASC
+		ORDER BY time, id
 	`
 	selectMessagesSinceTimeIncludeScheduledQuery = `
-		SELECT id, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
+		SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
 		FROM messages 
 		WHERE topic = ? AND time >= ?
-		ORDER BY time ASC
+		ORDER BY time, id
+	`
+	selectMessagesSinceIDQuery = `
+		SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
+		FROM messages 
+		WHERE topic = ?
+          AND published = 1 
+		  AND id > (SELECT IFNULL(id,0) FROM messages WHERE mid = ?) 
+		ORDER BY time, id
+	`
+	selectMessagesSinceIDIncludeScheduledQuery = `
+		SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
+		FROM messages 
+		WHERE topic = ? 
+		  AND id > (SELECT IFNULL(id,0) FROM messages WHERE mid = ?)
+		ORDER BY time, id
 	`
 	selectMessagesDueQuery = `
-		SELECT id, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
+		SELECT mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
 		FROM messages 
 		WHERE time <= ? AND published = 0
+		ORDER BY time, id
 	`
-	updateMessagePublishedQuery     = `UPDATE messages SET published = 1 WHERE id = ?`
+	updateMessagePublishedQuery     = `UPDATE messages SET published = 1 WHERE mid = ?`
 	selectMessagesCountQuery        = `SELECT COUNT(*) FROM messages`
 	selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?`
 	selectTopicsQuery               = `SELECT topic FROM messages GROUP BY topic`
 	selectAttachmentsSizeQuery      = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE attachment_owner = ? AND attachment_expires >= ?`
-	selectAttachmentsExpiredQuery   = `SELECT id FROM messages WHERE attachment_expires > 0 AND attachment_expires < ?`
+	selectAttachmentsExpiredQuery   = `SELECT mid FROM messages WHERE attachment_expires > 0 AND attachment_expires < ?`
 )
 
 // Schema management queries
 const (
-	currentSchemaVersion          = 4
+	currentSchemaVersion          = 5
 	createSchemaVersionTableQuery = `
 		CREATE TABLE IF NOT EXISTS schemaVersion (
 			id INT PRIMARY KEY,
@@ -108,6 +126,43 @@ const (
 	migrate3To4AlterMessagesTableQuery = `
 		ALTER TABLE messages ADD COLUMN encoding TEXT NOT NULL DEFAULT('');
 	`
+
+	// 4 -> 5
+	migrate4To5AlterMessagesTableQuery = `
+		BEGIN;
+		CREATE TABLE IF NOT EXISTS messages_new (
+			id INTEGER PRIMARY KEY AUTOINCREMENT,
+			mid TEXT NOT NULL,
+			time INT NOT NULL,
+			topic TEXT NOT NULL,
+			message TEXT NOT NULL,
+			title TEXT NOT NULL,
+			priority INT NOT NULL,
+			tags TEXT NOT NULL,
+			click TEXT NOT NULL,
+			attachment_name TEXT NOT NULL,
+			attachment_type TEXT NOT NULL,
+			attachment_size INT NOT NULL,
+			attachment_expires INT NOT NULL,
+			attachment_url TEXT NOT NULL,
+			attachment_owner TEXT NOT NULL,
+			encoding TEXT NOT NULL,
+			published INT NOT NULL
+		);
+		CREATE INDEX IF NOT EXISTS idx_mid ON messages_new (mid);
+		CREATE INDEX IF NOT EXISTS idx_topic ON messages_new (topic);
+		INSERT 
+			INTO messages_new (
+				mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, 
+				attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published)
+			SELECT
+				id, time, topic, message, title, priority, tags, click, attachment_name, attachment_type, 
+				attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published
+			FROM messages;
+		DROP TABLE messages;
+		ALTER TABLE messages_new RENAME TO messages;
+		COMMIT;
+	`
 )
 
 type sqliteCache struct {
@@ -167,16 +222,24 @@ func (c *sqliteCache) AddMessage(m *message) error {
 	return err
 }
 
-func (c *sqliteCache) Messages(topic string, since sinceTime, scheduled bool) ([]*message, error) {
+func (c *sqliteCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
 	if since.IsNone() {
 		return make([]*message, 0), nil
 	}
 	var rows *sql.Rows
 	var err error
-	if scheduled {
-		rows, err = c.db.Query(selectMessagesSinceTimeIncludeScheduledQuery, topic, since.Time().Unix())
+	if since.IsID() {
+		if scheduled {
+			rows, err = c.db.Query(selectMessagesSinceIDIncludeScheduledQuery, topic, since.ID())
+		} else {
+			rows, err = c.db.Query(selectMessagesSinceIDQuery, topic, since.ID())
+		}
 	} else {
-		rows, err = c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix())
+		if scheduled {
+			rows, err = c.db.Query(selectMessagesSinceTimeIncludeScheduledQuery, topic, since.Time().Unix())
+		} else {
+			rows, err = c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix())
+		}
 	}
 	if err != nil {
 		return nil, err
@@ -373,6 +436,8 @@ func setupCacheDB(db *sql.DB) error {
 		return migrateFrom2(db)
 	} else if schemaVersion == 3 {
 		return migrateFrom3(db)
+	} else if schemaVersion == 4 {
+		return migrateFrom4(db)
 	}
 	return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
 }
@@ -434,5 +499,16 @@ func migrateFrom3(db *sql.DB) error {
 	if _, err := db.Exec(updateSchemaVersion, 4); err != nil {
 		return err
 	}
+	return migrateFrom4(db)
+}
+
+func migrateFrom4(db *sql.DB) error {
+	log.Print("Migrating cache database schema: from 4 to 5")
+	if _, err := db.Exec(migrate4To5AlterMessagesTableQuery); err != nil {
+		return err
+	}
+	if _, err := db.Exec(updateSchemaVersion, 5); err != nil {
+		return err
+	}
 	return nil // Update this when a new version is added
 }

+ 1 - 1
server/cache_test.go

@@ -42,7 +42,7 @@ func testCacheMessages(t *testing.T, c cache) {
 	require.Empty(t, messages)
 
 	// mytopic: since 2
-	messages, _ = c.Messages("mytopic", sinceTime(time.Unix(2, 0)), false)
+	messages, _ = c.Messages("mytopic", newSinceTime(2), false)
 	require.Equal(t, 1, len(messages))
 	require.Equal(t, "my other message", messages[0].Message)
 

+ 15 - 7
server/server.go

@@ -805,7 +805,7 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
 	return err
 }
 
-func parseSubscribeParams(r *http.Request) (poll bool, since sinceTime, scheduled bool, filters *queryFilter, err error) {
+func parseSubscribeParams(r *http.Request) (poll bool, since sinceMarker, scheduled bool, filters *queryFilter, err error) {
 	poll = readBoolParam(r, false, "x-poll", "poll", "po")
 	scheduled = readBoolParam(r, false, "x-scheduled", "scheduled", "sched")
 	since, err = parseSince(r, poll)
@@ -819,7 +819,7 @@ func parseSubscribeParams(r *http.Request) (poll bool, since sinceTime, schedule
 	return
 }
 
-func (s *Server) sendOldMessages(topics []*topic, since sinceTime, scheduled bool, sub subscriber) error {
+func (s *Server) sendOldMessages(topics []*topic, since sinceMarker, scheduled bool, sub subscriber) error {
 	if since.IsNone() {
 		return nil
 	}
@@ -841,20 +841,28 @@ func (s *Server) sendOldMessages(topics []*topic, since sinceTime, scheduled boo
 //
 // Values in the "since=..." parameter can be either a unix timestamp or a duration (e.g. 12h), or
 // "all" for all messages.
-func parseSince(r *http.Request, poll bool) (sinceTime, error) {
+func parseSince(r *http.Request, poll bool) (sinceMarker, error) {
 	since := readParam(r, "x-since", "since", "si")
+
+	// Easy cases (empty, all, none)
 	if since == "" {
 		if poll {
 			return sinceAllMessages, nil
 		}
 		return sinceNoMessages, nil
-	}
-	if since == "all" {
+	} else if since == "all" {
 		return sinceAllMessages, nil
+	} else if since == "none" {
+		return sinceNoMessages, nil
+	}
+
+	// ID, timestamp, duration
+	if validMessageID(since) {
+		return newSinceID(since), nil
 	} else if s, err := strconv.ParseInt(since, 10, 64); err == nil {
-		return sinceTime(time.Unix(s, 0)), nil
+		return newSinceTime(s), nil
 	} else if d, err := time.ParseDuration(since); err == nil {
-		return sinceTime(time.Now().Add(-1 * d)), nil
+		return newSinceTime(time.Now().Add(-1 * d).Unix()), nil
 	}
 	return sinceNoMessages, errHTTPBadRequestSinceInvalid
 }

+ 31 - 8
server/types.go

@@ -15,7 +15,7 @@ const (
 )
 
 const (
-	messageIDLength = 10
+	messageIDLength = 12
 )
 
 // message represents a message published to a topic
@@ -74,23 +74,46 @@ func newDefaultMessage(topic, msg string) *message {
 	return newMessage(messageEvent, topic, msg)
 }
 
-type sinceTime time.Time
+func validMessageID(s string) bool {
+	return util.ValidRandomString(s, messageIDLength)
+}
+
+type sinceMarker struct {
+	time time.Time
+	id   string
+}
 
-func (t sinceTime) IsAll() bool {
+func newSinceTime(timestamp int64) sinceMarker {
+	return sinceMarker{time.Unix(timestamp, 0), ""}
+}
+
+func newSinceID(id string) sinceMarker {
+	return sinceMarker{time.Unix(0, 0), id}
+}
+
+func (t sinceMarker) IsAll() bool {
 	return t == sinceAllMessages
 }
 
-func (t sinceTime) IsNone() bool {
+func (t sinceMarker) IsNone() bool {
 	return t == sinceNoMessages
 }
 
-func (t sinceTime) Time() time.Time {
-	return time.Time(t)
+func (t sinceMarker) IsID() bool {
+	return t.id != ""
+}
+
+func (t sinceMarker) Time() time.Time {
+	return t.time
+}
+
+func (t sinceMarker) ID() string {
+	return t.id
 }
 
 var (
-	sinceAllMessages = sinceTime(time.Unix(0, 0))
-	sinceNoMessages  = sinceTime(time.Unix(1, 0))
+	sinceAllMessages = sinceMarker{time.Unix(0, 0), ""}
+	sinceNoMessages  = sinceMarker{time.Unix(1, 0), ""}
 )
 
 type queryFilter struct {

+ 14 - 1
util/util.go

@@ -88,7 +88,20 @@ func RandomString(length int) string {
 	return string(b)
 }
 
-// DurationToHuman converts a duration to a human readable format
+// ValidRandomString returns true if the given string matches the format created by RandomString
+func ValidRandomString(s string, length int) bool {
+	if len(s) != length {
+		return false
+	}
+	for _, c := range strings.Split(s, "") {
+		if !strings.Contains(randomStringCharset, c) {
+			return false
+		}
+	}
+	return true
+}
+
+// DurationToHuman converts a duration to a human-readable format
 func DurationToHuman(d time.Duration) (str string) {
 	if d == 0 {
 		return "0"