server.go 76 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069
  1. package server
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/sha256"
  6. "embed"
  7. "encoding/base64"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "io"
  12. "net"
  13. "net/http"
  14. "net/http/pprof"
  15. "net/netip"
  16. "net/url"
  17. "os"
  18. "path"
  19. "path/filepath"
  20. "regexp"
  21. "sort"
  22. "strconv"
  23. "strings"
  24. "sync"
  25. "text/template"
  26. "time"
  27. "unicode/utf8"
  28. "github.com/emersion/go-smtp"
  29. "github.com/gorilla/websocket"
  30. "github.com/prometheus/client_golang/prometheus/promhttp"
  31. "golang.org/x/sync/errgroup"
  32. "heckel.io/ntfy/v2/log"
  33. "heckel.io/ntfy/v2/user"
  34. "heckel.io/ntfy/v2/util"
  35. )
  36. // Server is the main server, providing the UI and API for ntfy
  37. type Server struct {
  38. config *Config
  39. httpServer *http.Server
  40. httpsServer *http.Server
  41. httpMetricsServer *http.Server
  42. httpProfileServer *http.Server
  43. unixListener net.Listener
  44. smtpServer *smtp.Server
  45. smtpServerBackend *smtpBackend
  46. smtpSender mailer
  47. topics map[string]*topic
  48. visitors map[string]*visitor // ip:<ip> or user:<user>
  49. firebaseClient *firebaseClient
  50. messages int64 // Total number of messages (persisted if messageCache enabled)
  51. messagesHistory []int64 // Last n values of the messages counter, used to determine rate
  52. userManager *user.Manager // Might be nil!
  53. messageCache *messageCache // Database that stores the messages
  54. webPush *webPushStore // Database that stores web push subscriptions
  55. fileCache *fileCache // File system based cache that stores attachments
  56. stripe stripeAPI // Stripe API, can be replaced with a mock
  57. priceCache *util.LookupCache[map[string]int64] // Stripe price ID -> price as cents (USD implied!)
  58. metricsHandler http.Handler // Handles /metrics if enable-metrics set, and listen-metrics-http not set
  59. closeChan chan bool
  60. mu sync.RWMutex
  61. }
  62. // handleFunc extends the normal http.HandlerFunc to be able to easily return errors
  63. type handleFunc func(http.ResponseWriter, *http.Request, *visitor) error
  64. var (
  65. // If changed, don't forget to update Android App and auth_sqlite.go
  66. topicRegex = regexp.MustCompile(`^[-_A-Za-z0-9]{1,64}$`) // No /!
  67. topicPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app!
  68. externalTopicPathRegex = regexp.MustCompile(`^/[^/]+\.[^/]+/[-_A-Za-z0-9]{1,64}$`) // Extended topic path, for web-app, e.g. /example.com/mytopic
  69. jsonPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/json$`)
  70. ssePathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/sse$`)
  71. rawPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/raw$`)
  72. wsPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/ws$`)
  73. authPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/auth$`)
  74. publishPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}/(publish|send|trigger)$`)
  75. webConfigPath = "/config.js"
  76. webManifestPath = "/manifest.webmanifest"
  77. webRootHTMLPath = "/app.html"
  78. webServiceWorkerPath = "/sw.js"
  79. accountPath = "/account"
  80. matrixPushPath = "/_matrix/push/v1/notify"
  81. metricsPath = "/metrics"
  82. apiHealthPath = "/v1/health"
  83. apiStatsPath = "/v1/stats"
  84. apiWebPushPath = "/v1/webpush"
  85. apiTiersPath = "/v1/tiers"
  86. apiUsersPath = "/v1/users"
  87. apiUsersAccessPath = "/v1/users/access"
  88. apiAccountPath = "/v1/account"
  89. apiAccountTokenPath = "/v1/account/token"
  90. apiAccountPasswordPath = "/v1/account/password"
  91. apiAccountSettingsPath = "/v1/account/settings"
  92. apiAccountSubscriptionPath = "/v1/account/subscription"
  93. apiAccountReservationPath = "/v1/account/reservation"
  94. apiAccountPhonePath = "/v1/account/phone"
  95. apiAccountPhoneVerifyPath = "/v1/account/phone/verify"
  96. apiAccountBillingPortalPath = "/v1/account/billing/portal"
  97. apiAccountBillingWebhookPath = "/v1/account/billing/webhook"
  98. apiAccountBillingSubscriptionPath = "/v1/account/billing/subscription"
  99. apiAccountBillingSubscriptionCheckoutSuccessTemplate = "/v1/account/billing/subscription/success/{CHECKOUT_SESSION_ID}"
  100. apiAccountBillingSubscriptionCheckoutSuccessRegex = regexp.MustCompile(`/v1/account/billing/subscription/success/(.+)$`)
  101. apiAccountReservationSingleRegex = regexp.MustCompile(`/v1/account/reservation/([-_A-Za-z0-9]{1,64})$`)
  102. staticRegex = regexp.MustCompile(`^/static/.+`)
  103. docsRegex = regexp.MustCompile(`^/docs(|/.*)$`)
  104. fileRegex = regexp.MustCompile(`^/file/([-_A-Za-z0-9]{1,64})(?:\.[A-Za-z0-9]{1,16})?$`)
  105. urlRegex = regexp.MustCompile(`^https?://`)
  106. phoneNumberRegex = regexp.MustCompile(`^\+\d{1,100}$`)
  107. //go:embed site
  108. webFs embed.FS
  109. webFsCached = &util.CachingEmbedFS{ModTime: time.Now(), FS: webFs}
  110. webSiteDir = "/site"
  111. webAppIndex = "/app.html" // React app
  112. //go:embed docs
  113. docsStaticFs embed.FS
  114. docsStaticCached = &util.CachingEmbedFS{ModTime: time.Now(), FS: docsStaticFs}
  115. )
  116. const (
  117. firebaseControlTopic = "~control" // See Android if changed
  118. firebasePollTopic = "~poll" // See iOS if changed (DISABLED for now)
  119. emptyMessageBody = "triggered" // Used if message body is empty
  120. newMessageBody = "New message" // Used in poll requests as generic message
  121. defaultAttachmentMessage = "You received a file: %s" // Used if message body is empty, and there is an attachment
  122. encodingBase64 = "base64" // Used mainly for binary UnifiedPush messages
  123. jsonBodyBytesLimit = 32768 // Max number of bytes for a request bodys (unless MessageLimit is higher)
  124. unifiedPushTopicPrefix = "up" // Temporarily, we rate limit all "up*" topics based on the subscriber
  125. unifiedPushTopicLength = 14 // Length of UnifiedPush topics, including the "up" part
  126. messagesHistoryMax = 10 // Number of message count values to keep in memory
  127. templateMaxExecutionTime = 100 * time.Millisecond
  128. )
  129. var (
  130. // templateDisallowedRegex tests a template for disallowed expressions. While not really dangerous, they
  131. // are not useful, and seem potentially troublesome.
  132. templateDisallowedRegex = regexp.MustCompile(`(?m)\{\{-?\s*(call|template|define)\b`)
  133. )
  134. // WebSocket constants
  135. const (
  136. wsWriteWait = 2 * time.Second
  137. wsBufferSize = 1024
  138. wsReadLimit = 64 // We only ever receive PINGs
  139. wsPongWait = 15 * time.Second
  140. )
  141. // New instantiates a new Server. It creates the cache and adds a Firebase
  142. // subscriber (if configured).
  143. func New(conf *Config) (*Server, error) {
  144. var mailer mailer
  145. if conf.SMTPSenderAddr != "" {
  146. mailer = &smtpSender{config: conf}
  147. }
  148. var stripe stripeAPI
  149. if conf.StripeSecretKey != "" {
  150. stripe = newStripeAPI()
  151. }
  152. messageCache, err := createMessageCache(conf)
  153. if err != nil {
  154. return nil, err
  155. }
  156. var webPush *webPushStore
  157. if conf.WebPushPublicKey != "" {
  158. webPush, err = newWebPushStore(conf.WebPushFile, conf.WebPushStartupQueries)
  159. if err != nil {
  160. return nil, err
  161. }
  162. }
  163. topics, err := messageCache.Topics()
  164. if err != nil {
  165. return nil, err
  166. }
  167. messages, err := messageCache.Stats()
  168. if err != nil {
  169. return nil, err
  170. }
  171. var fileCache *fileCache
  172. if conf.AttachmentCacheDir != "" {
  173. fileCache, err = newFileCache(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit)
  174. if err != nil {
  175. return nil, err
  176. }
  177. }
  178. var userManager *user.Manager
  179. if conf.AuthFile != "" {
  180. authConfig := &user.Config{
  181. Filename: conf.AuthFile,
  182. StartupQueries: conf.AuthStartupQueries,
  183. DefaultAccess: conf.AuthDefault,
  184. BcryptCost: conf.AuthBcryptCost,
  185. QueueWriterInterval: conf.AuthStatsQueueWriterInterval,
  186. }
  187. userManager, err = user.NewManager(authConfig)
  188. if err != nil {
  189. return nil, err
  190. }
  191. }
  192. var firebaseClient *firebaseClient
  193. if conf.FirebaseKeyFile != "" {
  194. sender, err := newFirebaseSender(conf.FirebaseKeyFile)
  195. if err != nil {
  196. return nil, err
  197. }
  198. // This awkward logic is required because Go is weird about nil types and interfaces.
  199. // See issue #641, and https://go.dev/play/p/uur1flrv1t3 for an example
  200. var auther user.Auther
  201. if userManager != nil {
  202. auther = userManager
  203. }
  204. firebaseClient = newFirebaseClient(sender, auther)
  205. }
  206. s := &Server{
  207. config: conf,
  208. messageCache: messageCache,
  209. webPush: webPush,
  210. fileCache: fileCache,
  211. firebaseClient: firebaseClient,
  212. smtpSender: mailer,
  213. topics: topics,
  214. userManager: userManager,
  215. messages: messages,
  216. messagesHistory: []int64{messages},
  217. visitors: make(map[string]*visitor),
  218. stripe: stripe,
  219. }
  220. s.priceCache = util.NewLookupCache(s.fetchStripePrices, conf.StripePriceCacheDuration)
  221. return s, nil
  222. }
  223. func createMessageCache(conf *Config) (*messageCache, error) {
  224. if conf.CacheDuration == 0 {
  225. return newNopCache()
  226. } else if conf.CacheFile != "" {
  227. return newSqliteCache(conf.CacheFile, conf.CacheStartupQueries, conf.CacheDuration, conf.CacheBatchSize, conf.CacheBatchTimeout, false)
  228. }
  229. return newMemCache()
  230. }
  231. // Run executes the main server. It listens on HTTP (+ HTTPS, if configured), and starts
  232. // a manager go routine to print stats and prune messages.
  233. func (s *Server) Run() error {
  234. var listenStr string
  235. if s.config.ListenHTTP != "" {
  236. listenStr += fmt.Sprintf(" %s[http]", s.config.ListenHTTP)
  237. }
  238. if s.config.ListenHTTPS != "" {
  239. listenStr += fmt.Sprintf(" %s[https]", s.config.ListenHTTPS)
  240. }
  241. if s.config.ListenUnix != "" {
  242. listenStr += fmt.Sprintf(" %s[unix]", s.config.ListenUnix)
  243. }
  244. if s.config.SMTPServerListen != "" {
  245. listenStr += fmt.Sprintf(" %s[smtp]", s.config.SMTPServerListen)
  246. }
  247. if s.config.MetricsListenHTTP != "" {
  248. listenStr += fmt.Sprintf(" %s[http/metrics]", s.config.MetricsListenHTTP)
  249. }
  250. if s.config.ProfileListenHTTP != "" {
  251. listenStr += fmt.Sprintf(" %s[http/profile]", s.config.ProfileListenHTTP)
  252. }
  253. log.Tag(tagStartup).Info("Listening on%s, ntfy %s, log level is %s", listenStr, s.config.Version, log.CurrentLevel().String())
  254. if log.IsFile() {
  255. fmt.Fprintf(os.Stderr, "Listening on%s, ntfy %s\n", listenStr, s.config.Version)
  256. fmt.Fprintf(os.Stderr, "Logs are written to %s\n", log.File())
  257. }
  258. mux := http.NewServeMux()
  259. mux.HandleFunc("/", s.handle)
  260. errChan := make(chan error)
  261. s.mu.Lock()
  262. s.closeChan = make(chan bool)
  263. if s.config.ListenHTTP != "" {
  264. s.httpServer = &http.Server{Addr: s.config.ListenHTTP, Handler: mux}
  265. go func() {
  266. errChan <- s.httpServer.ListenAndServe()
  267. }()
  268. }
  269. if s.config.ListenHTTPS != "" {
  270. s.httpsServer = &http.Server{Addr: s.config.ListenHTTPS, Handler: mux}
  271. go func() {
  272. errChan <- s.httpsServer.ListenAndServeTLS(s.config.CertFile, s.config.KeyFile)
  273. }()
  274. }
  275. if s.config.ListenUnix != "" {
  276. go func() {
  277. var err error
  278. s.mu.Lock()
  279. os.Remove(s.config.ListenUnix)
  280. s.unixListener, err = net.Listen("unix", s.config.ListenUnix)
  281. if err != nil {
  282. s.mu.Unlock()
  283. errChan <- err
  284. return
  285. }
  286. defer s.unixListener.Close()
  287. if s.config.ListenUnixMode > 0 {
  288. if err := os.Chmod(s.config.ListenUnix, s.config.ListenUnixMode); err != nil {
  289. s.mu.Unlock()
  290. errChan <- err
  291. return
  292. }
  293. }
  294. s.mu.Unlock()
  295. httpServer := &http.Server{Handler: mux}
  296. errChan <- httpServer.Serve(s.unixListener)
  297. }()
  298. }
  299. if s.config.MetricsListenHTTP != "" {
  300. initMetrics()
  301. s.httpMetricsServer = &http.Server{Addr: s.config.MetricsListenHTTP, Handler: promhttp.Handler()}
  302. go func() {
  303. errChan <- s.httpMetricsServer.ListenAndServe()
  304. }()
  305. } else if s.config.EnableMetrics {
  306. initMetrics()
  307. s.metricsHandler = promhttp.Handler()
  308. }
  309. if s.config.ProfileListenHTTP != "" {
  310. profileMux := http.NewServeMux()
  311. profileMux.HandleFunc("/debug/pprof/", pprof.Index)
  312. profileMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
  313. profileMux.HandleFunc("/debug/pprof/profile", pprof.Profile)
  314. profileMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
  315. profileMux.HandleFunc("/debug/pprof/trace", pprof.Trace)
  316. s.httpProfileServer = &http.Server{Addr: s.config.ProfileListenHTTP, Handler: profileMux}
  317. go func() {
  318. errChan <- s.httpProfileServer.ListenAndServe()
  319. }()
  320. }
  321. if s.config.SMTPServerListen != "" {
  322. go func() {
  323. errChan <- s.runSMTPServer()
  324. }()
  325. }
  326. s.mu.Unlock()
  327. go s.runManager()
  328. go s.runStatsResetter()
  329. go s.runDelayedSender()
  330. go s.runFirebaseKeepaliver()
  331. return <-errChan
  332. }
  333. // Stop stops HTTP (+HTTPS) server and all managers
  334. func (s *Server) Stop() {
  335. s.mu.Lock()
  336. defer s.mu.Unlock()
  337. if s.httpServer != nil {
  338. s.httpServer.Close()
  339. }
  340. if s.httpsServer != nil {
  341. s.httpsServer.Close()
  342. }
  343. if s.unixListener != nil {
  344. s.unixListener.Close()
  345. }
  346. if s.smtpServer != nil {
  347. s.smtpServer.Close()
  348. }
  349. s.closeDatabases()
  350. close(s.closeChan)
  351. }
  352. func (s *Server) closeDatabases() {
  353. if s.userManager != nil {
  354. s.userManager.Close()
  355. }
  356. s.messageCache.Close()
  357. if s.webPush != nil {
  358. s.webPush.Close()
  359. }
  360. }
  361. // handle is the main entry point for all HTTP requests
  362. func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
  363. v, err := s.maybeAuthenticate(r) // Note: Always returns v, even when error is returned
  364. if err != nil {
  365. s.handleError(w, r, v, err)
  366. return
  367. }
  368. ev := logvr(v, r)
  369. if ev.IsTrace() {
  370. ev.Field("http_request", renderHTTPRequest(r)).Trace("HTTP request started")
  371. } else if logvr(v, r).IsDebug() {
  372. ev.Debug("HTTP request started")
  373. }
  374. logvr(v, r).
  375. Timing(func() {
  376. if err := s.handleInternal(w, r, v); err != nil {
  377. s.handleError(w, r, v, err)
  378. return
  379. }
  380. if metricHTTPRequests != nil {
  381. metricHTTPRequests.WithLabelValues("200", "20000", r.Method).Inc()
  382. }
  383. }).
  384. Debug("HTTP request finished")
  385. }
  386. func (s *Server) handleError(w http.ResponseWriter, r *http.Request, v *visitor, err error) {
  387. httpErr, ok := err.(*errHTTP)
  388. if !ok {
  389. httpErr = errHTTPInternalError
  390. }
  391. if metricHTTPRequests != nil {
  392. metricHTTPRequests.WithLabelValues(fmt.Sprintf("%d", httpErr.HTTPCode), fmt.Sprintf("%d", httpErr.Code), r.Method).Inc()
  393. }
  394. isRateLimiting := util.Contains(rateLimitingErrorCodes, httpErr.HTTPCode)
  395. isNormalError := strings.Contains(err.Error(), "i/o timeout") || util.Contains(normalErrorCodes, httpErr.HTTPCode)
  396. ev := logvr(v, r).Err(err)
  397. if websocket.IsWebSocketUpgrade(r) {
  398. ev.Tag(tagWebsocket).Fields(websocketErrorContext(err))
  399. if isNormalError {
  400. ev.Debug("WebSocket error (this error is okay, it happens a lot): %s", err.Error())
  401. } else {
  402. ev.Info("WebSocket error: %s", err.Error())
  403. }
  404. w.WriteHeader(httpErr.HTTPCode)
  405. return // Do not attempt to write any body to upgraded connection
  406. }
  407. if isNormalError {
  408. ev.Debug("Connection closed with HTTP %d (ntfy error %d)", httpErr.HTTPCode, httpErr.Code)
  409. } else {
  410. ev.Info("Connection closed with HTTP %d (ntfy error %d)", httpErr.HTTPCode, httpErr.Code)
  411. }
  412. if isRateLimiting && s.config.StripeSecretKey != "" {
  413. u := v.User()
  414. if u == nil || u.Tier == nil {
  415. httpErr = httpErr.Wrap("increase your limits with a paid plan, see %s", s.config.BaseURL)
  416. }
  417. }
  418. w.Header().Set("Content-Type", "application/json")
  419. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  420. w.WriteHeader(httpErr.HTTPCode)
  421. io.WriteString(w, httpErr.JSON()+"\n")
  422. }
  423. func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request, v *visitor) error {
  424. if r.Method == http.MethodGet && r.URL.Path == "/" && s.config.WebRoot == "/" {
  425. return s.ensureWebEnabled(s.handleRoot)(w, r, v)
  426. } else if r.Method == http.MethodHead && r.URL.Path == "/" {
  427. return s.ensureWebEnabled(s.handleEmpty)(w, r, v)
  428. } else if r.Method == http.MethodGet && r.URL.Path == apiHealthPath {
  429. return s.handleHealth(w, r, v)
  430. } else if r.Method == http.MethodGet && r.URL.Path == webConfigPath {
  431. return s.ensureWebEnabled(s.handleWebConfig)(w, r, v)
  432. } else if r.Method == http.MethodGet && r.URL.Path == webManifestPath {
  433. return s.ensureWebPushEnabled(s.handleWebManifest)(w, r, v)
  434. } else if r.Method == http.MethodGet && r.URL.Path == apiUsersPath {
  435. return s.ensureAdmin(s.handleUsersGet)(w, r, v)
  436. } else if r.Method == http.MethodPost && r.URL.Path == apiUsersPath {
  437. return s.ensureAdmin(s.handleUsersAdd)(w, r, v)
  438. } else if r.Method == http.MethodPut && r.URL.Path == apiUsersPath {
  439. return s.ensureAdmin(s.handleUsersUpdate)(w, r, v)
  440. } else if r.Method == http.MethodDelete && r.URL.Path == apiUsersPath {
  441. return s.ensureAdmin(s.handleUsersDelete)(w, r, v)
  442. } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && r.URL.Path == apiUsersAccessPath {
  443. return s.ensureAdmin(s.handleAccessAllow)(w, r, v)
  444. } else if r.Method == http.MethodDelete && r.URL.Path == apiUsersAccessPath {
  445. return s.ensureAdmin(s.handleAccessReset)(w, r, v)
  446. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountPath {
  447. return s.ensureUserManager(s.handleAccountCreate)(w, r, v)
  448. } else if r.Method == http.MethodGet && r.URL.Path == apiAccountPath {
  449. return s.handleAccountGet(w, r, v) // Allowed by anonymous
  450. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountPath {
  451. return s.ensureUser(s.withAccountSync(s.handleAccountDelete))(w, r, v)
  452. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountPasswordPath {
  453. return s.ensureUser(s.handleAccountPasswordChange)(w, r, v)
  454. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountTokenPath {
  455. return s.ensureUser(s.withAccountSync(s.handleAccountTokenCreate))(w, r, v)
  456. } else if r.Method == http.MethodPatch && r.URL.Path == apiAccountTokenPath {
  457. return s.ensureUser(s.withAccountSync(s.handleAccountTokenUpdate))(w, r, v)
  458. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountTokenPath {
  459. return s.ensureUser(s.withAccountSync(s.handleAccountTokenDelete))(w, r, v)
  460. } else if r.Method == http.MethodPatch && r.URL.Path == apiAccountSettingsPath {
  461. return s.ensureUser(s.withAccountSync(s.handleAccountSettingsChange))(w, r, v)
  462. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountSubscriptionPath {
  463. return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionAdd))(w, r, v)
  464. } else if r.Method == http.MethodPatch && r.URL.Path == apiAccountSubscriptionPath {
  465. return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionChange))(w, r, v)
  466. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountSubscriptionPath {
  467. return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionDelete))(w, r, v)
  468. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountReservationPath {
  469. return s.ensureUser(s.withAccountSync(s.handleAccountReservationAdd))(w, r, v)
  470. } else if r.Method == http.MethodDelete && apiAccountReservationSingleRegex.MatchString(r.URL.Path) {
  471. return s.ensureUser(s.withAccountSync(s.handleAccountReservationDelete))(w, r, v)
  472. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingSubscriptionPath {
  473. return s.ensurePaymentsEnabled(s.ensureUser(s.handleAccountBillingSubscriptionCreate))(w, r, v) // Account sync via incoming Stripe webhook
  474. } else if r.Method == http.MethodGet && apiAccountBillingSubscriptionCheckoutSuccessRegex.MatchString(r.URL.Path) {
  475. return s.ensurePaymentsEnabled(s.ensureUserManager(s.handleAccountBillingSubscriptionCreateSuccess))(w, r, v) // No user context!
  476. } else if r.Method == http.MethodPut && r.URL.Path == apiAccountBillingSubscriptionPath {
  477. return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingSubscriptionUpdate))(w, r, v) // Account sync via incoming Stripe webhook
  478. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountBillingSubscriptionPath {
  479. return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingSubscriptionDelete))(w, r, v) // Account sync via incoming Stripe webhook
  480. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingPortalPath {
  481. return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingPortalSessionCreate))(w, r, v)
  482. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingWebhookPath {
  483. return s.ensurePaymentsEnabled(s.ensureUserManager(s.handleAccountBillingWebhook))(w, r, v) // This request comes from Stripe!
  484. } else if r.Method == http.MethodPut && r.URL.Path == apiAccountPhoneVerifyPath {
  485. return s.ensureUser(s.ensureCallsEnabled(s.withAccountSync(s.handleAccountPhoneNumberVerify)))(w, r, v)
  486. } else if r.Method == http.MethodPut && r.URL.Path == apiAccountPhonePath {
  487. return s.ensureUser(s.ensureCallsEnabled(s.withAccountSync(s.handleAccountPhoneNumberAdd)))(w, r, v)
  488. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountPhonePath {
  489. return s.ensureUser(s.ensureCallsEnabled(s.withAccountSync(s.handleAccountPhoneNumberDelete)))(w, r, v)
  490. } else if r.Method == http.MethodPost && apiWebPushPath == r.URL.Path {
  491. return s.ensureWebPushEnabled(s.limitRequests(s.handleWebPushUpdate))(w, r, v)
  492. } else if r.Method == http.MethodDelete && apiWebPushPath == r.URL.Path {
  493. return s.ensureWebPushEnabled(s.limitRequests(s.handleWebPushDelete))(w, r, v)
  494. } else if r.Method == http.MethodGet && r.URL.Path == apiStatsPath {
  495. return s.handleStats(w, r, v)
  496. } else if r.Method == http.MethodGet && r.URL.Path == apiTiersPath {
  497. return s.ensurePaymentsEnabled(s.handleBillingTiersGet)(w, r, v)
  498. } else if r.Method == http.MethodGet && r.URL.Path == matrixPushPath {
  499. return s.handleMatrixDiscovery(w)
  500. } else if r.Method == http.MethodGet && r.URL.Path == metricsPath && s.metricsHandler != nil {
  501. return s.handleMetrics(w, r, v)
  502. } else if r.Method == http.MethodGet && (staticRegex.MatchString(r.URL.Path) || r.URL.Path == webServiceWorkerPath || r.URL.Path == webRootHTMLPath) {
  503. return s.ensureWebEnabled(s.handleStatic)(w, r, v)
  504. } else if r.Method == http.MethodGet && docsRegex.MatchString(r.URL.Path) {
  505. return s.ensureWebEnabled(s.handleDocs)(w, r, v)
  506. } else if (r.Method == http.MethodGet || r.Method == http.MethodHead) && fileRegex.MatchString(r.URL.Path) && s.config.AttachmentCacheDir != "" {
  507. return s.limitRequests(s.handleFile)(w, r, v)
  508. } else if r.Method == http.MethodOptions {
  509. return s.limitRequests(s.handleOptions)(w, r, v) // Should work even if the web app is not enabled, see #598
  510. } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && r.URL.Path == "/" {
  511. return s.transformBodyJSON(s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish)))(w, r, v)
  512. } else if r.Method == http.MethodPost && r.URL.Path == matrixPushPath {
  513. return s.transformMatrixJSON(s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublishMatrix)))(w, r, v)
  514. } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && topicPathRegex.MatchString(r.URL.Path) {
  515. return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish))(w, r, v)
  516. } else if r.Method == http.MethodGet && publishPathRegex.MatchString(r.URL.Path) {
  517. return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish))(w, r, v)
  518. } else if r.Method == http.MethodGet && jsonPathRegex.MatchString(r.URL.Path) {
  519. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeJSON))(w, r, v)
  520. } else if r.Method == http.MethodGet && ssePathRegex.MatchString(r.URL.Path) {
  521. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeSSE))(w, r, v)
  522. } else if r.Method == http.MethodGet && rawPathRegex.MatchString(r.URL.Path) {
  523. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeRaw))(w, r, v)
  524. } else if r.Method == http.MethodGet && wsPathRegex.MatchString(r.URL.Path) {
  525. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeWS))(w, r, v)
  526. } else if r.Method == http.MethodGet && authPathRegex.MatchString(r.URL.Path) {
  527. return s.limitRequests(s.authorizeTopicRead(s.handleTopicAuth))(w, r, v)
  528. } else if r.Method == http.MethodGet && (topicPathRegex.MatchString(r.URL.Path) || externalTopicPathRegex.MatchString(r.URL.Path)) {
  529. return s.ensureWebEnabled(s.handleTopic)(w, r, v)
  530. }
  531. return errHTTPNotFound
  532. }
  533. func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request, v *visitor) error {
  534. r.URL.Path = webAppIndex
  535. return s.handleStatic(w, r, v)
  536. }
  537. func (s *Server) handleTopic(w http.ResponseWriter, r *http.Request, v *visitor) error {
  538. unifiedpush := readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see PUT/POST too!
  539. if unifiedpush {
  540. w.Header().Set("Content-Type", "application/json")
  541. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  542. _, err := io.WriteString(w, `{"unifiedpush":{"version":1}}`+"\n")
  543. return err
  544. }
  545. r.URL.Path = webAppIndex
  546. return s.handleStatic(w, r, v)
  547. }
  548. func (s *Server) handleEmpty(_ http.ResponseWriter, _ *http.Request, _ *visitor) error {
  549. return nil
  550. }
  551. func (s *Server) handleTopicAuth(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  552. return s.writeJSON(w, newSuccessResponse())
  553. }
  554. func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  555. response := &apiHealthResponse{
  556. Healthy: true,
  557. }
  558. return s.writeJSON(w, response)
  559. }
  560. func (s *Server) handleWebConfig(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  561. response := &apiConfigResponse{
  562. BaseURL: "", // Will translate to window.location.origin
  563. AppRoot: s.config.WebRoot,
  564. EnableLogin: s.config.EnableLogin,
  565. EnableSignup: s.config.EnableSignup,
  566. EnablePayments: s.config.StripeSecretKey != "",
  567. EnableCalls: s.config.TwilioAccount != "",
  568. EnableEmails: s.config.SMTPSenderFrom != "",
  569. EnableReservations: s.config.EnableReservations,
  570. EnableWebPush: s.config.WebPushPublicKey != "",
  571. BillingContact: s.config.BillingContact,
  572. WebPushPublicKey: s.config.WebPushPublicKey,
  573. DisallowedTopics: s.config.DisallowedTopics,
  574. }
  575. b, err := json.MarshalIndent(response, "", " ")
  576. if err != nil {
  577. return err
  578. }
  579. w.Header().Set("Content-Type", "text/javascript")
  580. w.Header().Set("Cache-Control", "no-cache")
  581. _, err = io.WriteString(w, fmt.Sprintf("// Generated server configuration\nvar config = %s;\n", string(b)))
  582. return err
  583. }
  584. // handleWebManifest serves the web app manifest for the progressive web app (PWA)
  585. func (s *Server) handleWebManifest(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  586. response := &webManifestResponse{
  587. Name: "ntfy web",
  588. Description: "ntfy lets you send push notifications via scripts from any computer or phone",
  589. ShortName: "ntfy",
  590. Scope: "/",
  591. StartURL: s.config.WebRoot,
  592. Display: "standalone",
  593. BackgroundColor: "#ffffff",
  594. ThemeColor: "#317f6f",
  595. Icons: []*webManifestIcon{
  596. {SRC: "/static/images/pwa-192x192.png", Sizes: "192x192", Type: "image/png"},
  597. {SRC: "/static/images/pwa-512x512.png", Sizes: "512x512", Type: "image/png"},
  598. },
  599. }
  600. return s.writeJSONWithContentType(w, response, "application/manifest+json")
  601. }
  602. // handleMetrics returns Prometheus metrics. This endpoint is only called if enable-metrics is set,
  603. // and listen-metrics-http is not set.
  604. func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request, _ *visitor) error {
  605. s.metricsHandler.ServeHTTP(w, r)
  606. return nil
  607. }
  608. // handleStatic returns all static resources (excluding the docs), including the web app
  609. func (s *Server) handleStatic(w http.ResponseWriter, r *http.Request, _ *visitor) error {
  610. r.URL.Path = webSiteDir + r.URL.Path
  611. util.Gzip(http.FileServer(http.FS(webFsCached))).ServeHTTP(w, r)
  612. return nil
  613. }
  614. // handleDocs returns static resources related to the docs
  615. func (s *Server) handleDocs(w http.ResponseWriter, r *http.Request, _ *visitor) error {
  616. util.Gzip(http.FileServer(http.FS(docsStaticCached))).ServeHTTP(w, r)
  617. return nil
  618. }
  619. // handleStats returns the publicly available server stats
  620. func (s *Server) handleStats(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  621. s.mu.RLock()
  622. messages, n, rate := s.messages, len(s.messagesHistory), float64(0)
  623. if n > 1 {
  624. rate = float64(s.messagesHistory[n-1]-s.messagesHistory[0]) / (float64(n-1) * s.config.ManagerInterval.Seconds())
  625. }
  626. s.mu.RUnlock()
  627. response := &apiStatsResponse{
  628. Messages: messages,
  629. MessagesRate: rate,
  630. }
  631. return s.writeJSON(w, response)
  632. }
  633. // handleFile processes the download of attachment files. The method handles GET and HEAD requests against a file.
  634. // Before streaming the file to a client, it locates uploader (m.Sender or m.User) in the message cache, so it
  635. // can associate the download bandwidth with the uploader.
  636. func (s *Server) handleFile(w http.ResponseWriter, r *http.Request, v *visitor) error {
  637. if s.config.AttachmentCacheDir == "" {
  638. return errHTTPInternalError
  639. }
  640. matches := fileRegex.FindStringSubmatch(r.URL.Path)
  641. if len(matches) != 2 {
  642. return errHTTPInternalErrorInvalidPath
  643. }
  644. messageID := matches[1]
  645. file := filepath.Join(s.config.AttachmentCacheDir, messageID)
  646. stat, err := os.Stat(file)
  647. if err != nil {
  648. return errHTTPNotFound.Fields(log.Context{
  649. "message_id": messageID,
  650. "error_context": "filesystem",
  651. })
  652. }
  653. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  654. w.Header().Set("Content-Length", fmt.Sprintf("%d", stat.Size()))
  655. if r.Method == http.MethodHead {
  656. return nil
  657. }
  658. // Find message in database, and associate bandwidth to the uploader user
  659. // This is an easy way to
  660. // - avoid abuse (e.g. 1 uploader, 1k downloaders)
  661. // - and also uses the higher bandwidth limits of a paying user
  662. m, err := s.messageCache.Message(messageID)
  663. if errors.Is(err, errMessageNotFound) {
  664. if s.config.CacheBatchTimeout > 0 {
  665. // Strange edge case: If we immediately after upload request the file (the web app does this for images),
  666. // and messages are persisted asynchronously, retry fetching from the database
  667. m, err = util.Retry(func() (*message, error) {
  668. return s.messageCache.Message(messageID)
  669. }, s.config.CacheBatchTimeout, 100*time.Millisecond, 300*time.Millisecond, 600*time.Millisecond)
  670. }
  671. if err != nil {
  672. return errHTTPNotFound.Fields(log.Context{
  673. "message_id": messageID,
  674. "error_context": "message_cache",
  675. })
  676. }
  677. } else if err != nil {
  678. return err
  679. }
  680. bandwidthVisitor := v
  681. if s.userManager != nil && m.User != "" {
  682. u, err := s.userManager.UserByID(m.User)
  683. if err != nil {
  684. return err
  685. }
  686. bandwidthVisitor = s.visitor(v.IP(), u)
  687. } else if m.Sender.IsValid() {
  688. bandwidthVisitor = s.visitor(m.Sender, nil)
  689. }
  690. if !bandwidthVisitor.BandwidthAllowed(stat.Size()) {
  691. return errHTTPTooManyRequestsLimitAttachmentBandwidth.With(m)
  692. }
  693. // Actually send file
  694. f, err := os.Open(file)
  695. if err != nil {
  696. return err
  697. }
  698. defer f.Close()
  699. if m.Attachment.Name != "" {
  700. w.Header().Set("Content-Disposition", "attachment; filename="+strconv.Quote(m.Attachment.Name))
  701. }
  702. _, err = io.Copy(util.NewContentTypeWriter(w, r.URL.Path), f)
  703. return err
  704. }
  705. func (s *Server) handleMatrixDiscovery(w http.ResponseWriter) error {
  706. if s.config.BaseURL == "" {
  707. return errHTTPInternalErrorMissingBaseURL
  708. }
  709. return writeMatrixDiscoveryResponse(w)
  710. }
  711. func (s *Server) handlePublishInternal(r *http.Request, v *visitor) (*message, error) {
  712. start := time.Now()
  713. t, err := fromContext[*topic](r, contextTopic)
  714. if err != nil {
  715. return nil, err
  716. }
  717. vrate, err := fromContext[*visitor](r, contextRateVisitor)
  718. if err != nil {
  719. return nil, err
  720. }
  721. body, err := util.Peek(r.Body, s.config.MessageSizeLimit)
  722. if err != nil {
  723. return nil, err
  724. }
  725. m := newDefaultMessage(t.ID, "")
  726. cache, firebase, email, call, template, unifiedpush, e := s.parsePublishParams(r, m)
  727. if e != nil {
  728. return nil, e.With(t)
  729. }
  730. if unifiedpush && s.config.VisitorSubscriberRateLimiting && t.RateVisitor() == nil {
  731. // UnifiedPush clients must subscribe before publishing to allow proper subscriber-based rate limiting.
  732. // The 5xx response is because some app servers (in particular Mastodon) will remove
  733. // the subscription as invalid if any 400-499 code (except 429/408) is returned.
  734. // See https://github.com/mastodon/mastodon/blob/730bb3e211a84a2f30e3e2bbeae3f77149824a68/app/workers/web/push_notification_worker.rb#L35-L46
  735. return nil, errHTTPInsufficientStorageUnifiedPush.With(t)
  736. } else if !util.ContainsIP(s.config.VisitorRequestExemptPrefixes, v.ip) && !vrate.MessageAllowed() {
  737. return nil, errHTTPTooManyRequestsLimitMessages.With(t)
  738. } else if email != "" && !vrate.EmailAllowed() {
  739. return nil, errHTTPTooManyRequestsLimitEmails.With(t)
  740. } else if call != "" {
  741. var httpErr *errHTTP
  742. call, httpErr = s.convertPhoneNumber(v.User(), call)
  743. if httpErr != nil {
  744. return nil, httpErr.With(t)
  745. } else if !vrate.CallAllowed() {
  746. return nil, errHTTPTooManyRequestsLimitCalls.With(t)
  747. }
  748. }
  749. if m.PollID != "" {
  750. m = newPollRequestMessage(t.ID, m.PollID)
  751. }
  752. m.Sender = v.IP()
  753. m.User = v.MaybeUserID()
  754. if cache {
  755. m.Expires = time.Unix(m.Time, 0).Add(v.Limits().MessageExpiryDuration).Unix()
  756. }
  757. if err := s.handlePublishBody(r, v, m, body, template, unifiedpush); err != nil {
  758. return nil, err
  759. }
  760. if m.Message == "" {
  761. m.Message = emptyMessageBody
  762. }
  763. delayed := m.Time > time.Now().Unix()
  764. ev := logvrm(v, r, m).
  765. Tag(tagPublish).
  766. With(t).
  767. Fields(log.Context{
  768. "message_delayed": delayed,
  769. "message_firebase": firebase,
  770. "message_unifiedpush": unifiedpush,
  771. "message_email": email,
  772. "message_call": call,
  773. })
  774. if ev.IsTrace() {
  775. ev.Field("message_body", util.MaybeMarshalJSON(m)).Trace("Received message")
  776. } else if ev.IsDebug() {
  777. ev.Debug("Received message")
  778. }
  779. if !delayed {
  780. if err := t.Publish(v, m); err != nil {
  781. return nil, err
  782. }
  783. if s.firebaseClient != nil && firebase {
  784. go s.sendToFirebase(v, m)
  785. }
  786. if s.smtpSender != nil && email != "" {
  787. go s.sendEmail(v, m, email)
  788. }
  789. if s.config.TwilioAccount != "" && call != "" {
  790. go s.callPhone(v, r, m, call)
  791. }
  792. if s.config.UpstreamBaseURL != "" && !unifiedpush { // UP messages are not sent to upstream
  793. go s.forwardPollRequest(v, m)
  794. }
  795. if s.config.WebPushPublicKey != "" {
  796. go s.publishToWebPushEndpoints(v, m)
  797. }
  798. } else {
  799. logvrm(v, r, m).Tag(tagPublish).Debug("Message delayed, will process later")
  800. }
  801. if cache {
  802. logvrm(v, r, m).Tag(tagPublish).Debug("Adding message to cache")
  803. if err := s.messageCache.AddMessage(m); err != nil {
  804. return nil, err
  805. }
  806. }
  807. u := v.User()
  808. if s.userManager != nil && u != nil && u.Tier != nil {
  809. go s.userManager.EnqueueUserStats(u.ID, v.Stats())
  810. }
  811. s.mu.Lock()
  812. s.messages++
  813. s.mu.Unlock()
  814. if unifiedpush {
  815. minc(metricUnifiedPushPublishedSuccess)
  816. }
  817. mset(metricMessagePublishDurationMillis, time.Since(start).Milliseconds())
  818. return m, nil
  819. }
  820. func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visitor) error {
  821. m, err := s.handlePublishInternal(r, v)
  822. if err != nil {
  823. minc(metricMessagesPublishedFailure)
  824. return err
  825. }
  826. minc(metricMessagesPublishedSuccess)
  827. return s.writeJSON(w, m)
  828. }
  829. func (s *Server) handlePublishMatrix(w http.ResponseWriter, r *http.Request, v *visitor) error {
  830. _, err := s.handlePublishInternal(r, v)
  831. if err != nil {
  832. minc(metricMessagesPublishedFailure)
  833. minc(metricMatrixPublishedFailure)
  834. if e, ok := err.(*errHTTP); ok && e.HTTPCode == errHTTPInsufficientStorageUnifiedPush.HTTPCode {
  835. topic, err := fromContext[*topic](r, contextTopic)
  836. if err != nil {
  837. return err
  838. }
  839. pushKey, err := fromContext[string](r, contextMatrixPushKey)
  840. if err != nil {
  841. return err
  842. }
  843. if time.Since(topic.LastAccess()) > matrixRejectPushKeyForUnifiedPushTopicWithoutRateVisitorAfter {
  844. return writeMatrixResponse(w, pushKey)
  845. }
  846. }
  847. return err
  848. }
  849. minc(metricMessagesPublishedSuccess)
  850. minc(metricMatrixPublishedSuccess)
  851. return writeMatrixSuccess(w)
  852. }
  853. func (s *Server) sendToFirebase(v *visitor, m *message) {
  854. logvm(v, m).Tag(tagFirebase).Debug("Publishing to Firebase")
  855. if err := s.firebaseClient.Send(v, m); err != nil {
  856. minc(metricFirebasePublishedFailure)
  857. if errors.Is(err, errFirebaseTemporarilyBanned) {
  858. logvm(v, m).Tag(tagFirebase).Err(err).Debug("Unable to publish to Firebase: %v", err.Error())
  859. } else {
  860. logvm(v, m).Tag(tagFirebase).Err(err).Warn("Unable to publish to Firebase: %v", err.Error())
  861. }
  862. return
  863. }
  864. minc(metricFirebasePublishedSuccess)
  865. }
  866. func (s *Server) sendEmail(v *visitor, m *message, email string) {
  867. logvm(v, m).Tag(tagEmail).Field("email", email).Debug("Sending email to %s", email)
  868. if err := s.smtpSender.Send(v, m, email); err != nil {
  869. logvm(v, m).Tag(tagEmail).Field("email", email).Err(err).Warn("Unable to send email to %s: %v", email, err.Error())
  870. minc(metricEmailsPublishedFailure)
  871. return
  872. }
  873. minc(metricEmailsPublishedSuccess)
  874. }
  875. func (s *Server) forwardPollRequest(v *visitor, m *message) {
  876. topicURL := fmt.Sprintf("%s/%s", s.config.BaseURL, m.Topic)
  877. topicHash := fmt.Sprintf("%x", sha256.Sum256([]byte(topicURL)))
  878. forwardURL := fmt.Sprintf("%s/%s", s.config.UpstreamBaseURL, topicHash)
  879. logvm(v, m).Debug("Publishing poll request to %s", forwardURL)
  880. req, err := http.NewRequest("POST", forwardURL, strings.NewReader(""))
  881. if err != nil {
  882. logvm(v, m).Err(err).Warn("Unable to publish poll request")
  883. return
  884. }
  885. req.Header.Set("User-Agent", "ntfy/"+s.config.Version)
  886. req.Header.Set("X-Poll-ID", m.ID)
  887. if s.config.UpstreamAccessToken != "" {
  888. req.Header.Set("Authorization", util.BearerAuth(s.config.UpstreamAccessToken))
  889. }
  890. var httpClient = &http.Client{
  891. Timeout: time.Second * 10,
  892. }
  893. response, err := httpClient.Do(req)
  894. if err != nil {
  895. logvm(v, m).Err(err).Warn("Unable to publish poll request")
  896. return
  897. } else if response.StatusCode != http.StatusOK {
  898. if response.StatusCode == http.StatusTooManyRequests {
  899. logvm(v, m).Err(err).Warn("Unable to publish poll request, the upstream server %s responded with HTTP %s; you may solve this by sending fewer daily messages, or by configuring upstream-access-token (assuming you have an account with higher rate limits) ", s.config.UpstreamBaseURL, response.Status)
  900. } else {
  901. logvm(v, m).Err(err).Warn("Unable to publish poll request, the upstream server %s responded with HTTP %s", s.config.UpstreamBaseURL, response.Status)
  902. }
  903. return
  904. }
  905. }
  906. func (s *Server) parsePublishParams(r *http.Request, m *message) (cache bool, firebase bool, email, call string, template bool, unifiedpush bool, err *errHTTP) {
  907. cache = readBoolParam(r, true, "x-cache", "cache")
  908. firebase = readBoolParam(r, true, "x-firebase", "firebase")
  909. m.Title = readParam(r, "x-title", "title", "t")
  910. m.Click = readParam(r, "x-click", "click")
  911. icon := readParam(r, "x-icon", "icon")
  912. filename := readParam(r, "x-filename", "filename", "file", "f")
  913. attach := readParam(r, "x-attach", "attach", "a")
  914. if attach != "" || filename != "" {
  915. m.Attachment = &attachment{}
  916. }
  917. if filename != "" {
  918. m.Attachment.Name = filename
  919. }
  920. if attach != "" {
  921. if !urlRegex.MatchString(attach) {
  922. return false, false, "", "", false, false, errHTTPBadRequestAttachmentURLInvalid
  923. }
  924. m.Attachment.URL = attach
  925. if m.Attachment.Name == "" {
  926. u, err := url.Parse(m.Attachment.URL)
  927. if err == nil {
  928. m.Attachment.Name = path.Base(u.Path)
  929. if m.Attachment.Name == "." || m.Attachment.Name == "/" {
  930. m.Attachment.Name = ""
  931. }
  932. }
  933. }
  934. if m.Attachment.Name == "" {
  935. m.Attachment.Name = "attachment"
  936. }
  937. }
  938. if icon != "" {
  939. if !urlRegex.MatchString(icon) {
  940. return false, false, "", "", false, false, errHTTPBadRequestIconURLInvalid
  941. }
  942. m.Icon = icon
  943. }
  944. email = readParam(r, "x-email", "x-e-mail", "email", "e-mail", "mail", "e")
  945. if s.smtpSender == nil && email != "" {
  946. return false, false, "", "", false, false, errHTTPBadRequestEmailDisabled
  947. }
  948. call = readParam(r, "x-call", "call")
  949. if call != "" && (s.config.TwilioAccount == "" || s.userManager == nil) {
  950. return false, false, "", "", false, false, errHTTPBadRequestPhoneCallsDisabled
  951. } else if call != "" && !isBoolValue(call) && !phoneNumberRegex.MatchString(call) {
  952. return false, false, "", "", false, false, errHTTPBadRequestPhoneNumberInvalid
  953. }
  954. messageStr := strings.ReplaceAll(readParam(r, "x-message", "message", "m"), "\\n", "\n")
  955. if messageStr != "" {
  956. m.Message = messageStr
  957. }
  958. var e error
  959. m.Priority, e = util.ParsePriority(readParam(r, "x-priority", "priority", "prio", "p"))
  960. if e != nil {
  961. return false, false, "", "", false, false, errHTTPBadRequestPriorityInvalid
  962. }
  963. m.Tags = readCommaSeparatedParam(r, "x-tags", "tags", "tag", "ta")
  964. delayStr := readParam(r, "x-delay", "delay", "x-at", "at", "x-in", "in")
  965. if delayStr != "" {
  966. if !cache {
  967. return false, false, "", "", false, false, errHTTPBadRequestDelayNoCache
  968. }
  969. if email != "" {
  970. return false, false, "", "", false, false, errHTTPBadRequestDelayNoEmail // we cannot store the email address (yet)
  971. }
  972. if call != "" {
  973. return false, false, "", "", false, false, errHTTPBadRequestDelayNoCall // we cannot store the phone number (yet)
  974. }
  975. delay, err := util.ParseFutureTime(delayStr, time.Now())
  976. if err != nil {
  977. return false, false, "", "", false, false, errHTTPBadRequestDelayCannotParse
  978. } else if delay.Unix() < time.Now().Add(s.config.MessageDelayMin).Unix() {
  979. return false, false, "", "", false, false, errHTTPBadRequestDelayTooSmall
  980. } else if delay.Unix() > time.Now().Add(s.config.MessageDelayMax).Unix() {
  981. return false, false, "", "", false, false, errHTTPBadRequestDelayTooLarge
  982. }
  983. m.Time = delay.Unix()
  984. }
  985. actionsStr := readParam(r, "x-actions", "actions", "action")
  986. if actionsStr != "" {
  987. m.Actions, e = parseActions(actionsStr)
  988. if e != nil {
  989. return false, false, "", "", false, false, errHTTPBadRequestActionsInvalid.Wrap("%s", e.Error())
  990. }
  991. }
  992. contentType, markdown := readParam(r, "content-type", "content_type"), readBoolParam(r, false, "x-markdown", "markdown", "md")
  993. if markdown || strings.ToLower(contentType) == "text/markdown" {
  994. m.ContentType = "text/markdown"
  995. }
  996. template = readBoolParam(r, false, "x-template", "template", "tpl")
  997. unifiedpush = readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see GET too!
  998. contentEncoding := readParam(r, "content-encoding")
  999. if unifiedpush || contentEncoding == "aes128gcm" {
  1000. firebase = false
  1001. unifiedpush = true
  1002. }
  1003. m.PollID = readParam(r, "x-poll-id", "poll-id")
  1004. if m.PollID != "" {
  1005. unifiedpush = false
  1006. cache = false
  1007. email = ""
  1008. }
  1009. return cache, firebase, email, call, template, unifiedpush, nil
  1010. }
  1011. // handlePublishBody consumes the PUT/POST body and decides whether the body is an attachment or the message.
  1012. //
  1013. // 1. curl -X POST -H "Poll: 1234" ntfy.sh/...
  1014. // If a message is flagged as poll request, the body does not matter and is discarded
  1015. // 2. curl -T somebinarydata.bin "ntfy.sh/mytopic?up=1"
  1016. // If UnifiedPush is enabled, encode as base64 if body is binary, and do not trim
  1017. // 3. curl -H "Attach: http://example.com/file.jpg" ntfy.sh/mytopic
  1018. // Body must be a message, because we attached an external URL
  1019. // 4. curl -T short.txt -H "Filename: short.txt" ntfy.sh/mytopic
  1020. // Body must be attachment, because we passed a filename
  1021. // 5. curl -H "Template: yes" -T file.txt ntfy.sh/mytopic
  1022. // If templating is enabled, read up to 32k and treat message body as JSON
  1023. // 6. curl -T file.txt ntfy.sh/mytopic
  1024. // If file.txt is <= 4096 (message limit) and valid UTF-8, treat it as a message
  1025. // 7. curl -T file.txt ntfy.sh/mytopic
  1026. // In all other cases, mostly if file.txt is > message limit, treat it as an attachment
  1027. func (s *Server) handlePublishBody(r *http.Request, v *visitor, m *message, body *util.PeekedReadCloser, template, unifiedpush bool) error {
  1028. if m.Event == pollRequestEvent { // Case 1
  1029. return s.handleBodyDiscard(body)
  1030. } else if unifiedpush {
  1031. return s.handleBodyAsMessageAutoDetect(m, body) // Case 2
  1032. } else if m.Attachment != nil && m.Attachment.URL != "" {
  1033. return s.handleBodyAsTextMessage(m, body) // Case 3
  1034. } else if m.Attachment != nil && m.Attachment.Name != "" {
  1035. return s.handleBodyAsAttachment(r, v, m, body) // Case 4
  1036. } else if template {
  1037. return s.handleBodyAsTemplatedTextMessage(m, body) // Case 5
  1038. } else if !body.LimitReached && utf8.Valid(body.PeekedBytes) {
  1039. return s.handleBodyAsTextMessage(m, body) // Case 6
  1040. }
  1041. return s.handleBodyAsAttachment(r, v, m, body) // Case 7
  1042. }
  1043. func (s *Server) handleBodyDiscard(body *util.PeekedReadCloser) error {
  1044. _, err := io.Copy(io.Discard, body)
  1045. _ = body.Close()
  1046. return err
  1047. }
  1048. func (s *Server) handleBodyAsMessageAutoDetect(m *message, body *util.PeekedReadCloser) error {
  1049. if utf8.Valid(body.PeekedBytes) {
  1050. m.Message = string(body.PeekedBytes) // Do not trim
  1051. } else {
  1052. m.Message = base64.StdEncoding.EncodeToString(body.PeekedBytes)
  1053. m.Encoding = encodingBase64
  1054. }
  1055. return nil
  1056. }
  1057. func (s *Server) handleBodyAsTextMessage(m *message, body *util.PeekedReadCloser) error {
  1058. if !utf8.Valid(body.PeekedBytes) {
  1059. return errHTTPBadRequestMessageNotUTF8.With(m)
  1060. }
  1061. if len(body.PeekedBytes) > 0 { // Empty body should not override message (publish via GET!)
  1062. m.Message = strings.TrimSpace(string(body.PeekedBytes)) // Truncates the message to the peek limit if required
  1063. }
  1064. if m.Attachment != nil && m.Attachment.Name != "" && m.Message == "" {
  1065. m.Message = fmt.Sprintf(defaultAttachmentMessage, m.Attachment.Name)
  1066. }
  1067. return nil
  1068. }
  1069. func (s *Server) handleBodyAsTemplatedTextMessage(m *message, body *util.PeekedReadCloser) error {
  1070. body, err := util.Peek(body, max(s.config.MessageSizeLimit, jsonBodyBytesLimit))
  1071. if err != nil {
  1072. return err
  1073. } else if body.LimitReached {
  1074. return errHTTPEntityTooLargeJSONBody
  1075. }
  1076. peekedBody := strings.TrimSpace(string(body.PeekedBytes))
  1077. if m.Message, err = replaceTemplate(m.Message, peekedBody); err != nil {
  1078. return err
  1079. }
  1080. if m.Title, err = replaceTemplate(m.Title, peekedBody); err != nil {
  1081. return err
  1082. }
  1083. if len(m.Message) > s.config.MessageSizeLimit {
  1084. return errHTTPBadRequestTemplateMessageTooLarge
  1085. }
  1086. return nil
  1087. }
  1088. func replaceTemplate(tpl string, source string) (string, error) {
  1089. if templateDisallowedRegex.MatchString(tpl) {
  1090. return "", errHTTPBadRequestTemplateDisallowedFunctionCalls
  1091. }
  1092. var data any
  1093. if err := json.Unmarshal([]byte(source), &data); err != nil {
  1094. return "", errHTTPBadRequestTemplateMessageNotJSON
  1095. }
  1096. t, err := template.New("").Parse(tpl)
  1097. if err != nil {
  1098. return "", errHTTPBadRequestTemplateInvalid
  1099. }
  1100. var buf bytes.Buffer
  1101. if err := t.Execute(util.NewTimeoutWriter(&buf, templateMaxExecutionTime), data); err != nil {
  1102. return "", errHTTPBadRequestTemplateExecuteFailed
  1103. }
  1104. return buf.String(), nil
  1105. }
  1106. func (s *Server) handleBodyAsAttachment(r *http.Request, v *visitor, m *message, body *util.PeekedReadCloser) error {
  1107. if s.fileCache == nil || s.config.BaseURL == "" || s.config.AttachmentCacheDir == "" {
  1108. return errHTTPBadRequestAttachmentsDisallowed.With(m)
  1109. }
  1110. vinfo, err := v.Info()
  1111. if err != nil {
  1112. return err
  1113. }
  1114. attachmentExpiry := time.Now().Add(vinfo.Limits.AttachmentExpiryDuration).Unix()
  1115. if m.Time > attachmentExpiry {
  1116. return errHTTPBadRequestAttachmentsExpiryBeforeDelivery.With(m)
  1117. }
  1118. contentLengthStr := r.Header.Get("Content-Length")
  1119. if contentLengthStr != "" { // Early "do-not-trust" check, hard limit see below
  1120. contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64)
  1121. if err == nil && (contentLength > vinfo.Stats.AttachmentTotalSizeRemaining || contentLength > vinfo.Limits.AttachmentFileSizeLimit) {
  1122. return errHTTPEntityTooLargeAttachment.With(m).Fields(log.Context{
  1123. "message_content_length": contentLength,
  1124. "attachment_total_size_remaining": vinfo.Stats.AttachmentTotalSizeRemaining,
  1125. "attachment_file_size_limit": vinfo.Limits.AttachmentFileSizeLimit,
  1126. })
  1127. }
  1128. }
  1129. if m.Attachment == nil {
  1130. m.Attachment = &attachment{}
  1131. }
  1132. var ext string
  1133. m.Attachment.Expires = attachmentExpiry
  1134. m.Attachment.Type, ext = util.DetectContentType(body.PeekedBytes, m.Attachment.Name)
  1135. m.Attachment.URL = fmt.Sprintf("%s/file/%s%s", s.config.BaseURL, m.ID, ext)
  1136. if m.Attachment.Name == "" {
  1137. m.Attachment.Name = fmt.Sprintf("attachment%s", ext)
  1138. }
  1139. if m.Message == "" {
  1140. m.Message = fmt.Sprintf(defaultAttachmentMessage, m.Attachment.Name)
  1141. }
  1142. limiters := []util.Limiter{
  1143. v.BandwidthLimiter(),
  1144. util.NewFixedLimiter(vinfo.Limits.AttachmentFileSizeLimit),
  1145. util.NewFixedLimiter(vinfo.Stats.AttachmentTotalSizeRemaining),
  1146. }
  1147. m.Attachment.Size, err = s.fileCache.Write(m.ID, body, limiters...)
  1148. if errors.Is(err, util.ErrLimitReached) {
  1149. return errHTTPEntityTooLargeAttachment.With(m)
  1150. } else if err != nil {
  1151. return err
  1152. }
  1153. return nil
  1154. }
  1155. func (s *Server) handleSubscribeJSON(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1156. encoder := func(msg *message) (string, error) {
  1157. var buf bytes.Buffer
  1158. if err := json.NewEncoder(&buf).Encode(&msg); err != nil {
  1159. return "", err
  1160. }
  1161. return buf.String(), nil
  1162. }
  1163. return s.handleSubscribeHTTP(w, r, v, "application/x-ndjson", encoder)
  1164. }
  1165. func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1166. encoder := func(msg *message) (string, error) {
  1167. var buf bytes.Buffer
  1168. if err := json.NewEncoder(&buf).Encode(&msg); err != nil {
  1169. return "", err
  1170. }
  1171. if msg.Event != messageEvent {
  1172. return fmt.Sprintf("event: %s\ndata: %s\n", msg.Event, buf.String()), nil // Browser's .onmessage() does not fire on this!
  1173. }
  1174. return fmt.Sprintf("data: %s\n", buf.String()), nil
  1175. }
  1176. return s.handleSubscribeHTTP(w, r, v, "text/event-stream", encoder)
  1177. }
  1178. func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1179. encoder := func(msg *message) (string, error) {
  1180. if msg.Event == messageEvent { // only handle default events
  1181. return strings.ReplaceAll(msg.Message, "\n", " ") + "\n", nil
  1182. }
  1183. return "\n", nil // "keepalive" and "open" events just send an empty line
  1184. }
  1185. return s.handleSubscribeHTTP(w, r, v, "text/plain", encoder)
  1186. }
  1187. func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *visitor, contentType string, encoder messageEncoder) error {
  1188. logvr(v, r).Tag(tagSubscribe).Debug("HTTP stream connection opened")
  1189. defer logvr(v, r).Tag(tagSubscribe).Debug("HTTP stream connection closed")
  1190. if !v.SubscriptionAllowed() {
  1191. return errHTTPTooManyRequestsLimitSubscriptions
  1192. }
  1193. defer v.RemoveSubscription()
  1194. topics, topicsStr, err := s.topicsFromPath(r.URL.Path)
  1195. if err != nil {
  1196. return err
  1197. }
  1198. poll, since, scheduled, filters, err := parseSubscribeParams(r)
  1199. if err != nil {
  1200. return err
  1201. }
  1202. var wlock sync.Mutex
  1203. defer func() {
  1204. // Hack: This is the fix for a horrible data race that I have not been able to figure out in quite some time.
  1205. // It appears to be happening when the Go HTTP code reads from the socket when closing the request (i.e. AFTER
  1206. // this function returns), and causes a data race with the ResponseWriter. Locking wlock here silences the
  1207. // data race detector. See https://github.com/binwiederhier/ntfy/issues/338#issuecomment-1163425889.
  1208. wlock.TryLock()
  1209. }()
  1210. sub := func(v *visitor, msg *message) error {
  1211. if !filters.Pass(msg) {
  1212. return nil
  1213. }
  1214. m, err := encoder(msg)
  1215. if err != nil {
  1216. return err
  1217. }
  1218. wlock.Lock()
  1219. defer wlock.Unlock()
  1220. if _, err := w.Write([]byte(m)); err != nil {
  1221. return err
  1222. }
  1223. if fl, ok := w.(http.Flusher); ok {
  1224. fl.Flush()
  1225. }
  1226. return nil
  1227. }
  1228. if err := s.maybeSetRateVisitors(r, v, topics); err != nil {
  1229. return err
  1230. }
  1231. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1232. w.Header().Set("Content-Type", contentType+"; charset=utf-8") // Android/Volley client needs charset!
  1233. if poll {
  1234. for _, t := range topics {
  1235. t.Keepalive()
  1236. }
  1237. return s.sendOldMessages(topics, since, scheduled, v, sub)
  1238. }
  1239. ctx, cancel := context.WithCancel(context.Background())
  1240. defer cancel()
  1241. subscriberIDs := make([]int, 0)
  1242. for _, t := range topics {
  1243. subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel))
  1244. }
  1245. defer func() {
  1246. for i, subscriberID := range subscriberIDs {
  1247. topics[i].Unsubscribe(subscriberID) // Order!
  1248. }
  1249. }()
  1250. if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message
  1251. return err
  1252. }
  1253. if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {
  1254. return err
  1255. }
  1256. for {
  1257. select {
  1258. case <-ctx.Done():
  1259. return nil
  1260. case <-r.Context().Done():
  1261. return nil
  1262. case <-time.After(s.config.KeepaliveInterval):
  1263. ev := logvr(v, r).Tag(tagSubscribe)
  1264. if len(topics) == 1 {
  1265. ev.With(topics[0]).Trace("Sending keepalive message to %s", topics[0].ID)
  1266. } else {
  1267. ev.Trace("Sending keepalive message to %d topics", len(topics))
  1268. }
  1269. v.Keepalive()
  1270. for _, t := range topics {
  1271. t.Keepalive()
  1272. }
  1273. if err := sub(v, newKeepaliveMessage(topicsStr)); err != nil { // Send keepalive message
  1274. return err
  1275. }
  1276. }
  1277. }
  1278. }
  1279. func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1280. if strings.ToLower(r.Header.Get("Upgrade")) != "websocket" {
  1281. return errHTTPBadRequestWebSocketsUpgradeHeaderMissing
  1282. }
  1283. if !v.SubscriptionAllowed() {
  1284. return errHTTPTooManyRequestsLimitSubscriptions
  1285. }
  1286. defer v.RemoveSubscription()
  1287. logvr(v, r).Tag(tagWebsocket).Debug("WebSocket connection opened")
  1288. defer logvr(v, r).Tag(tagWebsocket).Debug("WebSocket connection closed")
  1289. topics, topicsStr, err := s.topicsFromPath(r.URL.Path)
  1290. if err != nil {
  1291. return err
  1292. }
  1293. poll, since, scheduled, filters, err := parseSubscribeParams(r)
  1294. if err != nil {
  1295. return err
  1296. }
  1297. upgrader := &websocket.Upgrader{
  1298. ReadBufferSize: wsBufferSize,
  1299. WriteBufferSize: wsBufferSize,
  1300. CheckOrigin: func(r *http.Request) bool {
  1301. return true // We're open for business!
  1302. },
  1303. }
  1304. conn, err := upgrader.Upgrade(w, r, nil)
  1305. if err != nil {
  1306. return err
  1307. }
  1308. defer conn.Close()
  1309. // Subscription connections can be canceled externally, see topic.CancelSubscribersExceptUser
  1310. cancelCtx, cancel := context.WithCancel(context.Background())
  1311. defer cancel()
  1312. // Use errgroup to run WebSocket reader and writer in Go routines
  1313. var wlock sync.Mutex
  1314. g, gctx := errgroup.WithContext(cancelCtx)
  1315. g.Go(func() error {
  1316. pongWait := s.config.KeepaliveInterval + wsPongWait
  1317. conn.SetReadLimit(wsReadLimit)
  1318. if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
  1319. return err
  1320. }
  1321. conn.SetPongHandler(func(appData string) error {
  1322. logvr(v, r).Tag(tagWebsocket).Trace("Received WebSocket pong")
  1323. return conn.SetReadDeadline(time.Now().Add(pongWait))
  1324. })
  1325. for {
  1326. _, _, err := conn.NextReader()
  1327. if err != nil {
  1328. return err
  1329. }
  1330. select {
  1331. case <-gctx.Done():
  1332. return nil
  1333. default:
  1334. }
  1335. }
  1336. })
  1337. g.Go(func() error {
  1338. ping := func() error {
  1339. wlock.Lock()
  1340. defer wlock.Unlock()
  1341. if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
  1342. return err
  1343. }
  1344. logvr(v, r).Tag(tagWebsocket).Trace("Sending WebSocket ping")
  1345. return conn.WriteMessage(websocket.PingMessage, nil)
  1346. }
  1347. for {
  1348. select {
  1349. case <-gctx.Done():
  1350. return nil
  1351. case <-cancelCtx.Done():
  1352. logvr(v, r).Tag(tagWebsocket).Trace("Cancel received, closing subscriber connection")
  1353. conn.Close()
  1354. return &websocket.CloseError{Code: websocket.CloseNormalClosure, Text: "subscription was canceled"}
  1355. case <-time.After(s.config.KeepaliveInterval):
  1356. v.Keepalive()
  1357. for _, t := range topics {
  1358. t.Keepalive()
  1359. }
  1360. if err := ping(); err != nil {
  1361. return err
  1362. }
  1363. }
  1364. }
  1365. })
  1366. sub := func(v *visitor, msg *message) error {
  1367. if !filters.Pass(msg) {
  1368. return nil
  1369. }
  1370. wlock.Lock()
  1371. defer wlock.Unlock()
  1372. if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
  1373. return err
  1374. }
  1375. return conn.WriteJSON(msg)
  1376. }
  1377. if err := s.maybeSetRateVisitors(r, v, topics); err != nil {
  1378. return err
  1379. }
  1380. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1381. if poll {
  1382. for _, t := range topics {
  1383. t.Keepalive()
  1384. }
  1385. return s.sendOldMessages(topics, since, scheduled, v, sub)
  1386. }
  1387. subscriberIDs := make([]int, 0)
  1388. for _, t := range topics {
  1389. subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel))
  1390. }
  1391. defer func() {
  1392. for i, subscriberID := range subscriberIDs {
  1393. topics[i].Unsubscribe(subscriberID) // Order!
  1394. }
  1395. }()
  1396. if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message
  1397. return err
  1398. }
  1399. if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {
  1400. return err
  1401. }
  1402. err = g.Wait()
  1403. if err != nil && websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNoStatusReceived) {
  1404. logvr(v, r).Tag(tagWebsocket).Err(err).Fields(websocketErrorContext(err)).Trace("WebSocket connection closed")
  1405. return nil // Normal closures are not errors; note: "1006 (abnormal closure)" is treated as normal, because people disconnect a lot
  1406. }
  1407. return err
  1408. }
  1409. func parseSubscribeParams(r *http.Request) (poll bool, since sinceMarker, scheduled bool, filters *queryFilter, err error) {
  1410. poll = readBoolParam(r, false, "x-poll", "poll", "po")
  1411. scheduled = readBoolParam(r, false, "x-scheduled", "scheduled", "sched")
  1412. since, err = parseSince(r, poll)
  1413. if err != nil {
  1414. return
  1415. }
  1416. filters, err = parseQueryFilters(r)
  1417. if err != nil {
  1418. return
  1419. }
  1420. return
  1421. }
  1422. // maybeSetRateVisitors sets the rate visitor on a topic (v.SetRateVisitor), indicating that all messages published
  1423. // to that topic will be rate limited against the rate visitor instead of the publishing visitor.
  1424. //
  1425. // Setting the rate visitor is ony allowed if the `visitor-subscriber-rate-limiting` setting is enabled, AND
  1426. // - auth-file is not set (everything is open by default)
  1427. // - or the topic is reserved, and v.user is the owner
  1428. // - or the topic is not reserved, and v.user has write access
  1429. //
  1430. // This only applies to UnifiedPush topics ("up...").
  1431. func (s *Server) maybeSetRateVisitors(r *http.Request, v *visitor, topics []*topic) error {
  1432. // Bail out if not enabled
  1433. if !s.config.VisitorSubscriberRateLimiting {
  1434. return nil
  1435. }
  1436. // Make a list of topics that we'll actually set the RateVisitor on
  1437. eligibleRateTopics := make([]*topic, 0)
  1438. for _, t := range topics {
  1439. if strings.HasPrefix(t.ID, unifiedPushTopicPrefix) && len(t.ID) == unifiedPushTopicLength {
  1440. eligibleRateTopics = append(eligibleRateTopics, t)
  1441. }
  1442. }
  1443. if len(eligibleRateTopics) == 0 {
  1444. return nil
  1445. }
  1446. // If access controls are turned off, v has access to everything, and we can set the rate visitor
  1447. if s.userManager == nil {
  1448. return s.setRateVisitors(r, v, eligibleRateTopics)
  1449. }
  1450. // If access controls are enabled, only set rate visitor if
  1451. // - topic is reserved, and v.user is the owner
  1452. // - topic is not reserved, and v.user has write access
  1453. writableRateTopics := make([]*topic, 0)
  1454. for _, t := range topics {
  1455. if !util.Contains(eligibleRateTopics, t) {
  1456. continue
  1457. }
  1458. ownerUserID, err := s.userManager.ReservationOwner(t.ID)
  1459. if err != nil {
  1460. return err
  1461. }
  1462. if ownerUserID == "" {
  1463. if err := s.userManager.Authorize(v.User(), t.ID, user.PermissionWrite); err == nil {
  1464. writableRateTopics = append(writableRateTopics, t)
  1465. }
  1466. } else if ownerUserID == v.MaybeUserID() {
  1467. writableRateTopics = append(writableRateTopics, t)
  1468. }
  1469. }
  1470. return s.setRateVisitors(r, v, writableRateTopics)
  1471. }
  1472. func (s *Server) setRateVisitors(r *http.Request, v *visitor, rateTopics []*topic) error {
  1473. for _, t := range rateTopics {
  1474. logvr(v, r).
  1475. Tag(tagSubscribe).
  1476. With(t).
  1477. Debug("Setting visitor as rate visitor for topic %s", t.ID)
  1478. t.SetRateVisitor(v)
  1479. }
  1480. return nil
  1481. }
  1482. // sendOldMessages selects old messages from the messageCache and calls sub for each of them. It uses since as the
  1483. // marker, returning only messages that are newer than the marker.
  1484. func (s *Server) sendOldMessages(topics []*topic, since sinceMarker, scheduled bool, v *visitor, sub subscriber) error {
  1485. if since.IsNone() {
  1486. return nil
  1487. }
  1488. messages := make([]*message, 0)
  1489. for _, t := range topics {
  1490. topicMessages, err := s.messageCache.Messages(t.ID, since, scheduled)
  1491. if err != nil {
  1492. return err
  1493. }
  1494. messages = append(messages, topicMessages...)
  1495. }
  1496. sort.Slice(messages, func(i, j int) bool {
  1497. return messages[i].Time < messages[j].Time
  1498. })
  1499. for _, m := range messages {
  1500. if err := sub(v, m); err != nil {
  1501. return err
  1502. }
  1503. }
  1504. return nil
  1505. }
  1506. // parseSince returns a timestamp identifying the time span from which cached messages should be received.
  1507. //
  1508. // Values in the "since=..." parameter can be either a unix timestamp or a duration (e.g. 12h),
  1509. // "all" for all messages, or "latest" for the most recent message for a topic
  1510. func parseSince(r *http.Request, poll bool) (sinceMarker, error) {
  1511. since := readParam(r, "x-since", "since", "si")
  1512. // Easy cases (empty, all, none)
  1513. if since == "" {
  1514. if poll {
  1515. return sinceAllMessages, nil
  1516. }
  1517. return sinceNoMessages, nil
  1518. } else if since == "all" {
  1519. return sinceAllMessages, nil
  1520. } else if since == "latest" {
  1521. return sinceLatestMessage, nil
  1522. } else if since == "none" {
  1523. return sinceNoMessages, nil
  1524. }
  1525. // ID, timestamp, duration
  1526. if validMessageID(since) {
  1527. return newSinceID(since), nil
  1528. } else if s, err := strconv.ParseInt(since, 10, 64); err == nil {
  1529. return newSinceTime(s), nil
  1530. } else if d, err := time.ParseDuration(since); err == nil {
  1531. return newSinceTime(time.Now().Add(-1 * d).Unix()), nil
  1532. }
  1533. return sinceNoMessages, errHTTPBadRequestSinceInvalid
  1534. }
  1535. func (s *Server) handleOptions(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  1536. w.Header().Set("Access-Control-Allow-Methods", "GET, PUT, POST, PATCH, DELETE")
  1537. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1538. w.Header().Set("Access-Control-Allow-Headers", "*") // CORS, allow auth via JS // FIXME is this terrible?
  1539. return nil
  1540. }
  1541. // topicFromPath returns the topic from a root path (e.g. /mytopic), creating it if it doesn't exist.
  1542. func (s *Server) topicFromPath(path string) (*topic, error) {
  1543. parts := strings.Split(path, "/")
  1544. if len(parts) < 2 {
  1545. return nil, errHTTPBadRequestTopicInvalid
  1546. }
  1547. return s.topicFromID(parts[1])
  1548. }
  1549. // topicsFromPath returns the topic from a root path (e.g. /mytopic,mytopic2), creating it if it doesn't exist.
  1550. func (s *Server) topicsFromPath(path string) ([]*topic, string, error) {
  1551. parts := strings.Split(path, "/")
  1552. if len(parts) < 2 {
  1553. return nil, "", errHTTPBadRequestTopicInvalid
  1554. }
  1555. topicIDs := util.SplitNoEmpty(parts[1], ",")
  1556. topics, err := s.topicsFromIDs(topicIDs...)
  1557. if err != nil {
  1558. return nil, "", errHTTPBadRequestTopicInvalid
  1559. }
  1560. return topics, parts[1], nil
  1561. }
  1562. // topicsFromIDs returns the topics with the given IDs, creating them if they don't exist.
  1563. func (s *Server) topicsFromIDs(ids ...string) ([]*topic, error) {
  1564. s.mu.Lock()
  1565. defer s.mu.Unlock()
  1566. topics := make([]*topic, 0)
  1567. for _, id := range ids {
  1568. if util.Contains(s.config.DisallowedTopics, id) {
  1569. return nil, errHTTPBadRequestTopicDisallowed
  1570. }
  1571. if _, ok := s.topics[id]; !ok {
  1572. if len(s.topics) >= s.config.TotalTopicLimit {
  1573. return nil, errHTTPTooManyRequestsLimitTotalTopics
  1574. }
  1575. s.topics[id] = newTopic(id)
  1576. }
  1577. topics = append(topics, s.topics[id])
  1578. }
  1579. return topics, nil
  1580. }
  1581. // topicFromID returns the topic with the given ID, creating it if it doesn't exist.
  1582. func (s *Server) topicFromID(id string) (*topic, error) {
  1583. topics, err := s.topicsFromIDs(id)
  1584. if err != nil {
  1585. return nil, err
  1586. }
  1587. return topics[0], nil
  1588. }
  1589. // topicsFromPattern returns a list of topics matching the given pattern, but it does not create them.
  1590. func (s *Server) topicsFromPattern(pattern string) ([]*topic, error) {
  1591. s.mu.RLock()
  1592. defer s.mu.RUnlock()
  1593. patternRegexp, err := regexp.Compile("^" + strings.ReplaceAll(pattern, "*", ".*") + "$")
  1594. if err != nil {
  1595. return nil, err
  1596. }
  1597. topics := make([]*topic, 0)
  1598. for _, t := range s.topics {
  1599. if patternRegexp.MatchString(t.ID) {
  1600. topics = append(topics, t)
  1601. }
  1602. }
  1603. return topics, nil
  1604. }
  1605. func (s *Server) runSMTPServer() error {
  1606. s.smtpServerBackend = newMailBackend(s.config, s.handle)
  1607. s.smtpServer = smtp.NewServer(s.smtpServerBackend)
  1608. s.smtpServer.Addr = s.config.SMTPServerListen
  1609. s.smtpServer.Domain = s.config.SMTPServerDomain
  1610. s.smtpServer.ReadTimeout = 10 * time.Second
  1611. s.smtpServer.WriteTimeout = 10 * time.Second
  1612. s.smtpServer.MaxMessageBytes = 1024 * 1024 // Must be much larger than message size (headers, multipart, etc.)
  1613. s.smtpServer.MaxRecipients = 1
  1614. s.smtpServer.AllowInsecureAuth = true
  1615. return s.smtpServer.ListenAndServe()
  1616. }
  1617. func (s *Server) runManager() {
  1618. for {
  1619. select {
  1620. case <-time.After(s.config.ManagerInterval):
  1621. log.
  1622. Tag(tagManager).
  1623. Timing(s.execManager).
  1624. Debug("Manager finished")
  1625. case <-s.closeChan:
  1626. return
  1627. }
  1628. }
  1629. }
  1630. // runStatsResetter runs once a day (usually midnight UTC) to reset all the visitor's message and
  1631. // email counters. The stats are used to display the counters in the web app, as well as for rate limiting.
  1632. func (s *Server) runStatsResetter() {
  1633. for {
  1634. runAt := util.NextOccurrenceUTC(s.config.VisitorStatsResetTime, time.Now())
  1635. timer := time.NewTimer(time.Until(runAt))
  1636. log.Tag(tagResetter).Debug("Waiting until %v to reset visitor stats", runAt)
  1637. select {
  1638. case <-timer.C:
  1639. log.Tag(tagResetter).Debug("Running stats resetter")
  1640. s.resetStats()
  1641. case <-s.closeChan:
  1642. log.Tag(tagResetter).Debug("Stopping stats resetter")
  1643. timer.Stop()
  1644. return
  1645. }
  1646. }
  1647. }
  1648. func (s *Server) resetStats() {
  1649. log.Info("Resetting all visitor stats (daily task)")
  1650. s.mu.Lock()
  1651. defer s.mu.Unlock() // Includes the database query to avoid races with other processes
  1652. for _, v := range s.visitors {
  1653. v.ResetStats()
  1654. }
  1655. if s.userManager != nil {
  1656. if err := s.userManager.ResetStats(); err != nil {
  1657. log.Tag(tagResetter).Warn("Failed to write to database: %s", err.Error())
  1658. }
  1659. }
  1660. }
  1661. func (s *Server) runFirebaseKeepaliver() {
  1662. if s.firebaseClient == nil {
  1663. return
  1664. }
  1665. v := newVisitor(s.config, s.messageCache, s.userManager, netip.IPv4Unspecified(), nil) // Background process, not a real visitor, uses IP 0.0.0.0
  1666. for {
  1667. select {
  1668. case <-time.After(s.config.FirebaseKeepaliveInterval):
  1669. s.sendToFirebase(v, newKeepaliveMessage(firebaseControlTopic))
  1670. /*
  1671. FIXME: Disable iOS polling entirely for now due to thundering herd problem (see #677)
  1672. To solve this, we'd have to shard the iOS poll topics to spread out the polling evenly.
  1673. Given that it's not really necessary to poll, turning it off for now should not have any impact.
  1674. case <-time.After(s.config.FirebasePollInterval):
  1675. s.sendToFirebase(v, newKeepaliveMessage(firebasePollTopic))
  1676. */
  1677. case <-s.closeChan:
  1678. return
  1679. }
  1680. }
  1681. }
  1682. func (s *Server) runDelayedSender() {
  1683. for {
  1684. select {
  1685. case <-time.After(s.config.DelayedSenderInterval):
  1686. if err := s.sendDelayedMessages(); err != nil {
  1687. log.Tag(tagPublish).Err(err).Warn("Error sending delayed messages")
  1688. }
  1689. case <-s.closeChan:
  1690. return
  1691. }
  1692. }
  1693. }
  1694. func (s *Server) sendDelayedMessages() error {
  1695. messages, err := s.messageCache.MessagesDue()
  1696. if err != nil {
  1697. return err
  1698. }
  1699. for _, m := range messages {
  1700. var u *user.User
  1701. if s.userManager != nil && m.User != "" {
  1702. u, err = s.userManager.UserByID(m.User)
  1703. if err != nil {
  1704. log.With(m).Err(err).Warn("Error sending delayed message")
  1705. continue
  1706. }
  1707. }
  1708. v := s.visitor(m.Sender, u)
  1709. if err := s.sendDelayedMessage(v, m); err != nil {
  1710. logvm(v, m).Err(err).Warn("Error sending delayed message")
  1711. }
  1712. }
  1713. return nil
  1714. }
  1715. func (s *Server) sendDelayedMessage(v *visitor, m *message) error {
  1716. logvm(v, m).Debug("Sending delayed message")
  1717. s.mu.RLock()
  1718. t, ok := s.topics[m.Topic] // If no subscribers, just mark message as published
  1719. s.mu.RUnlock()
  1720. if ok {
  1721. go func() {
  1722. // We do not rate-limit messages here, since we've rate limited them in the PUT/POST handler
  1723. if err := t.Publish(v, m); err != nil {
  1724. logvm(v, m).Err(err).Warn("Unable to publish message")
  1725. }
  1726. }()
  1727. }
  1728. if s.firebaseClient != nil { // Firebase subscribers may not show up in topics map
  1729. go s.sendToFirebase(v, m)
  1730. }
  1731. if s.config.UpstreamBaseURL != "" {
  1732. go s.forwardPollRequest(v, m)
  1733. }
  1734. if s.config.WebPushPublicKey != "" {
  1735. go s.publishToWebPushEndpoints(v, m)
  1736. }
  1737. if err := s.messageCache.MarkPublished(m); err != nil {
  1738. return err
  1739. }
  1740. return nil
  1741. }
  1742. // transformBodyJSON peeks the request body, reads the JSON, and converts it to headers
  1743. // before passing it on to the next handler. This is meant to be used in combination with handlePublish.
  1744. func (s *Server) transformBodyJSON(next handleFunc) handleFunc {
  1745. return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1746. m, err := readJSONWithLimit[publishMessage](r.Body, s.config.MessageSizeLimit*2, false) // 2x to account for JSON format overhead
  1747. if err != nil {
  1748. return err
  1749. }
  1750. if !topicRegex.MatchString(m.Topic) {
  1751. return errHTTPBadRequestTopicInvalid
  1752. }
  1753. if m.Message == "" {
  1754. m.Message = emptyMessageBody
  1755. }
  1756. r.URL.Path = "/" + m.Topic
  1757. r.Body = io.NopCloser(strings.NewReader(m.Message))
  1758. if m.Title != "" {
  1759. r.Header.Set("X-Title", m.Title)
  1760. }
  1761. if m.Priority != 0 {
  1762. r.Header.Set("X-Priority", fmt.Sprintf("%d", m.Priority))
  1763. }
  1764. if len(m.Tags) > 0 {
  1765. r.Header.Set("X-Tags", strings.Join(m.Tags, ","))
  1766. }
  1767. if m.Attach != "" {
  1768. r.Header.Set("X-Attach", m.Attach)
  1769. }
  1770. if m.Filename != "" {
  1771. r.Header.Set("X-Filename", m.Filename)
  1772. }
  1773. if m.Click != "" {
  1774. r.Header.Set("X-Click", m.Click)
  1775. }
  1776. if m.Icon != "" {
  1777. r.Header.Set("X-Icon", m.Icon)
  1778. }
  1779. if m.Markdown {
  1780. r.Header.Set("X-Markdown", "yes")
  1781. }
  1782. if len(m.Actions) > 0 {
  1783. actionsStr, err := json.Marshal(m.Actions)
  1784. if err != nil {
  1785. return errHTTPBadRequestMessageJSONInvalid
  1786. }
  1787. r.Header.Set("X-Actions", string(actionsStr))
  1788. }
  1789. if m.Email != "" {
  1790. r.Header.Set("X-Email", m.Email)
  1791. }
  1792. if m.Delay != "" {
  1793. r.Header.Set("X-Delay", m.Delay)
  1794. }
  1795. if m.Call != "" {
  1796. r.Header.Set("X-Call", m.Call)
  1797. }
  1798. if m.Cache != "" {
  1799. r.Header.Set("X-Cache", m.Cache)
  1800. }
  1801. if m.Firebase != "" {
  1802. r.Header.Set("X-Firebase", m.Firebase)
  1803. }
  1804. return next(w, r, v)
  1805. }
  1806. }
  1807. func (s *Server) transformMatrixJSON(next handleFunc) handleFunc {
  1808. return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1809. newRequest, err := newRequestFromMatrixJSON(r, s.config.BaseURL, s.config.MessageSizeLimit)
  1810. if err != nil {
  1811. logvr(v, r).Tag(tagMatrix).Err(err).Debug("Invalid Matrix request")
  1812. if e, ok := err.(*errMatrixPushkeyRejected); ok {
  1813. return writeMatrixResponse(w, e.rejectedPushKey)
  1814. }
  1815. return err
  1816. }
  1817. if err := next(w, newRequest, v); err != nil {
  1818. logvr(v, r).Tag(tagMatrix).Err(err).Debug("Error handling Matrix request")
  1819. return err
  1820. }
  1821. return nil
  1822. }
  1823. }
  1824. func (s *Server) authorizeTopicWrite(next handleFunc) handleFunc {
  1825. return s.authorizeTopic(next, user.PermissionWrite)
  1826. }
  1827. func (s *Server) authorizeTopicRead(next handleFunc) handleFunc {
  1828. return s.authorizeTopic(next, user.PermissionRead)
  1829. }
  1830. func (s *Server) authorizeTopic(next handleFunc, perm user.Permission) handleFunc {
  1831. return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1832. if s.userManager == nil {
  1833. return next(w, r, v)
  1834. }
  1835. topics, _, err := s.topicsFromPath(r.URL.Path)
  1836. if err != nil {
  1837. return err
  1838. }
  1839. u := v.User()
  1840. for _, t := range topics {
  1841. if err := s.userManager.Authorize(u, t.ID, perm); err != nil {
  1842. logvr(v, r).With(t).Err(err).Debug("Access to topic %s not authorized", t.ID)
  1843. return errHTTPForbidden.With(t)
  1844. }
  1845. }
  1846. return next(w, r, v)
  1847. }
  1848. }
  1849. // maybeAuthenticate reads the "Authorization" header and will try to authenticate the user
  1850. // if it is set.
  1851. //
  1852. // - If auth-file is not configured, immediately return an IP-based visitor
  1853. // - If the header is not set or not supported (anything non-Basic and non-Bearer),
  1854. // an IP-based visitor is returned
  1855. // - If the header is set, authenticate will be called to check the username/password (Basic auth),
  1856. // or the token (Bearer auth), and read the user from the database
  1857. //
  1858. // This function will ALWAYS return a visitor, even if an error occurs (e.g. unauthorized), so
  1859. // that subsequent logging calls still have a visitor context.
  1860. func (s *Server) maybeAuthenticate(r *http.Request) (*visitor, error) {
  1861. // Read the "Authorization" header value and exit out early if it's not set
  1862. ip := extractIPAddress(r, s.config.BehindProxy, s.config.ProxyForwardedHeader, s.config.ProxyTrustedPrefixes)
  1863. vip := s.visitor(ip, nil)
  1864. if s.userManager == nil {
  1865. return vip, nil
  1866. }
  1867. header, err := readAuthHeader(r)
  1868. if err != nil {
  1869. return vip, err
  1870. } else if !supportedAuthHeader(header) {
  1871. return vip, nil
  1872. }
  1873. // If we're trying to auth, check the rate limiter first
  1874. if !vip.AuthAllowed() {
  1875. return vip, errHTTPTooManyRequestsLimitAuthFailure // Always return visitor, even when error occurs!
  1876. }
  1877. u, err := s.authenticate(r, header)
  1878. if err != nil {
  1879. vip.AuthFailed()
  1880. logr(r).Err(err).Debug("Authentication failed")
  1881. return vip, errHTTPUnauthorized // Always return visitor, even when error occurs!
  1882. }
  1883. // Authentication with user was successful
  1884. return s.visitor(ip, u), nil
  1885. }
  1886. // authenticate a user based on basic auth username/password (Authorization: Basic ...), or token auth (Authorization: Bearer ...).
  1887. // The Authorization header can be passed as a header or the ?auth=... query param. The latter is required only to
  1888. // support the WebSocket JavaScript class, which does not support passing headers during the initial request. The auth
  1889. // query param is effectively doubly base64 encoded. Its format is base64(Basic base64(user:pass)).
  1890. func (s *Server) authenticate(r *http.Request, header string) (user *user.User, err error) {
  1891. if strings.HasPrefix(header, "Bearer") {
  1892. return s.authenticateBearerAuth(r, strings.TrimSpace(strings.TrimPrefix(header, "Bearer")))
  1893. }
  1894. return s.authenticateBasicAuth(r, header)
  1895. }
  1896. // readAuthHeader reads the raw value of the Authorization header, either from the actual HTTP header,
  1897. // or from the ?auth... query parameter
  1898. func readAuthHeader(r *http.Request) (string, error) {
  1899. value := strings.TrimSpace(r.Header.Get("Authorization"))
  1900. queryParam := readQueryParam(r, "authorization", "auth")
  1901. if queryParam != "" {
  1902. a, err := base64.RawURLEncoding.DecodeString(queryParam)
  1903. if err != nil {
  1904. return "", err
  1905. }
  1906. value = strings.TrimSpace(string(a))
  1907. }
  1908. return value, nil
  1909. }
  1910. // supportedAuthHeader returns true only if the Authorization header value starts
  1911. // with "Basic" or "Bearer". In particular, an empty value is not supported, and neither
  1912. // are things like "WebPush", or "vapid" (see #629).
  1913. func supportedAuthHeader(value string) bool {
  1914. value = strings.ToLower(value)
  1915. return strings.HasPrefix(value, "basic ") || strings.HasPrefix(value, "bearer ")
  1916. }
  1917. func (s *Server) authenticateBasicAuth(r *http.Request, value string) (user *user.User, err error) {
  1918. r.Header.Set("Authorization", value)
  1919. username, password, ok := r.BasicAuth()
  1920. if !ok {
  1921. return nil, errors.New("invalid basic auth")
  1922. } else if username == "" {
  1923. return s.authenticateBearerAuth(r, password) // Treat password as token
  1924. }
  1925. return s.userManager.Authenticate(username, password)
  1926. }
  1927. func (s *Server) authenticateBearerAuth(r *http.Request, token string) (*user.User, error) {
  1928. u, err := s.userManager.AuthenticateToken(token)
  1929. if err != nil {
  1930. return nil, err
  1931. }
  1932. ip := extractIPAddress(r, s.config.BehindProxy, s.config.ProxyForwardedHeader, s.config.ProxyTrustedPrefixes)
  1933. go s.userManager.EnqueueTokenUpdate(token, &user.TokenUpdate{
  1934. LastAccess: time.Now(),
  1935. LastOrigin: ip,
  1936. })
  1937. return u, nil
  1938. }
  1939. func (s *Server) visitor(ip netip.Addr, user *user.User) *visitor {
  1940. s.mu.Lock()
  1941. defer s.mu.Unlock()
  1942. id := visitorID(ip, user, s.config)
  1943. v, exists := s.visitors[id]
  1944. if !exists {
  1945. s.visitors[id] = newVisitor(s.config, s.messageCache, s.userManager, ip, user)
  1946. return s.visitors[id]
  1947. }
  1948. v.Keepalive()
  1949. v.SetUser(user) // Always update with the latest user, may be nil!
  1950. return v
  1951. }
  1952. func (s *Server) writeJSON(w http.ResponseWriter, v any) error {
  1953. return s.writeJSONWithContentType(w, v, "application/json")
  1954. }
  1955. func (s *Server) writeJSONWithContentType(w http.ResponseWriter, v any, contentType string) error {
  1956. w.Header().Set("Content-Type", contentType)
  1957. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1958. if err := json.NewEncoder(w).Encode(v); err != nil {
  1959. return err
  1960. }
  1961. return nil
  1962. }
  1963. func (s *Server) updateAndWriteStats(messagesCount int64) {
  1964. s.mu.Lock()
  1965. s.messagesHistory = append(s.messagesHistory, messagesCount)
  1966. if len(s.messagesHistory) > messagesHistoryMax {
  1967. s.messagesHistory = s.messagesHistory[1:]
  1968. }
  1969. s.mu.Unlock()
  1970. go func() {
  1971. if err := s.messageCache.UpdateStats(messagesCount); err != nil {
  1972. log.Tag(tagManager).Err(err).Warn("Cannot write messages stats")
  1973. }
  1974. }()
  1975. }