webpush_store.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package server
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "time"
  6. _ "github.com/mattn/go-sqlite3" // SQLite driver
  7. )
  8. const (
  9. createWebPushSubscriptionsTableQuery = `
  10. BEGIN;
  11. CREATE TABLE IF NOT EXISTS subscriptions (
  12. id INTEGER PRIMARY KEY AUTOINCREMENT,
  13. topic TEXT NOT NULL,
  14. user_id TEXT,
  15. endpoint TEXT NOT NULL,
  16. key_auth TEXT NOT NULL,
  17. key_p256dh TEXT NOT NULL,
  18. updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
  19. warning_sent BOOLEAN DEFAULT FALSE
  20. );
  21. CREATE TABLE IF NOT EXISTS schemaVersion (
  22. id INT PRIMARY KEY,
  23. version INT NOT NULL
  24. );
  25. CREATE INDEX IF NOT EXISTS idx_topic ON subscriptions (topic);
  26. CREATE INDEX IF NOT EXISTS idx_endpoint ON subscriptions (endpoint);
  27. CREATE UNIQUE INDEX IF NOT EXISTS idx_topic_endpoint ON subscriptions (topic, endpoint);
  28. COMMIT;
  29. `
  30. insertWebPushSubscriptionQuery = `
  31. INSERT OR REPLACE INTO subscriptions (topic, user_id, endpoint, key_auth, key_p256dh)
  32. VALUES (?, ?, ?, ?, ?)
  33. `
  34. deleteWebPushSubscriptionByEndpointQuery = `DELETE FROM subscriptions WHERE endpoint = ?`
  35. deleteWebPushSubscriptionByUserIDQuery = `DELETE FROM subscriptions WHERE user_id = ?`
  36. deleteWebPushSubscriptionsByAgeQuery = `DELETE FROM subscriptions WHERE warning_sent = 1 AND updated_at <= datetime('now', ?)`
  37. selectWebPushSubscriptionsForTopicQuery = `SELECT endpoint, key_auth, key_p256dh, user_id FROM subscriptions WHERE topic = ?`
  38. selectWebPushSubscriptionsExpiringSoonQuery = `SELECT DISTINCT endpoint, key_auth, key_p256dh, user_id FROM subscriptions WHERE warning_sent = 0 AND updated_at <= datetime('now', ?)`
  39. updateWarningSentQuery = `UPDATE subscriptions SET warning_sent = true WHERE warning_sent = 0 AND updated_at <= datetime('now', ?)`
  40. )
  41. // Schema management queries
  42. const (
  43. currentWebPushSchemaVersion = 1
  44. insertWebPushSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)`
  45. selectWebPushSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1`
  46. )
  47. type webPushStore struct {
  48. db *sql.DB
  49. }
  50. func newWebPushStore(filename string) (*webPushStore, error) {
  51. db, err := sql.Open("sqlite3", filename)
  52. if err != nil {
  53. return nil, err
  54. }
  55. if err := setupWebPushDB(db); err != nil {
  56. return nil, err
  57. }
  58. return &webPushStore{
  59. db: db,
  60. }, nil
  61. }
  62. func setupWebPushDB(db *sql.DB) error {
  63. // If 'schemaVersion' table does not exist, this must be a new database
  64. rows, err := db.Query(selectWebPushSchemaVersionQuery)
  65. if err != nil {
  66. return setupNewWebPushDB(db)
  67. }
  68. return rows.Close()
  69. }
  70. func setupNewWebPushDB(db *sql.DB) error {
  71. if _, err := db.Exec(createWebPushSubscriptionsTableQuery); err != nil {
  72. return err
  73. }
  74. if _, err := db.Exec(insertWebPushSchemaVersion, currentWebPushSchemaVersion); err != nil {
  75. return err
  76. }
  77. return nil
  78. }
  79. // UpsertSubscription adds or updates Web Push subscriptions for the given topics and user ID. It always first deletes all
  80. // existing entries for a given endpoint.
  81. func (c *webPushStore) UpsertSubscription(endpoint string, topics []string, userID, auth, p256dh string) error {
  82. tx, err := c.db.Begin()
  83. if err != nil {
  84. return err
  85. }
  86. defer tx.Rollback()
  87. if _, err := tx.Exec(deleteWebPushSubscriptionByEndpointQuery, endpoint); err != nil {
  88. return err
  89. }
  90. for _, topic := range topics {
  91. if _, err = tx.Exec(insertWebPushSubscriptionQuery, topic, userID, endpoint, auth, p256dh); err != nil {
  92. return err
  93. }
  94. }
  95. return tx.Commit()
  96. }
  97. func (c *webPushStore) SubscriptionsForTopic(topic string) ([]*webPushSubscription, error) {
  98. rows, err := c.db.Query(selectWebPushSubscriptionsForTopicQuery, topic)
  99. if err != nil {
  100. return nil, err
  101. }
  102. defer rows.Close()
  103. subscriptions := make([]*webPushSubscription, 0)
  104. for rows.Next() {
  105. var endpoint, auth, p256dh, userID string
  106. if err = rows.Scan(&endpoint, &auth, &p256dh, &userID); err != nil {
  107. return nil, err
  108. }
  109. subscriptions = append(subscriptions, &webPushSubscription{
  110. Endpoint: endpoint,
  111. Auth: auth,
  112. P256dh: p256dh,
  113. UserID: userID,
  114. })
  115. }
  116. return subscriptions, nil
  117. }
  118. func (c *webPushStore) ExpireAndGetExpiringSubscriptions(warningDuration time.Duration, expiryDuration time.Duration) ([]*webPushSubscription, error) {
  119. // TODO this should be two functions
  120. tx, err := c.db.Begin()
  121. if err != nil {
  122. return nil, err
  123. }
  124. defer tx.Rollback()
  125. _, err = tx.Exec(deleteWebPushSubscriptionsByAgeQuery, fmt.Sprintf("-%.2f seconds", expiryDuration.Seconds()))
  126. if err != nil {
  127. return nil, err
  128. }
  129. rows, err := tx.Query(selectWebPushSubscriptionsExpiringSoonQuery, fmt.Sprintf("-%.2f seconds", warningDuration.Seconds()))
  130. if err != nil {
  131. return nil, err
  132. }
  133. defer rows.Close()
  134. subscriptions := make([]*webPushSubscription, 0)
  135. for rows.Next() {
  136. var endpoint, auth, p256dh, userID string
  137. if err = rows.Scan(&endpoint, &auth, &p256dh, &userID); err != nil {
  138. return nil, err
  139. }
  140. subscriptions = append(subscriptions, &webPushSubscription{
  141. Endpoint: endpoint,
  142. Auth: auth,
  143. P256dh: p256dh,
  144. UserID: userID,
  145. })
  146. }
  147. // also set warning as sent
  148. _, err = tx.Exec(updateWarningSentQuery, fmt.Sprintf("-%.2f seconds", warningDuration.Seconds()))
  149. if err != nil {
  150. return nil, err
  151. }
  152. if err = tx.Commit(); err != nil {
  153. return nil, err
  154. }
  155. return subscriptions, nil
  156. }
  157. func (c *webPushStore) RemoveSubscriptionsByEndpoint(endpoint string) error {
  158. _, err := c.db.Exec(deleteWebPushSubscriptionByEndpointQuery, endpoint)
  159. return err
  160. }
  161. func (c *webPushStore) RemoveSubscriptionsByUserID(userID string) error {
  162. _, err := c.db.Exec(deleteWebPushSubscriptionByUserIDQuery, userID)
  163. return err
  164. }
  165. func (c *webPushStore) Close() error {
  166. return c.db.Close()
  167. }