server.go 78 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128
  1. package server
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/sha256"
  6. "embed"
  7. "encoding/base64"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "io"
  12. "net"
  13. "net/http"
  14. "net/http/pprof"
  15. "net/netip"
  16. "net/url"
  17. "os"
  18. "path"
  19. "path/filepath"
  20. "regexp"
  21. "sort"
  22. "strconv"
  23. "strings"
  24. "sync"
  25. "time"
  26. "unicode/utf8"
  27. "github.com/emersion/go-smtp"
  28. "github.com/gorilla/websocket"
  29. "github.com/prometheus/client_golang/prometheus/promhttp"
  30. "golang.org/x/sync/errgroup"
  31. "heckel.io/ntfy/log"
  32. "heckel.io/ntfy/user"
  33. "heckel.io/ntfy/util"
  34. "github.com/SherClockHolmes/webpush-go"
  35. )
  36. // Server is the main server, providing the UI and API for ntfy
  37. type Server struct {
  38. config *Config
  39. httpServer *http.Server
  40. httpsServer *http.Server
  41. httpMetricsServer *http.Server
  42. httpProfileServer *http.Server
  43. unixListener net.Listener
  44. smtpServer *smtp.Server
  45. smtpServerBackend *smtpBackend
  46. smtpSender mailer
  47. topics map[string]*topic
  48. visitors map[string]*visitor // ip:<ip> or user:<user>
  49. firebaseClient *firebaseClient
  50. messages int64 // Total number of messages (persisted if messageCache enabled)
  51. messagesHistory []int64 // Last n values of the messages counter, used to determine rate
  52. userManager *user.Manager // Might be nil!
  53. messageCache *messageCache // Database that stores the messages
  54. webPushSubscriptionStore *webPushSubscriptionStore // Database that stores web push subscriptions
  55. fileCache *fileCache // File system based cache that stores attachments
  56. stripe stripeAPI // Stripe API, can be replaced with a mock
  57. priceCache *util.LookupCache[map[string]int64] // Stripe price ID -> price as cents (USD implied!)
  58. metricsHandler http.Handler // Handles /metrics if enable-metrics set, and listen-metrics-http not set
  59. closeChan chan bool
  60. mu sync.RWMutex
  61. }
  62. // handleFunc extends the normal http.HandlerFunc to be able to easily return errors
  63. type handleFunc func(http.ResponseWriter, *http.Request, *visitor) error
  64. var (
  65. // If changed, don't forget to update Android App and auth_sqlite.go
  66. topicRegex = regexp.MustCompile(`^[-_A-Za-z0-9]{1,64}$`) // No /!
  67. topicPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app!
  68. externalTopicPathRegex = regexp.MustCompile(`^/[^/]+\.[^/]+/[-_A-Za-z0-9]{1,64}$`) // Extended topic path, for web-app, e.g. /example.com/mytopic
  69. jsonPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/json$`)
  70. ssePathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/sse$`)
  71. rawPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/raw$`)
  72. wsPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/ws$`)
  73. authPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/auth$`)
  74. webPushSubscribePathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/web-push/subscribe$`)
  75. webPushUnsubscribePathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/web-push/unsubscribe$`)
  76. publishPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}/(publish|send|trigger)$`)
  77. webConfigPath = "/config.js"
  78. webManifestPath = "/manifest.webmanifest"
  79. webServiceWorkerPath = "/sw.js"
  80. accountPath = "/account"
  81. matrixPushPath = "/_matrix/push/v1/notify"
  82. metricsPath = "/metrics"
  83. apiHealthPath = "/v1/health"
  84. apiStatsPath = "/v1/stats"
  85. apiTiersPath = "/v1/tiers"
  86. apiUsersPath = "/v1/users"
  87. apiUsersAccessPath = "/v1/users/access"
  88. apiAccountPath = "/v1/account"
  89. apiAccountTokenPath = "/v1/account/token"
  90. apiAccountPasswordPath = "/v1/account/password"
  91. apiAccountSettingsPath = "/v1/account/settings"
  92. apiAccountSubscriptionPath = "/v1/account/subscription"
  93. apiAccountReservationPath = "/v1/account/reservation"
  94. apiAccountPhonePath = "/v1/account/phone"
  95. apiAccountPhoneVerifyPath = "/v1/account/phone/verify"
  96. apiAccountBillingPortalPath = "/v1/account/billing/portal"
  97. apiAccountBillingWebhookPath = "/v1/account/billing/webhook"
  98. apiAccountBillingSubscriptionPath = "/v1/account/billing/subscription"
  99. apiAccountBillingSubscriptionCheckoutSuccessTemplate = "/v1/account/billing/subscription/success/{CHECKOUT_SESSION_ID}"
  100. apiAccountBillingSubscriptionCheckoutSuccessRegex = regexp.MustCompile(`/v1/account/billing/subscription/success/(.+)$`)
  101. apiAccountReservationSingleRegex = regexp.MustCompile(`/v1/account/reservation/([-_A-Za-z0-9]{1,64})$`)
  102. apiWebPushConfig = "/v1/web-push-config"
  103. staticRegex = regexp.MustCompile(`^/static/.+`)
  104. docsRegex = regexp.MustCompile(`^/docs(|/.*)$`)
  105. fileRegex = regexp.MustCompile(`^/file/([-_A-Za-z0-9]{1,64})(?:\.[A-Za-z0-9]{1,16})?$`)
  106. urlRegex = regexp.MustCompile(`^https?://`)
  107. phoneNumberRegex = regexp.MustCompile(`^\+\d{1,100}$`)
  108. //go:embed site
  109. webFs embed.FS
  110. webFsCached = &util.CachingEmbedFS{ModTime: time.Now(), FS: webFs}
  111. webSiteDir = "/site"
  112. webAppIndex = "/app.html" // React app
  113. //go:embed docs
  114. docsStaticFs embed.FS
  115. docsStaticCached = &util.CachingEmbedFS{ModTime: time.Now(), FS: docsStaticFs}
  116. )
  117. const (
  118. firebaseControlTopic = "~control" // See Android if changed
  119. firebasePollTopic = "~poll" // See iOS if changed
  120. emptyMessageBody = "triggered" // Used if message body is empty
  121. newMessageBody = "New message" // Used in poll requests as generic message
  122. defaultAttachmentMessage = "You received a file: %s" // Used if message body is empty, and there is an attachment
  123. encodingBase64 = "base64" // Used mainly for binary UnifiedPush messages
  124. jsonBodyBytesLimit = 16384 // Max number of bytes for a JSON request body
  125. unifiedPushTopicPrefix = "up" // Temporarily, we rate limit all "up*" topics based on the subscriber
  126. unifiedPushTopicLength = 14 // Length of UnifiedPush topics, including the "up" part
  127. messagesHistoryMax = 10 // Number of message count values to keep in memory
  128. )
  129. // WebSocket constants
  130. const (
  131. wsWriteWait = 2 * time.Second
  132. wsBufferSize = 1024
  133. wsReadLimit = 64 // We only ever receive PINGs
  134. wsPongWait = 15 * time.Second
  135. )
  136. // New instantiates a new Server. It creates the cache and adds a Firebase
  137. // subscriber (if configured).
  138. func New(conf *Config) (*Server, error) {
  139. var mailer mailer
  140. if conf.SMTPSenderAddr != "" {
  141. mailer = &smtpSender{config: conf}
  142. }
  143. var stripe stripeAPI
  144. if conf.StripeSecretKey != "" {
  145. stripe = newStripeAPI()
  146. }
  147. messageCache, err := createMessageCache(conf)
  148. if err != nil {
  149. return nil, err
  150. }
  151. webPushSubscriptionStore, err := createWebPushSubscriptionStore(conf)
  152. if err != nil {
  153. return nil, err
  154. }
  155. topics, err := messageCache.Topics()
  156. if err != nil {
  157. return nil, err
  158. }
  159. messages, err := messageCache.Stats()
  160. if err != nil {
  161. return nil, err
  162. }
  163. var fileCache *fileCache
  164. if conf.AttachmentCacheDir != "" {
  165. fileCache, err = newFileCache(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit)
  166. if err != nil {
  167. return nil, err
  168. }
  169. }
  170. var userManager *user.Manager
  171. if conf.AuthFile != "" {
  172. userManager, err = user.NewManager(conf.AuthFile, conf.AuthStartupQueries, conf.AuthDefault, conf.AuthBcryptCost, conf.AuthStatsQueueWriterInterval)
  173. if err != nil {
  174. return nil, err
  175. }
  176. }
  177. var firebaseClient *firebaseClient
  178. if conf.FirebaseKeyFile != "" {
  179. sender, err := newFirebaseSender(conf.FirebaseKeyFile)
  180. if err != nil {
  181. return nil, err
  182. }
  183. // This awkward logic is required because Go is weird about nil types and interfaces.
  184. // See issue #641, and https://go.dev/play/p/uur1flrv1t3 for an example
  185. var auther user.Auther
  186. if userManager != nil {
  187. auther = userManager
  188. }
  189. firebaseClient = newFirebaseClient(sender, auther)
  190. }
  191. s := &Server{
  192. config: conf,
  193. messageCache: messageCache,
  194. webPushSubscriptionStore: webPushSubscriptionStore,
  195. fileCache: fileCache,
  196. firebaseClient: firebaseClient,
  197. smtpSender: mailer,
  198. topics: topics,
  199. userManager: userManager,
  200. messages: messages,
  201. messagesHistory: []int64{messages},
  202. visitors: make(map[string]*visitor),
  203. stripe: stripe,
  204. }
  205. s.priceCache = util.NewLookupCache(s.fetchStripePrices, conf.StripePriceCacheDuration)
  206. return s, nil
  207. }
  208. func createMessageCache(conf *Config) (*messageCache, error) {
  209. if conf.CacheDuration == 0 {
  210. return newNopCache()
  211. } else if conf.CacheFile != "" {
  212. return newSqliteCache(conf.CacheFile, conf.CacheStartupQueries, conf.CacheDuration, conf.CacheBatchSize, conf.CacheBatchTimeout, false)
  213. }
  214. return newMemCache()
  215. }
  216. func createWebPushSubscriptionStore(conf *Config) (*webPushSubscriptionStore, error) {
  217. if !conf.WebPushEnabled {
  218. return nil, nil
  219. }
  220. return newWebPushSubscriptionStore(conf.WebPushSubscriptionsFile)
  221. }
  222. // Run executes the main server. It listens on HTTP (+ HTTPS, if configured), and starts
  223. // a manager go routine to print stats and prune messages.
  224. func (s *Server) Run() error {
  225. var listenStr string
  226. if s.config.ListenHTTP != "" {
  227. listenStr += fmt.Sprintf(" %s[http]", s.config.ListenHTTP)
  228. }
  229. if s.config.ListenHTTPS != "" {
  230. listenStr += fmt.Sprintf(" %s[https]", s.config.ListenHTTPS)
  231. }
  232. if s.config.ListenUnix != "" {
  233. listenStr += fmt.Sprintf(" %s[unix]", s.config.ListenUnix)
  234. }
  235. if s.config.SMTPServerListen != "" {
  236. listenStr += fmt.Sprintf(" %s[smtp]", s.config.SMTPServerListen)
  237. }
  238. if s.config.MetricsListenHTTP != "" {
  239. listenStr += fmt.Sprintf(" %s[http/metrics]", s.config.MetricsListenHTTP)
  240. }
  241. if s.config.ProfileListenHTTP != "" {
  242. listenStr += fmt.Sprintf(" %s[http/profile]", s.config.ProfileListenHTTP)
  243. }
  244. log.Tag(tagStartup).Info("Listening on%s, ntfy %s, log level is %s", listenStr, s.config.Version, log.CurrentLevel().String())
  245. if log.IsFile() {
  246. fmt.Fprintf(os.Stderr, "Listening on%s, ntfy %s\n", listenStr, s.config.Version)
  247. fmt.Fprintf(os.Stderr, "Logs are written to %s\n", log.File())
  248. }
  249. mux := http.NewServeMux()
  250. mux.HandleFunc("/", s.handle)
  251. errChan := make(chan error)
  252. s.mu.Lock()
  253. s.closeChan = make(chan bool)
  254. if s.config.ListenHTTP != "" {
  255. s.httpServer = &http.Server{Addr: s.config.ListenHTTP, Handler: mux}
  256. go func() {
  257. errChan <- s.httpServer.ListenAndServe()
  258. }()
  259. }
  260. if s.config.ListenHTTPS != "" {
  261. s.httpsServer = &http.Server{Addr: s.config.ListenHTTPS, Handler: mux}
  262. go func() {
  263. errChan <- s.httpsServer.ListenAndServeTLS(s.config.CertFile, s.config.KeyFile)
  264. }()
  265. }
  266. if s.config.ListenUnix != "" {
  267. go func() {
  268. var err error
  269. s.mu.Lock()
  270. os.Remove(s.config.ListenUnix)
  271. s.unixListener, err = net.Listen("unix", s.config.ListenUnix)
  272. if err != nil {
  273. s.mu.Unlock()
  274. errChan <- err
  275. return
  276. }
  277. defer s.unixListener.Close()
  278. if s.config.ListenUnixMode > 0 {
  279. if err := os.Chmod(s.config.ListenUnix, s.config.ListenUnixMode); err != nil {
  280. s.mu.Unlock()
  281. errChan <- err
  282. return
  283. }
  284. }
  285. s.mu.Unlock()
  286. httpServer := &http.Server{Handler: mux}
  287. errChan <- httpServer.Serve(s.unixListener)
  288. }()
  289. }
  290. if s.config.MetricsListenHTTP != "" {
  291. initMetrics()
  292. s.httpMetricsServer = &http.Server{Addr: s.config.MetricsListenHTTP, Handler: promhttp.Handler()}
  293. go func() {
  294. errChan <- s.httpMetricsServer.ListenAndServe()
  295. }()
  296. } else if s.config.EnableMetrics {
  297. initMetrics()
  298. s.metricsHandler = promhttp.Handler()
  299. }
  300. if s.config.ProfileListenHTTP != "" {
  301. profileMux := http.NewServeMux()
  302. profileMux.HandleFunc("/debug/pprof/", pprof.Index)
  303. profileMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
  304. profileMux.HandleFunc("/debug/pprof/profile", pprof.Profile)
  305. profileMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
  306. profileMux.HandleFunc("/debug/pprof/trace", pprof.Trace)
  307. s.httpProfileServer = &http.Server{Addr: s.config.ProfileListenHTTP, Handler: profileMux}
  308. go func() {
  309. errChan <- s.httpProfileServer.ListenAndServe()
  310. }()
  311. }
  312. if s.config.SMTPServerListen != "" {
  313. go func() {
  314. errChan <- s.runSMTPServer()
  315. }()
  316. }
  317. s.mu.Unlock()
  318. go s.runManager()
  319. go s.runStatsResetter()
  320. go s.runDelayedSender()
  321. go s.runFirebaseKeepaliver()
  322. return <-errChan
  323. }
  324. // Stop stops HTTP (+HTTPS) server and all managers
  325. func (s *Server) Stop() {
  326. s.mu.Lock()
  327. defer s.mu.Unlock()
  328. if s.httpServer != nil {
  329. s.httpServer.Close()
  330. }
  331. if s.httpsServer != nil {
  332. s.httpsServer.Close()
  333. }
  334. if s.unixListener != nil {
  335. s.unixListener.Close()
  336. }
  337. if s.smtpServer != nil {
  338. s.smtpServer.Close()
  339. }
  340. s.closeDatabases()
  341. close(s.closeChan)
  342. }
  343. func (s *Server) closeDatabases() {
  344. if s.userManager != nil {
  345. s.userManager.Close()
  346. }
  347. s.messageCache.Close()
  348. if s.webPushSubscriptionStore != nil {
  349. s.webPushSubscriptionStore.Close()
  350. }
  351. }
  352. // handle is the main entry point for all HTTP requests
  353. func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
  354. v, err := s.maybeAuthenticate(r) // Note: Always returns v, even when error is returned
  355. if err != nil {
  356. s.handleError(w, r, v, err)
  357. return
  358. }
  359. ev := logvr(v, r)
  360. if ev.IsTrace() {
  361. ev.Field("http_request", renderHTTPRequest(r)).Trace("HTTP request started")
  362. } else if logvr(v, r).IsDebug() {
  363. ev.Debug("HTTP request started")
  364. }
  365. logvr(v, r).
  366. Timing(func() {
  367. if err := s.handleInternal(w, r, v); err != nil {
  368. s.handleError(w, r, v, err)
  369. return
  370. }
  371. if metricHTTPRequests != nil {
  372. metricHTTPRequests.WithLabelValues("200", "20000", r.Method).Inc()
  373. }
  374. }).
  375. Debug("HTTP request finished")
  376. }
  377. func (s *Server) handleError(w http.ResponseWriter, r *http.Request, v *visitor, err error) {
  378. httpErr, ok := err.(*errHTTP)
  379. if !ok {
  380. httpErr = errHTTPInternalError
  381. }
  382. if metricHTTPRequests != nil {
  383. metricHTTPRequests.WithLabelValues(fmt.Sprintf("%d", httpErr.HTTPCode), fmt.Sprintf("%d", httpErr.Code), r.Method).Inc()
  384. }
  385. isRateLimiting := util.Contains(rateLimitingErrorCodes, httpErr.HTTPCode)
  386. isNormalError := strings.Contains(err.Error(), "i/o timeout") || util.Contains(normalErrorCodes, httpErr.HTTPCode)
  387. ev := logvr(v, r).Err(err)
  388. if websocket.IsWebSocketUpgrade(r) {
  389. ev.Tag(tagWebsocket).Fields(websocketErrorContext(err))
  390. if isNormalError {
  391. ev.Debug("WebSocket error (this error is okay, it happens a lot): %s", err.Error())
  392. } else {
  393. ev.Info("WebSocket error: %s", err.Error())
  394. }
  395. return // Do not attempt to write to upgraded connection
  396. }
  397. if isNormalError {
  398. ev.Debug("Connection closed with HTTP %d (ntfy error %d)", httpErr.HTTPCode, httpErr.Code)
  399. } else {
  400. ev.Info("Connection closed with HTTP %d (ntfy error %d)", httpErr.HTTPCode, httpErr.Code)
  401. }
  402. if isRateLimiting && s.config.StripeSecretKey != "" {
  403. u := v.User()
  404. if u == nil || u.Tier == nil {
  405. httpErr = httpErr.Wrap("increase your limits with a paid plan, see %s", s.config.BaseURL)
  406. }
  407. }
  408. w.Header().Set("Content-Type", "application/json")
  409. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  410. w.WriteHeader(httpErr.HTTPCode)
  411. io.WriteString(w, httpErr.JSON()+"\n")
  412. }
  413. func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request, v *visitor) error {
  414. if r.Method == http.MethodGet && r.URL.Path == "/" && s.config.WebRoot == "/" {
  415. return s.ensureWebEnabled(s.handleRoot)(w, r, v)
  416. } else if r.Method == http.MethodHead && r.URL.Path == "/" {
  417. return s.ensureWebEnabled(s.handleEmpty)(w, r, v)
  418. } else if r.Method == http.MethodGet && r.URL.Path == apiHealthPath {
  419. return s.handleHealth(w, r, v)
  420. } else if r.Method == http.MethodGet && r.URL.Path == webConfigPath {
  421. return s.ensureWebEnabled(s.handleWebConfig)(w, r, v)
  422. } else if r.Method == http.MethodGet && r.URL.Path == webManifestPath {
  423. return s.ensureWebEnabled(s.handleWebManifest)(w, r, v)
  424. } else if r.Method == http.MethodGet && r.URL.Path == webServiceWorkerPath {
  425. return s.ensureWebEnabled(s.handleStatic)(w, r, v)
  426. } else if r.Method == http.MethodGet && r.URL.Path == apiUsersPath {
  427. return s.ensureAdmin(s.handleUsersGet)(w, r, v)
  428. } else if r.Method == http.MethodPut && r.URL.Path == apiUsersPath {
  429. return s.ensureAdmin(s.handleUsersAdd)(w, r, v)
  430. } else if r.Method == http.MethodDelete && r.URL.Path == apiUsersPath {
  431. return s.ensureAdmin(s.handleUsersDelete)(w, r, v)
  432. } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && r.URL.Path == apiUsersAccessPath {
  433. return s.ensureAdmin(s.handleAccessAllow)(w, r, v)
  434. } else if r.Method == http.MethodDelete && r.URL.Path == apiUsersAccessPath {
  435. return s.ensureAdmin(s.handleAccessReset)(w, r, v)
  436. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountPath {
  437. return s.ensureUserManager(s.handleAccountCreate)(w, r, v)
  438. } else if r.Method == http.MethodGet && r.URL.Path == apiAccountPath {
  439. return s.handleAccountGet(w, r, v) // Allowed by anonymous
  440. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountPath {
  441. return s.ensureUser(s.withAccountSync(s.handleAccountDelete))(w, r, v)
  442. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountPasswordPath {
  443. return s.ensureUser(s.handleAccountPasswordChange)(w, r, v)
  444. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountTokenPath {
  445. return s.ensureUser(s.withAccountSync(s.handleAccountTokenCreate))(w, r, v)
  446. } else if r.Method == http.MethodPatch && r.URL.Path == apiAccountTokenPath {
  447. return s.ensureUser(s.withAccountSync(s.handleAccountTokenUpdate))(w, r, v)
  448. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountTokenPath {
  449. return s.ensureUser(s.withAccountSync(s.handleAccountTokenDelete))(w, r, v)
  450. } else if r.Method == http.MethodPatch && r.URL.Path == apiAccountSettingsPath {
  451. return s.ensureUser(s.withAccountSync(s.handleAccountSettingsChange))(w, r, v)
  452. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountSubscriptionPath {
  453. return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionAdd))(w, r, v)
  454. } else if r.Method == http.MethodPatch && r.URL.Path == apiAccountSubscriptionPath {
  455. return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionChange))(w, r, v)
  456. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountSubscriptionPath {
  457. return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionDelete))(w, r, v)
  458. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountReservationPath {
  459. return s.ensureUser(s.withAccountSync(s.handleAccountReservationAdd))(w, r, v)
  460. } else if r.Method == http.MethodDelete && apiAccountReservationSingleRegex.MatchString(r.URL.Path) {
  461. return s.ensureUser(s.withAccountSync(s.handleAccountReservationDelete))(w, r, v)
  462. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingSubscriptionPath {
  463. return s.ensurePaymentsEnabled(s.ensureUser(s.handleAccountBillingSubscriptionCreate))(w, r, v) // Account sync via incoming Stripe webhook
  464. } else if r.Method == http.MethodGet && apiAccountBillingSubscriptionCheckoutSuccessRegex.MatchString(r.URL.Path) {
  465. return s.ensurePaymentsEnabled(s.ensureUserManager(s.handleAccountBillingSubscriptionCreateSuccess))(w, r, v) // No user context!
  466. } else if r.Method == http.MethodPut && r.URL.Path == apiAccountBillingSubscriptionPath {
  467. return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingSubscriptionUpdate))(w, r, v) // Account sync via incoming Stripe webhook
  468. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountBillingSubscriptionPath {
  469. return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingSubscriptionDelete))(w, r, v) // Account sync via incoming Stripe webhook
  470. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingPortalPath {
  471. return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingPortalSessionCreate))(w, r, v)
  472. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingWebhookPath {
  473. return s.ensurePaymentsEnabled(s.ensureUserManager(s.handleAccountBillingWebhook))(w, r, v) // This request comes from Stripe!
  474. } else if r.Method == http.MethodPut && r.URL.Path == apiAccountPhoneVerifyPath {
  475. return s.ensureUser(s.ensureCallsEnabled(s.withAccountSync(s.handleAccountPhoneNumberVerify)))(w, r, v)
  476. } else if r.Method == http.MethodPut && r.URL.Path == apiAccountPhonePath {
  477. return s.ensureUser(s.ensureCallsEnabled(s.withAccountSync(s.handleAccountPhoneNumberAdd)))(w, r, v)
  478. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountPhonePath {
  479. return s.ensureUser(s.ensureCallsEnabled(s.withAccountSync(s.handleAccountPhoneNumberDelete)))(w, r, v)
  480. } else if r.Method == http.MethodGet && r.URL.Path == apiStatsPath {
  481. return s.handleStats(w, r, v)
  482. } else if r.Method == http.MethodGet && r.URL.Path == apiTiersPath {
  483. return s.ensurePaymentsEnabled(s.handleBillingTiersGet)(w, r, v)
  484. } else if r.Method == http.MethodGet && r.URL.Path == apiWebPushConfig {
  485. return s.ensureWebPushEnabled(s.handleAPIWebPushConfig)(w, r, v)
  486. } else if r.Method == http.MethodGet && r.URL.Path == matrixPushPath {
  487. return s.handleMatrixDiscovery(w)
  488. } else if r.Method == http.MethodGet && r.URL.Path == metricsPath && s.metricsHandler != nil {
  489. return s.handleMetrics(w, r, v)
  490. } else if r.Method == http.MethodGet && staticRegex.MatchString(r.URL.Path) {
  491. return s.ensureWebEnabled(s.handleStatic)(w, r, v)
  492. } else if r.Method == http.MethodGet && docsRegex.MatchString(r.URL.Path) {
  493. return s.ensureWebEnabled(s.handleDocs)(w, r, v)
  494. } else if (r.Method == http.MethodGet || r.Method == http.MethodHead) && fileRegex.MatchString(r.URL.Path) && s.config.AttachmentCacheDir != "" {
  495. return s.limitRequests(s.handleFile)(w, r, v)
  496. } else if r.Method == http.MethodOptions {
  497. return s.limitRequests(s.handleOptions)(w, r, v) // Should work even if the web app is not enabled, see #598
  498. } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && r.URL.Path == "/" {
  499. return s.transformBodyJSON(s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish)))(w, r, v)
  500. } else if r.Method == http.MethodPost && r.URL.Path == matrixPushPath {
  501. return s.transformMatrixJSON(s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublishMatrix)))(w, r, v)
  502. } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && topicPathRegex.MatchString(r.URL.Path) {
  503. return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish))(w, r, v)
  504. } else if r.Method == http.MethodGet && publishPathRegex.MatchString(r.URL.Path) {
  505. return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish))(w, r, v)
  506. } else if r.Method == http.MethodGet && jsonPathRegex.MatchString(r.URL.Path) {
  507. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeJSON))(w, r, v)
  508. } else if r.Method == http.MethodGet && ssePathRegex.MatchString(r.URL.Path) {
  509. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeSSE))(w, r, v)
  510. } else if r.Method == http.MethodGet && rawPathRegex.MatchString(r.URL.Path) {
  511. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeRaw))(w, r, v)
  512. } else if r.Method == http.MethodGet && wsPathRegex.MatchString(r.URL.Path) {
  513. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeWS))(w, r, v)
  514. } else if r.Method == http.MethodGet && authPathRegex.MatchString(r.URL.Path) {
  515. return s.limitRequests(s.authorizeTopicRead(s.handleTopicAuth))(w, r, v)
  516. } else if r.Method == http.MethodPost && webPushSubscribePathRegex.MatchString(r.URL.Path) {
  517. return s.limitRequestsWithTopic(s.authorizeTopicRead(s.ensureWebPushEnabled(s.handleTopicWebPushSubscribe)))(w, r, v)
  518. } else if r.Method == http.MethodPost && webPushUnsubscribePathRegex.MatchString(r.URL.Path) {
  519. return s.limitRequestsWithTopic(s.authorizeTopicRead(s.ensureWebPushEnabled(s.handleTopicWebPushUnsubscribe)))(w, r, v)
  520. } else if r.Method == http.MethodGet && (topicPathRegex.MatchString(r.URL.Path) || externalTopicPathRegex.MatchString(r.URL.Path)) {
  521. return s.ensureWebEnabled(s.handleTopic)(w, r, v)
  522. }
  523. return errHTTPNotFound
  524. }
  525. func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request, v *visitor) error {
  526. r.URL.Path = webAppIndex
  527. return s.handleStatic(w, r, v)
  528. }
  529. func (s *Server) handleTopic(w http.ResponseWriter, r *http.Request, v *visitor) error {
  530. unifiedpush := readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see PUT/POST too!
  531. if unifiedpush {
  532. w.Header().Set("Content-Type", "application/json")
  533. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  534. _, err := io.WriteString(w, `{"unifiedpush":{"version":1}}`+"\n")
  535. return err
  536. }
  537. r.URL.Path = webAppIndex
  538. return s.handleStatic(w, r, v)
  539. }
  540. func (s *Server) handleEmpty(_ http.ResponseWriter, _ *http.Request, _ *visitor) error {
  541. return nil
  542. }
  543. func (s *Server) handleTopicAuth(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  544. return s.writeJSON(w, newSuccessResponse())
  545. }
  546. func (s *Server) handleAPIWebPushConfig(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  547. response := &apiWebPushConfigResponse{
  548. PublicKey: s.config.WebPushPublicKey,
  549. }
  550. return s.writeJSON(w, response)
  551. }
  552. func (s *Server) handleTopicWebPushSubscribe(w http.ResponseWriter, r *http.Request, v *visitor) error {
  553. var username string
  554. u := v.User()
  555. if u != nil {
  556. username = u.Name
  557. }
  558. var sub webPushSubscribePayload
  559. err := json.NewDecoder(r.Body).Decode(&sub)
  560. if err != nil || sub.BrowserSubscription.Endpoint == "" || sub.BrowserSubscription.Keys.P256dh == "" || sub.BrowserSubscription.Keys.Auth == "" {
  561. return errHTTPBadRequestWebPushSubscriptionInvalid
  562. }
  563. topic, err := fromContext[*topic](r, contextTopic)
  564. if err != nil {
  565. return err
  566. }
  567. err = s.webPushSubscriptionStore.AddSubscription(topic.ID, username, sub)
  568. if err != nil {
  569. return err
  570. }
  571. return s.writeJSON(w, newSuccessResponse())
  572. }
  573. func (s *Server) handleTopicWebPushUnsubscribe(w http.ResponseWriter, r *http.Request, _ *visitor) error {
  574. var payload webPushUnsubscribePayload
  575. err := json.NewDecoder(r.Body).Decode(&payload)
  576. if err != nil {
  577. return errHTTPBadRequestWebPushSubscriptionInvalid
  578. }
  579. topic, err := fromContext[*topic](r, contextTopic)
  580. if err != nil {
  581. return err
  582. }
  583. err = s.webPushSubscriptionStore.RemoveSubscription(topic.ID, payload.Endpoint)
  584. if err != nil {
  585. return err
  586. }
  587. return s.writeJSON(w, newSuccessResponse())
  588. }
  589. func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  590. response := &apiHealthResponse{
  591. Healthy: true,
  592. }
  593. return s.writeJSON(w, response)
  594. }
  595. func (s *Server) handleWebConfig(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  596. response := &apiConfigResponse{
  597. BaseURL: "", // Will translate to window.location.origin
  598. AppRoot: s.config.WebRoot,
  599. EnableLogin: s.config.EnableLogin,
  600. EnableSignup: s.config.EnableSignup,
  601. EnablePayments: s.config.StripeSecretKey != "",
  602. EnableCalls: s.config.TwilioAccount != "",
  603. EnableEmails: s.config.SMTPSenderFrom != "",
  604. EnableReservations: s.config.EnableReservations,
  605. BillingContact: s.config.BillingContact,
  606. DisallowedTopics: s.config.DisallowedTopics,
  607. }
  608. b, err := json.MarshalIndent(response, "", " ")
  609. if err != nil {
  610. return err
  611. }
  612. w.Header().Set("Content-Type", "text/javascript")
  613. _, err = io.WriteString(w, fmt.Sprintf("// Generated server configuration\nvar config = %s;\n", string(b)))
  614. return err
  615. }
  616. func (s *Server) handleWebManifest(w http.ResponseWriter, r *http.Request, v *visitor) error {
  617. w.Header().Set("Content-Type", "application/manifest+json")
  618. return s.handleStatic(w, r, v)
  619. }
  620. // handleMetrics returns Prometheus metrics. This endpoint is only called if enable-metrics is set,
  621. // and listen-metrics-http is not set.
  622. func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request, _ *visitor) error {
  623. s.metricsHandler.ServeHTTP(w, r)
  624. return nil
  625. }
  626. // handleStatic returns all static resources (excluding the docs), including the web app
  627. func (s *Server) handleStatic(w http.ResponseWriter, r *http.Request, _ *visitor) error {
  628. r.URL.Path = webSiteDir + r.URL.Path
  629. util.Gzip(http.FileServer(http.FS(webFsCached))).ServeHTTP(w, r)
  630. return nil
  631. }
  632. // handleDocs returns static resources related to the docs
  633. func (s *Server) handleDocs(w http.ResponseWriter, r *http.Request, _ *visitor) error {
  634. util.Gzip(http.FileServer(http.FS(docsStaticCached))).ServeHTTP(w, r)
  635. return nil
  636. }
  637. // handleStats returns the publicly available server stats
  638. func (s *Server) handleStats(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  639. s.mu.RLock()
  640. messages, n, rate := s.messages, len(s.messagesHistory), float64(0)
  641. if n > 1 {
  642. rate = float64(s.messagesHistory[n-1]-s.messagesHistory[0]) / (float64(n-1) * s.config.ManagerInterval.Seconds())
  643. }
  644. s.mu.RUnlock()
  645. response := &apiStatsResponse{
  646. Messages: messages,
  647. MessagesRate: rate,
  648. }
  649. return s.writeJSON(w, response)
  650. }
  651. // handleFile processes the download of attachment files. The method handles GET and HEAD requests against a file.
  652. // Before streaming the file to a client, it locates uploader (m.Sender or m.User) in the message cache, so it
  653. // can associate the download bandwidth with the uploader.
  654. func (s *Server) handleFile(w http.ResponseWriter, r *http.Request, v *visitor) error {
  655. if s.config.AttachmentCacheDir == "" {
  656. return errHTTPInternalError
  657. }
  658. matches := fileRegex.FindStringSubmatch(r.URL.Path)
  659. if len(matches) != 2 {
  660. return errHTTPInternalErrorInvalidPath
  661. }
  662. messageID := matches[1]
  663. file := filepath.Join(s.config.AttachmentCacheDir, messageID)
  664. stat, err := os.Stat(file)
  665. if err != nil {
  666. return errHTTPNotFound.Fields(log.Context{
  667. "message_id": messageID,
  668. "error_context": "filesystem",
  669. })
  670. }
  671. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  672. w.Header().Set("Content-Length", fmt.Sprintf("%d", stat.Size()))
  673. if r.Method == http.MethodHead {
  674. return nil
  675. }
  676. // Find message in database, and associate bandwidth to the uploader user
  677. // This is an easy way to
  678. // - avoid abuse (e.g. 1 uploader, 1k downloaders)
  679. // - and also uses the higher bandwidth limits of a paying user
  680. m, err := s.messageCache.Message(messageID)
  681. if err == errMessageNotFound {
  682. if s.config.CacheBatchTimeout > 0 {
  683. // Strange edge case: If we immediately after upload request the file (the web app does this for images),
  684. // and messages are persisted asynchronously, retry fetching from the database
  685. m, err = util.Retry(func() (*message, error) {
  686. return s.messageCache.Message(messageID)
  687. }, s.config.CacheBatchTimeout, 100*time.Millisecond, 300*time.Millisecond, 600*time.Millisecond)
  688. }
  689. if err != nil {
  690. return errHTTPNotFound.Fields(log.Context{
  691. "message_id": messageID,
  692. "error_context": "message_cache",
  693. })
  694. }
  695. } else if err != nil {
  696. return err
  697. }
  698. bandwidthVisitor := v
  699. if s.userManager != nil && m.User != "" {
  700. u, err := s.userManager.UserByID(m.User)
  701. if err != nil {
  702. return err
  703. }
  704. bandwidthVisitor = s.visitor(v.IP(), u)
  705. } else if m.Sender.IsValid() {
  706. bandwidthVisitor = s.visitor(m.Sender, nil)
  707. }
  708. if !bandwidthVisitor.BandwidthAllowed(stat.Size()) {
  709. return errHTTPTooManyRequestsLimitAttachmentBandwidth.With(m)
  710. }
  711. // Actually send file
  712. f, err := os.Open(file)
  713. if err != nil {
  714. return err
  715. }
  716. defer f.Close()
  717. if m.Attachment.Name != "" {
  718. w.Header().Set("Content-Disposition", "attachment; filename="+strconv.Quote(m.Attachment.Name))
  719. }
  720. _, err = io.Copy(util.NewContentTypeWriter(w, r.URL.Path), f)
  721. return err
  722. }
  723. func (s *Server) handleMatrixDiscovery(w http.ResponseWriter) error {
  724. if s.config.BaseURL == "" {
  725. return errHTTPInternalErrorMissingBaseURL
  726. }
  727. return writeMatrixDiscoveryResponse(w)
  728. }
  729. func (s *Server) handlePublishInternal(r *http.Request, v *visitor) (*message, error) {
  730. start := time.Now()
  731. t, err := fromContext[*topic](r, contextTopic)
  732. if err != nil {
  733. return nil, err
  734. }
  735. vrate, err := fromContext[*visitor](r, contextRateVisitor)
  736. if err != nil {
  737. return nil, err
  738. }
  739. body, err := util.Peek(r.Body, s.config.MessageLimit)
  740. if err != nil {
  741. return nil, err
  742. }
  743. m := newDefaultMessage(t.ID, "")
  744. cache, firebase, email, call, unifiedpush, e := s.parsePublishParams(r, m)
  745. if e != nil {
  746. return nil, e.With(t)
  747. }
  748. if unifiedpush && s.config.VisitorSubscriberRateLimiting && t.RateVisitor() == nil {
  749. // UnifiedPush clients must subscribe before publishing to allow proper subscriber-based rate limiting (see
  750. // Rate-Topics header). The 5xx response is because some app servers (in particular Mastodon) will remove
  751. // the subscription as invalid if any 400-499 code (except 429/408) is returned.
  752. // See https://github.com/mastodon/mastodon/blob/730bb3e211a84a2f30e3e2bbeae3f77149824a68/app/workers/web/push_notification_worker.rb#L35-L46
  753. return nil, errHTTPInsufficientStorageUnifiedPush.With(t)
  754. } else if !util.ContainsIP(s.config.VisitorRequestExemptIPAddrs, v.ip) && !vrate.MessageAllowed() {
  755. return nil, errHTTPTooManyRequestsLimitMessages.With(t)
  756. } else if email != "" && !vrate.EmailAllowed() {
  757. return nil, errHTTPTooManyRequestsLimitEmails.With(t)
  758. } else if call != "" {
  759. var httpErr *errHTTP
  760. call, httpErr = s.convertPhoneNumber(v.User(), call)
  761. if httpErr != nil {
  762. return nil, httpErr.With(t)
  763. } else if !vrate.CallAllowed() {
  764. return nil, errHTTPTooManyRequestsLimitCalls.With(t)
  765. }
  766. }
  767. if m.PollID != "" {
  768. m = newPollRequestMessage(t.ID, m.PollID)
  769. }
  770. m.Sender = v.IP()
  771. m.User = v.MaybeUserID()
  772. if cache {
  773. m.Expires = time.Unix(m.Time, 0).Add(v.Limits().MessageExpiryDuration).Unix()
  774. }
  775. if err := s.handlePublishBody(r, v, m, body, unifiedpush); err != nil {
  776. return nil, err
  777. }
  778. if m.Message == "" {
  779. m.Message = emptyMessageBody
  780. }
  781. delayed := m.Time > time.Now().Unix()
  782. ev := logvrm(v, r, m).
  783. Tag(tagPublish).
  784. With(t).
  785. Fields(log.Context{
  786. "message_delayed": delayed,
  787. "message_firebase": firebase,
  788. "message_unifiedpush": unifiedpush,
  789. "message_email": email,
  790. "message_call": call,
  791. })
  792. if ev.IsTrace() {
  793. ev.Field("message_body", util.MaybeMarshalJSON(m)).Trace("Received message")
  794. } else if ev.IsDebug() {
  795. ev.Debug("Received message")
  796. }
  797. if !delayed {
  798. if err := t.Publish(v, m); err != nil {
  799. return nil, err
  800. }
  801. if s.firebaseClient != nil && firebase {
  802. go s.sendToFirebase(v, m)
  803. }
  804. if s.smtpSender != nil && email != "" {
  805. go s.sendEmail(v, m, email)
  806. }
  807. if s.config.TwilioAccount != "" && call != "" {
  808. go s.callPhone(v, r, m, call)
  809. }
  810. if s.config.UpstreamBaseURL != "" && !unifiedpush { // UP messages are not sent to upstream
  811. go s.forwardPollRequest(v, m)
  812. }
  813. if s.config.WebPushEnabled {
  814. go s.publishToWebPushEndpoints(v, m)
  815. }
  816. } else {
  817. logvrm(v, r, m).Tag(tagPublish).Debug("Message delayed, will process later")
  818. }
  819. if cache {
  820. logvrm(v, r, m).Tag(tagPublish).Debug("Adding message to cache")
  821. if err := s.messageCache.AddMessage(m); err != nil {
  822. return nil, err
  823. }
  824. }
  825. u := v.User()
  826. if s.userManager != nil && u != nil && u.Tier != nil {
  827. go s.userManager.EnqueueUserStats(u.ID, v.Stats())
  828. }
  829. s.mu.Lock()
  830. s.messages++
  831. s.mu.Unlock()
  832. if unifiedpush {
  833. minc(metricUnifiedPushPublishedSuccess)
  834. }
  835. mset(metricMessagePublishDurationMillis, time.Since(start).Milliseconds())
  836. return m, nil
  837. }
  838. func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visitor) error {
  839. m, err := s.handlePublishInternal(r, v)
  840. if err != nil {
  841. minc(metricMessagesPublishedFailure)
  842. return err
  843. }
  844. minc(metricMessagesPublishedSuccess)
  845. return s.writeJSON(w, m)
  846. }
  847. func (s *Server) handlePublishMatrix(w http.ResponseWriter, r *http.Request, v *visitor) error {
  848. _, err := s.handlePublishInternal(r, v)
  849. if err != nil {
  850. minc(metricMessagesPublishedFailure)
  851. minc(metricMatrixPublishedFailure)
  852. if e, ok := err.(*errHTTP); ok && e.HTTPCode == errHTTPInsufficientStorageUnifiedPush.HTTPCode {
  853. topic, err := fromContext[*topic](r, contextTopic)
  854. if err != nil {
  855. return err
  856. }
  857. pushKey, err := fromContext[string](r, contextMatrixPushKey)
  858. if err != nil {
  859. return err
  860. }
  861. if time.Since(topic.LastAccess()) > matrixRejectPushKeyForUnifiedPushTopicWithoutRateVisitorAfter {
  862. return writeMatrixResponse(w, pushKey)
  863. }
  864. }
  865. return err
  866. }
  867. minc(metricMessagesPublishedSuccess)
  868. minc(metricMatrixPublishedSuccess)
  869. return writeMatrixSuccess(w)
  870. }
  871. func (s *Server) sendToFirebase(v *visitor, m *message) {
  872. logvm(v, m).Tag(tagFirebase).Debug("Publishing to Firebase")
  873. if err := s.firebaseClient.Send(v, m); err != nil {
  874. minc(metricFirebasePublishedFailure)
  875. if err == errFirebaseTemporarilyBanned {
  876. logvm(v, m).Tag(tagFirebase).Err(err).Debug("Unable to publish to Firebase: %v", err.Error())
  877. } else {
  878. logvm(v, m).Tag(tagFirebase).Err(err).Warn("Unable to publish to Firebase: %v", err.Error())
  879. }
  880. return
  881. }
  882. minc(metricFirebasePublishedSuccess)
  883. }
  884. func (s *Server) sendEmail(v *visitor, m *message, email string) {
  885. logvm(v, m).Tag(tagEmail).Field("email", email).Debug("Sending email to %s", email)
  886. if err := s.smtpSender.Send(v, m, email); err != nil {
  887. logvm(v, m).Tag(tagEmail).Field("email", email).Err(err).Warn("Unable to send email to %s: %v", email, err.Error())
  888. minc(metricEmailsPublishedFailure)
  889. return
  890. }
  891. minc(metricEmailsPublishedSuccess)
  892. }
  893. func (s *Server) forwardPollRequest(v *visitor, m *message) {
  894. topicURL := fmt.Sprintf("%s/%s", s.config.BaseURL, m.Topic)
  895. topicHash := fmt.Sprintf("%x", sha256.Sum256([]byte(topicURL)))
  896. forwardURL := fmt.Sprintf("%s/%s", s.config.UpstreamBaseURL, topicHash)
  897. logvm(v, m).Debug("Publishing poll request to %s", forwardURL)
  898. req, err := http.NewRequest("POST", forwardURL, strings.NewReader(""))
  899. if err != nil {
  900. logvm(v, m).Err(err).Warn("Unable to publish poll request")
  901. return
  902. }
  903. req.Header.Set("User-Agent", "ntfy/"+s.config.Version)
  904. req.Header.Set("X-Poll-ID", m.ID)
  905. if s.config.UpstreamAccessToken != "" {
  906. req.Header.Set("Authorization", util.BearerAuth(s.config.UpstreamAccessToken))
  907. }
  908. var httpClient = &http.Client{
  909. Timeout: time.Second * 10,
  910. }
  911. response, err := httpClient.Do(req)
  912. if err != nil {
  913. logvm(v, m).Err(err).Warn("Unable to publish poll request")
  914. return
  915. } else if response.StatusCode != http.StatusOK {
  916. if response.StatusCode == http.StatusTooManyRequests {
  917. logvm(v, m).Err(err).Warn("Unable to publish poll request, the upstream server %s responded with HTTP %s; you may solve this by sending fewer daily messages, or by configuring upstream-access-token (assuming you have an account with higher rate limits) ", s.config.UpstreamBaseURL, response.Status)
  918. } else {
  919. logvm(v, m).Err(err).Warn("Unable to publish poll request, the upstream server %s responded with HTTP %s", s.config.UpstreamBaseURL, response.Status)
  920. }
  921. return
  922. }
  923. }
  924. func (s *Server) publishToWebPushEndpoints(v *visitor, m *message) {
  925. subscriptions, err := s.webPushSubscriptionStore.GetSubscriptionsForTopic(m.Topic)
  926. if err != nil {
  927. logvm(v, m).Err(err).Warn("Unable to publish web push messages")
  928. return
  929. }
  930. totalCount := len(subscriptions)
  931. wg := &sync.WaitGroup{}
  932. wg.Add(totalCount)
  933. ctx := log.Context{"topic": m.Topic, "message_id": m.ID, "total_count": totalCount}
  934. // Importing the emojis in the service worker would add unnecessary complexity,
  935. // simply do it here for web push notifications instead
  936. var titleWithDefault string
  937. var formattedTitle string
  938. emojis, _, err := toEmojis(m.Tags)
  939. if err != nil {
  940. logvm(v, m).Err(err).Fields(ctx).Debug("Unable to publish web push message")
  941. return
  942. }
  943. if m.Title == "" {
  944. titleWithDefault = m.Topic
  945. } else {
  946. titleWithDefault = m.Title
  947. }
  948. if len(emojis) > 0 {
  949. formattedTitle = fmt.Sprintf("%s %s", strings.Join(emojis[:], " "), titleWithDefault)
  950. } else {
  951. formattedTitle = titleWithDefault
  952. }
  953. for i, xi := range subscriptions {
  954. go func(i int, sub webPushSubscription) {
  955. defer wg.Done()
  956. ctx := log.Context{"endpoint": sub.BrowserSubscription.Endpoint, "username": sub.Username, "topic": m.Topic, "message_id": m.ID}
  957. payload := &webPushPayload{
  958. SubscriptionID: fmt.Sprintf("%s/%s", s.config.BaseURL, m.Topic),
  959. Message: *m,
  960. FormattedTitle: formattedTitle,
  961. }
  962. jsonPayload, err := json.Marshal(payload)
  963. if err != nil {
  964. logvm(v, m).Err(err).Fields(ctx).Debug("Unable to publish web push message")
  965. return
  966. }
  967. resp, err := webpush.SendNotification(jsonPayload, &sub.BrowserSubscription, &webpush.Options{
  968. Subscriber: s.config.WebPushEmailAddress,
  969. VAPIDPublicKey: s.config.WebPushPublicKey,
  970. VAPIDPrivateKey: s.config.WebPushPrivateKey,
  971. // deliverability on iOS isn't great with lower urgency values,
  972. // and thus we can't really map lower ntfy priorities to lower urgency values
  973. Urgency: webpush.UrgencyHigh,
  974. })
  975. if err != nil {
  976. logvm(v, m).Err(err).Fields(ctx).Debug("Unable to publish web push message")
  977. err = s.webPushSubscriptionStore.ExpireWebPushEndpoint(sub.BrowserSubscription.Endpoint)
  978. if err != nil {
  979. logvm(v, m).Err(err).Fields(ctx).Warn("Unable to expire subscription")
  980. }
  981. return
  982. }
  983. // May want to handle at least 429 differently, but for now treat all errors the same
  984. if !(200 <= resp.StatusCode && resp.StatusCode <= 299) {
  985. logvm(v, m).Fields(ctx).Field("response", resp).Debug("Unable to publish web push message")
  986. err = s.webPushSubscriptionStore.ExpireWebPushEndpoint(sub.BrowserSubscription.Endpoint)
  987. if err != nil {
  988. logvm(v, m).Err(err).Fields(ctx).Warn("Unable to expire subscription")
  989. }
  990. return
  991. }
  992. }(i, xi)
  993. }
  994. }
  995. func (s *Server) parsePublishParams(r *http.Request, m *message) (cache bool, firebase bool, email, call string, unifiedpush bool, err *errHTTP) {
  996. cache = readBoolParam(r, true, "x-cache", "cache")
  997. firebase = readBoolParam(r, true, "x-firebase", "firebase")
  998. m.Title = readParam(r, "x-title", "title", "t")
  999. m.Click = readParam(r, "x-click", "click")
  1000. icon := readParam(r, "x-icon", "icon")
  1001. filename := readParam(r, "x-filename", "filename", "file", "f")
  1002. attach := readParam(r, "x-attach", "attach", "a")
  1003. if attach != "" || filename != "" {
  1004. m.Attachment = &attachment{}
  1005. }
  1006. if filename != "" {
  1007. m.Attachment.Name = filename
  1008. }
  1009. if attach != "" {
  1010. if !urlRegex.MatchString(attach) {
  1011. return false, false, "", "", false, errHTTPBadRequestAttachmentURLInvalid
  1012. }
  1013. m.Attachment.URL = attach
  1014. if m.Attachment.Name == "" {
  1015. u, err := url.Parse(m.Attachment.URL)
  1016. if err == nil {
  1017. m.Attachment.Name = path.Base(u.Path)
  1018. if m.Attachment.Name == "." || m.Attachment.Name == "/" {
  1019. m.Attachment.Name = ""
  1020. }
  1021. }
  1022. }
  1023. if m.Attachment.Name == "" {
  1024. m.Attachment.Name = "attachment"
  1025. }
  1026. }
  1027. if icon != "" {
  1028. if !urlRegex.MatchString(icon) {
  1029. return false, false, "", "", false, errHTTPBadRequestIconURLInvalid
  1030. }
  1031. m.Icon = icon
  1032. }
  1033. email = readParam(r, "x-email", "x-e-mail", "email", "e-mail", "mail", "e")
  1034. if s.smtpSender == nil && email != "" {
  1035. return false, false, "", "", false, errHTTPBadRequestEmailDisabled
  1036. }
  1037. call = readParam(r, "x-call", "call")
  1038. if call != "" && (s.config.TwilioAccount == "" || s.userManager == nil) {
  1039. return false, false, "", "", false, errHTTPBadRequestPhoneCallsDisabled
  1040. } else if call != "" && !isBoolValue(call) && !phoneNumberRegex.MatchString(call) {
  1041. return false, false, "", "", false, errHTTPBadRequestPhoneNumberInvalid
  1042. }
  1043. messageStr := strings.ReplaceAll(readParam(r, "x-message", "message", "m"), "\\n", "\n")
  1044. if messageStr != "" {
  1045. m.Message = messageStr
  1046. }
  1047. var e error
  1048. m.Priority, e = util.ParsePriority(readParam(r, "x-priority", "priority", "prio", "p"))
  1049. if e != nil {
  1050. return false, false, "", "", false, errHTTPBadRequestPriorityInvalid
  1051. }
  1052. m.Tags = readCommaSeparatedParam(r, "x-tags", "tags", "tag", "ta")
  1053. delayStr := readParam(r, "x-delay", "delay", "x-at", "at", "x-in", "in")
  1054. if delayStr != "" {
  1055. if !cache {
  1056. return false, false, "", "", false, errHTTPBadRequestDelayNoCache
  1057. }
  1058. if email != "" {
  1059. return false, false, "", "", false, errHTTPBadRequestDelayNoEmail // we cannot store the email address (yet)
  1060. }
  1061. if call != "" {
  1062. return false, false, "", "", false, errHTTPBadRequestDelayNoCall // we cannot store the phone number (yet)
  1063. }
  1064. delay, err := util.ParseFutureTime(delayStr, time.Now())
  1065. if err != nil {
  1066. return false, false, "", "", false, errHTTPBadRequestDelayCannotParse
  1067. } else if delay.Unix() < time.Now().Add(s.config.MinDelay).Unix() {
  1068. return false, false, "", "", false, errHTTPBadRequestDelayTooSmall
  1069. } else if delay.Unix() > time.Now().Add(s.config.MaxDelay).Unix() {
  1070. return false, false, "", "", false, errHTTPBadRequestDelayTooLarge
  1071. }
  1072. m.Time = delay.Unix()
  1073. }
  1074. actionsStr := readParam(r, "x-actions", "actions", "action")
  1075. if actionsStr != "" {
  1076. m.Actions, e = parseActions(actionsStr)
  1077. if e != nil {
  1078. return false, false, "", "", false, errHTTPBadRequestActionsInvalid.Wrap(e.Error())
  1079. }
  1080. }
  1081. unifiedpush = readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see GET too!
  1082. if unifiedpush {
  1083. firebase = false
  1084. unifiedpush = true
  1085. }
  1086. m.PollID = readParam(r, "x-poll-id", "poll-id")
  1087. if m.PollID != "" {
  1088. unifiedpush = false
  1089. cache = false
  1090. email = ""
  1091. }
  1092. return cache, firebase, email, call, unifiedpush, nil
  1093. }
  1094. // handlePublishBody consumes the PUT/POST body and decides whether the body is an attachment or the message.
  1095. //
  1096. // 1. curl -X POST -H "Poll: 1234" ntfy.sh/...
  1097. // If a message is flagged as poll request, the body does not matter and is discarded
  1098. // 2. curl -T somebinarydata.bin "ntfy.sh/mytopic?up=1"
  1099. // If body is binary, encode as base64, if not do not encode
  1100. // 3. curl -H "Attach: http://example.com/file.jpg" ntfy.sh/mytopic
  1101. // Body must be a message, because we attached an external URL
  1102. // 4. curl -T short.txt -H "Filename: short.txt" ntfy.sh/mytopic
  1103. // Body must be attachment, because we passed a filename
  1104. // 5. curl -T file.txt ntfy.sh/mytopic
  1105. // If file.txt is <= 4096 (message limit) and valid UTF-8, treat it as a message
  1106. // 6. curl -T file.txt ntfy.sh/mytopic
  1107. // If file.txt is > message limit, treat it as an attachment
  1108. func (s *Server) handlePublishBody(r *http.Request, v *visitor, m *message, body *util.PeekedReadCloser, unifiedpush bool) error {
  1109. if m.Event == pollRequestEvent { // Case 1
  1110. return s.handleBodyDiscard(body)
  1111. } else if unifiedpush {
  1112. return s.handleBodyAsMessageAutoDetect(m, body) // Case 2
  1113. } else if m.Attachment != nil && m.Attachment.URL != "" {
  1114. return s.handleBodyAsTextMessage(m, body) // Case 3
  1115. } else if m.Attachment != nil && m.Attachment.Name != "" {
  1116. return s.handleBodyAsAttachment(r, v, m, body) // Case 4
  1117. } else if !body.LimitReached && utf8.Valid(body.PeekedBytes) {
  1118. return s.handleBodyAsTextMessage(m, body) // Case 5
  1119. }
  1120. return s.handleBodyAsAttachment(r, v, m, body) // Case 6
  1121. }
  1122. func (s *Server) handleBodyDiscard(body *util.PeekedReadCloser) error {
  1123. _, err := io.Copy(io.Discard, body)
  1124. _ = body.Close()
  1125. return err
  1126. }
  1127. func (s *Server) handleBodyAsMessageAutoDetect(m *message, body *util.PeekedReadCloser) error {
  1128. if utf8.Valid(body.PeekedBytes) {
  1129. m.Message = string(body.PeekedBytes) // Do not trim
  1130. } else {
  1131. m.Message = base64.StdEncoding.EncodeToString(body.PeekedBytes)
  1132. m.Encoding = encodingBase64
  1133. }
  1134. return nil
  1135. }
  1136. func (s *Server) handleBodyAsTextMessage(m *message, body *util.PeekedReadCloser) error {
  1137. if !utf8.Valid(body.PeekedBytes) {
  1138. return errHTTPBadRequestMessageNotUTF8.With(m)
  1139. }
  1140. if len(body.PeekedBytes) > 0 { // Empty body should not override message (publish via GET!)
  1141. m.Message = strings.TrimSpace(string(body.PeekedBytes)) // Truncates the message to the peek limit if required
  1142. }
  1143. if m.Attachment != nil && m.Attachment.Name != "" && m.Message == "" {
  1144. m.Message = fmt.Sprintf(defaultAttachmentMessage, m.Attachment.Name)
  1145. }
  1146. return nil
  1147. }
  1148. func (s *Server) handleBodyAsAttachment(r *http.Request, v *visitor, m *message, body *util.PeekedReadCloser) error {
  1149. if s.fileCache == nil || s.config.BaseURL == "" || s.config.AttachmentCacheDir == "" {
  1150. return errHTTPBadRequestAttachmentsDisallowed.With(m)
  1151. }
  1152. vinfo, err := v.Info()
  1153. if err != nil {
  1154. return err
  1155. }
  1156. attachmentExpiry := time.Now().Add(vinfo.Limits.AttachmentExpiryDuration).Unix()
  1157. if m.Time > attachmentExpiry {
  1158. return errHTTPBadRequestAttachmentsExpiryBeforeDelivery.With(m)
  1159. }
  1160. contentLengthStr := r.Header.Get("Content-Length")
  1161. if contentLengthStr != "" { // Early "do-not-trust" check, hard limit see below
  1162. contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64)
  1163. if err == nil && (contentLength > vinfo.Stats.AttachmentTotalSizeRemaining || contentLength > vinfo.Limits.AttachmentFileSizeLimit) {
  1164. return errHTTPEntityTooLargeAttachment.With(m).Fields(log.Context{
  1165. "message_content_length": contentLength,
  1166. "attachment_total_size_remaining": vinfo.Stats.AttachmentTotalSizeRemaining,
  1167. "attachment_file_size_limit": vinfo.Limits.AttachmentFileSizeLimit,
  1168. })
  1169. }
  1170. }
  1171. if m.Attachment == nil {
  1172. m.Attachment = &attachment{}
  1173. }
  1174. var ext string
  1175. m.Attachment.Expires = attachmentExpiry
  1176. m.Attachment.Type, ext = util.DetectContentType(body.PeekedBytes, m.Attachment.Name)
  1177. m.Attachment.URL = fmt.Sprintf("%s/file/%s%s", s.config.BaseURL, m.ID, ext)
  1178. if m.Attachment.Name == "" {
  1179. m.Attachment.Name = fmt.Sprintf("attachment%s", ext)
  1180. }
  1181. if m.Message == "" {
  1182. m.Message = fmt.Sprintf(defaultAttachmentMessage, m.Attachment.Name)
  1183. }
  1184. limiters := []util.Limiter{
  1185. v.BandwidthLimiter(),
  1186. util.NewFixedLimiter(vinfo.Limits.AttachmentFileSizeLimit),
  1187. util.NewFixedLimiter(vinfo.Stats.AttachmentTotalSizeRemaining),
  1188. }
  1189. m.Attachment.Size, err = s.fileCache.Write(m.ID, body, limiters...)
  1190. if err == util.ErrLimitReached {
  1191. return errHTTPEntityTooLargeAttachment.With(m)
  1192. } else if err != nil {
  1193. return err
  1194. }
  1195. return nil
  1196. }
  1197. func (s *Server) handleSubscribeJSON(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1198. encoder := func(msg *message) (string, error) {
  1199. var buf bytes.Buffer
  1200. if err := json.NewEncoder(&buf).Encode(&msg); err != nil {
  1201. return "", err
  1202. }
  1203. return buf.String(), nil
  1204. }
  1205. return s.handleSubscribeHTTP(w, r, v, "application/x-ndjson", encoder)
  1206. }
  1207. func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1208. encoder := func(msg *message) (string, error) {
  1209. var buf bytes.Buffer
  1210. if err := json.NewEncoder(&buf).Encode(&msg); err != nil {
  1211. return "", err
  1212. }
  1213. if msg.Event != messageEvent {
  1214. return fmt.Sprintf("event: %s\ndata: %s\n", msg.Event, buf.String()), nil // Browser's .onmessage() does not fire on this!
  1215. }
  1216. return fmt.Sprintf("data: %s\n", buf.String()), nil
  1217. }
  1218. return s.handleSubscribeHTTP(w, r, v, "text/event-stream", encoder)
  1219. }
  1220. func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1221. encoder := func(msg *message) (string, error) {
  1222. if msg.Event == messageEvent { // only handle default events
  1223. return strings.ReplaceAll(msg.Message, "\n", " ") + "\n", nil
  1224. }
  1225. return "\n", nil // "keepalive" and "open" events just send an empty line
  1226. }
  1227. return s.handleSubscribeHTTP(w, r, v, "text/plain", encoder)
  1228. }
  1229. func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *visitor, contentType string, encoder messageEncoder) error {
  1230. logvr(v, r).Tag(tagSubscribe).Debug("HTTP stream connection opened")
  1231. defer logvr(v, r).Tag(tagSubscribe).Debug("HTTP stream connection closed")
  1232. if !v.SubscriptionAllowed() {
  1233. return errHTTPTooManyRequestsLimitSubscriptions
  1234. }
  1235. defer v.RemoveSubscription()
  1236. topics, topicsStr, err := s.topicsFromPath(r.URL.Path)
  1237. if err != nil {
  1238. return err
  1239. }
  1240. poll, since, scheduled, filters, rateTopics, err := parseSubscribeParams(r)
  1241. if err != nil {
  1242. return err
  1243. }
  1244. var wlock sync.Mutex
  1245. defer func() {
  1246. // Hack: This is the fix for a horrible data race that I have not been able to figure out in quite some time.
  1247. // It appears to be happening when the Go HTTP code reads from the socket when closing the request (i.e. AFTER
  1248. // this function returns), and causes a data race with the ResponseWriter. Locking wlock here silences the
  1249. // data race detector. See https://github.com/binwiederhier/ntfy/issues/338#issuecomment-1163425889.
  1250. wlock.TryLock()
  1251. }()
  1252. sub := func(v *visitor, msg *message) error {
  1253. if !filters.Pass(msg) {
  1254. return nil
  1255. }
  1256. m, err := encoder(msg)
  1257. if err != nil {
  1258. return err
  1259. }
  1260. wlock.Lock()
  1261. defer wlock.Unlock()
  1262. if _, err := w.Write([]byte(m)); err != nil {
  1263. return err
  1264. }
  1265. if fl, ok := w.(http.Flusher); ok {
  1266. fl.Flush()
  1267. }
  1268. return nil
  1269. }
  1270. if err := s.maybeSetRateVisitors(r, v, topics, rateTopics); err != nil {
  1271. return err
  1272. }
  1273. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1274. w.Header().Set("Content-Type", contentType+"; charset=utf-8") // Android/Volley client needs charset!
  1275. if poll {
  1276. for _, t := range topics {
  1277. t.Keepalive()
  1278. }
  1279. return s.sendOldMessages(topics, since, scheduled, v, sub)
  1280. }
  1281. ctx, cancel := context.WithCancel(context.Background())
  1282. defer cancel()
  1283. subscriberIDs := make([]int, 0)
  1284. for _, t := range topics {
  1285. subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel))
  1286. }
  1287. defer func() {
  1288. for i, subscriberID := range subscriberIDs {
  1289. topics[i].Unsubscribe(subscriberID) // Order!
  1290. }
  1291. }()
  1292. if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message
  1293. return err
  1294. }
  1295. if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {
  1296. return err
  1297. }
  1298. for {
  1299. select {
  1300. case <-ctx.Done():
  1301. return nil
  1302. case <-r.Context().Done():
  1303. return nil
  1304. case <-time.After(s.config.KeepaliveInterval):
  1305. ev := logvr(v, r).Tag(tagSubscribe)
  1306. if len(topics) == 1 {
  1307. ev.With(topics[0]).Trace("Sending keepalive message to %s", topics[0].ID)
  1308. } else {
  1309. ev.Trace("Sending keepalive message to %d topics", len(topics))
  1310. }
  1311. v.Keepalive()
  1312. for _, t := range topics {
  1313. t.Keepalive()
  1314. }
  1315. if err := sub(v, newKeepaliveMessage(topicsStr)); err != nil { // Send keepalive message
  1316. return err
  1317. }
  1318. }
  1319. }
  1320. }
  1321. func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1322. if strings.ToLower(r.Header.Get("Upgrade")) != "websocket" {
  1323. return errHTTPBadRequestWebSocketsUpgradeHeaderMissing
  1324. }
  1325. if !v.SubscriptionAllowed() {
  1326. return errHTTPTooManyRequestsLimitSubscriptions
  1327. }
  1328. defer v.RemoveSubscription()
  1329. logvr(v, r).Tag(tagWebsocket).Debug("WebSocket connection opened")
  1330. defer logvr(v, r).Tag(tagWebsocket).Debug("WebSocket connection closed")
  1331. topics, topicsStr, err := s.topicsFromPath(r.URL.Path)
  1332. if err != nil {
  1333. return err
  1334. }
  1335. poll, since, scheduled, filters, rateTopics, err := parseSubscribeParams(r)
  1336. if err != nil {
  1337. return err
  1338. }
  1339. upgrader := &websocket.Upgrader{
  1340. ReadBufferSize: wsBufferSize,
  1341. WriteBufferSize: wsBufferSize,
  1342. CheckOrigin: func(r *http.Request) bool {
  1343. return true // We're open for business!
  1344. },
  1345. }
  1346. conn, err := upgrader.Upgrade(w, r, nil)
  1347. if err != nil {
  1348. return err
  1349. }
  1350. defer conn.Close()
  1351. // Subscription connections can be canceled externally, see topic.CancelSubscribersExceptUser
  1352. cancelCtx, cancel := context.WithCancel(context.Background())
  1353. defer cancel()
  1354. // Use errgroup to run WebSocket reader and writer in Go routines
  1355. var wlock sync.Mutex
  1356. g, gctx := errgroup.WithContext(cancelCtx)
  1357. g.Go(func() error {
  1358. pongWait := s.config.KeepaliveInterval + wsPongWait
  1359. conn.SetReadLimit(wsReadLimit)
  1360. if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
  1361. return err
  1362. }
  1363. conn.SetPongHandler(func(appData string) error {
  1364. logvr(v, r).Tag(tagWebsocket).Trace("Received WebSocket pong")
  1365. return conn.SetReadDeadline(time.Now().Add(pongWait))
  1366. })
  1367. for {
  1368. _, _, err := conn.NextReader()
  1369. if err != nil {
  1370. return err
  1371. }
  1372. select {
  1373. case <-gctx.Done():
  1374. return nil
  1375. default:
  1376. }
  1377. }
  1378. })
  1379. g.Go(func() error {
  1380. ping := func() error {
  1381. wlock.Lock()
  1382. defer wlock.Unlock()
  1383. if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
  1384. return err
  1385. }
  1386. logvr(v, r).Tag(tagWebsocket).Trace("Sending WebSocket ping")
  1387. return conn.WriteMessage(websocket.PingMessage, nil)
  1388. }
  1389. for {
  1390. select {
  1391. case <-gctx.Done():
  1392. return nil
  1393. case <-cancelCtx.Done():
  1394. logvr(v, r).Tag(tagWebsocket).Trace("Cancel received, closing subscriber connection")
  1395. conn.Close()
  1396. return &websocket.CloseError{Code: websocket.CloseNormalClosure, Text: "subscription was canceled"}
  1397. case <-time.After(s.config.KeepaliveInterval):
  1398. v.Keepalive()
  1399. for _, t := range topics {
  1400. t.Keepalive()
  1401. }
  1402. if err := ping(); err != nil {
  1403. return err
  1404. }
  1405. }
  1406. }
  1407. })
  1408. sub := func(v *visitor, msg *message) error {
  1409. if !filters.Pass(msg) {
  1410. return nil
  1411. }
  1412. wlock.Lock()
  1413. defer wlock.Unlock()
  1414. if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
  1415. return err
  1416. }
  1417. return conn.WriteJSON(msg)
  1418. }
  1419. if err := s.maybeSetRateVisitors(r, v, topics, rateTopics); err != nil {
  1420. return err
  1421. }
  1422. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1423. if poll {
  1424. for _, t := range topics {
  1425. t.Keepalive()
  1426. }
  1427. return s.sendOldMessages(topics, since, scheduled, v, sub)
  1428. }
  1429. subscriberIDs := make([]int, 0)
  1430. for _, t := range topics {
  1431. subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel))
  1432. }
  1433. defer func() {
  1434. for i, subscriberID := range subscriberIDs {
  1435. topics[i].Unsubscribe(subscriberID) // Order!
  1436. }
  1437. }()
  1438. if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message
  1439. return err
  1440. }
  1441. if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {
  1442. return err
  1443. }
  1444. err = g.Wait()
  1445. if err != nil && websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNoStatusReceived) {
  1446. logvr(v, r).Tag(tagWebsocket).Err(err).Fields(websocketErrorContext(err)).Trace("WebSocket connection closed")
  1447. return nil // Normal closures are not errors; note: "1006 (abnormal closure)" is treated as normal, because people disconnect a lot
  1448. }
  1449. return err
  1450. }
  1451. func parseSubscribeParams(r *http.Request) (poll bool, since sinceMarker, scheduled bool, filters *queryFilter, rateTopics []string, err error) {
  1452. poll = readBoolParam(r, false, "x-poll", "poll", "po")
  1453. scheduled = readBoolParam(r, false, "x-scheduled", "scheduled", "sched")
  1454. since, err = parseSince(r, poll)
  1455. if err != nil {
  1456. return
  1457. }
  1458. filters, err = parseQueryFilters(r)
  1459. if err != nil {
  1460. return
  1461. }
  1462. rateTopics = readCommaSeparatedParam(r, "x-rate-topics", "rate-topics")
  1463. return
  1464. }
  1465. // maybeSetRateVisitors sets the rate visitor on a topic (v.SetRateVisitor), indicating that all messages published
  1466. // to that topic will be rate limited against the rate visitor instead of the publishing visitor.
  1467. //
  1468. // Setting the rate visitor is ony allowed if the `visitor-subscriber-rate-limiting` setting is enabled, AND
  1469. // - auth-file is not set (everything is open by default)
  1470. // - or the topic is reserved, and v.user is the owner
  1471. // - or the topic is not reserved, and v.user has write access
  1472. //
  1473. // Note: This TEMPORARILY also registers all topics starting with "up" (= UnifiedPush). This is to ease the transition
  1474. // until the Android app will send the "Rate-Topics" header.
  1475. func (s *Server) maybeSetRateVisitors(r *http.Request, v *visitor, topics []*topic, rateTopics []string) error {
  1476. // Bail out if not enabled
  1477. if !s.config.VisitorSubscriberRateLimiting {
  1478. return nil
  1479. }
  1480. // Make a list of topics that we'll actually set the RateVisitor on
  1481. eligibleRateTopics := make([]*topic, 0)
  1482. for _, t := range topics {
  1483. if (strings.HasPrefix(t.ID, unifiedPushTopicPrefix) && len(t.ID) == unifiedPushTopicLength) || util.Contains(rateTopics, t.ID) {
  1484. eligibleRateTopics = append(eligibleRateTopics, t)
  1485. }
  1486. }
  1487. if len(eligibleRateTopics) == 0 {
  1488. return nil
  1489. }
  1490. // If access controls are turned off, v has access to everything, and we can set the rate visitor
  1491. if s.userManager == nil {
  1492. return s.setRateVisitors(r, v, eligibleRateTopics)
  1493. }
  1494. // If access controls are enabled, only set rate visitor if
  1495. // - topic is reserved, and v.user is the owner
  1496. // - topic is not reserved, and v.user has write access
  1497. writableRateTopics := make([]*topic, 0)
  1498. for _, t := range topics {
  1499. ownerUserID, err := s.userManager.ReservationOwner(t.ID)
  1500. if err != nil {
  1501. return err
  1502. }
  1503. if ownerUserID == "" {
  1504. if err := s.userManager.Authorize(v.User(), t.ID, user.PermissionWrite); err == nil {
  1505. writableRateTopics = append(writableRateTopics, t)
  1506. }
  1507. } else if ownerUserID == v.MaybeUserID() {
  1508. writableRateTopics = append(writableRateTopics, t)
  1509. }
  1510. }
  1511. return s.setRateVisitors(r, v, writableRateTopics)
  1512. }
  1513. func (s *Server) setRateVisitors(r *http.Request, v *visitor, rateTopics []*topic) error {
  1514. for _, t := range rateTopics {
  1515. logvr(v, r).
  1516. Tag(tagSubscribe).
  1517. With(t).
  1518. Debug("Setting visitor as rate visitor for topic %s", t.ID)
  1519. t.SetRateVisitor(v)
  1520. }
  1521. return nil
  1522. }
  1523. // sendOldMessages selects old messages from the messageCache and calls sub for each of them. It uses since as the
  1524. // marker, returning only messages that are newer than the marker.
  1525. func (s *Server) sendOldMessages(topics []*topic, since sinceMarker, scheduled bool, v *visitor, sub subscriber) error {
  1526. if since.IsNone() {
  1527. return nil
  1528. }
  1529. messages := make([]*message, 0)
  1530. for _, t := range topics {
  1531. topicMessages, err := s.messageCache.Messages(t.ID, since, scheduled)
  1532. if err != nil {
  1533. return err
  1534. }
  1535. messages = append(messages, topicMessages...)
  1536. }
  1537. sort.Slice(messages, func(i, j int) bool {
  1538. return messages[i].Time < messages[j].Time
  1539. })
  1540. for _, m := range messages {
  1541. if err := sub(v, m); err != nil {
  1542. return err
  1543. }
  1544. }
  1545. return nil
  1546. }
  1547. // parseSince returns a timestamp identifying the time span from which cached messages should be received.
  1548. //
  1549. // Values in the "since=..." parameter can be either a unix timestamp or a duration (e.g. 12h), or
  1550. // "all" for all messages.
  1551. func parseSince(r *http.Request, poll bool) (sinceMarker, error) {
  1552. since := readParam(r, "x-since", "since", "si")
  1553. // Easy cases (empty, all, none)
  1554. if since == "" {
  1555. if poll {
  1556. return sinceAllMessages, nil
  1557. }
  1558. return sinceNoMessages, nil
  1559. } else if since == "all" {
  1560. return sinceAllMessages, nil
  1561. } else if since == "none" {
  1562. return sinceNoMessages, nil
  1563. }
  1564. // ID, timestamp, duration
  1565. if validMessageID(since) {
  1566. return newSinceID(since), nil
  1567. } else if s, err := strconv.ParseInt(since, 10, 64); err == nil {
  1568. return newSinceTime(s), nil
  1569. } else if d, err := time.ParseDuration(since); err == nil {
  1570. return newSinceTime(time.Now().Add(-1 * d).Unix()), nil
  1571. }
  1572. return sinceNoMessages, errHTTPBadRequestSinceInvalid
  1573. }
  1574. func (s *Server) handleOptions(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  1575. w.Header().Set("Access-Control-Allow-Methods", "GET, PUT, POST, PATCH, DELETE")
  1576. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1577. w.Header().Set("Access-Control-Allow-Headers", "*") // CORS, allow auth via JS // FIXME is this terrible?
  1578. return nil
  1579. }
  1580. // topicFromPath returns the topic from a root path (e.g. /mytopic), creating it if it doesn't exist.
  1581. func (s *Server) topicFromPath(path string) (*topic, error) {
  1582. parts := strings.Split(path, "/")
  1583. if len(parts) < 2 {
  1584. return nil, errHTTPBadRequestTopicInvalid
  1585. }
  1586. return s.topicFromID(parts[1])
  1587. }
  1588. // topicsFromPath returns the topic from a root path (e.g. /mytopic,mytopic2), creating it if it doesn't exist.
  1589. func (s *Server) topicsFromPath(path string) ([]*topic, string, error) {
  1590. parts := strings.Split(path, "/")
  1591. if len(parts) < 2 {
  1592. return nil, "", errHTTPBadRequestTopicInvalid
  1593. }
  1594. topicIDs := util.SplitNoEmpty(parts[1], ",")
  1595. topics, err := s.topicsFromIDs(topicIDs...)
  1596. if err != nil {
  1597. return nil, "", errHTTPBadRequestTopicInvalid
  1598. }
  1599. return topics, parts[1], nil
  1600. }
  1601. // topicsFromIDs returns the topics with the given IDs, creating them if they don't exist.
  1602. func (s *Server) topicsFromIDs(ids ...string) ([]*topic, error) {
  1603. s.mu.Lock()
  1604. defer s.mu.Unlock()
  1605. topics := make([]*topic, 0)
  1606. for _, id := range ids {
  1607. if util.Contains(s.config.DisallowedTopics, id) {
  1608. return nil, errHTTPBadRequestTopicDisallowed
  1609. }
  1610. if _, ok := s.topics[id]; !ok {
  1611. if len(s.topics) >= s.config.TotalTopicLimit {
  1612. return nil, errHTTPTooManyRequestsLimitTotalTopics
  1613. }
  1614. s.topics[id] = newTopic(id)
  1615. }
  1616. topics = append(topics, s.topics[id])
  1617. }
  1618. return topics, nil
  1619. }
  1620. // topicFromID returns the topic with the given ID, creating it if it doesn't exist.
  1621. func (s *Server) topicFromID(id string) (*topic, error) {
  1622. topics, err := s.topicsFromIDs(id)
  1623. if err != nil {
  1624. return nil, err
  1625. }
  1626. return topics[0], nil
  1627. }
  1628. // topicsFromPattern returns a list of topics matching the given pattern, but it does not create them.
  1629. func (s *Server) topicsFromPattern(pattern string) ([]*topic, error) {
  1630. s.mu.RLock()
  1631. defer s.mu.RUnlock()
  1632. patternRegexp, err := regexp.Compile("^" + strings.ReplaceAll(pattern, "*", ".*") + "$")
  1633. if err != nil {
  1634. return nil, err
  1635. }
  1636. topics := make([]*topic, 0)
  1637. for _, t := range s.topics {
  1638. if patternRegexp.MatchString(t.ID) {
  1639. topics = append(topics, t)
  1640. }
  1641. }
  1642. return topics, nil
  1643. }
  1644. func (s *Server) runSMTPServer() error {
  1645. s.smtpServerBackend = newMailBackend(s.config, s.handle)
  1646. s.smtpServer = smtp.NewServer(s.smtpServerBackend)
  1647. s.smtpServer.Addr = s.config.SMTPServerListen
  1648. s.smtpServer.Domain = s.config.SMTPServerDomain
  1649. s.smtpServer.ReadTimeout = 10 * time.Second
  1650. s.smtpServer.WriteTimeout = 10 * time.Second
  1651. s.smtpServer.MaxMessageBytes = 1024 * 1024 // Must be much larger than message size (headers, multipart, etc.)
  1652. s.smtpServer.MaxRecipients = 1
  1653. s.smtpServer.AllowInsecureAuth = true
  1654. return s.smtpServer.ListenAndServe()
  1655. }
  1656. func (s *Server) runManager() {
  1657. for {
  1658. select {
  1659. case <-time.After(s.config.ManagerInterval):
  1660. log.
  1661. Tag(tagManager).
  1662. Timing(s.execManager).
  1663. Debug("Manager finished")
  1664. case <-s.closeChan:
  1665. return
  1666. }
  1667. }
  1668. }
  1669. // runStatsResetter runs once a day (usually midnight UTC) to reset all the visitor's message and
  1670. // email counters. The stats are used to display the counters in the web app, as well as for rate limiting.
  1671. func (s *Server) runStatsResetter() {
  1672. for {
  1673. runAt := util.NextOccurrenceUTC(s.config.VisitorStatsResetTime, time.Now())
  1674. timer := time.NewTimer(time.Until(runAt))
  1675. log.Tag(tagResetter).Debug("Waiting until %v to reset visitor stats", runAt)
  1676. select {
  1677. case <-timer.C:
  1678. log.Tag(tagResetter).Debug("Running stats resetter")
  1679. s.resetStats()
  1680. case <-s.closeChan:
  1681. log.Tag(tagResetter).Debug("Stopping stats resetter")
  1682. timer.Stop()
  1683. return
  1684. }
  1685. }
  1686. }
  1687. func (s *Server) resetStats() {
  1688. log.Info("Resetting all visitor stats (daily task)")
  1689. s.mu.Lock()
  1690. defer s.mu.Unlock() // Includes the database query to avoid races with other processes
  1691. for _, v := range s.visitors {
  1692. v.ResetStats()
  1693. }
  1694. if s.userManager != nil {
  1695. if err := s.userManager.ResetStats(); err != nil {
  1696. log.Tag(tagResetter).Warn("Failed to write to database: %s", err.Error())
  1697. }
  1698. }
  1699. }
  1700. func (s *Server) runFirebaseKeepaliver() {
  1701. if s.firebaseClient == nil {
  1702. return
  1703. }
  1704. v := newVisitor(s.config, s.messageCache, s.userManager, netip.IPv4Unspecified(), nil) // Background process, not a real visitor, uses IP 0.0.0.0
  1705. for {
  1706. select {
  1707. case <-time.After(s.config.FirebaseKeepaliveInterval):
  1708. s.sendToFirebase(v, newKeepaliveMessage(firebaseControlTopic))
  1709. /*
  1710. FIXME: Disable iOS polling entirely for now due to thundering herd problem (see #677)
  1711. To solve this, we'd have to shard the iOS poll topics to spread out the polling evenly.
  1712. Given that it's not really necessary to poll, turning it off for now should not have any impact.
  1713. case <-time.After(s.config.FirebasePollInterval):
  1714. s.sendToFirebase(v, newKeepaliveMessage(firebasePollTopic))
  1715. */
  1716. case <-s.closeChan:
  1717. return
  1718. }
  1719. }
  1720. }
  1721. func (s *Server) runDelayedSender() {
  1722. for {
  1723. select {
  1724. case <-time.After(s.config.DelayedSenderInterval):
  1725. if err := s.sendDelayedMessages(); err != nil {
  1726. log.Tag(tagPublish).Err(err).Warn("Error sending delayed messages")
  1727. }
  1728. case <-s.closeChan:
  1729. return
  1730. }
  1731. }
  1732. }
  1733. func (s *Server) sendDelayedMessages() error {
  1734. messages, err := s.messageCache.MessagesDue()
  1735. if err != nil {
  1736. return err
  1737. }
  1738. for _, m := range messages {
  1739. var u *user.User
  1740. if s.userManager != nil && m.User != "" {
  1741. u, err = s.userManager.UserByID(m.User)
  1742. if err != nil {
  1743. log.With(m).Err(err).Warn("Error sending delayed message")
  1744. continue
  1745. }
  1746. }
  1747. v := s.visitor(m.Sender, u)
  1748. if err := s.sendDelayedMessage(v, m); err != nil {
  1749. logvm(v, m).Err(err).Warn("Error sending delayed message")
  1750. }
  1751. }
  1752. return nil
  1753. }
  1754. func (s *Server) sendDelayedMessage(v *visitor, m *message) error {
  1755. logvm(v, m).Debug("Sending delayed message")
  1756. s.mu.RLock()
  1757. t, ok := s.topics[m.Topic] // If no subscribers, just mark message as published
  1758. s.mu.RUnlock()
  1759. if ok {
  1760. go func() {
  1761. // We do not rate-limit messages here, since we've rate limited them in the PUT/POST handler
  1762. if err := t.Publish(v, m); err != nil {
  1763. logvm(v, m).Err(err).Warn("Unable to publish message")
  1764. }
  1765. }()
  1766. }
  1767. if s.firebaseClient != nil { // Firebase subscribers may not show up in topics map
  1768. go s.sendToFirebase(v, m)
  1769. }
  1770. if s.config.UpstreamBaseURL != "" {
  1771. go s.forwardPollRequest(v, m)
  1772. }
  1773. if s.config.WebPushEnabled {
  1774. go s.publishToWebPushEndpoints(v, m)
  1775. }
  1776. if err := s.messageCache.MarkPublished(m); err != nil {
  1777. return err
  1778. }
  1779. return nil
  1780. }
  1781. // transformBodyJSON peeks the request body, reads the JSON, and converts it to headers
  1782. // before passing it on to the next handler. This is meant to be used in combination with handlePublish.
  1783. func (s *Server) transformBodyJSON(next handleFunc) handleFunc {
  1784. return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1785. m, err := readJSONWithLimit[publishMessage](r.Body, s.config.MessageLimit*2, false) // 2x to account for JSON format overhead
  1786. if err != nil {
  1787. return err
  1788. }
  1789. if !topicRegex.MatchString(m.Topic) {
  1790. return errHTTPBadRequestTopicInvalid
  1791. }
  1792. if m.Message == "" {
  1793. m.Message = emptyMessageBody
  1794. }
  1795. r.URL.Path = "/" + m.Topic
  1796. r.Body = io.NopCloser(strings.NewReader(m.Message))
  1797. if m.Title != "" {
  1798. r.Header.Set("X-Title", m.Title)
  1799. }
  1800. if m.Priority != 0 {
  1801. r.Header.Set("X-Priority", fmt.Sprintf("%d", m.Priority))
  1802. }
  1803. if m.Tags != nil && len(m.Tags) > 0 {
  1804. r.Header.Set("X-Tags", strings.Join(m.Tags, ","))
  1805. }
  1806. if m.Attach != "" {
  1807. r.Header.Set("X-Attach", m.Attach)
  1808. }
  1809. if m.Filename != "" {
  1810. r.Header.Set("X-Filename", m.Filename)
  1811. }
  1812. if m.Click != "" {
  1813. r.Header.Set("X-Click", m.Click)
  1814. }
  1815. if m.Icon != "" {
  1816. r.Header.Set("X-Icon", m.Icon)
  1817. }
  1818. if len(m.Actions) > 0 {
  1819. actionsStr, err := json.Marshal(m.Actions)
  1820. if err != nil {
  1821. return errHTTPBadRequestMessageJSONInvalid
  1822. }
  1823. r.Header.Set("X-Actions", string(actionsStr))
  1824. }
  1825. if m.Email != "" {
  1826. r.Header.Set("X-Email", m.Email)
  1827. }
  1828. if m.Delay != "" {
  1829. r.Header.Set("X-Delay", m.Delay)
  1830. }
  1831. if m.Call != "" {
  1832. r.Header.Set("X-Call", m.Call)
  1833. }
  1834. return next(w, r, v)
  1835. }
  1836. }
  1837. func (s *Server) transformMatrixJSON(next handleFunc) handleFunc {
  1838. return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1839. newRequest, err := newRequestFromMatrixJSON(r, s.config.BaseURL, s.config.MessageLimit)
  1840. if err != nil {
  1841. logvr(v, r).Tag(tagMatrix).Err(err).Debug("Invalid Matrix request")
  1842. if e, ok := err.(*errMatrixPushkeyRejected); ok {
  1843. return writeMatrixResponse(w, e.rejectedPushKey)
  1844. }
  1845. return err
  1846. }
  1847. if err := next(w, newRequest, v); err != nil {
  1848. logvr(v, r).Tag(tagMatrix).Err(err).Debug("Error handling Matrix request")
  1849. return err
  1850. }
  1851. return nil
  1852. }
  1853. }
  1854. func (s *Server) authorizeTopicWrite(next handleFunc) handleFunc {
  1855. return s.autorizeTopic(next, user.PermissionWrite)
  1856. }
  1857. func (s *Server) authorizeTopicRead(next handleFunc) handleFunc {
  1858. return s.autorizeTopic(next, user.PermissionRead)
  1859. }
  1860. func (s *Server) autorizeTopic(next handleFunc, perm user.Permission) handleFunc {
  1861. return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1862. if s.userManager == nil {
  1863. return next(w, r, v)
  1864. }
  1865. topics, _, err := s.topicsFromPath(r.URL.Path)
  1866. if err != nil {
  1867. return err
  1868. }
  1869. u := v.User()
  1870. for _, t := range topics {
  1871. if err := s.userManager.Authorize(u, t.ID, perm); err != nil {
  1872. logvr(v, r).With(t).Err(err).Debug("Access to topic %s not authorized", t.ID)
  1873. return errHTTPForbidden.With(t)
  1874. }
  1875. }
  1876. return next(w, r, v)
  1877. }
  1878. }
  1879. // maybeAuthenticate reads the "Authorization" header and will try to authenticate the user
  1880. // if it is set.
  1881. //
  1882. // - If auth-file is not configured, immediately return an IP-based visitor
  1883. // - If the header is not set or not supported (anything non-Basic and non-Bearer),
  1884. // an IP-based visitor is returned
  1885. // - If the header is set, authenticate will be called to check the username/password (Basic auth),
  1886. // or the token (Bearer auth), and read the user from the database
  1887. //
  1888. // This function will ALWAYS return a visitor, even if an error occurs (e.g. unauthorized), so
  1889. // that subsequent logging calls still have a visitor context.
  1890. func (s *Server) maybeAuthenticate(r *http.Request) (*visitor, error) {
  1891. // Read "Authorization" header value, and exit out early if it's not set
  1892. ip := extractIPAddress(r, s.config.BehindProxy)
  1893. vip := s.visitor(ip, nil)
  1894. if s.userManager == nil {
  1895. return vip, nil
  1896. }
  1897. header, err := readAuthHeader(r)
  1898. if err != nil {
  1899. return vip, err
  1900. } else if !supportedAuthHeader(header) {
  1901. return vip, nil
  1902. }
  1903. // If we're trying to auth, check the rate limiter first
  1904. if !vip.AuthAllowed() {
  1905. return vip, errHTTPTooManyRequestsLimitAuthFailure // Always return visitor, even when error occurs!
  1906. }
  1907. u, err := s.authenticate(r, header)
  1908. if err != nil {
  1909. vip.AuthFailed()
  1910. logr(r).Err(err).Debug("Authentication failed")
  1911. return vip, errHTTPUnauthorized // Always return visitor, even when error occurs!
  1912. }
  1913. // Authentication with user was successful
  1914. return s.visitor(ip, u), nil
  1915. }
  1916. // authenticate a user based on basic auth username/password (Authorization: Basic ...), or token auth (Authorization: Bearer ...).
  1917. // The Authorization header can be passed as a header or the ?auth=... query param. The latter is required only to
  1918. // support the WebSocket JavaScript class, which does not support passing headers during the initial request. The auth
  1919. // query param is effectively doubly base64 encoded. Its format is base64(Basic base64(user:pass)).
  1920. func (s *Server) authenticate(r *http.Request, header string) (user *user.User, err error) {
  1921. if strings.HasPrefix(header, "Bearer") {
  1922. return s.authenticateBearerAuth(r, strings.TrimSpace(strings.TrimPrefix(header, "Bearer")))
  1923. }
  1924. return s.authenticateBasicAuth(r, header)
  1925. }
  1926. // readAuthHeader reads the raw value of the Authorization header, either from the actual HTTP header,
  1927. // or from the ?auth... query parameter
  1928. func readAuthHeader(r *http.Request) (string, error) {
  1929. value := strings.TrimSpace(r.Header.Get("Authorization"))
  1930. queryParam := readQueryParam(r, "authorization", "auth")
  1931. if queryParam != "" {
  1932. a, err := base64.RawURLEncoding.DecodeString(queryParam)
  1933. if err != nil {
  1934. return "", err
  1935. }
  1936. value = strings.TrimSpace(string(a))
  1937. }
  1938. return value, nil
  1939. }
  1940. // supportedAuthHeader returns true only if the Authorization header value starts
  1941. // with "Basic" or "Bearer". In particular, an empty value is not supported, and neither
  1942. // are things like "WebPush", or "vapid" (see #629).
  1943. func supportedAuthHeader(value string) bool {
  1944. value = strings.ToLower(value)
  1945. return strings.HasPrefix(value, "basic ") || strings.HasPrefix(value, "bearer ")
  1946. }
  1947. func (s *Server) authenticateBasicAuth(r *http.Request, value string) (user *user.User, err error) {
  1948. r.Header.Set("Authorization", value)
  1949. username, password, ok := r.BasicAuth()
  1950. if !ok {
  1951. return nil, errors.New("invalid basic auth")
  1952. } else if username == "" {
  1953. return s.authenticateBearerAuth(r, password) // Treat password as token
  1954. }
  1955. return s.userManager.Authenticate(username, password)
  1956. }
  1957. func (s *Server) authenticateBearerAuth(r *http.Request, token string) (*user.User, error) {
  1958. u, err := s.userManager.AuthenticateToken(token)
  1959. if err != nil {
  1960. return nil, err
  1961. }
  1962. ip := extractIPAddress(r, s.config.BehindProxy)
  1963. go s.userManager.EnqueueTokenUpdate(token, &user.TokenUpdate{
  1964. LastAccess: time.Now(),
  1965. LastOrigin: ip,
  1966. })
  1967. return u, nil
  1968. }
  1969. func (s *Server) visitor(ip netip.Addr, user *user.User) *visitor {
  1970. s.mu.Lock()
  1971. defer s.mu.Unlock()
  1972. id := visitorID(ip, user)
  1973. v, exists := s.visitors[id]
  1974. if !exists {
  1975. s.visitors[id] = newVisitor(s.config, s.messageCache, s.userManager, ip, user)
  1976. return s.visitors[id]
  1977. }
  1978. v.Keepalive()
  1979. v.SetUser(user) // Always update with the latest user, may be nil!
  1980. return v
  1981. }
  1982. func (s *Server) writeJSON(w http.ResponseWriter, v any) error {
  1983. w.Header().Set("Content-Type", "application/json")
  1984. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1985. if err := json.NewEncoder(w).Encode(v); err != nil {
  1986. return err
  1987. }
  1988. return nil
  1989. }
  1990. func (s *Server) updateAndWriteStats(messagesCount int64) {
  1991. s.mu.Lock()
  1992. s.messagesHistory = append(s.messagesHistory, messagesCount)
  1993. if len(s.messagesHistory) > messagesHistoryMax {
  1994. s.messagesHistory = s.messagesHistory[1:]
  1995. }
  1996. s.mu.Unlock()
  1997. go func() {
  1998. if err := s.messageCache.UpdateStats(messagesCount); err != nil {
  1999. log.Tag(tagManager).Err(err).Warn("Cannot write messages stats")
  2000. }
  2001. }()
  2002. }