server.go 79 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120
  1. package server
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/sha256"
  6. "embed"
  7. "encoding/base64"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "gopkg.in/yaml.v2"
  12. "io"
  13. "net"
  14. "net/http"
  15. "net/http/pprof"
  16. "net/netip"
  17. "net/url"
  18. "os"
  19. "path"
  20. "path/filepath"
  21. "regexp"
  22. "sort"
  23. "strconv"
  24. "strings"
  25. "sync"
  26. "text/template"
  27. "time"
  28. "unicode/utf8"
  29. "github.com/emersion/go-smtp"
  30. "github.com/gorilla/websocket"
  31. "github.com/prometheus/client_golang/prometheus/promhttp"
  32. "golang.org/x/sync/errgroup"
  33. "heckel.io/ntfy/v2/log"
  34. "heckel.io/ntfy/v2/user"
  35. "heckel.io/ntfy/v2/util"
  36. "heckel.io/ntfy/v2/util/sprig"
  37. )
  38. // Server is the main server, providing the UI and API for ntfy
  39. type Server struct {
  40. config *Config
  41. httpServer *http.Server
  42. httpsServer *http.Server
  43. httpMetricsServer *http.Server
  44. httpProfileServer *http.Server
  45. unixListener net.Listener
  46. smtpServer *smtp.Server
  47. smtpServerBackend *smtpBackend
  48. smtpSender mailer
  49. topics map[string]*topic
  50. visitors map[string]*visitor // ip:<ip> or user:<user>
  51. firebaseClient *firebaseClient
  52. messages int64 // Total number of messages (persisted if messageCache enabled)
  53. messagesHistory []int64 // Last n values of the messages counter, used to determine rate
  54. userManager *user.Manager // Might be nil!
  55. messageCache *messageCache // Database that stores the messages
  56. webPush *webPushStore // Database that stores web push subscriptions
  57. fileCache *fileCache // File system based cache that stores attachments
  58. stripe stripeAPI // Stripe API, can be replaced with a mock
  59. priceCache *util.LookupCache[map[string]int64] // Stripe price ID -> price as cents (USD implied!)
  60. metricsHandler http.Handler // Handles /metrics if enable-metrics set, and listen-metrics-http not set
  61. closeChan chan bool
  62. mu sync.RWMutex
  63. }
  64. // handleFunc extends the normal http.HandlerFunc to be able to easily return errors
  65. type handleFunc func(http.ResponseWriter, *http.Request, *visitor) error
  66. var (
  67. // If changed, don't forget to update Android App and auth_sqlite.go
  68. topicRegex = regexp.MustCompile(`^[-_A-Za-z0-9]{1,64}$`) // No /!
  69. topicPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app!
  70. externalTopicPathRegex = regexp.MustCompile(`^/[^/]+\.[^/]+/[-_A-Za-z0-9]{1,64}$`) // Extended topic path, for web-app, e.g. /example.com/mytopic
  71. jsonPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/json$`)
  72. ssePathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/sse$`)
  73. rawPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/raw$`)
  74. wsPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/ws$`)
  75. authPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/auth$`)
  76. publishPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}/(publish|send|trigger)$`)
  77. webConfigPath = "/config.js"
  78. webManifestPath = "/manifest.webmanifest"
  79. webRootHTMLPath = "/app.html"
  80. webServiceWorkerPath = "/sw.js"
  81. accountPath = "/account"
  82. matrixPushPath = "/_matrix/push/v1/notify"
  83. metricsPath = "/metrics"
  84. apiHealthPath = "/v1/health"
  85. apiStatsPath = "/v1/stats"
  86. apiWebPushPath = "/v1/webpush"
  87. apiTiersPath = "/v1/tiers"
  88. apiUsersPath = "/v1/users"
  89. apiUsersAccessPath = "/v1/users/access"
  90. apiAccountPath = "/v1/account"
  91. apiAccountTokenPath = "/v1/account/token"
  92. apiAccountPasswordPath = "/v1/account/password"
  93. apiAccountSettingsPath = "/v1/account/settings"
  94. apiAccountSubscriptionPath = "/v1/account/subscription"
  95. apiAccountReservationPath = "/v1/account/reservation"
  96. apiAccountPhonePath = "/v1/account/phone"
  97. apiAccountPhoneVerifyPath = "/v1/account/phone/verify"
  98. apiAccountBillingPortalPath = "/v1/account/billing/portal"
  99. apiAccountBillingWebhookPath = "/v1/account/billing/webhook"
  100. apiAccountBillingSubscriptionPath = "/v1/account/billing/subscription"
  101. apiAccountBillingSubscriptionCheckoutSuccessTemplate = "/v1/account/billing/subscription/success/{CHECKOUT_SESSION_ID}"
  102. apiAccountBillingSubscriptionCheckoutSuccessRegex = regexp.MustCompile(`/v1/account/billing/subscription/success/(.+)$`)
  103. apiAccountReservationSingleRegex = regexp.MustCompile(`/v1/account/reservation/([-_A-Za-z0-9]{1,64})$`)
  104. staticRegex = regexp.MustCompile(`^/static/.+`)
  105. docsRegex = regexp.MustCompile(`^/docs(|/.*)$`)
  106. fileRegex = regexp.MustCompile(`^/file/([-_A-Za-z0-9]{1,64})(?:\.[A-Za-z0-9]{1,16})?$`)
  107. urlRegex = regexp.MustCompile(`^https?://`)
  108. phoneNumberRegex = regexp.MustCompile(`^\+\d{1,100}$`)
  109. //go:embed site
  110. webFs embed.FS
  111. webFsCached = &util.CachingEmbedFS{ModTime: time.Now(), FS: webFs}
  112. webSiteDir = "/site"
  113. webAppIndex = "/app.html" // React app
  114. //go:embed docs
  115. docsStaticFs embed.FS
  116. docsStaticCached = &util.CachingEmbedFS{ModTime: time.Now(), FS: docsStaticFs}
  117. //go:embed templates
  118. templatesFs embed.FS // Contains template config files (e.g. grafana.yml, github.yml, ...)
  119. templatesDir = "templates"
  120. // templateDisallowedRegex tests a template for disallowed expressions. While not really dangerous, they
  121. // are not useful, and seem potentially troublesome.
  122. templateDisallowedRegex = regexp.MustCompile(`(?m)\{\{-?\s*(call|template|define)\b`)
  123. templateNameRegex = regexp.MustCompile(`^[-_A-Za-z0-9]+$`)
  124. )
  125. const (
  126. firebaseControlTopic = "~control" // See Android if changed
  127. firebasePollTopic = "~poll" // See iOS if changed (DISABLED for now)
  128. emptyMessageBody = "triggered" // Used if message body is empty
  129. newMessageBody = "New message" // Used in poll requests as generic message
  130. defaultAttachmentMessage = "You received a file: %s" // Used if message body is empty, and there is an attachment
  131. encodingBase64 = "base64" // Used mainly for binary UnifiedPush messages
  132. jsonBodyBytesLimit = 131072 // Max number of bytes for a request bodys (unless MessageLimit is higher)
  133. unifiedPushTopicPrefix = "up" // Temporarily, we rate limit all "up*" topics based on the subscriber
  134. unifiedPushTopicLength = 14 // Length of UnifiedPush topics, including the "up" part
  135. messagesHistoryMax = 10 // Number of message count values to keep in memory
  136. templateMaxExecutionTime = 100 * time.Millisecond // Maximum time a template can take to execute, used to prevent DoS attacks
  137. templateMaxOutputBytes = 1024 * 1024 // Maximum number of bytes a template can output, used to prevent DoS attacks
  138. templateFileExtension = ".yml" // Template files must end with this extension
  139. )
  140. // WebSocket constants
  141. const (
  142. wsWriteWait = 2 * time.Second
  143. wsBufferSize = 1024
  144. wsReadLimit = 64 // We only ever receive PINGs
  145. wsPongWait = 15 * time.Second
  146. )
  147. // New instantiates a new Server. It creates the cache and adds a Firebase
  148. // subscriber (if configured).
  149. func New(conf *Config) (*Server, error) {
  150. var mailer mailer
  151. if conf.SMTPSenderAddr != "" {
  152. mailer = &smtpSender{config: conf}
  153. }
  154. var stripe stripeAPI
  155. if conf.StripeSecretKey != "" {
  156. stripe = newStripeAPI()
  157. }
  158. messageCache, err := createMessageCache(conf)
  159. if err != nil {
  160. return nil, err
  161. }
  162. var webPush *webPushStore
  163. if conf.WebPushPublicKey != "" {
  164. webPush, err = newWebPushStore(conf.WebPushFile, conf.WebPushStartupQueries)
  165. if err != nil {
  166. return nil, err
  167. }
  168. }
  169. topics, err := messageCache.Topics()
  170. if err != nil {
  171. return nil, err
  172. }
  173. messages, err := messageCache.Stats()
  174. if err != nil {
  175. return nil, err
  176. }
  177. var fileCache *fileCache
  178. if conf.AttachmentCacheDir != "" {
  179. fileCache, err = newFileCache(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit)
  180. if err != nil {
  181. return nil, err
  182. }
  183. }
  184. var userManager *user.Manager
  185. if conf.AuthFile != "" {
  186. userManager, err = user.NewManager(conf.AuthFile, conf.AuthStartupQueries, conf.AuthDefault, conf.AuthBcryptCost, conf.AuthStatsQueueWriterInterval)
  187. if err != nil {
  188. return nil, err
  189. }
  190. }
  191. var firebaseClient *firebaseClient
  192. if conf.FirebaseKeyFile != "" {
  193. sender, err := newFirebaseSender(conf.FirebaseKeyFile)
  194. if err != nil {
  195. return nil, err
  196. }
  197. // This awkward logic is required because Go is weird about nil types and interfaces.
  198. // See issue #641, and https://go.dev/play/p/uur1flrv1t3 for an example
  199. var auther user.Auther
  200. if userManager != nil {
  201. auther = userManager
  202. }
  203. firebaseClient = newFirebaseClient(sender, auther)
  204. }
  205. s := &Server{
  206. config: conf,
  207. messageCache: messageCache,
  208. webPush: webPush,
  209. fileCache: fileCache,
  210. firebaseClient: firebaseClient,
  211. smtpSender: mailer,
  212. topics: topics,
  213. userManager: userManager,
  214. messages: messages,
  215. messagesHistory: []int64{messages},
  216. visitors: make(map[string]*visitor),
  217. stripe: stripe,
  218. }
  219. s.priceCache = util.NewLookupCache(s.fetchStripePrices, conf.StripePriceCacheDuration)
  220. return s, nil
  221. }
  222. func createMessageCache(conf *Config) (*messageCache, error) {
  223. if conf.CacheDuration == 0 {
  224. return newNopCache()
  225. } else if conf.CacheFile != "" {
  226. return newSqliteCache(conf.CacheFile, conf.CacheStartupQueries, conf.CacheDuration, conf.CacheBatchSize, conf.CacheBatchTimeout, false)
  227. }
  228. return newMemCache()
  229. }
  230. // Run executes the main server. It listens on HTTP (+ HTTPS, if configured), and starts
  231. // a manager go routine to print stats and prune messages.
  232. func (s *Server) Run() error {
  233. var listenStr string
  234. if s.config.ListenHTTP != "" {
  235. listenStr += fmt.Sprintf(" %s[http]", s.config.ListenHTTP)
  236. }
  237. if s.config.ListenHTTPS != "" {
  238. listenStr += fmt.Sprintf(" %s[https]", s.config.ListenHTTPS)
  239. }
  240. if s.config.ListenUnix != "" {
  241. listenStr += fmt.Sprintf(" %s[unix]", s.config.ListenUnix)
  242. }
  243. if s.config.SMTPServerListen != "" {
  244. listenStr += fmt.Sprintf(" %s[smtp]", s.config.SMTPServerListen)
  245. }
  246. if s.config.MetricsListenHTTP != "" {
  247. listenStr += fmt.Sprintf(" %s[http/metrics]", s.config.MetricsListenHTTP)
  248. }
  249. if s.config.ProfileListenHTTP != "" {
  250. listenStr += fmt.Sprintf(" %s[http/profile]", s.config.ProfileListenHTTP)
  251. }
  252. log.Tag(tagStartup).Info("Listening on%s, ntfy %s, log level is %s", listenStr, s.config.Version, log.CurrentLevel().String())
  253. if log.IsFile() {
  254. fmt.Fprintf(os.Stderr, "Listening on%s, ntfy %s\n", listenStr, s.config.Version)
  255. fmt.Fprintf(os.Stderr, "Logs are written to %s\n", log.File())
  256. }
  257. mux := http.NewServeMux()
  258. mux.HandleFunc("/", s.handle)
  259. errChan := make(chan error)
  260. s.mu.Lock()
  261. s.closeChan = make(chan bool)
  262. if s.config.ListenHTTP != "" {
  263. s.httpServer = &http.Server{Addr: s.config.ListenHTTP, Handler: mux}
  264. go func() {
  265. errChan <- s.httpServer.ListenAndServe()
  266. }()
  267. }
  268. if s.config.ListenHTTPS != "" {
  269. s.httpsServer = &http.Server{Addr: s.config.ListenHTTPS, Handler: mux}
  270. go func() {
  271. errChan <- s.httpsServer.ListenAndServeTLS(s.config.CertFile, s.config.KeyFile)
  272. }()
  273. }
  274. if s.config.ListenUnix != "" {
  275. go func() {
  276. var err error
  277. s.mu.Lock()
  278. os.Remove(s.config.ListenUnix)
  279. s.unixListener, err = net.Listen("unix", s.config.ListenUnix)
  280. if err != nil {
  281. s.mu.Unlock()
  282. errChan <- err
  283. return
  284. }
  285. defer s.unixListener.Close()
  286. if s.config.ListenUnixMode > 0 {
  287. if err := os.Chmod(s.config.ListenUnix, s.config.ListenUnixMode); err != nil {
  288. s.mu.Unlock()
  289. errChan <- err
  290. return
  291. }
  292. }
  293. s.mu.Unlock()
  294. httpServer := &http.Server{Handler: mux}
  295. errChan <- httpServer.Serve(s.unixListener)
  296. }()
  297. }
  298. if s.config.MetricsListenHTTP != "" {
  299. initMetrics()
  300. s.httpMetricsServer = &http.Server{Addr: s.config.MetricsListenHTTP, Handler: promhttp.Handler()}
  301. go func() {
  302. errChan <- s.httpMetricsServer.ListenAndServe()
  303. }()
  304. } else if s.config.EnableMetrics {
  305. initMetrics()
  306. s.metricsHandler = promhttp.Handler()
  307. }
  308. if s.config.ProfileListenHTTP != "" {
  309. profileMux := http.NewServeMux()
  310. profileMux.HandleFunc("/debug/pprof/", pprof.Index)
  311. profileMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
  312. profileMux.HandleFunc("/debug/pprof/profile", pprof.Profile)
  313. profileMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
  314. profileMux.HandleFunc("/debug/pprof/trace", pprof.Trace)
  315. s.httpProfileServer = &http.Server{Addr: s.config.ProfileListenHTTP, Handler: profileMux}
  316. go func() {
  317. errChan <- s.httpProfileServer.ListenAndServe()
  318. }()
  319. }
  320. if s.config.SMTPServerListen != "" {
  321. go func() {
  322. errChan <- s.runSMTPServer()
  323. }()
  324. }
  325. s.mu.Unlock()
  326. go s.runManager()
  327. go s.runStatsResetter()
  328. go s.runDelayedSender()
  329. go s.runFirebaseKeepaliver()
  330. return <-errChan
  331. }
  332. // Stop stops HTTP (+HTTPS) server and all managers
  333. func (s *Server) Stop() {
  334. s.mu.Lock()
  335. defer s.mu.Unlock()
  336. if s.httpServer != nil {
  337. s.httpServer.Close()
  338. }
  339. if s.httpsServer != nil {
  340. s.httpsServer.Close()
  341. }
  342. if s.unixListener != nil {
  343. s.unixListener.Close()
  344. }
  345. if s.smtpServer != nil {
  346. s.smtpServer.Close()
  347. }
  348. s.closeDatabases()
  349. close(s.closeChan)
  350. }
  351. func (s *Server) closeDatabases() {
  352. if s.userManager != nil {
  353. s.userManager.Close()
  354. }
  355. s.messageCache.Close()
  356. if s.webPush != nil {
  357. s.webPush.Close()
  358. }
  359. }
  360. // handle is the main entry point for all HTTP requests
  361. func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
  362. v, err := s.maybeAuthenticate(r) // Note: Always returns v, even when error is returned
  363. if err != nil {
  364. s.handleError(w, r, v, err)
  365. return
  366. }
  367. ev := logvr(v, r)
  368. if ev.IsTrace() {
  369. ev.Field("http_request", renderHTTPRequest(r)).Trace("HTTP request started")
  370. } else if logvr(v, r).IsDebug() {
  371. ev.Debug("HTTP request started")
  372. }
  373. logvr(v, r).
  374. Timing(func() {
  375. if err := s.handleInternal(w, r, v); err != nil {
  376. s.handleError(w, r, v, err)
  377. return
  378. }
  379. if metricHTTPRequests != nil {
  380. metricHTTPRequests.WithLabelValues("200", "20000", r.Method).Inc()
  381. }
  382. }).
  383. Debug("HTTP request finished")
  384. }
  385. func (s *Server) handleError(w http.ResponseWriter, r *http.Request, v *visitor, err error) {
  386. httpErr, ok := err.(*errHTTP)
  387. if !ok {
  388. httpErr = errHTTPInternalError
  389. }
  390. if metricHTTPRequests != nil {
  391. metricHTTPRequests.WithLabelValues(fmt.Sprintf("%d", httpErr.HTTPCode), fmt.Sprintf("%d", httpErr.Code), r.Method).Inc()
  392. }
  393. isRateLimiting := util.Contains(rateLimitingErrorCodes, httpErr.HTTPCode)
  394. isNormalError := strings.Contains(err.Error(), "i/o timeout") || util.Contains(normalErrorCodes, httpErr.HTTPCode)
  395. ev := logvr(v, r).Err(err)
  396. if websocket.IsWebSocketUpgrade(r) {
  397. ev.Tag(tagWebsocket).Fields(websocketErrorContext(err))
  398. if isNormalError {
  399. ev.Debug("WebSocket error (this error is okay, it happens a lot): %s", err.Error())
  400. } else {
  401. ev.Info("WebSocket error: %s", err.Error())
  402. }
  403. w.WriteHeader(httpErr.HTTPCode)
  404. return // Do not attempt to write any body to upgraded connection
  405. }
  406. if isNormalError {
  407. ev.Debug("Connection closed with HTTP %d (ntfy error %d)", httpErr.HTTPCode, httpErr.Code)
  408. } else {
  409. ev.Info("Connection closed with HTTP %d (ntfy error %d)", httpErr.HTTPCode, httpErr.Code)
  410. }
  411. if isRateLimiting && s.config.StripeSecretKey != "" {
  412. u := v.User()
  413. if u == nil || u.Tier == nil {
  414. httpErr = httpErr.Wrap("increase your limits with a paid plan, see %s", s.config.BaseURL)
  415. }
  416. }
  417. w.Header().Set("Content-Type", "application/json")
  418. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  419. w.WriteHeader(httpErr.HTTPCode)
  420. io.WriteString(w, httpErr.JSON()+"\n")
  421. }
  422. func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request, v *visitor) error {
  423. if r.Method == http.MethodGet && r.URL.Path == "/" && s.config.WebRoot == "/" {
  424. return s.ensureWebEnabled(s.handleRoot)(w, r, v)
  425. } else if r.Method == http.MethodHead && r.URL.Path == "/" {
  426. return s.ensureWebEnabled(s.handleEmpty)(w, r, v)
  427. } else if r.Method == http.MethodGet && r.URL.Path == apiHealthPath {
  428. return s.handleHealth(w, r, v)
  429. } else if r.Method == http.MethodGet && r.URL.Path == webConfigPath {
  430. return s.ensureWebEnabled(s.handleWebConfig)(w, r, v)
  431. } else if r.Method == http.MethodGet && r.URL.Path == webManifestPath {
  432. return s.ensureWebPushEnabled(s.handleWebManifest)(w, r, v)
  433. } else if r.Method == http.MethodGet && r.URL.Path == apiUsersPath {
  434. return s.ensureAdmin(s.handleUsersGet)(w, r, v)
  435. } else if r.Method == http.MethodPost && r.URL.Path == apiUsersPath {
  436. return s.ensureAdmin(s.handleUsersAdd)(w, r, v)
  437. } else if r.Method == http.MethodPut && r.URL.Path == apiUsersPath {
  438. return s.ensureAdmin(s.handleUsersUpdate)(w, r, v)
  439. } else if r.Method == http.MethodDelete && r.URL.Path == apiUsersPath {
  440. return s.ensureAdmin(s.handleUsersDelete)(w, r, v)
  441. } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && r.URL.Path == apiUsersAccessPath {
  442. return s.ensureAdmin(s.handleAccessAllow)(w, r, v)
  443. } else if r.Method == http.MethodDelete && r.URL.Path == apiUsersAccessPath {
  444. return s.ensureAdmin(s.handleAccessReset)(w, r, v)
  445. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountPath {
  446. return s.ensureUserManager(s.handleAccountCreate)(w, r, v)
  447. } else if r.Method == http.MethodGet && r.URL.Path == apiAccountPath {
  448. return s.handleAccountGet(w, r, v) // Allowed by anonymous
  449. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountPath {
  450. return s.ensureUser(s.withAccountSync(s.handleAccountDelete))(w, r, v)
  451. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountPasswordPath {
  452. return s.ensureUser(s.handleAccountPasswordChange)(w, r, v)
  453. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountTokenPath {
  454. return s.ensureUser(s.withAccountSync(s.handleAccountTokenCreate))(w, r, v)
  455. } else if r.Method == http.MethodPatch && r.URL.Path == apiAccountTokenPath {
  456. return s.ensureUser(s.withAccountSync(s.handleAccountTokenUpdate))(w, r, v)
  457. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountTokenPath {
  458. return s.ensureUser(s.withAccountSync(s.handleAccountTokenDelete))(w, r, v)
  459. } else if r.Method == http.MethodPatch && r.URL.Path == apiAccountSettingsPath {
  460. return s.ensureUser(s.withAccountSync(s.handleAccountSettingsChange))(w, r, v)
  461. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountSubscriptionPath {
  462. return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionAdd))(w, r, v)
  463. } else if r.Method == http.MethodPatch && r.URL.Path == apiAccountSubscriptionPath {
  464. return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionChange))(w, r, v)
  465. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountSubscriptionPath {
  466. return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionDelete))(w, r, v)
  467. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountReservationPath {
  468. return s.ensureUser(s.withAccountSync(s.handleAccountReservationAdd))(w, r, v)
  469. } else if r.Method == http.MethodDelete && apiAccountReservationSingleRegex.MatchString(r.URL.Path) {
  470. return s.ensureUser(s.withAccountSync(s.handleAccountReservationDelete))(w, r, v)
  471. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingSubscriptionPath {
  472. return s.ensurePaymentsEnabled(s.ensureUser(s.handleAccountBillingSubscriptionCreate))(w, r, v) // Account sync via incoming Stripe webhook
  473. } else if r.Method == http.MethodGet && apiAccountBillingSubscriptionCheckoutSuccessRegex.MatchString(r.URL.Path) {
  474. return s.ensurePaymentsEnabled(s.ensureUserManager(s.handleAccountBillingSubscriptionCreateSuccess))(w, r, v) // No user context!
  475. } else if r.Method == http.MethodPut && r.URL.Path == apiAccountBillingSubscriptionPath {
  476. return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingSubscriptionUpdate))(w, r, v) // Account sync via incoming Stripe webhook
  477. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountBillingSubscriptionPath {
  478. return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingSubscriptionDelete))(w, r, v) // Account sync via incoming Stripe webhook
  479. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingPortalPath {
  480. return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingPortalSessionCreate))(w, r, v)
  481. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingWebhookPath {
  482. return s.ensurePaymentsEnabled(s.ensureUserManager(s.handleAccountBillingWebhook))(w, r, v) // This request comes from Stripe!
  483. } else if r.Method == http.MethodPut && r.URL.Path == apiAccountPhoneVerifyPath {
  484. return s.ensureUser(s.ensureCallsEnabled(s.withAccountSync(s.handleAccountPhoneNumberVerify)))(w, r, v)
  485. } else if r.Method == http.MethodPut && r.URL.Path == apiAccountPhonePath {
  486. return s.ensureUser(s.ensureCallsEnabled(s.withAccountSync(s.handleAccountPhoneNumberAdd)))(w, r, v)
  487. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountPhonePath {
  488. return s.ensureUser(s.ensureCallsEnabled(s.withAccountSync(s.handleAccountPhoneNumberDelete)))(w, r, v)
  489. } else if r.Method == http.MethodPost && apiWebPushPath == r.URL.Path {
  490. return s.ensureWebPushEnabled(s.limitRequests(s.handleWebPushUpdate))(w, r, v)
  491. } else if r.Method == http.MethodDelete && apiWebPushPath == r.URL.Path {
  492. return s.ensureWebPushEnabled(s.limitRequests(s.handleWebPushDelete))(w, r, v)
  493. } else if r.Method == http.MethodGet && r.URL.Path == apiStatsPath {
  494. return s.handleStats(w, r, v)
  495. } else if r.Method == http.MethodGet && r.URL.Path == apiTiersPath {
  496. return s.ensurePaymentsEnabled(s.handleBillingTiersGet)(w, r, v)
  497. } else if r.Method == http.MethodGet && r.URL.Path == matrixPushPath {
  498. return s.handleMatrixDiscovery(w)
  499. } else if r.Method == http.MethodGet && r.URL.Path == metricsPath && s.metricsHandler != nil {
  500. return s.handleMetrics(w, r, v)
  501. } else if r.Method == http.MethodGet && (staticRegex.MatchString(r.URL.Path) || r.URL.Path == webServiceWorkerPath || r.URL.Path == webRootHTMLPath) {
  502. return s.ensureWebEnabled(s.handleStatic)(w, r, v)
  503. } else if r.Method == http.MethodGet && docsRegex.MatchString(r.URL.Path) {
  504. return s.ensureWebEnabled(s.handleDocs)(w, r, v)
  505. } else if (r.Method == http.MethodGet || r.Method == http.MethodHead) && fileRegex.MatchString(r.URL.Path) && s.config.AttachmentCacheDir != "" {
  506. return s.limitRequests(s.handleFile)(w, r, v)
  507. } else if r.Method == http.MethodOptions {
  508. return s.limitRequests(s.handleOptions)(w, r, v) // Should work even if the web app is not enabled, see #598
  509. } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && r.URL.Path == "/" {
  510. return s.transformBodyJSON(s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish)))(w, r, v)
  511. } else if r.Method == http.MethodPost && r.URL.Path == matrixPushPath {
  512. return s.transformMatrixJSON(s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublishMatrix)))(w, r, v)
  513. } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && topicPathRegex.MatchString(r.URL.Path) {
  514. return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish))(w, r, v)
  515. } else if r.Method == http.MethodGet && publishPathRegex.MatchString(r.URL.Path) {
  516. return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish))(w, r, v)
  517. } else if r.Method == http.MethodGet && jsonPathRegex.MatchString(r.URL.Path) {
  518. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeJSON))(w, r, v)
  519. } else if r.Method == http.MethodGet && ssePathRegex.MatchString(r.URL.Path) {
  520. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeSSE))(w, r, v)
  521. } else if r.Method == http.MethodGet && rawPathRegex.MatchString(r.URL.Path) {
  522. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeRaw))(w, r, v)
  523. } else if r.Method == http.MethodGet && wsPathRegex.MatchString(r.URL.Path) {
  524. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeWS))(w, r, v)
  525. } else if r.Method == http.MethodGet && authPathRegex.MatchString(r.URL.Path) {
  526. return s.limitRequests(s.authorizeTopicRead(s.handleTopicAuth))(w, r, v)
  527. } else if r.Method == http.MethodGet && (topicPathRegex.MatchString(r.URL.Path) || externalTopicPathRegex.MatchString(r.URL.Path)) {
  528. return s.ensureWebEnabled(s.handleTopic)(w, r, v)
  529. }
  530. return errHTTPNotFound
  531. }
  532. func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request, v *visitor) error {
  533. r.URL.Path = webAppIndex
  534. return s.handleStatic(w, r, v)
  535. }
  536. func (s *Server) handleTopic(w http.ResponseWriter, r *http.Request, v *visitor) error {
  537. unifiedpush := readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see PUT/POST too!
  538. if unifiedpush {
  539. w.Header().Set("Content-Type", "application/json")
  540. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  541. _, err := io.WriteString(w, `{"unifiedpush":{"version":1}}`+"\n")
  542. return err
  543. }
  544. r.URL.Path = webAppIndex
  545. return s.handleStatic(w, r, v)
  546. }
  547. func (s *Server) handleEmpty(_ http.ResponseWriter, _ *http.Request, _ *visitor) error {
  548. return nil
  549. }
  550. func (s *Server) handleTopicAuth(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  551. return s.writeJSON(w, newSuccessResponse())
  552. }
  553. func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  554. response := &apiHealthResponse{
  555. Healthy: true,
  556. }
  557. return s.writeJSON(w, response)
  558. }
  559. func (s *Server) handleWebConfig(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  560. response := &apiConfigResponse{
  561. BaseURL: "", // Will translate to window.location.origin
  562. AppRoot: s.config.WebRoot,
  563. EnableLogin: s.config.EnableLogin,
  564. EnableSignup: s.config.EnableSignup,
  565. EnablePayments: s.config.StripeSecretKey != "",
  566. EnableCalls: s.config.TwilioAccount != "",
  567. EnableEmails: s.config.SMTPSenderFrom != "",
  568. EnableReservations: s.config.EnableReservations,
  569. EnableWebPush: s.config.WebPushPublicKey != "",
  570. BillingContact: s.config.BillingContact,
  571. WebPushPublicKey: s.config.WebPushPublicKey,
  572. DisallowedTopics: s.config.DisallowedTopics,
  573. }
  574. b, err := json.MarshalIndent(response, "", " ")
  575. if err != nil {
  576. return err
  577. }
  578. w.Header().Set("Content-Type", "text/javascript")
  579. w.Header().Set("Cache-Control", "no-cache")
  580. _, err = io.WriteString(w, fmt.Sprintf("// Generated server configuration\nvar config = %s;\n", string(b)))
  581. return err
  582. }
  583. // handleWebManifest serves the web app manifest for the progressive web app (PWA)
  584. func (s *Server) handleWebManifest(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  585. response := &webManifestResponse{
  586. Name: "ntfy web",
  587. Description: "ntfy lets you send push notifications via scripts from any computer or phone",
  588. ShortName: "ntfy",
  589. Scope: "/",
  590. StartURL: s.config.WebRoot,
  591. Display: "standalone",
  592. BackgroundColor: "#ffffff",
  593. ThemeColor: "#317f6f",
  594. Icons: []*webManifestIcon{
  595. {SRC: "/static/images/pwa-192x192.png", Sizes: "192x192", Type: "image/png"},
  596. {SRC: "/static/images/pwa-512x512.png", Sizes: "512x512", Type: "image/png"},
  597. },
  598. }
  599. return s.writeJSONWithContentType(w, response, "application/manifest+json")
  600. }
  601. // handleMetrics returns Prometheus metrics. This endpoint is only called if enable-metrics is set,
  602. // and listen-metrics-http is not set.
  603. func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request, _ *visitor) error {
  604. s.metricsHandler.ServeHTTP(w, r)
  605. return nil
  606. }
  607. // handleStatic returns all static resources (excluding the docs), including the web app
  608. func (s *Server) handleStatic(w http.ResponseWriter, r *http.Request, _ *visitor) error {
  609. r.URL.Path = webSiteDir + r.URL.Path
  610. util.Gzip(http.FileServer(http.FS(webFsCached))).ServeHTTP(w, r)
  611. return nil
  612. }
  613. // handleDocs returns static resources related to the docs
  614. func (s *Server) handleDocs(w http.ResponseWriter, r *http.Request, _ *visitor) error {
  615. util.Gzip(http.FileServer(http.FS(docsStaticCached))).ServeHTTP(w, r)
  616. return nil
  617. }
  618. // handleStats returns the publicly available server stats
  619. func (s *Server) handleStats(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  620. s.mu.RLock()
  621. messages, n, rate := s.messages, len(s.messagesHistory), float64(0)
  622. if n > 1 {
  623. rate = float64(s.messagesHistory[n-1]-s.messagesHistory[0]) / (float64(n-1) * s.config.ManagerInterval.Seconds())
  624. }
  625. s.mu.RUnlock()
  626. response := &apiStatsResponse{
  627. Messages: messages,
  628. MessagesRate: rate,
  629. }
  630. return s.writeJSON(w, response)
  631. }
  632. // handleFile processes the download of attachment files. The method handles GET and HEAD requests against a file.
  633. // Before streaming the file to a client, it locates uploader (m.Sender or m.User) in the message cache, so it
  634. // can associate the download bandwidth with the uploader.
  635. func (s *Server) handleFile(w http.ResponseWriter, r *http.Request, v *visitor) error {
  636. if s.config.AttachmentCacheDir == "" {
  637. return errHTTPInternalError
  638. }
  639. matches := fileRegex.FindStringSubmatch(r.URL.Path)
  640. if len(matches) != 2 {
  641. return errHTTPInternalErrorInvalidPath
  642. }
  643. messageID := matches[1]
  644. file := filepath.Join(s.config.AttachmentCacheDir, messageID)
  645. stat, err := os.Stat(file)
  646. if err != nil {
  647. return errHTTPNotFound.Fields(log.Context{
  648. "message_id": messageID,
  649. "error_context": "filesystem",
  650. })
  651. }
  652. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  653. w.Header().Set("Content-Length", fmt.Sprintf("%d", stat.Size()))
  654. if r.Method == http.MethodHead {
  655. return nil
  656. }
  657. // Find message in database, and associate bandwidth to the uploader user
  658. // This is an easy way to
  659. // - avoid abuse (e.g. 1 uploader, 1k downloaders)
  660. // - and also uses the higher bandwidth limits of a paying user
  661. m, err := s.messageCache.Message(messageID)
  662. if errors.Is(err, errMessageNotFound) {
  663. if s.config.CacheBatchTimeout > 0 {
  664. // Strange edge case: If we immediately after upload request the file (the web app does this for images),
  665. // and messages are persisted asynchronously, retry fetching from the database
  666. m, err = util.Retry(func() (*message, error) {
  667. return s.messageCache.Message(messageID)
  668. }, s.config.CacheBatchTimeout, 100*time.Millisecond, 300*time.Millisecond, 600*time.Millisecond)
  669. }
  670. if err != nil {
  671. return errHTTPNotFound.Fields(log.Context{
  672. "message_id": messageID,
  673. "error_context": "message_cache",
  674. })
  675. }
  676. } else if err != nil {
  677. return err
  678. }
  679. bandwidthVisitor := v
  680. if s.userManager != nil && m.User != "" {
  681. u, err := s.userManager.UserByID(m.User)
  682. if err != nil {
  683. return err
  684. }
  685. bandwidthVisitor = s.visitor(v.IP(), u)
  686. } else if m.Sender.IsValid() {
  687. bandwidthVisitor = s.visitor(m.Sender, nil)
  688. }
  689. if !bandwidthVisitor.BandwidthAllowed(stat.Size()) {
  690. return errHTTPTooManyRequestsLimitAttachmentBandwidth.With(m)
  691. }
  692. // Actually send file
  693. f, err := os.Open(file)
  694. if err != nil {
  695. return err
  696. }
  697. defer f.Close()
  698. if m.Attachment.Name != "" {
  699. w.Header().Set("Content-Disposition", "attachment; filename="+strconv.Quote(m.Attachment.Name))
  700. }
  701. _, err = io.Copy(util.NewContentTypeWriter(w, r.URL.Path), f)
  702. return err
  703. }
  704. func (s *Server) handleMatrixDiscovery(w http.ResponseWriter) error {
  705. if s.config.BaseURL == "" {
  706. return errHTTPInternalErrorMissingBaseURL
  707. }
  708. return writeMatrixDiscoveryResponse(w)
  709. }
  710. func (s *Server) handlePublishInternal(r *http.Request, v *visitor) (*message, error) {
  711. start := time.Now()
  712. t, err := fromContext[*topic](r, contextTopic)
  713. if err != nil {
  714. return nil, err
  715. }
  716. vrate, err := fromContext[*visitor](r, contextRateVisitor)
  717. if err != nil {
  718. return nil, err
  719. }
  720. body, err := util.Peek(r.Body, s.config.MessageSizeLimit)
  721. if err != nil {
  722. return nil, err
  723. }
  724. m := newDefaultMessage(t.ID, "")
  725. cache, firebase, email, call, template, unifiedpush, e := s.parsePublishParams(r, m)
  726. if e != nil {
  727. return nil, e.With(t)
  728. }
  729. if unifiedpush && s.config.VisitorSubscriberRateLimiting && t.RateVisitor() == nil {
  730. // UnifiedPush clients must subscribe before publishing to allow proper subscriber-based rate limiting.
  731. // The 5xx response is because some app servers (in particular Mastodon) will remove
  732. // the subscription as invalid if any 400-499 code (except 429/408) is returned.
  733. // See https://github.com/mastodon/mastodon/blob/730bb3e211a84a2f30e3e2bbeae3f77149824a68/app/workers/web/push_notification_worker.rb#L35-L46
  734. return nil, errHTTPInsufficientStorageUnifiedPush.With(t)
  735. } else if !util.ContainsIP(s.config.VisitorRequestExemptPrefixes, v.ip) && !vrate.MessageAllowed() {
  736. return nil, errHTTPTooManyRequestsLimitMessages.With(t)
  737. } else if email != "" && !vrate.EmailAllowed() {
  738. return nil, errHTTPTooManyRequestsLimitEmails.With(t)
  739. } else if call != "" {
  740. var httpErr *errHTTP
  741. call, httpErr = s.convertPhoneNumber(v.User(), call)
  742. if httpErr != nil {
  743. return nil, httpErr.With(t)
  744. } else if !vrate.CallAllowed() {
  745. return nil, errHTTPTooManyRequestsLimitCalls.With(t)
  746. }
  747. }
  748. if m.PollID != "" {
  749. m = newPollRequestMessage(t.ID, m.PollID)
  750. }
  751. m.Sender = v.IP()
  752. m.User = v.MaybeUserID()
  753. if cache {
  754. m.Expires = time.Unix(m.Time, 0).Add(v.Limits().MessageExpiryDuration).Unix()
  755. }
  756. if err := s.handlePublishBody(r, v, m, body, template, unifiedpush); err != nil {
  757. return nil, err
  758. }
  759. if m.Message == "" {
  760. m.Message = emptyMessageBody
  761. }
  762. delayed := m.Time > time.Now().Unix()
  763. ev := logvrm(v, r, m).
  764. Tag(tagPublish).
  765. With(t).
  766. Fields(log.Context{
  767. "message_delayed": delayed,
  768. "message_firebase": firebase,
  769. "message_unifiedpush": unifiedpush,
  770. "message_email": email,
  771. "message_call": call,
  772. })
  773. if ev.IsTrace() {
  774. ev.Field("message_body", util.MaybeMarshalJSON(m)).Trace("Received message")
  775. } else if ev.IsDebug() {
  776. ev.Debug("Received message")
  777. }
  778. if !delayed {
  779. if err := t.Publish(v, m); err != nil {
  780. return nil, err
  781. }
  782. if s.firebaseClient != nil && firebase {
  783. go s.sendToFirebase(v, m)
  784. }
  785. if s.smtpSender != nil && email != "" {
  786. go s.sendEmail(v, m, email)
  787. }
  788. if s.config.TwilioAccount != "" && call != "" {
  789. go s.callPhone(v, r, m, call)
  790. }
  791. if s.config.UpstreamBaseURL != "" && !unifiedpush { // UP messages are not sent to upstream
  792. go s.forwardPollRequest(v, m)
  793. }
  794. if s.config.WebPushPublicKey != "" {
  795. go s.publishToWebPushEndpoints(v, m)
  796. }
  797. } else {
  798. logvrm(v, r, m).Tag(tagPublish).Debug("Message delayed, will process later")
  799. }
  800. if cache {
  801. logvrm(v, r, m).Tag(tagPublish).Debug("Adding message to cache")
  802. if err := s.messageCache.AddMessage(m); err != nil {
  803. return nil, err
  804. }
  805. }
  806. u := v.User()
  807. if s.userManager != nil && u != nil && u.Tier != nil {
  808. go s.userManager.EnqueueUserStats(u.ID, v.Stats())
  809. }
  810. s.mu.Lock()
  811. s.messages++
  812. s.mu.Unlock()
  813. if unifiedpush {
  814. minc(metricUnifiedPushPublishedSuccess)
  815. }
  816. mset(metricMessagePublishDurationMillis, time.Since(start).Milliseconds())
  817. return m, nil
  818. }
  819. func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visitor) error {
  820. m, err := s.handlePublishInternal(r, v)
  821. if err != nil {
  822. minc(metricMessagesPublishedFailure)
  823. return err
  824. }
  825. minc(metricMessagesPublishedSuccess)
  826. return s.writeJSON(w, m)
  827. }
  828. func (s *Server) handlePublishMatrix(w http.ResponseWriter, r *http.Request, v *visitor) error {
  829. _, err := s.handlePublishInternal(r, v)
  830. if err != nil {
  831. minc(metricMessagesPublishedFailure)
  832. minc(metricMatrixPublishedFailure)
  833. if e, ok := err.(*errHTTP); ok && e.HTTPCode == errHTTPInsufficientStorageUnifiedPush.HTTPCode {
  834. topic, err := fromContext[*topic](r, contextTopic)
  835. if err != nil {
  836. return err
  837. }
  838. pushKey, err := fromContext[string](r, contextMatrixPushKey)
  839. if err != nil {
  840. return err
  841. }
  842. if time.Since(topic.LastAccess()) > matrixRejectPushKeyForUnifiedPushTopicWithoutRateVisitorAfter {
  843. return writeMatrixResponse(w, pushKey)
  844. }
  845. }
  846. return err
  847. }
  848. minc(metricMessagesPublishedSuccess)
  849. minc(metricMatrixPublishedSuccess)
  850. return writeMatrixSuccess(w)
  851. }
  852. func (s *Server) sendToFirebase(v *visitor, m *message) {
  853. logvm(v, m).Tag(tagFirebase).Debug("Publishing to Firebase")
  854. if err := s.firebaseClient.Send(v, m); err != nil {
  855. minc(metricFirebasePublishedFailure)
  856. if errors.Is(err, errFirebaseTemporarilyBanned) {
  857. logvm(v, m).Tag(tagFirebase).Err(err).Debug("Unable to publish to Firebase: %v", err.Error())
  858. } else {
  859. logvm(v, m).Tag(tagFirebase).Err(err).Warn("Unable to publish to Firebase: %v", err.Error())
  860. }
  861. return
  862. }
  863. minc(metricFirebasePublishedSuccess)
  864. }
  865. func (s *Server) sendEmail(v *visitor, m *message, email string) {
  866. logvm(v, m).Tag(tagEmail).Field("email", email).Debug("Sending email to %s", email)
  867. if err := s.smtpSender.Send(v, m, email); err != nil {
  868. logvm(v, m).Tag(tagEmail).Field("email", email).Err(err).Warn("Unable to send email to %s: %v", email, err.Error())
  869. minc(metricEmailsPublishedFailure)
  870. return
  871. }
  872. minc(metricEmailsPublishedSuccess)
  873. }
  874. func (s *Server) forwardPollRequest(v *visitor, m *message) {
  875. topicURL := fmt.Sprintf("%s/%s", s.config.BaseURL, m.Topic)
  876. topicHash := fmt.Sprintf("%x", sha256.Sum256([]byte(topicURL)))
  877. forwardURL := fmt.Sprintf("%s/%s", s.config.UpstreamBaseURL, topicHash)
  878. logvm(v, m).Debug("Publishing poll request to %s", forwardURL)
  879. req, err := http.NewRequest("POST", forwardURL, strings.NewReader(""))
  880. if err != nil {
  881. logvm(v, m).Err(err).Warn("Unable to publish poll request")
  882. return
  883. }
  884. req.Header.Set("User-Agent", "ntfy/"+s.config.Version)
  885. req.Header.Set("X-Poll-ID", m.ID)
  886. if s.config.UpstreamAccessToken != "" {
  887. req.Header.Set("Authorization", util.BearerAuth(s.config.UpstreamAccessToken))
  888. }
  889. var httpClient = &http.Client{
  890. Timeout: time.Second * 10,
  891. }
  892. response, err := httpClient.Do(req)
  893. if err != nil {
  894. logvm(v, m).Err(err).Warn("Unable to publish poll request")
  895. return
  896. } else if response.StatusCode != http.StatusOK {
  897. if response.StatusCode == http.StatusTooManyRequests {
  898. logvm(v, m).Err(err).Warn("Unable to publish poll request, the upstream server %s responded with HTTP %s; you may solve this by sending fewer daily messages, or by configuring upstream-access-token (assuming you have an account with higher rate limits) ", s.config.UpstreamBaseURL, response.Status)
  899. } else {
  900. logvm(v, m).Err(err).Warn("Unable to publish poll request, the upstream server %s responded with HTTP %s", s.config.UpstreamBaseURL, response.Status)
  901. }
  902. return
  903. }
  904. }
  905. func (s *Server) parsePublishParams(r *http.Request, m *message) (cache bool, firebase bool, email, call string, template templateMode, unifiedpush bool, err *errHTTP) {
  906. cache = readBoolParam(r, true, "x-cache", "cache")
  907. firebase = readBoolParam(r, true, "x-firebase", "firebase")
  908. m.Title = readParam(r, "x-title", "title", "t")
  909. m.Click = readParam(r, "x-click", "click")
  910. icon := readParam(r, "x-icon", "icon")
  911. filename := readParam(r, "x-filename", "filename", "file", "f")
  912. attach := readParam(r, "x-attach", "attach", "a")
  913. if attach != "" || filename != "" {
  914. m.Attachment = &attachment{}
  915. }
  916. if filename != "" {
  917. m.Attachment.Name = filename
  918. }
  919. if attach != "" {
  920. if !urlRegex.MatchString(attach) {
  921. return false, false, "", "", "", false, errHTTPBadRequestAttachmentURLInvalid
  922. }
  923. m.Attachment.URL = attach
  924. if m.Attachment.Name == "" {
  925. u, err := url.Parse(m.Attachment.URL)
  926. if err == nil {
  927. m.Attachment.Name = path.Base(u.Path)
  928. if m.Attachment.Name == "." || m.Attachment.Name == "/" {
  929. m.Attachment.Name = ""
  930. }
  931. }
  932. }
  933. if m.Attachment.Name == "" {
  934. m.Attachment.Name = "attachment"
  935. }
  936. }
  937. if icon != "" {
  938. if !urlRegex.MatchString(icon) {
  939. return false, false, "", "", "", false, errHTTPBadRequestIconURLInvalid
  940. }
  941. m.Icon = icon
  942. }
  943. email = readParam(r, "x-email", "x-e-mail", "email", "e-mail", "mail", "e")
  944. if s.smtpSender == nil && email != "" {
  945. return false, false, "", "", "", false, errHTTPBadRequestEmailDisabled
  946. }
  947. call = readParam(r, "x-call", "call")
  948. if call != "" && (s.config.TwilioAccount == "" || s.userManager == nil) {
  949. return false, false, "", "", "", false, errHTTPBadRequestPhoneCallsDisabled
  950. } else if call != "" && !isBoolValue(call) && !phoneNumberRegex.MatchString(call) {
  951. return false, false, "", "", "", false, errHTTPBadRequestPhoneNumberInvalid
  952. }
  953. messageStr := strings.ReplaceAll(readParam(r, "x-message", "message", "m"), "\\n", "\n")
  954. if messageStr != "" {
  955. m.Message = messageStr
  956. }
  957. var e error
  958. m.Priority, e = util.ParsePriority(readParam(r, "x-priority", "priority", "prio", "p"))
  959. if e != nil {
  960. return false, false, "", "", "", false, errHTTPBadRequestPriorityInvalid
  961. }
  962. m.Tags = readCommaSeparatedParam(r, "x-tags", "tags", "tag", "ta")
  963. delayStr := readParam(r, "x-delay", "delay", "x-at", "at", "x-in", "in")
  964. if delayStr != "" {
  965. if !cache {
  966. return false, false, "", "", "", false, errHTTPBadRequestDelayNoCache
  967. }
  968. if email != "" {
  969. return false, false, "", "", "", false, errHTTPBadRequestDelayNoEmail // we cannot store the email address (yet)
  970. }
  971. if call != "" {
  972. return false, false, "", "", "", false, errHTTPBadRequestDelayNoCall // we cannot store the phone number (yet)
  973. }
  974. delay, err := util.ParseFutureTime(delayStr, time.Now())
  975. if err != nil {
  976. return false, false, "", "", "", false, errHTTPBadRequestDelayCannotParse
  977. } else if delay.Unix() < time.Now().Add(s.config.MessageDelayMin).Unix() {
  978. return false, false, "", "", "", false, errHTTPBadRequestDelayTooSmall
  979. } else if delay.Unix() > time.Now().Add(s.config.MessageDelayMax).Unix() {
  980. return false, false, "", "", "", false, errHTTPBadRequestDelayTooLarge
  981. }
  982. m.Time = delay.Unix()
  983. }
  984. actionsStr := readParam(r, "x-actions", "actions", "action")
  985. if actionsStr != "" {
  986. m.Actions, e = parseActions(actionsStr)
  987. if e != nil {
  988. return false, false, "", "", "", false, errHTTPBadRequestActionsInvalid.Wrap("%s", e.Error())
  989. }
  990. }
  991. contentType, markdown := readParam(r, "content-type", "content_type"), readBoolParam(r, false, "x-markdown", "markdown", "md")
  992. if markdown || strings.ToLower(contentType) == "text/markdown" {
  993. m.ContentType = "text/markdown"
  994. }
  995. template = templateMode(readParam(r, "x-template", "template", "tpl"))
  996. unifiedpush = readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see GET too!
  997. contentEncoding := readParam(r, "content-encoding")
  998. if unifiedpush || contentEncoding == "aes128gcm" {
  999. firebase = false
  1000. unifiedpush = true
  1001. }
  1002. m.PollID = readParam(r, "x-poll-id", "poll-id")
  1003. if m.PollID != "" {
  1004. unifiedpush = false
  1005. cache = false
  1006. email = ""
  1007. }
  1008. return cache, firebase, email, call, template, unifiedpush, nil
  1009. }
  1010. // handlePublishBody consumes the PUT/POST body and decides whether the body is an attachment or the message.
  1011. //
  1012. // 1. curl -X POST -H "Poll: 1234" ntfy.sh/...
  1013. // If a message is flagged as poll request, the body does not matter and is discarded
  1014. // 2. curl -T somebinarydata.bin "ntfy.sh/mytopic?up=1"
  1015. // If UnifiedPush is enabled, encode as base64 if body is binary, and do not trim
  1016. // 3. curl -H "Attach: http://example.com/file.jpg" ntfy.sh/mytopic
  1017. // Body must be a message, because we attached an external URL
  1018. // 4. curl -T short.txt -H "Filename: short.txt" ntfy.sh/mytopic
  1019. // Body must be attachment, because we passed a filename
  1020. // 5. curl -H "Template: yes" -T file.txt ntfy.sh/mytopic
  1021. // If templating is enabled, read up to 32k and treat message body as JSON
  1022. // 6. curl -T file.txt ntfy.sh/mytopic
  1023. // If file.txt is <= 4096 (message limit) and valid UTF-8, treat it as a message
  1024. // 7. curl -T file.txt ntfy.sh/mytopic
  1025. // In all other cases, mostly if file.txt is > message limit, treat it as an attachment
  1026. func (s *Server) handlePublishBody(r *http.Request, v *visitor, m *message, body *util.PeekedReadCloser, template templateMode, unifiedpush bool) error {
  1027. if m.Event == pollRequestEvent { // Case 1
  1028. return s.handleBodyDiscard(body)
  1029. } else if unifiedpush {
  1030. return s.handleBodyAsMessageAutoDetect(m, body) // Case 2
  1031. } else if m.Attachment != nil && m.Attachment.URL != "" {
  1032. return s.handleBodyAsTextMessage(m, body) // Case 3
  1033. } else if m.Attachment != nil && m.Attachment.Name != "" {
  1034. return s.handleBodyAsAttachment(r, v, m, body) // Case 4
  1035. } else if template.Enabled() {
  1036. return s.handleBodyAsTemplatedTextMessage(m, template, body) // Case 5
  1037. } else if !body.LimitReached && utf8.Valid(body.PeekedBytes) {
  1038. return s.handleBodyAsTextMessage(m, body) // Case 6
  1039. }
  1040. return s.handleBodyAsAttachment(r, v, m, body) // Case 7
  1041. }
  1042. func (s *Server) handleBodyDiscard(body *util.PeekedReadCloser) error {
  1043. _, err := io.Copy(io.Discard, body)
  1044. _ = body.Close()
  1045. return err
  1046. }
  1047. func (s *Server) handleBodyAsMessageAutoDetect(m *message, body *util.PeekedReadCloser) error {
  1048. if utf8.Valid(body.PeekedBytes) {
  1049. m.Message = string(body.PeekedBytes) // Do not trim
  1050. } else {
  1051. m.Message = base64.StdEncoding.EncodeToString(body.PeekedBytes)
  1052. m.Encoding = encodingBase64
  1053. }
  1054. return nil
  1055. }
  1056. func (s *Server) handleBodyAsTextMessage(m *message, body *util.PeekedReadCloser) error {
  1057. if !utf8.Valid(body.PeekedBytes) {
  1058. return errHTTPBadRequestMessageNotUTF8.With(m)
  1059. }
  1060. if len(body.PeekedBytes) > 0 { // Empty body should not override message (publish via GET!)
  1061. m.Message = strings.TrimSpace(string(body.PeekedBytes)) // Truncates the message to the peek limit if required
  1062. }
  1063. if m.Attachment != nil && m.Attachment.Name != "" && m.Message == "" {
  1064. m.Message = fmt.Sprintf(defaultAttachmentMessage, m.Attachment.Name)
  1065. }
  1066. return nil
  1067. }
  1068. func (s *Server) handleBodyAsTemplatedTextMessage(m *message, template templateMode, body *util.PeekedReadCloser) error {
  1069. body, err := util.Peek(body, max(s.config.MessageSizeLimit, jsonBodyBytesLimit))
  1070. if err != nil {
  1071. return err
  1072. } else if body.LimitReached {
  1073. return errHTTPEntityTooLargeJSONBody
  1074. }
  1075. peekedBody := strings.TrimSpace(string(body.PeekedBytes))
  1076. if templateName := template.Name(); templateName != "" {
  1077. if err := s.renderTemplateFromFile(m, templateName, peekedBody); err != nil {
  1078. return err
  1079. }
  1080. } else {
  1081. if err := s.renderTemplateFromParams(m, peekedBody); err != nil {
  1082. return err
  1083. }
  1084. }
  1085. if len(m.Title) > s.config.MessageSizeLimit || len(m.Message) > s.config.MessageSizeLimit {
  1086. return errHTTPBadRequestTemplateMessageTooLarge
  1087. }
  1088. return nil
  1089. }
  1090. // renderTemplateFromFile transforms the JSON message body according to a template from the filesystem.
  1091. // The template file must be in the templates directory, or in the configured template directory.
  1092. func (s *Server) renderTemplateFromFile(m *message, templateName, peekedBody string) error {
  1093. if !templateNameRegex.MatchString(templateName) {
  1094. return errHTTPBadRequestTemplateFileNotFound
  1095. }
  1096. templateContent, _ := templatesFs.ReadFile(filepath.Join(templatesDir, templateName+templateFileExtension)) // Read from the embedded filesystem first
  1097. if s.config.TemplateDir != "" {
  1098. if b, _ := os.ReadFile(filepath.Join(s.config.TemplateDir, templateName+templateFileExtension)); len(b) > 0 {
  1099. templateContent = b
  1100. }
  1101. }
  1102. if len(templateContent) == 0 {
  1103. return errHTTPBadRequestTemplateFileNotFound
  1104. }
  1105. var tpl templateFile
  1106. if err := yaml.Unmarshal(templateContent, &tpl); err != nil {
  1107. return errHTTPBadRequestTemplateFileInvalid
  1108. }
  1109. var err error
  1110. if tpl.Message != nil {
  1111. if m.Message, err = s.renderTemplate(*tpl.Message, peekedBody); err != nil {
  1112. return err
  1113. }
  1114. }
  1115. if tpl.Title != nil {
  1116. if m.Title, err = s.renderTemplate(*tpl.Title, peekedBody); err != nil {
  1117. return err
  1118. }
  1119. }
  1120. return nil
  1121. }
  1122. // renderTemplateFromParams transforms the JSON message body according to the inline template in the
  1123. // message and title parameters.
  1124. func (s *Server) renderTemplateFromParams(m *message, peekedBody string) error {
  1125. var err error
  1126. if m.Message, err = s.renderTemplate(m.Message, peekedBody); err != nil {
  1127. return err
  1128. }
  1129. if m.Title, err = s.renderTemplate(m.Title, peekedBody); err != nil {
  1130. return err
  1131. }
  1132. return nil
  1133. }
  1134. // renderTemplate renders a template with the given JSON source data.
  1135. func (s *Server) renderTemplate(tpl string, source string) (string, error) {
  1136. if templateDisallowedRegex.MatchString(tpl) {
  1137. return "", errHTTPBadRequestTemplateDisallowedFunctionCalls
  1138. }
  1139. var data any
  1140. if err := json.Unmarshal([]byte(source), &data); err != nil {
  1141. return "", errHTTPBadRequestTemplateMessageNotJSON
  1142. }
  1143. t, err := template.New("").Funcs(sprig.TxtFuncMap()).Parse(tpl)
  1144. if err != nil {
  1145. return "", errHTTPBadRequestTemplateInvalid.Wrap("%s", err.Error())
  1146. }
  1147. var buf bytes.Buffer
  1148. limitWriter := util.NewLimitWriter(util.NewTimeoutWriter(&buf, templateMaxExecutionTime), util.NewFixedLimiter(templateMaxOutputBytes))
  1149. if err := t.Execute(limitWriter, data); err != nil {
  1150. return "", errHTTPBadRequestTemplateExecuteFailed.Wrap("%s", err.Error())
  1151. }
  1152. return strings.TrimSpace(buf.String()), nil
  1153. }
  1154. func (s *Server) handleBodyAsAttachment(r *http.Request, v *visitor, m *message, body *util.PeekedReadCloser) error {
  1155. if s.fileCache == nil || s.config.BaseURL == "" || s.config.AttachmentCacheDir == "" {
  1156. return errHTTPBadRequestAttachmentsDisallowed.With(m)
  1157. }
  1158. vinfo, err := v.Info()
  1159. if err != nil {
  1160. return err
  1161. }
  1162. attachmentExpiry := time.Now().Add(vinfo.Limits.AttachmentExpiryDuration).Unix()
  1163. if m.Time > attachmentExpiry {
  1164. return errHTTPBadRequestAttachmentsExpiryBeforeDelivery.With(m)
  1165. }
  1166. contentLengthStr := r.Header.Get("Content-Length")
  1167. if contentLengthStr != "" { // Early "do-not-trust" check, hard limit see below
  1168. contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64)
  1169. if err == nil && (contentLength > vinfo.Stats.AttachmentTotalSizeRemaining || contentLength > vinfo.Limits.AttachmentFileSizeLimit) {
  1170. return errHTTPEntityTooLargeAttachment.With(m).Fields(log.Context{
  1171. "message_content_length": contentLength,
  1172. "attachment_total_size_remaining": vinfo.Stats.AttachmentTotalSizeRemaining,
  1173. "attachment_file_size_limit": vinfo.Limits.AttachmentFileSizeLimit,
  1174. })
  1175. }
  1176. }
  1177. if m.Attachment == nil {
  1178. m.Attachment = &attachment{}
  1179. }
  1180. var ext string
  1181. m.Attachment.Expires = attachmentExpiry
  1182. m.Attachment.Type, ext = util.DetectContentType(body.PeekedBytes, m.Attachment.Name)
  1183. m.Attachment.URL = fmt.Sprintf("%s/file/%s%s", s.config.BaseURL, m.ID, ext)
  1184. if m.Attachment.Name == "" {
  1185. m.Attachment.Name = fmt.Sprintf("attachment%s", ext)
  1186. }
  1187. if m.Message == "" {
  1188. m.Message = fmt.Sprintf(defaultAttachmentMessage, m.Attachment.Name)
  1189. }
  1190. limiters := []util.Limiter{
  1191. v.BandwidthLimiter(),
  1192. util.NewFixedLimiter(vinfo.Limits.AttachmentFileSizeLimit),
  1193. util.NewFixedLimiter(vinfo.Stats.AttachmentTotalSizeRemaining),
  1194. }
  1195. m.Attachment.Size, err = s.fileCache.Write(m.ID, body, limiters...)
  1196. if errors.Is(err, util.ErrLimitReached) {
  1197. return errHTTPEntityTooLargeAttachment.With(m)
  1198. } else if err != nil {
  1199. return err
  1200. }
  1201. return nil
  1202. }
  1203. func (s *Server) handleSubscribeJSON(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1204. encoder := func(msg *message) (string, error) {
  1205. var buf bytes.Buffer
  1206. if err := json.NewEncoder(&buf).Encode(&msg); err != nil {
  1207. return "", err
  1208. }
  1209. return buf.String(), nil
  1210. }
  1211. return s.handleSubscribeHTTP(w, r, v, "application/x-ndjson", encoder)
  1212. }
  1213. func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1214. encoder := func(msg *message) (string, error) {
  1215. var buf bytes.Buffer
  1216. if err := json.NewEncoder(&buf).Encode(&msg); err != nil {
  1217. return "", err
  1218. }
  1219. if msg.Event != messageEvent {
  1220. return fmt.Sprintf("event: %s\ndata: %s\n", msg.Event, buf.String()), nil // Browser's .onmessage() does not fire on this!
  1221. }
  1222. return fmt.Sprintf("data: %s\n", buf.String()), nil
  1223. }
  1224. return s.handleSubscribeHTTP(w, r, v, "text/event-stream", encoder)
  1225. }
  1226. func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1227. encoder := func(msg *message) (string, error) {
  1228. if msg.Event == messageEvent { // only handle default events
  1229. return strings.ReplaceAll(msg.Message, "\n", " ") + "\n", nil
  1230. }
  1231. return "\n", nil // "keepalive" and "open" events just send an empty line
  1232. }
  1233. return s.handleSubscribeHTTP(w, r, v, "text/plain", encoder)
  1234. }
  1235. func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *visitor, contentType string, encoder messageEncoder) error {
  1236. logvr(v, r).Tag(tagSubscribe).Debug("HTTP stream connection opened")
  1237. defer logvr(v, r).Tag(tagSubscribe).Debug("HTTP stream connection closed")
  1238. if !v.SubscriptionAllowed() {
  1239. return errHTTPTooManyRequestsLimitSubscriptions
  1240. }
  1241. defer v.RemoveSubscription()
  1242. topics, topicsStr, err := s.topicsFromPath(r.URL.Path)
  1243. if err != nil {
  1244. return err
  1245. }
  1246. poll, since, scheduled, filters, err := parseSubscribeParams(r)
  1247. if err != nil {
  1248. return err
  1249. }
  1250. var wlock sync.Mutex
  1251. defer func() {
  1252. // Hack: This is the fix for a horrible data race that I have not been able to figure out in quite some time.
  1253. // It appears to be happening when the Go HTTP code reads from the socket when closing the request (i.e. AFTER
  1254. // this function returns), and causes a data race with the ResponseWriter. Locking wlock here silences the
  1255. // data race detector. See https://github.com/binwiederhier/ntfy/issues/338#issuecomment-1163425889.
  1256. wlock.TryLock()
  1257. }()
  1258. sub := func(v *visitor, msg *message) error {
  1259. if !filters.Pass(msg) {
  1260. return nil
  1261. }
  1262. m, err := encoder(msg)
  1263. if err != nil {
  1264. return err
  1265. }
  1266. wlock.Lock()
  1267. defer wlock.Unlock()
  1268. if _, err := w.Write([]byte(m)); err != nil {
  1269. return err
  1270. }
  1271. if fl, ok := w.(http.Flusher); ok {
  1272. fl.Flush()
  1273. }
  1274. return nil
  1275. }
  1276. if err := s.maybeSetRateVisitors(r, v, topics); err != nil {
  1277. return err
  1278. }
  1279. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1280. w.Header().Set("Content-Type", contentType+"; charset=utf-8") // Android/Volley client needs charset!
  1281. if poll {
  1282. for _, t := range topics {
  1283. t.Keepalive()
  1284. }
  1285. return s.sendOldMessages(topics, since, scheduled, v, sub)
  1286. }
  1287. ctx, cancel := context.WithCancel(context.Background())
  1288. defer cancel()
  1289. subscriberIDs := make([]int, 0)
  1290. for _, t := range topics {
  1291. subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel))
  1292. }
  1293. defer func() {
  1294. for i, subscriberID := range subscriberIDs {
  1295. topics[i].Unsubscribe(subscriberID) // Order!
  1296. }
  1297. }()
  1298. if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message
  1299. return err
  1300. }
  1301. if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {
  1302. return err
  1303. }
  1304. for {
  1305. select {
  1306. case <-ctx.Done():
  1307. return nil
  1308. case <-r.Context().Done():
  1309. return nil
  1310. case <-time.After(s.config.KeepaliveInterval):
  1311. ev := logvr(v, r).Tag(tagSubscribe)
  1312. if len(topics) == 1 {
  1313. ev.With(topics[0]).Trace("Sending keepalive message to %s", topics[0].ID)
  1314. } else {
  1315. ev.Trace("Sending keepalive message to %d topics", len(topics))
  1316. }
  1317. v.Keepalive()
  1318. for _, t := range topics {
  1319. t.Keepalive()
  1320. }
  1321. if err := sub(v, newKeepaliveMessage(topicsStr)); err != nil { // Send keepalive message
  1322. return err
  1323. }
  1324. }
  1325. }
  1326. }
  1327. func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1328. if strings.ToLower(r.Header.Get("Upgrade")) != "websocket" {
  1329. return errHTTPBadRequestWebSocketsUpgradeHeaderMissing
  1330. }
  1331. if !v.SubscriptionAllowed() {
  1332. return errHTTPTooManyRequestsLimitSubscriptions
  1333. }
  1334. defer v.RemoveSubscription()
  1335. logvr(v, r).Tag(tagWebsocket).Debug("WebSocket connection opened")
  1336. defer logvr(v, r).Tag(tagWebsocket).Debug("WebSocket connection closed")
  1337. topics, topicsStr, err := s.topicsFromPath(r.URL.Path)
  1338. if err != nil {
  1339. return err
  1340. }
  1341. poll, since, scheduled, filters, err := parseSubscribeParams(r)
  1342. if err != nil {
  1343. return err
  1344. }
  1345. upgrader := &websocket.Upgrader{
  1346. ReadBufferSize: wsBufferSize,
  1347. WriteBufferSize: wsBufferSize,
  1348. CheckOrigin: func(r *http.Request) bool {
  1349. return true // We're open for business!
  1350. },
  1351. }
  1352. conn, err := upgrader.Upgrade(w, r, nil)
  1353. if err != nil {
  1354. return err
  1355. }
  1356. defer conn.Close()
  1357. // Subscription connections can be canceled externally, see topic.CancelSubscribersExceptUser
  1358. cancelCtx, cancel := context.WithCancel(context.Background())
  1359. defer cancel()
  1360. // Use errgroup to run WebSocket reader and writer in Go routines
  1361. var wlock sync.Mutex
  1362. g, gctx := errgroup.WithContext(cancelCtx)
  1363. g.Go(func() error {
  1364. pongWait := s.config.KeepaliveInterval + wsPongWait
  1365. conn.SetReadLimit(wsReadLimit)
  1366. if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
  1367. return err
  1368. }
  1369. conn.SetPongHandler(func(appData string) error {
  1370. logvr(v, r).Tag(tagWebsocket).Trace("Received WebSocket pong")
  1371. return conn.SetReadDeadline(time.Now().Add(pongWait))
  1372. })
  1373. for {
  1374. _, _, err := conn.NextReader()
  1375. if err != nil {
  1376. return err
  1377. }
  1378. select {
  1379. case <-gctx.Done():
  1380. return nil
  1381. default:
  1382. }
  1383. }
  1384. })
  1385. g.Go(func() error {
  1386. ping := func() error {
  1387. wlock.Lock()
  1388. defer wlock.Unlock()
  1389. if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
  1390. return err
  1391. }
  1392. logvr(v, r).Tag(tagWebsocket).Trace("Sending WebSocket ping")
  1393. return conn.WriteMessage(websocket.PingMessage, nil)
  1394. }
  1395. for {
  1396. select {
  1397. case <-gctx.Done():
  1398. return nil
  1399. case <-cancelCtx.Done():
  1400. logvr(v, r).Tag(tagWebsocket).Trace("Cancel received, closing subscriber connection")
  1401. conn.Close()
  1402. return &websocket.CloseError{Code: websocket.CloseNormalClosure, Text: "subscription was canceled"}
  1403. case <-time.After(s.config.KeepaliveInterval):
  1404. v.Keepalive()
  1405. for _, t := range topics {
  1406. t.Keepalive()
  1407. }
  1408. if err := ping(); err != nil {
  1409. return err
  1410. }
  1411. }
  1412. }
  1413. })
  1414. sub := func(v *visitor, msg *message) error {
  1415. if !filters.Pass(msg) {
  1416. return nil
  1417. }
  1418. wlock.Lock()
  1419. defer wlock.Unlock()
  1420. if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
  1421. return err
  1422. }
  1423. return conn.WriteJSON(msg)
  1424. }
  1425. if err := s.maybeSetRateVisitors(r, v, topics); err != nil {
  1426. return err
  1427. }
  1428. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1429. if poll {
  1430. for _, t := range topics {
  1431. t.Keepalive()
  1432. }
  1433. return s.sendOldMessages(topics, since, scheduled, v, sub)
  1434. }
  1435. subscriberIDs := make([]int, 0)
  1436. for _, t := range topics {
  1437. subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel))
  1438. }
  1439. defer func() {
  1440. for i, subscriberID := range subscriberIDs {
  1441. topics[i].Unsubscribe(subscriberID) // Order!
  1442. }
  1443. }()
  1444. if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message
  1445. return err
  1446. }
  1447. if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {
  1448. return err
  1449. }
  1450. err = g.Wait()
  1451. if err != nil && websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNoStatusReceived) {
  1452. logvr(v, r).Tag(tagWebsocket).Err(err).Fields(websocketErrorContext(err)).Trace("WebSocket connection closed")
  1453. return nil // Normal closures are not errors; note: "1006 (abnormal closure)" is treated as normal, because people disconnect a lot
  1454. }
  1455. return err
  1456. }
  1457. func parseSubscribeParams(r *http.Request) (poll bool, since sinceMarker, scheduled bool, filters *queryFilter, err error) {
  1458. poll = readBoolParam(r, false, "x-poll", "poll", "po")
  1459. scheduled = readBoolParam(r, false, "x-scheduled", "scheduled", "sched")
  1460. since, err = parseSince(r, poll)
  1461. if err != nil {
  1462. return
  1463. }
  1464. filters, err = parseQueryFilters(r)
  1465. if err != nil {
  1466. return
  1467. }
  1468. return
  1469. }
  1470. // maybeSetRateVisitors sets the rate visitor on a topic (v.SetRateVisitor), indicating that all messages published
  1471. // to that topic will be rate limited against the rate visitor instead of the publishing visitor.
  1472. //
  1473. // Setting the rate visitor is ony allowed if the `visitor-subscriber-rate-limiting` setting is enabled, AND
  1474. // - auth-file is not set (everything is open by default)
  1475. // - or the topic is reserved, and v.user is the owner
  1476. // - or the topic is not reserved, and v.user has write access
  1477. //
  1478. // This only applies to UnifiedPush topics ("up...").
  1479. func (s *Server) maybeSetRateVisitors(r *http.Request, v *visitor, topics []*topic) error {
  1480. // Bail out if not enabled
  1481. if !s.config.VisitorSubscriberRateLimiting {
  1482. return nil
  1483. }
  1484. // Make a list of topics that we'll actually set the RateVisitor on
  1485. eligibleRateTopics := make([]*topic, 0)
  1486. for _, t := range topics {
  1487. if strings.HasPrefix(t.ID, unifiedPushTopicPrefix) && len(t.ID) == unifiedPushTopicLength {
  1488. eligibleRateTopics = append(eligibleRateTopics, t)
  1489. }
  1490. }
  1491. if len(eligibleRateTopics) == 0 {
  1492. return nil
  1493. }
  1494. // If access controls are turned off, v has access to everything, and we can set the rate visitor
  1495. if s.userManager == nil {
  1496. return s.setRateVisitors(r, v, eligibleRateTopics)
  1497. }
  1498. // If access controls are enabled, only set rate visitor if
  1499. // - topic is reserved, and v.user is the owner
  1500. // - topic is not reserved, and v.user has write access
  1501. writableRateTopics := make([]*topic, 0)
  1502. for _, t := range topics {
  1503. if !util.Contains(eligibleRateTopics, t) {
  1504. continue
  1505. }
  1506. ownerUserID, err := s.userManager.ReservationOwner(t.ID)
  1507. if err != nil {
  1508. return err
  1509. }
  1510. if ownerUserID == "" {
  1511. if err := s.userManager.Authorize(v.User(), t.ID, user.PermissionWrite); err == nil {
  1512. writableRateTopics = append(writableRateTopics, t)
  1513. }
  1514. } else if ownerUserID == v.MaybeUserID() {
  1515. writableRateTopics = append(writableRateTopics, t)
  1516. }
  1517. }
  1518. return s.setRateVisitors(r, v, writableRateTopics)
  1519. }
  1520. func (s *Server) setRateVisitors(r *http.Request, v *visitor, rateTopics []*topic) error {
  1521. for _, t := range rateTopics {
  1522. logvr(v, r).
  1523. Tag(tagSubscribe).
  1524. With(t).
  1525. Debug("Setting visitor as rate visitor for topic %s", t.ID)
  1526. t.SetRateVisitor(v)
  1527. }
  1528. return nil
  1529. }
  1530. // sendOldMessages selects old messages from the messageCache and calls sub for each of them. It uses since as the
  1531. // marker, returning only messages that are newer than the marker.
  1532. func (s *Server) sendOldMessages(topics []*topic, since sinceMarker, scheduled bool, v *visitor, sub subscriber) error {
  1533. if since.IsNone() {
  1534. return nil
  1535. }
  1536. messages := make([]*message, 0)
  1537. for _, t := range topics {
  1538. topicMessages, err := s.messageCache.Messages(t.ID, since, scheduled)
  1539. if err != nil {
  1540. return err
  1541. }
  1542. messages = append(messages, topicMessages...)
  1543. }
  1544. sort.Slice(messages, func(i, j int) bool {
  1545. return messages[i].Time < messages[j].Time
  1546. })
  1547. for _, m := range messages {
  1548. if err := sub(v, m); err != nil {
  1549. return err
  1550. }
  1551. }
  1552. return nil
  1553. }
  1554. // parseSince returns a timestamp identifying the time span from which cached messages should be received.
  1555. //
  1556. // Values in the "since=..." parameter can be either a unix timestamp or a duration (e.g. 12h),
  1557. // "all" for all messages, or "latest" for the most recent message for a topic
  1558. func parseSince(r *http.Request, poll bool) (sinceMarker, error) {
  1559. since := readParam(r, "x-since", "since", "si")
  1560. // Easy cases (empty, all, none)
  1561. if since == "" {
  1562. if poll {
  1563. return sinceAllMessages, nil
  1564. }
  1565. return sinceNoMessages, nil
  1566. } else if since == "all" {
  1567. return sinceAllMessages, nil
  1568. } else if since == "latest" {
  1569. return sinceLatestMessage, nil
  1570. } else if since == "none" {
  1571. return sinceNoMessages, nil
  1572. }
  1573. // ID, timestamp, duration
  1574. if validMessageID(since) {
  1575. return newSinceID(since), nil
  1576. } else if s, err := strconv.ParseInt(since, 10, 64); err == nil {
  1577. return newSinceTime(s), nil
  1578. } else if d, err := time.ParseDuration(since); err == nil {
  1579. return newSinceTime(time.Now().Add(-1 * d).Unix()), nil
  1580. }
  1581. return sinceNoMessages, errHTTPBadRequestSinceInvalid
  1582. }
  1583. func (s *Server) handleOptions(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  1584. w.Header().Set("Access-Control-Allow-Methods", "GET, PUT, POST, PATCH, DELETE")
  1585. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1586. w.Header().Set("Access-Control-Allow-Headers", "*") // CORS, allow auth via JS // FIXME is this terrible?
  1587. return nil
  1588. }
  1589. // topicFromPath returns the topic from a root path (e.g. /mytopic), creating it if it doesn't exist.
  1590. func (s *Server) topicFromPath(path string) (*topic, error) {
  1591. parts := strings.Split(path, "/")
  1592. if len(parts) < 2 {
  1593. return nil, errHTTPBadRequestTopicInvalid
  1594. }
  1595. return s.topicFromID(parts[1])
  1596. }
  1597. // topicsFromPath returns the topic from a root path (e.g. /mytopic,mytopic2), creating it if it doesn't exist.
  1598. func (s *Server) topicsFromPath(path string) ([]*topic, string, error) {
  1599. parts := strings.Split(path, "/")
  1600. if len(parts) < 2 {
  1601. return nil, "", errHTTPBadRequestTopicInvalid
  1602. }
  1603. topicIDs := util.SplitNoEmpty(parts[1], ",")
  1604. topics, err := s.topicsFromIDs(topicIDs...)
  1605. if err != nil {
  1606. return nil, "", errHTTPBadRequestTopicInvalid
  1607. }
  1608. return topics, parts[1], nil
  1609. }
  1610. // topicsFromIDs returns the topics with the given IDs, creating them if they don't exist.
  1611. func (s *Server) topicsFromIDs(ids ...string) ([]*topic, error) {
  1612. s.mu.Lock()
  1613. defer s.mu.Unlock()
  1614. topics := make([]*topic, 0)
  1615. for _, id := range ids {
  1616. if util.Contains(s.config.DisallowedTopics, id) {
  1617. return nil, errHTTPBadRequestTopicDisallowed
  1618. }
  1619. if _, ok := s.topics[id]; !ok {
  1620. if len(s.topics) >= s.config.TotalTopicLimit {
  1621. return nil, errHTTPTooManyRequestsLimitTotalTopics
  1622. }
  1623. s.topics[id] = newTopic(id)
  1624. }
  1625. topics = append(topics, s.topics[id])
  1626. }
  1627. return topics, nil
  1628. }
  1629. // topicFromID returns the topic with the given ID, creating it if it doesn't exist.
  1630. func (s *Server) topicFromID(id string) (*topic, error) {
  1631. topics, err := s.topicsFromIDs(id)
  1632. if err != nil {
  1633. return nil, err
  1634. }
  1635. return topics[0], nil
  1636. }
  1637. // topicsFromPattern returns a list of topics matching the given pattern, but it does not create them.
  1638. func (s *Server) topicsFromPattern(pattern string) ([]*topic, error) {
  1639. s.mu.RLock()
  1640. defer s.mu.RUnlock()
  1641. patternRegexp, err := regexp.Compile("^" + strings.ReplaceAll(pattern, "*", ".*") + "$")
  1642. if err != nil {
  1643. return nil, err
  1644. }
  1645. topics := make([]*topic, 0)
  1646. for _, t := range s.topics {
  1647. if patternRegexp.MatchString(t.ID) {
  1648. topics = append(topics, t)
  1649. }
  1650. }
  1651. return topics, nil
  1652. }
  1653. func (s *Server) runSMTPServer() error {
  1654. s.smtpServerBackend = newMailBackend(s.config, s.handle)
  1655. s.smtpServer = smtp.NewServer(s.smtpServerBackend)
  1656. s.smtpServer.Addr = s.config.SMTPServerListen
  1657. s.smtpServer.Domain = s.config.SMTPServerDomain
  1658. s.smtpServer.ReadTimeout = 10 * time.Second
  1659. s.smtpServer.WriteTimeout = 10 * time.Second
  1660. s.smtpServer.MaxMessageBytes = 1024 * 1024 // Must be much larger than message size (headers, multipart, etc.)
  1661. s.smtpServer.MaxRecipients = 1
  1662. s.smtpServer.AllowInsecureAuth = true
  1663. return s.smtpServer.ListenAndServe()
  1664. }
  1665. func (s *Server) runManager() {
  1666. for {
  1667. select {
  1668. case <-time.After(s.config.ManagerInterval):
  1669. log.
  1670. Tag(tagManager).
  1671. Timing(s.execManager).
  1672. Debug("Manager finished")
  1673. case <-s.closeChan:
  1674. return
  1675. }
  1676. }
  1677. }
  1678. // runStatsResetter runs once a day (usually midnight UTC) to reset all the visitor's message and
  1679. // email counters. The stats are used to display the counters in the web app, as well as for rate limiting.
  1680. func (s *Server) runStatsResetter() {
  1681. for {
  1682. runAt := util.NextOccurrenceUTC(s.config.VisitorStatsResetTime, time.Now())
  1683. timer := time.NewTimer(time.Until(runAt))
  1684. log.Tag(tagResetter).Debug("Waiting until %v to reset visitor stats", runAt)
  1685. select {
  1686. case <-timer.C:
  1687. log.Tag(tagResetter).Debug("Running stats resetter")
  1688. s.resetStats()
  1689. case <-s.closeChan:
  1690. log.Tag(tagResetter).Debug("Stopping stats resetter")
  1691. timer.Stop()
  1692. return
  1693. }
  1694. }
  1695. }
  1696. func (s *Server) resetStats() {
  1697. log.Info("Resetting all visitor stats (daily task)")
  1698. s.mu.Lock()
  1699. defer s.mu.Unlock() // Includes the database query to avoid races with other processes
  1700. for _, v := range s.visitors {
  1701. v.ResetStats()
  1702. }
  1703. if s.userManager != nil {
  1704. if err := s.userManager.ResetStats(); err != nil {
  1705. log.Tag(tagResetter).Warn("Failed to write to database: %s", err.Error())
  1706. }
  1707. }
  1708. }
  1709. func (s *Server) runFirebaseKeepaliver() {
  1710. if s.firebaseClient == nil {
  1711. return
  1712. }
  1713. v := newVisitor(s.config, s.messageCache, s.userManager, netip.IPv4Unspecified(), nil) // Background process, not a real visitor, uses IP 0.0.0.0
  1714. for {
  1715. select {
  1716. case <-time.After(s.config.FirebaseKeepaliveInterval):
  1717. s.sendToFirebase(v, newKeepaliveMessage(firebaseControlTopic))
  1718. /*
  1719. FIXME: Disable iOS polling entirely for now due to thundering herd problem (see #677)
  1720. To solve this, we'd have to shard the iOS poll topics to spread out the polling evenly.
  1721. Given that it's not really necessary to poll, turning it off for now should not have any impact.
  1722. case <-time.After(s.config.FirebasePollInterval):
  1723. s.sendToFirebase(v, newKeepaliveMessage(firebasePollTopic))
  1724. */
  1725. case <-s.closeChan:
  1726. return
  1727. }
  1728. }
  1729. }
  1730. func (s *Server) runDelayedSender() {
  1731. for {
  1732. select {
  1733. case <-time.After(s.config.DelayedSenderInterval):
  1734. if err := s.sendDelayedMessages(); err != nil {
  1735. log.Tag(tagPublish).Err(err).Warn("Error sending delayed messages")
  1736. }
  1737. case <-s.closeChan:
  1738. return
  1739. }
  1740. }
  1741. }
  1742. func (s *Server) sendDelayedMessages() error {
  1743. messages, err := s.messageCache.MessagesDue()
  1744. if err != nil {
  1745. return err
  1746. }
  1747. for _, m := range messages {
  1748. var u *user.User
  1749. if s.userManager != nil && m.User != "" {
  1750. u, err = s.userManager.UserByID(m.User)
  1751. if err != nil {
  1752. log.With(m).Err(err).Warn("Error sending delayed message")
  1753. continue
  1754. }
  1755. }
  1756. v := s.visitor(m.Sender, u)
  1757. if err := s.sendDelayedMessage(v, m); err != nil {
  1758. logvm(v, m).Err(err).Warn("Error sending delayed message")
  1759. }
  1760. }
  1761. return nil
  1762. }
  1763. func (s *Server) sendDelayedMessage(v *visitor, m *message) error {
  1764. logvm(v, m).Debug("Sending delayed message")
  1765. s.mu.RLock()
  1766. t, ok := s.topics[m.Topic] // If no subscribers, just mark message as published
  1767. s.mu.RUnlock()
  1768. if ok {
  1769. go func() {
  1770. // We do not rate-limit messages here, since we've rate limited them in the PUT/POST handler
  1771. if err := t.Publish(v, m); err != nil {
  1772. logvm(v, m).Err(err).Warn("Unable to publish message")
  1773. }
  1774. }()
  1775. }
  1776. if s.firebaseClient != nil { // Firebase subscribers may not show up in topics map
  1777. go s.sendToFirebase(v, m)
  1778. }
  1779. if s.config.UpstreamBaseURL != "" {
  1780. go s.forwardPollRequest(v, m)
  1781. }
  1782. if s.config.WebPushPublicKey != "" {
  1783. go s.publishToWebPushEndpoints(v, m)
  1784. }
  1785. if err := s.messageCache.MarkPublished(m); err != nil {
  1786. return err
  1787. }
  1788. return nil
  1789. }
  1790. // transformBodyJSON peeks the request body, reads the JSON, and converts it to headers
  1791. // before passing it on to the next handler. This is meant to be used in combination with handlePublish.
  1792. func (s *Server) transformBodyJSON(next handleFunc) handleFunc {
  1793. return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1794. m, err := readJSONWithLimit[publishMessage](r.Body, s.config.MessageSizeLimit*2, false) // 2x to account for JSON format overhead
  1795. if err != nil {
  1796. return err
  1797. }
  1798. if !topicRegex.MatchString(m.Topic) {
  1799. return errHTTPBadRequestTopicInvalid
  1800. }
  1801. if m.Message == "" {
  1802. m.Message = emptyMessageBody
  1803. }
  1804. r.URL.Path = "/" + m.Topic
  1805. r.Body = io.NopCloser(strings.NewReader(m.Message))
  1806. if m.Title != "" {
  1807. r.Header.Set("X-Title", m.Title)
  1808. }
  1809. if m.Priority != 0 {
  1810. r.Header.Set("X-Priority", fmt.Sprintf("%d", m.Priority))
  1811. }
  1812. if len(m.Tags) > 0 {
  1813. r.Header.Set("X-Tags", strings.Join(m.Tags, ","))
  1814. }
  1815. if m.Attach != "" {
  1816. r.Header.Set("X-Attach", m.Attach)
  1817. }
  1818. if m.Filename != "" {
  1819. r.Header.Set("X-Filename", m.Filename)
  1820. }
  1821. if m.Click != "" {
  1822. r.Header.Set("X-Click", m.Click)
  1823. }
  1824. if m.Icon != "" {
  1825. r.Header.Set("X-Icon", m.Icon)
  1826. }
  1827. if m.Markdown {
  1828. r.Header.Set("X-Markdown", "yes")
  1829. }
  1830. if len(m.Actions) > 0 {
  1831. actionsStr, err := json.Marshal(m.Actions)
  1832. if err != nil {
  1833. return errHTTPBadRequestMessageJSONInvalid
  1834. }
  1835. r.Header.Set("X-Actions", string(actionsStr))
  1836. }
  1837. if m.Email != "" {
  1838. r.Header.Set("X-Email", m.Email)
  1839. }
  1840. if m.Delay != "" {
  1841. r.Header.Set("X-Delay", m.Delay)
  1842. }
  1843. if m.Call != "" {
  1844. r.Header.Set("X-Call", m.Call)
  1845. }
  1846. if m.Cache != "" {
  1847. r.Header.Set("X-Cache", m.Cache)
  1848. }
  1849. if m.Firebase != "" {
  1850. r.Header.Set("X-Firebase", m.Firebase)
  1851. }
  1852. return next(w, r, v)
  1853. }
  1854. }
  1855. func (s *Server) transformMatrixJSON(next handleFunc) handleFunc {
  1856. return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1857. newRequest, err := newRequestFromMatrixJSON(r, s.config.BaseURL, s.config.MessageSizeLimit)
  1858. if err != nil {
  1859. logvr(v, r).Tag(tagMatrix).Err(err).Debug("Invalid Matrix request")
  1860. if e, ok := err.(*errMatrixPushkeyRejected); ok {
  1861. return writeMatrixResponse(w, e.rejectedPushKey)
  1862. }
  1863. return err
  1864. }
  1865. if err := next(w, newRequest, v); err != nil {
  1866. logvr(v, r).Tag(tagMatrix).Err(err).Debug("Error handling Matrix request")
  1867. return err
  1868. }
  1869. return nil
  1870. }
  1871. }
  1872. func (s *Server) authorizeTopicWrite(next handleFunc) handleFunc {
  1873. return s.authorizeTopic(next, user.PermissionWrite)
  1874. }
  1875. func (s *Server) authorizeTopicRead(next handleFunc) handleFunc {
  1876. return s.authorizeTopic(next, user.PermissionRead)
  1877. }
  1878. func (s *Server) authorizeTopic(next handleFunc, perm user.Permission) handleFunc {
  1879. return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1880. if s.userManager == nil {
  1881. return next(w, r, v)
  1882. }
  1883. topics, _, err := s.topicsFromPath(r.URL.Path)
  1884. if err != nil {
  1885. return err
  1886. }
  1887. u := v.User()
  1888. for _, t := range topics {
  1889. if err := s.userManager.Authorize(u, t.ID, perm); err != nil {
  1890. logvr(v, r).With(t).Err(err).Debug("Access to topic %s not authorized", t.ID)
  1891. return errHTTPForbidden.With(t)
  1892. }
  1893. }
  1894. return next(w, r, v)
  1895. }
  1896. }
  1897. // maybeAuthenticate reads the "Authorization" header and will try to authenticate the user
  1898. // if it is set.
  1899. //
  1900. // - If auth-file is not configured, immediately return an IP-based visitor
  1901. // - If the header is not set or not supported (anything non-Basic and non-Bearer),
  1902. // an IP-based visitor is returned
  1903. // - If the header is set, authenticate will be called to check the username/password (Basic auth),
  1904. // or the token (Bearer auth), and read the user from the database
  1905. //
  1906. // This function will ALWAYS return a visitor, even if an error occurs (e.g. unauthorized), so
  1907. // that subsequent logging calls still have a visitor context.
  1908. func (s *Server) maybeAuthenticate(r *http.Request) (*visitor, error) {
  1909. // Read the "Authorization" header value and exit out early if it's not set
  1910. ip := extractIPAddress(r, s.config.BehindProxy, s.config.ProxyForwardedHeader, s.config.ProxyTrustedPrefixes)
  1911. vip := s.visitor(ip, nil)
  1912. if s.userManager == nil {
  1913. return vip, nil
  1914. }
  1915. header, err := readAuthHeader(r)
  1916. if err != nil {
  1917. return vip, err
  1918. } else if !supportedAuthHeader(header) {
  1919. return vip, nil
  1920. }
  1921. // If we're trying to auth, check the rate limiter first
  1922. if !vip.AuthAllowed() {
  1923. return vip, errHTTPTooManyRequestsLimitAuthFailure // Always return visitor, even when error occurs!
  1924. }
  1925. u, err := s.authenticate(r, header)
  1926. if err != nil {
  1927. vip.AuthFailed()
  1928. logr(r).Err(err).Debug("Authentication failed")
  1929. return vip, errHTTPUnauthorized // Always return visitor, even when error occurs!
  1930. }
  1931. // Authentication with user was successful
  1932. return s.visitor(ip, u), nil
  1933. }
  1934. // authenticate a user based on basic auth username/password (Authorization: Basic ...), or token auth (Authorization: Bearer ...).
  1935. // The Authorization header can be passed as a header or the ?auth=... query param. The latter is required only to
  1936. // support the WebSocket JavaScript class, which does not support passing headers during the initial request. The auth
  1937. // query param is effectively doubly base64 encoded. Its format is base64(Basic base64(user:pass)).
  1938. func (s *Server) authenticate(r *http.Request, header string) (user *user.User, err error) {
  1939. if strings.HasPrefix(header, "Bearer") {
  1940. return s.authenticateBearerAuth(r, strings.TrimSpace(strings.TrimPrefix(header, "Bearer")))
  1941. }
  1942. return s.authenticateBasicAuth(r, header)
  1943. }
  1944. // readAuthHeader reads the raw value of the Authorization header, either from the actual HTTP header,
  1945. // or from the ?auth... query parameter
  1946. func readAuthHeader(r *http.Request) (string, error) {
  1947. value := strings.TrimSpace(r.Header.Get("Authorization"))
  1948. queryParam := readQueryParam(r, "authorization", "auth")
  1949. if queryParam != "" {
  1950. a, err := base64.RawURLEncoding.DecodeString(queryParam)
  1951. if err != nil {
  1952. return "", err
  1953. }
  1954. value = strings.TrimSpace(string(a))
  1955. }
  1956. return value, nil
  1957. }
  1958. // supportedAuthHeader returns true only if the Authorization header value starts
  1959. // with "Basic" or "Bearer". In particular, an empty value is not supported, and neither
  1960. // are things like "WebPush", or "vapid" (see #629).
  1961. func supportedAuthHeader(value string) bool {
  1962. value = strings.ToLower(value)
  1963. return strings.HasPrefix(value, "basic ") || strings.HasPrefix(value, "bearer ")
  1964. }
  1965. func (s *Server) authenticateBasicAuth(r *http.Request, value string) (user *user.User, err error) {
  1966. r.Header.Set("Authorization", value)
  1967. username, password, ok := r.BasicAuth()
  1968. if !ok {
  1969. return nil, errors.New("invalid basic auth")
  1970. } else if username == "" {
  1971. return s.authenticateBearerAuth(r, password) // Treat password as token
  1972. }
  1973. return s.userManager.Authenticate(username, password)
  1974. }
  1975. func (s *Server) authenticateBearerAuth(r *http.Request, token string) (*user.User, error) {
  1976. u, err := s.userManager.AuthenticateToken(token)
  1977. if err != nil {
  1978. return nil, err
  1979. }
  1980. ip := extractIPAddress(r, s.config.BehindProxy, s.config.ProxyForwardedHeader, s.config.ProxyTrustedPrefixes)
  1981. go s.userManager.EnqueueTokenUpdate(token, &user.TokenUpdate{
  1982. LastAccess: time.Now(),
  1983. LastOrigin: ip,
  1984. })
  1985. return u, nil
  1986. }
  1987. func (s *Server) visitor(ip netip.Addr, user *user.User) *visitor {
  1988. s.mu.Lock()
  1989. defer s.mu.Unlock()
  1990. id := visitorID(ip, user, s.config)
  1991. v, exists := s.visitors[id]
  1992. if !exists {
  1993. s.visitors[id] = newVisitor(s.config, s.messageCache, s.userManager, ip, user)
  1994. return s.visitors[id]
  1995. }
  1996. v.Keepalive()
  1997. v.SetUser(user) // Always update with the latest user, may be nil!
  1998. return v
  1999. }
  2000. func (s *Server) writeJSON(w http.ResponseWriter, v any) error {
  2001. return s.writeJSONWithContentType(w, v, "application/json")
  2002. }
  2003. func (s *Server) writeJSONWithContentType(w http.ResponseWriter, v any, contentType string) error {
  2004. w.Header().Set("Content-Type", contentType)
  2005. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  2006. if err := json.NewEncoder(w).Encode(v); err != nil {
  2007. return err
  2008. }
  2009. return nil
  2010. }
  2011. func (s *Server) updateAndWriteStats(messagesCount int64) {
  2012. s.mu.Lock()
  2013. s.messagesHistory = append(s.messagesHistory, messagesCount)
  2014. if len(s.messagesHistory) > messagesHistoryMax {
  2015. s.messagesHistory = s.messagesHistory[1:]
  2016. }
  2017. s.mu.Unlock()
  2018. go func() {
  2019. if err := s.messageCache.UpdateStats(messagesCount); err != nil {
  2020. log.Tag(tagManager).Err(err).Warn("Cannot write messages stats")
  2021. }
  2022. }()
  2023. }