webpush_store.go 7.0 KB


  1. package server
  2. import (
  3. "database/sql"
  4. "heckel.io/ntfy/util"
  5. "time"
  6. _ "github.com/mattn/go-sqlite3" // SQLite driver
  7. )
  8. const (
  9. subscriptionIDPrefix = "wps_"
  10. subscriptionIDLength = 10
  11. )
  12. const (
  13. createWebPushSubscriptionsTableQuery = `
  14. BEGIN;
  15. CREATE TABLE IF NOT EXISTS subscription (
  16. id TEXT PRIMARY KEY,
  17. endpoint TEXT NOT NULL,
  18. key_auth TEXT NOT NULL,
  19. key_p256dh TEXT NOT NULL,
  20. user_id TEXT NOT NULL,
  21. updated_at INT NOT NULL,
  22. warned_at INT NOT NULL DEFAULT 0
  23. );
  24. CREATE UNIQUE INDEX IF NOT EXISTS idx_endpoint ON subscription (endpoint);
  25. CREATE TABLE IF NOT EXISTS subscription_topic (
  26. subscription_id TEXT NOT NULL,
  27. topic TEXT NOT NULL,
  28. PRIMARY KEY (subscription_id, topic),
  29. FOREIGN KEY (subscription_id) REFERENCES subscription (id) ON DELETE CASCADE
  30. );
  31. CREATE INDEX IF NOT EXISTS idx_topic ON subscription_topic (topic);
  32. CREATE TABLE IF NOT EXISTS schemaVersion (
  33. id INT PRIMARY KEY,
  34. version INT NOT NULL
  35. );
  36. COMMIT;
  37. `
  38. builtinStartupQueries = `
  39. PRAGMA foreign_keys = ON;
  40. `
  41. selectWebPushSubscriptionIDByEndpoint = `SELECT id FROM subscription WHERE endpoint = ?`
  42. selectWebPushSubscriptionsForTopicQuery = `
  43. SELECT id, endpoint, key_auth, key_p256dh, user_id
  44. FROM subscription_topic st
  45. JOIN subscription s ON s.id = st.subscription_id
  46. WHERE st.topic = ?
  47. `
  48. selectWebPushSubscriptionsExpiringSoonQuery = `SELECT id, endpoint, key_auth, key_p256dh, user_id FROM subscription WHERE warned_at = 0 AND updated_at <= ?`
  49. insertWebPushSubscriptionQuery = `
  50. INSERT INTO subscription (id, endpoint, key_auth, key_p256dh, user_id, updated_at, warned_at)
  51. VALUES (?, ?, ?, ?, ?, ?, ?)
  52. ON CONFLICT (endpoint)
  53. DO UPDATE SET key_auth = excluded.key_auth, key_p256dh = excluded.key_p256dh, user_id = excluded.user_id, updated_at = excluded.updated_at, warned_at = excluded.warned_at
  54. `
  55. updateWebPushSubscriptionWarningSentQuery = `UPDATE subscription SET warned_at = ? WHERE id = ?`
  56. deleteWebPushSubscriptionByEndpointQuery = `DELETE FROM subscription WHERE endpoint = ?`
  57. deleteWebPushSubscriptionByUserIDQuery = `DELETE FROM subscription WHERE user_id = ?`
  58. deleteWebPushSubscriptionByAgeQuery = `DELETE FROM subscription WHERE updated_at <= ?` // Full table scan!
  59. insertWebPushSubscriptionTopicQuery = `INSERT INTO subscription_topic (subscription_id, topic) VALUES (?, ?)`
  60. deleteWebPushSubscriptionTopicAllQuery = `DELETE FROM subscription_topic WHERE subscription_id = ?`
  61. )
  62. // Schema management queries
  63. const (
  64. currentWebPushSchemaVersion = 1
  65. insertWebPushSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)`
  66. selectWebPushSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1`
  67. )
  68. type webPushStore struct {
  69. db *sql.DB
  70. }
  71. func newWebPushStore(filename string) (*webPushStore, error) {
  72. db, err := sql.Open("sqlite3", filename)
  73. if err != nil {
  74. return nil, err
  75. }
  76. if err := setupWebPushDB(db); err != nil {
  77. return nil, err
  78. }
  79. if err := runWebPushStartupQueries(db); err != nil {
  80. return nil, err
  81. }
  82. return &webPushStore{
  83. db: db,
  84. }, nil
  85. }
  86. func setupWebPushDB(db *sql.DB) error {
  87. // If 'schemaVersion' table does not exist, this must be a new database
  88. rows, err := db.Query(selectWebPushSchemaVersionQuery)
  89. if err != nil {
  90. return setupNewWebPushDB(db)
  91. }
  92. return rows.Close()
  93. }
  94. func setupNewWebPushDB(db *sql.DB) error {
  95. if _, err := db.Exec(createWebPushSubscriptionsTableQuery); err != nil {
  96. return err
  97. }
  98. if _, err := db.Exec(insertWebPushSchemaVersion, currentWebPushSchemaVersion); err != nil {
  99. return err
  100. }
  101. return nil
  102. }
  103. func runWebPushStartupQueries(db *sql.DB) error {
  104. _, err := db.Exec(builtinStartupQueries)
  105. return err
  106. }
  107. // UpsertSubscription adds or updates Web Push subscriptions for the given topics and user ID. It always first deletes all
  108. // existing entries for a given endpoint.
  109. func (c *webPushStore) UpsertSubscription(endpoint string, auth, p256dh, userID string, topics []string) error {
  110. tx, err := c.db.Begin()
  111. if err != nil {
  112. return err
  113. }
  114. defer tx.Rollback()
  115. // Read existing subscription ID for endpoint (or create new ID)
  116. rows, err := tx.Query(selectWebPushSubscriptionIDByEndpoint, endpoint)
  117. if err != nil {
  118. return err
  119. }
  120. defer rows.Close()
  121. var subscriptionID string
  122. if rows.Next() {
  123. if err := rows.Scan(&subscriptionID); err != nil {
  124. return err
  125. }
  126. } else {
  127. subscriptionID = util.RandomStringPrefix(subscriptionIDPrefix, subscriptionIDLength)
  128. }
  129. if err := rows.Close(); err != nil {
  130. return err
  131. }
  132. // Insert or update subscription
  133. updatedAt, warnedAt := time.Now().Unix(), 0
  134. if _, err = tx.Exec(insertWebPushSubscriptionQuery, subscriptionID, endpoint, auth, p256dh, userID, updatedAt, warnedAt); err != nil {
  135. return err
  136. }
  137. // Replace all subscription topics
  138. if _, err := tx.Exec(deleteWebPushSubscriptionTopicAllQuery, subscriptionID); err != nil {
  139. return err
  140. }
  141. for _, topic := range topics {
  142. if _, err = tx.Exec(insertWebPushSubscriptionTopicQuery, subscriptionID, topic); err != nil {
  143. return err
  144. }
  145. }
  146. return tx.Commit()
  147. }
  148. func (c *webPushStore) SubscriptionsForTopic(topic string) ([]*webPushSubscription, error) {
  149. rows, err := c.db.Query(selectWebPushSubscriptionsForTopicQuery, topic)
  150. if err != nil {
  151. return nil, err
  152. }
  153. defer rows.Close()
  154. return c.subscriptionsFromRows(rows)
  155. }
  156. func (c *webPushStore) SubscriptionsExpiring(warnAfter time.Duration) ([]*webPushSubscription, error) {
  157. rows, err := c.db.Query(selectWebPushSubscriptionsExpiringSoonQuery, time.Now().Add(-warnAfter).Unix())
  158. if err != nil {
  159. return nil, err
  160. }
  161. defer rows.Close()
  162. return c.subscriptionsFromRows(rows)
  163. }
  164. func (c *webPushStore) MarkExpiryWarningSent(subscriptions []*webPushSubscription) error {
  165. tx, err := c.db.Begin()
  166. if err != nil {
  167. return err
  168. }
  169. defer tx.Rollback()
  170. for _, subscription := range subscriptions {
  171. if _, err := tx.Exec(updateWebPushSubscriptionWarningSentQuery, time.Now().Unix(), subscription.ID); err != nil {
  172. return err
  173. }
  174. }
  175. return tx.Commit()
  176. }
  177. func (c *webPushStore) subscriptionsFromRows(rows *sql.Rows) ([]*webPushSubscription, error) {
  178. subscriptions := make([]*webPushSubscription, 0)
  179. for rows.Next() {
  180. var id, endpoint, auth, p256dh, userID string
  181. if err := rows.Scan(&id, &endpoint, &auth, &p256dh, &userID); err != nil {
  182. return nil, err
  183. }
  184. subscriptions = append(subscriptions, &webPushSubscription{
  185. ID: id,
  186. Endpoint: endpoint,
  187. Auth: auth,
  188. P256dh: p256dh,
  189. UserID: userID,
  190. })
  191. }
  192. return subscriptions, nil
  193. }
  194. func (c *webPushStore) RemoveSubscriptionsByEndpoint(endpoint string) error {
  195. _, err := c.db.Exec(deleteWebPushSubscriptionByEndpointQuery, endpoint)
  196. return err
  197. }
  198. func (c *webPushStore) RemoveSubscriptionsByUserID(userID string) error {
  199. _, err := c.db.Exec(deleteWebPushSubscriptionByUserIDQuery, userID)
  200. return err
  201. }
  202. func (c *webPushStore) RemoveExpiredSubscriptions(expireAfter time.Duration) error {
  203. _, err := c.db.Exec(deleteWebPushSubscriptionByAgeQuery, time.Now().Add(-expireAfter).Unix())
  204. return err
  205. }
  206. func (c *webPushStore) Close() error {
  207. return c.db.Close()
  208. }