server.go 65 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823
  1. package server
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/sha256"
  6. "embed"
  7. "encoding/base64"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "github.com/emersion/go-smtp"
  12. "github.com/gorilla/websocket"
  13. "github.com/prometheus/client_golang/prometheus/promhttp"
  14. "golang.org/x/sync/errgroup"
  15. "heckel.io/ntfy/log"
  16. "heckel.io/ntfy/user"
  17. "heckel.io/ntfy/util"
  18. "io"
  19. "net"
  20. "net/http"
  21. "net/http/pprof"
  22. "net/netip"
  23. "net/url"
  24. "os"
  25. "path"
  26. "path/filepath"
  27. "regexp"
  28. "sort"
  29. "strconv"
  30. "strings"
  31. "sync"
  32. "time"
  33. "unicode/utf8"
  34. )
  35. // Server is the main server, providing the UI and API for ntfy
  36. type Server struct {
  37. config *Config
  38. httpServer *http.Server
  39. httpsServer *http.Server
  40. httpMetricsServer *http.Server
  41. httpProfileServer *http.Server
  42. unixListener net.Listener
  43. smtpServer *smtp.Server
  44. smtpServerBackend *smtpBackend
  45. smtpSender mailer
  46. topics map[string]*topic
  47. visitors map[string]*visitor // ip:<ip> or user:<user>
  48. firebaseClient *firebaseClient
  49. messages int64
  50. userManager *user.Manager // Might be nil!
  51. messageCache *messageCache // Database that stores the messages
  52. fileCache *fileCache // File system based cache that stores attachments
  53. stripe stripeAPI // Stripe API, can be replaced with a mock
  54. priceCache *util.LookupCache[map[string]int64] // Stripe price ID -> price as cents (USD implied!)
  55. metricsHandler http.Handler // Handles /metrics if enable-metrics set, and listen-metrics-http not set
  56. closeChan chan bool
  57. mu sync.Mutex
  58. }
  59. // handleFunc extends the normal http.HandlerFunc to be able to easily return errors
  60. type handleFunc func(http.ResponseWriter, *http.Request, *visitor) error
  61. var (
  62. // If changed, don't forget to update Android App and auth_sqlite.go
  63. topicRegex = regexp.MustCompile(`^[-_A-Za-z0-9]{1,64}$`) // No /!
  64. topicPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app!
  65. externalTopicPathRegex = regexp.MustCompile(`^/[^/]+\.[^/]+/[-_A-Za-z0-9]{1,64}$`) // Extended topic path, for web-app, e.g. /example.com/mytopic
  66. jsonPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/json$`)
  67. ssePathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/sse$`)
  68. rawPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/raw$`)
  69. wsPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/ws$`)
  70. authPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/auth$`)
  71. publishPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}/(publish|send|trigger)$`)
  72. webConfigPath = "/config.js"
  73. accountPath = "/account"
  74. matrixPushPath = "/_matrix/push/v1/notify"
  75. metricsPath = "/metrics"
  76. apiHealthPath = "/v1/health"
  77. apiTiers = "/v1/tiers"
  78. apiAccountPath = "/v1/account"
  79. apiAccountTokenPath = "/v1/account/token"
  80. apiAccountPasswordPath = "/v1/account/password"
  81. apiAccountSettingsPath = "/v1/account/settings"
  82. apiAccountSubscriptionPath = "/v1/account/subscription"
  83. apiAccountReservationPath = "/v1/account/reservation"
  84. apiAccountBillingPortalPath = "/v1/account/billing/portal"
  85. apiAccountBillingWebhookPath = "/v1/account/billing/webhook"
  86. apiAccountBillingSubscriptionPath = "/v1/account/billing/subscription"
  87. apiAccountBillingSubscriptionCheckoutSuccessTemplate = "/v1/account/billing/subscription/success/{CHECKOUT_SESSION_ID}"
  88. apiAccountBillingSubscriptionCheckoutSuccessRegex = regexp.MustCompile(`/v1/account/billing/subscription/success/(.+)$`)
  89. apiAccountReservationSingleRegex = regexp.MustCompile(`/v1/account/reservation/([-_A-Za-z0-9]{1,64})$`)
  90. staticRegex = regexp.MustCompile(`^/static/.+`)
  91. docsRegex = regexp.MustCompile(`^/docs(|/.*)$`)
  92. fileRegex = regexp.MustCompile(`^/file/([-_A-Za-z0-9]{1,64})(?:\.[A-Za-z0-9]{1,16})?$`)
  93. urlRegex = regexp.MustCompile(`^https?://`)
  94. //go:embed site
  95. webFs embed.FS
  96. webFsCached = &util.CachingEmbedFS{ModTime: time.Now(), FS: webFs}
  97. webSiteDir = "/site"
  98. webHomeIndex = "/home.html" // Landing page, only if "web-root: home"
  99. webAppIndex = "/app.html" // React app
  100. //go:embed docs
  101. docsStaticFs embed.FS
  102. docsStaticCached = &util.CachingEmbedFS{ModTime: time.Now(), FS: docsStaticFs}
  103. )
  104. const (
  105. firebaseControlTopic = "~control" // See Android if changed
  106. firebasePollTopic = "~poll" // See iOS if changed
  107. emptyMessageBody = "triggered" // Used if message body is empty
  108. newMessageBody = "New message" // Used in poll requests as generic message
  109. defaultAttachmentMessage = "You received a file: %s" // Used if message body is empty, and there is an attachment
  110. encodingBase64 = "base64" // Used mainly for binary UnifiedPush messages
  111. jsonBodyBytesLimit = 16384
  112. unifiedPushTopicPrefix = "up" // Temporarily, we rate limit all "up*" topics based on the subscriber
  113. unifiedPushTopicLength = 14
  114. )
  115. // WebSocket constants
  116. const (
  117. wsWriteWait = 2 * time.Second
  118. wsBufferSize = 1024
  119. wsReadLimit = 64 // We only ever receive PINGs
  120. wsPongWait = 15 * time.Second
  121. )
  122. // New instantiates a new Server. It creates the cache and adds a Firebase
  123. // subscriber (if configured).
  124. func New(conf *Config) (*Server, error) {
  125. var mailer mailer
  126. if conf.SMTPSenderAddr != "" {
  127. mailer = &smtpSender{config: conf}
  128. }
  129. var stripe stripeAPI
  130. if conf.StripeSecretKey != "" {
  131. stripe = newStripeAPI()
  132. }
  133. messageCache, err := createMessageCache(conf)
  134. if err != nil {
  135. return nil, err
  136. }
  137. topics, err := messageCache.Topics()
  138. if err != nil {
  139. return nil, err
  140. }
  141. var fileCache *fileCache
  142. if conf.AttachmentCacheDir != "" {
  143. fileCache, err = newFileCache(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit)
  144. if err != nil {
  145. return nil, err
  146. }
  147. }
  148. var userManager *user.Manager
  149. if conf.AuthFile != "" {
  150. userManager, err = user.NewManager(conf.AuthFile, conf.AuthStartupQueries, conf.AuthDefault, conf.AuthBcryptCost, conf.AuthStatsQueueWriterInterval)
  151. if err != nil {
  152. return nil, err
  153. }
  154. }
  155. var firebaseClient *firebaseClient
  156. if conf.FirebaseKeyFile != "" {
  157. sender, err := newFirebaseSender(conf.FirebaseKeyFile)
  158. if err != nil {
  159. return nil, err
  160. }
  161. // This awkward logic is required because Go is weird about nil types and interfaces.
  162. // See issue #641, and https://go.dev/play/p/uur1flrv1t3 for an example
  163. var auther user.Auther
  164. if userManager != nil {
  165. auther = userManager
  166. }
  167. firebaseClient = newFirebaseClient(sender, auther)
  168. }
  169. s := &Server{
  170. config: conf,
  171. messageCache: messageCache,
  172. fileCache: fileCache,
  173. firebaseClient: firebaseClient,
  174. smtpSender: mailer,
  175. topics: topics,
  176. userManager: userManager,
  177. visitors: make(map[string]*visitor),
  178. stripe: stripe,
  179. }
  180. s.priceCache = util.NewLookupCache(s.fetchStripePrices, conf.StripePriceCacheDuration)
  181. return s, nil
  182. }
  183. func createMessageCache(conf *Config) (*messageCache, error) {
  184. if conf.CacheDuration == 0 {
  185. return newNopCache()
  186. } else if conf.CacheFile != "" {
  187. return newSqliteCache(conf.CacheFile, conf.CacheStartupQueries, conf.CacheDuration, conf.CacheBatchSize, conf.CacheBatchTimeout, false)
  188. }
  189. return newMemCache()
  190. }
  191. // Run executes the main server. It listens on HTTP (+ HTTPS, if configured), and starts
  192. // a manager go routine to print stats and prune messages.
  193. func (s *Server) Run() error {
  194. var listenStr string
  195. if s.config.ListenHTTP != "" {
  196. listenStr += fmt.Sprintf(" %s[http]", s.config.ListenHTTP)
  197. }
  198. if s.config.ListenHTTPS != "" {
  199. listenStr += fmt.Sprintf(" %s[https]", s.config.ListenHTTPS)
  200. }
  201. if s.config.ListenUnix != "" {
  202. listenStr += fmt.Sprintf(" %s[unix]", s.config.ListenUnix)
  203. }
  204. if s.config.SMTPServerListen != "" {
  205. listenStr += fmt.Sprintf(" %s[smtp]", s.config.SMTPServerListen)
  206. }
  207. if s.config.MetricsListenHTTP != "" {
  208. listenStr += fmt.Sprintf(" %s[http/metrics]", s.config.MetricsListenHTTP)
  209. }
  210. if s.config.ProfileListenHTTP != "" {
  211. listenStr += fmt.Sprintf(" %s[http/profile]", s.config.ProfileListenHTTP)
  212. }
  213. log.Tag(tagStartup).Info("Listening on%s, ntfy %s, log level is %s", listenStr, s.config.Version, log.CurrentLevel().String())
  214. if log.IsFile() {
  215. fmt.Fprintf(os.Stderr, "Listening on%s, ntfy %s\n", listenStr, s.config.Version)
  216. fmt.Fprintf(os.Stderr, "Logs are written to %s\n", log.File())
  217. }
  218. mux := http.NewServeMux()
  219. mux.HandleFunc("/", s.handle)
  220. errChan := make(chan error)
  221. s.mu.Lock()
  222. s.closeChan = make(chan bool)
  223. if s.config.ListenHTTP != "" {
  224. s.httpServer = &http.Server{Addr: s.config.ListenHTTP, Handler: mux}
  225. go func() {
  226. errChan <- s.httpServer.ListenAndServe()
  227. }()
  228. }
  229. if s.config.ListenHTTPS != "" {
  230. s.httpsServer = &http.Server{Addr: s.config.ListenHTTPS, Handler: mux}
  231. go func() {
  232. errChan <- s.httpsServer.ListenAndServeTLS(s.config.CertFile, s.config.KeyFile)
  233. }()
  234. }
  235. if s.config.ListenUnix != "" {
  236. go func() {
  237. var err error
  238. s.mu.Lock()
  239. os.Remove(s.config.ListenUnix)
  240. s.unixListener, err = net.Listen("unix", s.config.ListenUnix)
  241. if err != nil {
  242. s.mu.Unlock()
  243. errChan <- err
  244. return
  245. }
  246. defer s.unixListener.Close()
  247. if s.config.ListenUnixMode > 0 {
  248. if err := os.Chmod(s.config.ListenUnix, s.config.ListenUnixMode); err != nil {
  249. s.mu.Unlock()
  250. errChan <- err
  251. return
  252. }
  253. }
  254. s.mu.Unlock()
  255. httpServer := &http.Server{Handler: mux}
  256. errChan <- httpServer.Serve(s.unixListener)
  257. }()
  258. }
  259. if s.config.MetricsListenHTTP != "" {
  260. initMetrics()
  261. s.httpMetricsServer = &http.Server{Addr: s.config.MetricsListenHTTP, Handler: promhttp.Handler()}
  262. go func() {
  263. errChan <- s.httpMetricsServer.ListenAndServe()
  264. }()
  265. } else if s.config.EnableMetrics {
  266. initMetrics()
  267. s.metricsHandler = promhttp.Handler()
  268. }
  269. if s.config.ProfileListenHTTP != "" {
  270. profileMux := http.NewServeMux()
  271. profileMux.HandleFunc("/debug/pprof/", pprof.Index)
  272. profileMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
  273. profileMux.HandleFunc("/debug/pprof/profile", pprof.Profile)
  274. profileMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
  275. profileMux.HandleFunc("/debug/pprof/trace", pprof.Trace)
  276. s.httpProfileServer = &http.Server{Addr: s.config.ProfileListenHTTP, Handler: profileMux}
  277. go func() {
  278. errChan <- s.httpProfileServer.ListenAndServe()
  279. }()
  280. }
  281. if s.config.SMTPServerListen != "" {
  282. go func() {
  283. errChan <- s.runSMTPServer()
  284. }()
  285. }
  286. s.mu.Unlock()
  287. go s.runManager()
  288. go s.runStatsResetter()
  289. go s.runDelayedSender()
  290. go s.runFirebaseKeepaliver()
  291. return <-errChan
  292. }
  293. // Stop stops HTTP (+HTTPS) server and all managers
  294. func (s *Server) Stop() {
  295. s.mu.Lock()
  296. defer s.mu.Unlock()
  297. if s.httpServer != nil {
  298. s.httpServer.Close()
  299. }
  300. if s.httpsServer != nil {
  301. s.httpsServer.Close()
  302. }
  303. if s.unixListener != nil {
  304. s.unixListener.Close()
  305. }
  306. if s.smtpServer != nil {
  307. s.smtpServer.Close()
  308. }
  309. s.closeDatabases()
  310. close(s.closeChan)
  311. }
  312. func (s *Server) closeDatabases() {
  313. if s.userManager != nil {
  314. s.userManager.Close()
  315. }
  316. s.messageCache.Close()
  317. }
  318. // handle is the main entry point for all HTTP requests
  319. func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
  320. v, err := s.maybeAuthenticate(r) // Note: Always returns v, even when error is returned
  321. if err != nil {
  322. s.handleError(w, r, v, err)
  323. return
  324. }
  325. ev := logvr(v, r)
  326. if ev.IsTrace() {
  327. ev.Field("http_request", renderHTTPRequest(r)).Trace("HTTP request started")
  328. } else if logvr(v, r).IsDebug() {
  329. ev.Debug("HTTP request started")
  330. }
  331. logvr(v, r).
  332. Timing(func() {
  333. if err := s.handleInternal(w, r, v); err != nil {
  334. s.handleError(w, r, v, err)
  335. return
  336. }
  337. if metricHTTPRequests != nil {
  338. metricHTTPRequests.WithLabelValues("200", "20000", r.Method).Inc()
  339. }
  340. }).
  341. Debug("HTTP request finished")
  342. }
  343. func (s *Server) handleError(w http.ResponseWriter, r *http.Request, v *visitor, err error) {
  344. httpErr, ok := err.(*errHTTP)
  345. if !ok {
  346. httpErr = errHTTPInternalError
  347. }
  348. if metricHTTPRequests != nil {
  349. metricHTTPRequests.WithLabelValues(fmt.Sprintf("%d", httpErr.HTTPCode), fmt.Sprintf("%d", httpErr.Code), r.Method).Inc()
  350. }
  351. isRateLimiting := util.Contains(rateLimitingErrorCodes, httpErr.HTTPCode)
  352. isNormalError := strings.Contains(err.Error(), "i/o timeout") || util.Contains(normalErrorCodes, httpErr.HTTPCode)
  353. ev := logvr(v, r).Err(err)
  354. if websocket.IsWebSocketUpgrade(r) {
  355. ev.Tag(tagWebsocket).Fields(websocketErrorContext(err))
  356. if isNormalError {
  357. ev.Debug("WebSocket error (this error is okay, it happens a lot): %s", err.Error())
  358. } else {
  359. ev.Info("WebSocket error: %s", err.Error())
  360. }
  361. return // Do not attempt to write to upgraded connection
  362. }
  363. if isNormalError {
  364. ev.Debug("Connection closed with HTTP %d (ntfy error %d)", httpErr.HTTPCode, httpErr.Code)
  365. } else {
  366. ev.Info("Connection closed with HTTP %d (ntfy error %d)", httpErr.HTTPCode, httpErr.Code)
  367. }
  368. if isRateLimiting && s.config.StripeSecretKey != "" {
  369. u := v.User()
  370. if u == nil || u.Tier == nil {
  371. httpErr = httpErr.Wrap("increase your limits with a paid plan, see %s", s.config.BaseURL)
  372. }
  373. }
  374. w.Header().Set("Content-Type", "application/json")
  375. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  376. w.WriteHeader(httpErr.HTTPCode)
  377. io.WriteString(w, httpErr.JSON()+"\n")
  378. }
  379. func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request, v *visitor) error {
  380. if r.Method == http.MethodGet && r.URL.Path == "/" {
  381. return s.ensureWebEnabled(s.handleHome)(w, r, v)
  382. } else if r.Method == http.MethodHead && r.URL.Path == "/" {
  383. return s.ensureWebEnabled(s.handleEmpty)(w, r, v)
  384. } else if r.Method == http.MethodGet && r.URL.Path == apiHealthPath {
  385. return s.handleHealth(w, r, v)
  386. } else if r.Method == http.MethodGet && r.URL.Path == webConfigPath {
  387. return s.ensureWebEnabled(s.handleWebConfig)(w, r, v)
  388. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountPath {
  389. return s.ensureUserManager(s.handleAccountCreate)(w, r, v)
  390. } else if r.Method == http.MethodGet && r.URL.Path == apiAccountPath {
  391. return s.handleAccountGet(w, r, v) // Allowed by anonymous
  392. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountPath {
  393. return s.ensureUser(s.withAccountSync(s.handleAccountDelete))(w, r, v)
  394. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountPasswordPath {
  395. return s.ensureUser(s.handleAccountPasswordChange)(w, r, v)
  396. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountTokenPath {
  397. return s.ensureUser(s.withAccountSync(s.handleAccountTokenCreate))(w, r, v)
  398. } else if r.Method == http.MethodPatch && r.URL.Path == apiAccountTokenPath {
  399. return s.ensureUser(s.withAccountSync(s.handleAccountTokenUpdate))(w, r, v)
  400. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountTokenPath {
  401. return s.ensureUser(s.withAccountSync(s.handleAccountTokenDelete))(w, r, v)
  402. } else if r.Method == http.MethodPatch && r.URL.Path == apiAccountSettingsPath {
  403. return s.ensureUser(s.withAccountSync(s.handleAccountSettingsChange))(w, r, v)
  404. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountSubscriptionPath {
  405. return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionAdd))(w, r, v)
  406. } else if r.Method == http.MethodPatch && r.URL.Path == apiAccountSubscriptionPath {
  407. return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionChange))(w, r, v)
  408. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountSubscriptionPath {
  409. return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionDelete))(w, r, v)
  410. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountReservationPath {
  411. return s.ensureUser(s.withAccountSync(s.handleAccountReservationAdd))(w, r, v)
  412. } else if r.Method == http.MethodDelete && apiAccountReservationSingleRegex.MatchString(r.URL.Path) {
  413. return s.ensureUser(s.withAccountSync(s.handleAccountReservationDelete))(w, r, v)
  414. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingSubscriptionPath {
  415. return s.ensurePaymentsEnabled(s.ensureUser(s.handleAccountBillingSubscriptionCreate))(w, r, v) // Account sync via incoming Stripe webhook
  416. } else if r.Method == http.MethodGet && apiAccountBillingSubscriptionCheckoutSuccessRegex.MatchString(r.URL.Path) {
  417. return s.ensurePaymentsEnabled(s.ensureUserManager(s.handleAccountBillingSubscriptionCreateSuccess))(w, r, v) // No user context!
  418. } else if r.Method == http.MethodPut && r.URL.Path == apiAccountBillingSubscriptionPath {
  419. return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingSubscriptionUpdate))(w, r, v) // Account sync via incoming Stripe webhook
  420. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountBillingSubscriptionPath {
  421. return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingSubscriptionDelete))(w, r, v) // Account sync via incoming Stripe webhook
  422. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingPortalPath {
  423. return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingPortalSessionCreate))(w, r, v)
  424. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingWebhookPath {
  425. return s.ensurePaymentsEnabled(s.ensureUserManager(s.handleAccountBillingWebhook))(w, r, v) // This request comes from Stripe!
  426. } else if r.Method == http.MethodGet && r.URL.Path == apiTiers {
  427. return s.ensurePaymentsEnabled(s.handleBillingTiersGet)(w, r, v)
  428. } else if r.Method == http.MethodGet && r.URL.Path == matrixPushPath {
  429. return s.handleMatrixDiscovery(w)
  430. } else if r.Method == http.MethodGet && r.URL.Path == metricsPath && s.metricsHandler != nil {
  431. return s.handleMetrics(w, r, v)
  432. } else if r.Method == http.MethodGet && staticRegex.MatchString(r.URL.Path) {
  433. return s.ensureWebEnabled(s.handleStatic)(w, r, v)
  434. } else if r.Method == http.MethodGet && docsRegex.MatchString(r.URL.Path) {
  435. return s.ensureWebEnabled(s.handleDocs)(w, r, v)
  436. } else if (r.Method == http.MethodGet || r.Method == http.MethodHead) && fileRegex.MatchString(r.URL.Path) && s.config.AttachmentCacheDir != "" {
  437. return s.limitRequests(s.handleFile)(w, r, v)
  438. } else if r.Method == http.MethodOptions {
  439. return s.limitRequests(s.handleOptions)(w, r, v) // Should work even if the web app is not enabled, see #598
  440. } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && r.URL.Path == "/" {
  441. return s.transformBodyJSON(s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish)))(w, r, v)
  442. } else if r.Method == http.MethodPost && r.URL.Path == matrixPushPath {
  443. return s.transformMatrixJSON(s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublishMatrix)))(w, r, v)
  444. } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && topicPathRegex.MatchString(r.URL.Path) {
  445. return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish))(w, r, v)
  446. } else if r.Method == http.MethodGet && publishPathRegex.MatchString(r.URL.Path) {
  447. return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish))(w, r, v)
  448. } else if r.Method == http.MethodGet && jsonPathRegex.MatchString(r.URL.Path) {
  449. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeJSON))(w, r, v)
  450. } else if r.Method == http.MethodGet && ssePathRegex.MatchString(r.URL.Path) {
  451. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeSSE))(w, r, v)
  452. } else if r.Method == http.MethodGet && rawPathRegex.MatchString(r.URL.Path) {
  453. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeRaw))(w, r, v)
  454. } else if r.Method == http.MethodGet && wsPathRegex.MatchString(r.URL.Path) {
  455. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeWS))(w, r, v)
  456. } else if r.Method == http.MethodGet && authPathRegex.MatchString(r.URL.Path) {
  457. return s.limitRequests(s.authorizeTopicRead(s.handleTopicAuth))(w, r, v)
  458. } else if r.Method == http.MethodGet && (topicPathRegex.MatchString(r.URL.Path) || externalTopicPathRegex.MatchString(r.URL.Path)) {
  459. return s.ensureWebEnabled(s.handleTopic)(w, r, v)
  460. }
  461. return errHTTPNotFound
  462. }
  463. func (s *Server) handleHome(w http.ResponseWriter, r *http.Request, v *visitor) error {
  464. if s.config.WebRootIsApp {
  465. r.URL.Path = webAppIndex
  466. } else {
  467. r.URL.Path = webHomeIndex
  468. }
  469. return s.handleStatic(w, r, v)
  470. }
  471. func (s *Server) handleTopic(w http.ResponseWriter, r *http.Request, v *visitor) error {
  472. unifiedpush := readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see PUT/POST too!
  473. if unifiedpush {
  474. w.Header().Set("Content-Type", "application/json")
  475. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  476. _, err := io.WriteString(w, `{"unifiedpush":{"version":1}}`+"\n")
  477. return err
  478. }
  479. r.URL.Path = webAppIndex
  480. return s.handleStatic(w, r, v)
  481. }
  482. func (s *Server) handleEmpty(_ http.ResponseWriter, _ *http.Request, _ *visitor) error {
  483. return nil
  484. }
  485. func (s *Server) handleTopicAuth(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  486. return s.writeJSON(w, newSuccessResponse())
  487. }
  488. func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  489. response := &apiHealthResponse{
  490. Healthy: true,
  491. }
  492. return s.writeJSON(w, response)
  493. }
  494. func (s *Server) handleWebConfig(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  495. appRoot := "/"
  496. if !s.config.WebRootIsApp {
  497. appRoot = "/app"
  498. }
  499. response := &apiConfigResponse{
  500. BaseURL: "", // Will translate to window.location.origin
  501. AppRoot: appRoot,
  502. EnableLogin: s.config.EnableLogin,
  503. EnableSignup: s.config.EnableSignup,
  504. EnablePayments: s.config.StripeSecretKey != "",
  505. EnableReservations: s.config.EnableReservations,
  506. BillingContact: s.config.BillingContact,
  507. DisallowedTopics: s.config.DisallowedTopics,
  508. }
  509. b, err := json.MarshalIndent(response, "", " ")
  510. if err != nil {
  511. return err
  512. }
  513. w.Header().Set("Content-Type", "text/javascript")
  514. _, err = io.WriteString(w, fmt.Sprintf("// Generated server configuration\nvar config = %s;\n", string(b)))
  515. return err
  516. }
  517. // handleMetrics returns Prometheus metrics. This endpoint is only called if enable-metrics is set,
  518. // and listen-metrics-http is not set.
  519. func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request, _ *visitor) error {
  520. s.metricsHandler.ServeHTTP(w, r)
  521. return nil
  522. }
  523. func (s *Server) handleStatic(w http.ResponseWriter, r *http.Request, _ *visitor) error {
  524. r.URL.Path = webSiteDir + r.URL.Path
  525. util.Gzip(http.FileServer(http.FS(webFsCached))).ServeHTTP(w, r)
  526. return nil
  527. }
  528. func (s *Server) handleDocs(w http.ResponseWriter, r *http.Request, _ *visitor) error {
  529. util.Gzip(http.FileServer(http.FS(docsStaticCached))).ServeHTTP(w, r)
  530. return nil
  531. }
  532. // handleFile processes the download of attachment files. The method handles GET and HEAD requests against a file.
  533. // Before streaming the file to a client, it locates uploader (m.Sender or m.User) in the message cache, so it
  534. // can associate the download bandwidth with the uploader.
  535. func (s *Server) handleFile(w http.ResponseWriter, r *http.Request, v *visitor) error {
  536. if s.config.AttachmentCacheDir == "" {
  537. return errHTTPInternalError
  538. }
  539. matches := fileRegex.FindStringSubmatch(r.URL.Path)
  540. if len(matches) != 2 {
  541. return errHTTPInternalErrorInvalidPath
  542. }
  543. messageID := matches[1]
  544. file := filepath.Join(s.config.AttachmentCacheDir, messageID)
  545. stat, err := os.Stat(file)
  546. if err != nil {
  547. return errHTTPNotFound.Fields(log.Context{
  548. "message_id": messageID,
  549. "error_context": "filesystem",
  550. })
  551. }
  552. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  553. w.Header().Set("Content-Length", fmt.Sprintf("%d", stat.Size()))
  554. if r.Method == http.MethodHead {
  555. return nil
  556. }
  557. // Find message in database, and associate bandwidth to the uploader user
  558. // This is an easy way to
  559. // - avoid abuse (e.g. 1 uploader, 1k downloaders)
  560. // - and also uses the higher bandwidth limits of a paying user
  561. m, err := s.messageCache.Message(messageID)
  562. if err == errMessageNotFound {
  563. if s.config.CacheBatchTimeout > 0 {
  564. // Strange edge case: If we immediately after upload request the file (the web app does this for images),
  565. // and messages are persisted asynchronously, retry fetching from the database
  566. m, err = util.Retry(func() (*message, error) {
  567. return s.messageCache.Message(messageID)
  568. }, s.config.CacheBatchTimeout, 100*time.Millisecond, 300*time.Millisecond, 600*time.Millisecond)
  569. }
  570. if err != nil {
  571. return errHTTPNotFound.Fields(log.Context{
  572. "message_id": messageID,
  573. "error_context": "message_cache",
  574. })
  575. }
  576. } else if err != nil {
  577. return err
  578. }
  579. bandwidthVisitor := v
  580. if s.userManager != nil && m.User != "" {
  581. u, err := s.userManager.UserByID(m.User)
  582. if err != nil {
  583. return err
  584. }
  585. bandwidthVisitor = s.visitor(v.IP(), u)
  586. } else if m.Sender.IsValid() {
  587. bandwidthVisitor = s.visitor(m.Sender, nil)
  588. }
  589. if !bandwidthVisitor.BandwidthAllowed(stat.Size()) {
  590. return errHTTPTooManyRequestsLimitAttachmentBandwidth.With(m)
  591. }
  592. // Actually send file
  593. f, err := os.Open(file)
  594. if err != nil {
  595. return err
  596. }
  597. defer f.Close()
  598. _, err = io.Copy(util.NewContentTypeWriter(w, r.URL.Path), f)
  599. return err
  600. }
  601. func (s *Server) handleMatrixDiscovery(w http.ResponseWriter) error {
  602. if s.config.BaseURL == "" {
  603. return errHTTPInternalErrorMissingBaseURL
  604. }
  605. return writeMatrixDiscoveryResponse(w)
  606. }
  607. func (s *Server) handlePublishInternal(r *http.Request, v *visitor) (*message, error) {
  608. start := time.Now()
  609. t, err := fromContext[*topic](r, contextTopic)
  610. if err != nil {
  611. return nil, err
  612. }
  613. vrate, err := fromContext[*visitor](r, contextRateVisitor)
  614. if err != nil {
  615. return nil, err
  616. }
  617. body, err := util.Peek(r.Body, s.config.MessageLimit)
  618. if err != nil {
  619. return nil, err
  620. }
  621. m := newDefaultMessage(t.ID, "")
  622. cache, firebase, email, unifiedpush, e := s.parsePublishParams(r, m)
  623. if e != nil {
  624. return nil, e.With(t)
  625. }
  626. if unifiedpush && s.config.VisitorSubscriberRateLimiting && t.RateVisitor() == nil {
  627. // UnifiedPush clients must subscribe before publishing to allow proper subscriber-based rate limiting (see
  628. // Rate-Topics header). The 5xx response is because some app servers (in particular Mastodon) will remove
  629. // the subscription as invalid if any 400-499 code (except 429/408) is returned.
  630. // See https://github.com/mastodon/mastodon/blob/730bb3e211a84a2f30e3e2bbeae3f77149824a68/app/workers/web/push_notification_worker.rb#L35-L46
  631. return nil, errHTTPInsufficientStorageUnifiedPush.With(t)
  632. } else if !util.ContainsIP(s.config.VisitorRequestExemptIPAddrs, v.ip) && !vrate.MessageAllowed() {
  633. return nil, errHTTPTooManyRequestsLimitMessages.With(t)
  634. } else if email != "" && !vrate.EmailAllowed() {
  635. return nil, errHTTPTooManyRequestsLimitEmails.With(t)
  636. }
  637. if m.PollID != "" {
  638. m = newPollRequestMessage(t.ID, m.PollID)
  639. }
  640. m.Sender = v.IP()
  641. m.User = v.MaybeUserID()
  642. if cache {
  643. m.Expires = time.Unix(m.Time, 0).Add(v.Limits().MessageExpiryDuration).Unix()
  644. }
  645. if err := s.handlePublishBody(r, v, m, body, unifiedpush); err != nil {
  646. return nil, err
  647. }
  648. if m.Message == "" {
  649. m.Message = emptyMessageBody
  650. }
  651. delayed := m.Time > time.Now().Unix()
  652. ev := logvrm(v, r, m).
  653. Tag(tagPublish).
  654. With(t).
  655. Fields(log.Context{
  656. "message_delayed": delayed,
  657. "message_firebase": firebase,
  658. "message_unifiedpush": unifiedpush,
  659. "message_email": email,
  660. })
  661. if ev.IsTrace() {
  662. ev.Field("message_body", util.MaybeMarshalJSON(m)).Trace("Received message")
  663. } else if ev.IsDebug() {
  664. ev.Debug("Received message")
  665. }
  666. if !delayed {
  667. if err := t.Publish(v, m); err != nil {
  668. return nil, err
  669. }
  670. if s.firebaseClient != nil && firebase {
  671. go s.sendToFirebase(v, m)
  672. }
  673. if s.smtpSender != nil && email != "" {
  674. go s.sendEmail(v, m, email)
  675. }
  676. if s.config.UpstreamBaseURL != "" {
  677. go s.forwardPollRequest(v, m)
  678. }
  679. } else {
  680. logvrm(v, r, m).Tag(tagPublish).Debug("Message delayed, will process later")
  681. }
  682. if cache {
  683. logvrm(v, r, m).Tag(tagPublish).Debug("Adding message to cache")
  684. if err := s.messageCache.AddMessage(m); err != nil {
  685. return nil, err
  686. }
  687. }
  688. u := v.User()
  689. if s.userManager != nil && u != nil && u.Tier != nil {
  690. go s.userManager.EnqueueUserStats(u.ID, v.Stats())
  691. }
  692. s.mu.Lock()
  693. s.messages++
  694. s.mu.Unlock()
  695. if unifiedpush {
  696. minc(metricUnifiedPushPublishedSuccess)
  697. }
  698. mset(metricMessagePublishDurationMillis, time.Since(start).Milliseconds())
  699. return m, nil
  700. }
  701. func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visitor) error {
  702. m, err := s.handlePublishInternal(r, v)
  703. if err != nil {
  704. minc(metricMessagesPublishedFailure)
  705. return err
  706. }
  707. minc(metricMessagesPublishedSuccess)
  708. return s.writeJSON(w, m)
  709. }
  710. func (s *Server) handlePublishMatrix(w http.ResponseWriter, r *http.Request, v *visitor) error {
  711. _, err := s.handlePublishInternal(r, v)
  712. if err != nil {
  713. minc(metricMessagesPublishedFailure)
  714. minc(metricMatrixPublishedFailure)
  715. if e, ok := err.(*errHTTP); ok && e.HTTPCode == errHTTPInsufficientStorageUnifiedPush.HTTPCode {
  716. topic, err := fromContext[*topic](r, contextTopic)
  717. if err != nil {
  718. return err
  719. }
  720. pushKey, err := fromContext[string](r, contextMatrixPushKey)
  721. if err != nil {
  722. return err
  723. }
  724. if time.Since(topic.LastAccess()) > matrixRejectPushKeyForUnifiedPushTopicWithoutRateVisitorAfter {
  725. return writeMatrixResponse(w, pushKey)
  726. }
  727. }
  728. return err
  729. }
  730. minc(metricMessagesPublishedSuccess)
  731. minc(metricMatrixPublishedSuccess)
  732. return writeMatrixSuccess(w)
  733. }
  734. func (s *Server) sendToFirebase(v *visitor, m *message) {
  735. logvm(v, m).Tag(tagFirebase).Debug("Publishing to Firebase")
  736. if err := s.firebaseClient.Send(v, m); err != nil {
  737. minc(metricFirebasePublishedFailure)
  738. if err == errFirebaseTemporarilyBanned {
  739. logvm(v, m).Tag(tagFirebase).Err(err).Debug("Unable to publish to Firebase: %v", err.Error())
  740. } else {
  741. logvm(v, m).Tag(tagFirebase).Err(err).Warn("Unable to publish to Firebase: %v", err.Error())
  742. }
  743. return
  744. }
  745. minc(metricFirebasePublishedSuccess)
  746. }
  747. func (s *Server) sendEmail(v *visitor, m *message, email string) {
  748. logvm(v, m).Tag(tagEmail).Field("email", email).Debug("Sending email to %s", email)
  749. if err := s.smtpSender.Send(v, m, email); err != nil {
  750. logvm(v, m).Tag(tagEmail).Field("email", email).Err(err).Warn("Unable to send email to %s: %v", email, err.Error())
  751. minc(metricEmailsPublishedFailure)
  752. return
  753. }
  754. minc(metricEmailsPublishedSuccess)
  755. }
  756. func (s *Server) forwardPollRequest(v *visitor, m *message) {
  757. topicURL := fmt.Sprintf("%s/%s", s.config.BaseURL, m.Topic)
  758. topicHash := fmt.Sprintf("%x", sha256.Sum256([]byte(topicURL)))
  759. forwardURL := fmt.Sprintf("%s/%s", s.config.UpstreamBaseURL, topicHash)
  760. logvm(v, m).Debug("Publishing poll request to %s", forwardURL)
  761. req, err := http.NewRequest("POST", forwardURL, strings.NewReader(""))
  762. if err != nil {
  763. logvm(v, m).Err(err).Warn("Unable to publish poll request")
  764. return
  765. }
  766. req.Header.Set("X-Poll-ID", m.ID)
  767. var httpClient = &http.Client{
  768. Timeout: time.Second * 10,
  769. }
  770. response, err := httpClient.Do(req)
  771. if err != nil {
  772. logvm(v, m).Err(err).Warn("Unable to publish poll request")
  773. return
  774. } else if response.StatusCode != http.StatusOK {
  775. logvm(v, m).Err(err).Warn("Unable to publish poll request, unexpected HTTP status: %d", response.StatusCode)
  776. return
  777. }
  778. }
  779. func (s *Server) parsePublishParams(r *http.Request, m *message) (cache bool, firebase bool, email string, unifiedpush bool, err *errHTTP) {
  780. cache = readBoolParam(r, true, "x-cache", "cache")
  781. firebase = readBoolParam(r, true, "x-firebase", "firebase")
  782. m.Title = readParam(r, "x-title", "title", "t")
  783. m.Click = readParam(r, "x-click", "click")
  784. icon := readParam(r, "x-icon", "icon")
  785. filename := readParam(r, "x-filename", "filename", "file", "f")
  786. attach := readParam(r, "x-attach", "attach", "a")
  787. if attach != "" || filename != "" {
  788. m.Attachment = &attachment{}
  789. }
  790. if filename != "" {
  791. m.Attachment.Name = filename
  792. }
  793. if attach != "" {
  794. if !urlRegex.MatchString(attach) {
  795. return false, false, "", false, errHTTPBadRequestAttachmentURLInvalid
  796. }
  797. m.Attachment.URL = attach
  798. if m.Attachment.Name == "" {
  799. u, err := url.Parse(m.Attachment.URL)
  800. if err == nil {
  801. m.Attachment.Name = path.Base(u.Path)
  802. if m.Attachment.Name == "." || m.Attachment.Name == "/" {
  803. m.Attachment.Name = ""
  804. }
  805. }
  806. }
  807. if m.Attachment.Name == "" {
  808. m.Attachment.Name = "attachment"
  809. }
  810. }
  811. if icon != "" {
  812. if !urlRegex.MatchString(icon) {
  813. return false, false, "", false, errHTTPBadRequestIconURLInvalid
  814. }
  815. m.Icon = icon
  816. }
  817. email = readParam(r, "x-email", "x-e-mail", "email", "e-mail", "mail", "e")
  818. if s.smtpSender == nil && email != "" {
  819. return false, false, "", false, errHTTPBadRequestEmailDisabled
  820. }
  821. messageStr := strings.ReplaceAll(readParam(r, "x-message", "message", "m"), "\\n", "\n")
  822. if messageStr != "" {
  823. m.Message = messageStr
  824. }
  825. var e error
  826. m.Priority, e = util.ParsePriority(readParam(r, "x-priority", "priority", "prio", "p"))
  827. if e != nil {
  828. return false, false, "", false, errHTTPBadRequestPriorityInvalid
  829. }
  830. m.Tags = readCommaSeparatedParam(r, "x-tags", "tags", "tag", "ta")
  831. delayStr := readParam(r, "x-delay", "delay", "x-at", "at", "x-in", "in")
  832. if delayStr != "" {
  833. if !cache {
  834. return false, false, "", false, errHTTPBadRequestDelayNoCache
  835. }
  836. if email != "" {
  837. return false, false, "", false, errHTTPBadRequestDelayNoEmail // we cannot store the email address (yet)
  838. }
  839. delay, err := util.ParseFutureTime(delayStr, time.Now())
  840. if err != nil {
  841. return false, false, "", false, errHTTPBadRequestDelayCannotParse
  842. } else if delay.Unix() < time.Now().Add(s.config.MinDelay).Unix() {
  843. return false, false, "", false, errHTTPBadRequestDelayTooSmall
  844. } else if delay.Unix() > time.Now().Add(s.config.MaxDelay).Unix() {
  845. return false, false, "", false, errHTTPBadRequestDelayTooLarge
  846. }
  847. m.Time = delay.Unix()
  848. }
  849. actionsStr := readParam(r, "x-actions", "actions", "action")
  850. if actionsStr != "" {
  851. m.Actions, e = parseActions(actionsStr)
  852. if e != nil {
  853. return false, false, "", false, errHTTPBadRequestActionsInvalid.Wrap(e.Error())
  854. }
  855. }
  856. unifiedpush = readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see GET too!
  857. if unifiedpush {
  858. firebase = false
  859. unifiedpush = true
  860. }
  861. m.PollID = readParam(r, "x-poll-id", "poll-id")
  862. if m.PollID != "" {
  863. unifiedpush = false
  864. cache = false
  865. email = ""
  866. }
  867. return cache, firebase, email, unifiedpush, nil
  868. }
  869. // handlePublishBody consumes the PUT/POST body and decides whether the body is an attachment or the message.
  870. //
  871. // 1. curl -X POST -H "Poll: 1234" ntfy.sh/...
  872. // If a message is flagged as poll request, the body does not matter and is discarded
  873. // 2. curl -T somebinarydata.bin "ntfy.sh/mytopic?up=1"
  874. // If body is binary, encode as base64, if not do not encode
  875. // 3. curl -H "Attach: http://example.com/file.jpg" ntfy.sh/mytopic
  876. // Body must be a message, because we attached an external URL
  877. // 4. curl -T short.txt -H "Filename: short.txt" ntfy.sh/mytopic
  878. // Body must be attachment, because we passed a filename
  879. // 5. curl -T file.txt ntfy.sh/mytopic
  880. // If file.txt is <= 4096 (message limit) and valid UTF-8, treat it as a message
  881. // 6. curl -T file.txt ntfy.sh/mytopic
  882. // If file.txt is > message limit, treat it as an attachment
  883. func (s *Server) handlePublishBody(r *http.Request, v *visitor, m *message, body *util.PeekedReadCloser, unifiedpush bool) error {
  884. if m.Event == pollRequestEvent { // Case 1
  885. return s.handleBodyDiscard(body)
  886. } else if unifiedpush {
  887. return s.handleBodyAsMessageAutoDetect(m, body) // Case 2
  888. } else if m.Attachment != nil && m.Attachment.URL != "" {
  889. return s.handleBodyAsTextMessage(m, body) // Case 3
  890. } else if m.Attachment != nil && m.Attachment.Name != "" {
  891. return s.handleBodyAsAttachment(r, v, m, body) // Case 4
  892. } else if !body.LimitReached && utf8.Valid(body.PeekedBytes) {
  893. return s.handleBodyAsTextMessage(m, body) // Case 5
  894. }
  895. return s.handleBodyAsAttachment(r, v, m, body) // Case 6
  896. }
  897. func (s *Server) handleBodyDiscard(body *util.PeekedReadCloser) error {
  898. _, err := io.Copy(io.Discard, body)
  899. _ = body.Close()
  900. return err
  901. }
  902. func (s *Server) handleBodyAsMessageAutoDetect(m *message, body *util.PeekedReadCloser) error {
  903. if utf8.Valid(body.PeekedBytes) {
  904. m.Message = string(body.PeekedBytes) // Do not trim
  905. } else {
  906. m.Message = base64.StdEncoding.EncodeToString(body.PeekedBytes)
  907. m.Encoding = encodingBase64
  908. }
  909. return nil
  910. }
  911. func (s *Server) handleBodyAsTextMessage(m *message, body *util.PeekedReadCloser) error {
  912. if !utf8.Valid(body.PeekedBytes) {
  913. return errHTTPBadRequestMessageNotUTF8.With(m)
  914. }
  915. if len(body.PeekedBytes) > 0 { // Empty body should not override message (publish via GET!)
  916. m.Message = strings.TrimSpace(string(body.PeekedBytes)) // Truncates the message to the peek limit if required
  917. }
  918. if m.Attachment != nil && m.Attachment.Name != "" && m.Message == "" {
  919. m.Message = fmt.Sprintf(defaultAttachmentMessage, m.Attachment.Name)
  920. }
  921. return nil
  922. }
  923. func (s *Server) handleBodyAsAttachment(r *http.Request, v *visitor, m *message, body *util.PeekedReadCloser) error {
  924. if s.fileCache == nil || s.config.BaseURL == "" || s.config.AttachmentCacheDir == "" {
  925. return errHTTPBadRequestAttachmentsDisallowed.With(m)
  926. }
  927. vinfo, err := v.Info()
  928. if err != nil {
  929. return err
  930. }
  931. attachmentExpiry := time.Now().Add(vinfo.Limits.AttachmentExpiryDuration).Unix()
  932. if m.Time > attachmentExpiry {
  933. return errHTTPBadRequestAttachmentsExpiryBeforeDelivery.With(m)
  934. }
  935. contentLengthStr := r.Header.Get("Content-Length")
  936. if contentLengthStr != "" { // Early "do-not-trust" check, hard limit see below
  937. contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64)
  938. if err == nil && (contentLength > vinfo.Stats.AttachmentTotalSizeRemaining || contentLength > vinfo.Limits.AttachmentFileSizeLimit) {
  939. return errHTTPEntityTooLargeAttachment.With(m).Fields(log.Context{
  940. "message_content_length": contentLength,
  941. "attachment_total_size_remaining": vinfo.Stats.AttachmentTotalSizeRemaining,
  942. "attachment_file_size_limit": vinfo.Limits.AttachmentFileSizeLimit,
  943. })
  944. }
  945. }
  946. if m.Attachment == nil {
  947. m.Attachment = &attachment{}
  948. }
  949. var ext string
  950. m.Attachment.Expires = attachmentExpiry
  951. m.Attachment.Type, ext = util.DetectContentType(body.PeekedBytes, m.Attachment.Name)
  952. m.Attachment.URL = fmt.Sprintf("%s/file/%s%s", s.config.BaseURL, m.ID, ext)
  953. if m.Attachment.Name == "" {
  954. m.Attachment.Name = fmt.Sprintf("attachment%s", ext)
  955. }
  956. if m.Message == "" {
  957. m.Message = fmt.Sprintf(defaultAttachmentMessage, m.Attachment.Name)
  958. }
  959. limiters := []util.Limiter{
  960. v.BandwidthLimiter(),
  961. util.NewFixedLimiter(vinfo.Limits.AttachmentFileSizeLimit),
  962. util.NewFixedLimiter(vinfo.Stats.AttachmentTotalSizeRemaining),
  963. }
  964. m.Attachment.Size, err = s.fileCache.Write(m.ID, body, limiters...)
  965. if err == util.ErrLimitReached {
  966. return errHTTPEntityTooLargeAttachment.With(m)
  967. } else if err != nil {
  968. return err
  969. }
  970. return nil
  971. }
  972. func (s *Server) handleSubscribeJSON(w http.ResponseWriter, r *http.Request, v *visitor) error {
  973. encoder := func(msg *message) (string, error) {
  974. var buf bytes.Buffer
  975. if err := json.NewEncoder(&buf).Encode(&msg); err != nil {
  976. return "", err
  977. }
  978. return buf.String(), nil
  979. }
  980. return s.handleSubscribeHTTP(w, r, v, "application/x-ndjson", encoder)
  981. }
  982. func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request, v *visitor) error {
  983. encoder := func(msg *message) (string, error) {
  984. var buf bytes.Buffer
  985. if err := json.NewEncoder(&buf).Encode(&msg); err != nil {
  986. return "", err
  987. }
  988. if msg.Event != messageEvent {
  989. return fmt.Sprintf("event: %s\ndata: %s\n", msg.Event, buf.String()), nil // Browser's .onmessage() does not fire on this!
  990. }
  991. return fmt.Sprintf("data: %s\n", buf.String()), nil
  992. }
  993. return s.handleSubscribeHTTP(w, r, v, "text/event-stream", encoder)
  994. }
  995. func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request, v *visitor) error {
  996. encoder := func(msg *message) (string, error) {
  997. if msg.Event == messageEvent { // only handle default events
  998. return strings.ReplaceAll(msg.Message, "\n", " ") + "\n", nil
  999. }
  1000. return "\n", nil // "keepalive" and "open" events just send an empty line
  1001. }
  1002. return s.handleSubscribeHTTP(w, r, v, "text/plain", encoder)
  1003. }
  1004. func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *visitor, contentType string, encoder messageEncoder) error {
  1005. logvr(v, r).Tag(tagSubscribe).Debug("HTTP stream connection opened")
  1006. defer logvr(v, r).Tag(tagSubscribe).Debug("HTTP stream connection closed")
  1007. if !v.SubscriptionAllowed() {
  1008. return errHTTPTooManyRequestsLimitSubscriptions
  1009. }
  1010. defer v.RemoveSubscription()
  1011. topics, topicsStr, err := s.topicsFromPath(r.URL.Path)
  1012. if err != nil {
  1013. return err
  1014. }
  1015. poll, since, scheduled, filters, rateTopics, err := parseSubscribeParams(r)
  1016. if err != nil {
  1017. return err
  1018. }
  1019. var wlock sync.Mutex
  1020. defer func() {
  1021. // Hack: This is the fix for a horrible data race that I have not been able to figure out in quite some time.
  1022. // It appears to be happening when the Go HTTP code reads from the socket when closing the request (i.e. AFTER
  1023. // this function returns), and causes a data race with the ResponseWriter. Locking wlock here silences the
  1024. // data race detector. See https://github.com/binwiederhier/ntfy/issues/338#issuecomment-1163425889.
  1025. wlock.TryLock()
  1026. }()
  1027. sub := func(v *visitor, msg *message) error {
  1028. if !filters.Pass(msg) {
  1029. return nil
  1030. }
  1031. m, err := encoder(msg)
  1032. if err != nil {
  1033. return err
  1034. }
  1035. wlock.Lock()
  1036. defer wlock.Unlock()
  1037. if _, err := w.Write([]byte(m)); err != nil {
  1038. return err
  1039. }
  1040. if fl, ok := w.(http.Flusher); ok {
  1041. fl.Flush()
  1042. }
  1043. return nil
  1044. }
  1045. if err := s.maybeSetRateVisitors(r, v, topics, rateTopics); err != nil {
  1046. return err
  1047. }
  1048. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1049. w.Header().Set("Content-Type", contentType+"; charset=utf-8") // Android/Volley client needs charset!
  1050. if poll {
  1051. for _, t := range topics {
  1052. t.Keepalive()
  1053. }
  1054. return s.sendOldMessages(topics, since, scheduled, v, sub)
  1055. }
  1056. ctx, cancel := context.WithCancel(context.Background())
  1057. defer cancel()
  1058. subscriberIDs := make([]int, 0)
  1059. for _, t := range topics {
  1060. subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel))
  1061. }
  1062. defer func() {
  1063. for i, subscriberID := range subscriberIDs {
  1064. topics[i].Unsubscribe(subscriberID) // Order!
  1065. }
  1066. }()
  1067. if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message
  1068. return err
  1069. }
  1070. if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {
  1071. return err
  1072. }
  1073. for {
  1074. select {
  1075. case <-ctx.Done():
  1076. return nil
  1077. case <-r.Context().Done():
  1078. return nil
  1079. case <-time.After(s.config.KeepaliveInterval):
  1080. ev := logvr(v, r).Tag(tagSubscribe)
  1081. if len(topics) == 1 {
  1082. ev.With(topics[0]).Trace("Sending keepalive message to %s", topics[0].ID)
  1083. } else {
  1084. ev.Trace("Sending keepalive message to %d topics", len(topics))
  1085. }
  1086. v.Keepalive()
  1087. for _, t := range topics {
  1088. t.Keepalive()
  1089. }
  1090. if err := sub(v, newKeepaliveMessage(topicsStr)); err != nil { // Send keepalive message
  1091. return err
  1092. }
  1093. }
  1094. }
  1095. }
  1096. func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1097. if strings.ToLower(r.Header.Get("Upgrade")) != "websocket" {
  1098. return errHTTPBadRequestWebSocketsUpgradeHeaderMissing
  1099. }
  1100. if !v.SubscriptionAllowed() {
  1101. return errHTTPTooManyRequestsLimitSubscriptions
  1102. }
  1103. defer v.RemoveSubscription()
  1104. logvr(v, r).Tag(tagWebsocket).Debug("WebSocket connection opened")
  1105. defer logvr(v, r).Tag(tagWebsocket).Debug("WebSocket connection closed")
  1106. topics, topicsStr, err := s.topicsFromPath(r.URL.Path)
  1107. if err != nil {
  1108. return err
  1109. }
  1110. poll, since, scheduled, filters, rateTopics, err := parseSubscribeParams(r)
  1111. if err != nil {
  1112. return err
  1113. }
  1114. upgrader := &websocket.Upgrader{
  1115. ReadBufferSize: wsBufferSize,
  1116. WriteBufferSize: wsBufferSize,
  1117. CheckOrigin: func(r *http.Request) bool {
  1118. return true // We're open for business!
  1119. },
  1120. }
  1121. conn, err := upgrader.Upgrade(w, r, nil)
  1122. if err != nil {
  1123. return err
  1124. }
  1125. defer conn.Close()
  1126. // Subscription connections can be canceled externally, see topic.CancelSubscribers
  1127. cancelCtx, cancel := context.WithCancel(context.Background())
  1128. defer cancel()
  1129. // Use errgroup to run WebSocket reader and writer in Go routines
  1130. var wlock sync.Mutex
  1131. g, gctx := errgroup.WithContext(cancelCtx)
  1132. g.Go(func() error {
  1133. pongWait := s.config.KeepaliveInterval + wsPongWait
  1134. conn.SetReadLimit(wsReadLimit)
  1135. if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
  1136. return err
  1137. }
  1138. conn.SetPongHandler(func(appData string) error {
  1139. logvr(v, r).Tag(tagWebsocket).Trace("Received WebSocket pong")
  1140. return conn.SetReadDeadline(time.Now().Add(pongWait))
  1141. })
  1142. for {
  1143. _, _, err := conn.NextReader()
  1144. if err != nil {
  1145. return err
  1146. }
  1147. select {
  1148. case <-gctx.Done():
  1149. return nil
  1150. default:
  1151. }
  1152. }
  1153. })
  1154. g.Go(func() error {
  1155. ping := func() error {
  1156. wlock.Lock()
  1157. defer wlock.Unlock()
  1158. if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
  1159. return err
  1160. }
  1161. logvr(v, r).Tag(tagWebsocket).Trace("Sending WebSocket ping")
  1162. return conn.WriteMessage(websocket.PingMessage, nil)
  1163. }
  1164. for {
  1165. select {
  1166. case <-gctx.Done():
  1167. return nil
  1168. case <-cancelCtx.Done():
  1169. logvr(v, r).Tag(tagWebsocket).Trace("Cancel received, closing subscriber connection")
  1170. conn.Close()
  1171. return &websocket.CloseError{Code: websocket.CloseNormalClosure, Text: "subscription was canceled"}
  1172. case <-time.After(s.config.KeepaliveInterval):
  1173. v.Keepalive()
  1174. for _, t := range topics {
  1175. t.Keepalive()
  1176. }
  1177. if err := ping(); err != nil {
  1178. return err
  1179. }
  1180. }
  1181. }
  1182. })
  1183. sub := func(v *visitor, msg *message) error {
  1184. if !filters.Pass(msg) {
  1185. return nil
  1186. }
  1187. wlock.Lock()
  1188. defer wlock.Unlock()
  1189. if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
  1190. return err
  1191. }
  1192. return conn.WriteJSON(msg)
  1193. }
  1194. if err := s.maybeSetRateVisitors(r, v, topics, rateTopics); err != nil {
  1195. return err
  1196. }
  1197. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1198. if poll {
  1199. for _, t := range topics {
  1200. t.Keepalive()
  1201. }
  1202. return s.sendOldMessages(topics, since, scheduled, v, sub)
  1203. }
  1204. subscriberIDs := make([]int, 0)
  1205. for _, t := range topics {
  1206. subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel))
  1207. }
  1208. defer func() {
  1209. for i, subscriberID := range subscriberIDs {
  1210. topics[i].Unsubscribe(subscriberID) // Order!
  1211. }
  1212. }()
  1213. if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message
  1214. return err
  1215. }
  1216. if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {
  1217. return err
  1218. }
  1219. err = g.Wait()
  1220. if err != nil && websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNoStatusReceived) {
  1221. logvr(v, r).Tag(tagWebsocket).Err(err).Fields(websocketErrorContext(err)).Trace("WebSocket connection closed")
  1222. return nil // Normal closures are not errors; note: "1006 (abnormal closure)" is treated as normal, because people disconnect a lot
  1223. }
  1224. return err
  1225. }
  1226. func parseSubscribeParams(r *http.Request) (poll bool, since sinceMarker, scheduled bool, filters *queryFilter, rateTopics []string, err error) {
  1227. poll = readBoolParam(r, false, "x-poll", "poll", "po")
  1228. scheduled = readBoolParam(r, false, "x-scheduled", "scheduled", "sched")
  1229. since, err = parseSince(r, poll)
  1230. if err != nil {
  1231. return
  1232. }
  1233. filters, err = parseQueryFilters(r)
  1234. if err != nil {
  1235. return
  1236. }
  1237. rateTopics = readCommaSeparatedParam(r, "x-rate-topics", "rate-topics")
  1238. return
  1239. }
  1240. // maybeSetRateVisitors sets the rate visitor on a topic (v.SetRateVisitor), indicating that all messages published
  1241. // to that topic will be rate limited against the rate visitor instead of the publishing visitor.
  1242. //
  1243. // Setting the rate visitor is ony allowed if the `visitor-subscriber-rate-limiting` setting is enabled, AND
  1244. // - auth-file is not set (everything is open by default)
  1245. // - or the topic is reserved, and v.user is the owner
  1246. // - or the topic is not reserved, and v.user has write access
  1247. //
  1248. // Note: This TEMPORARILY also registers all topics starting with "up" (= UnifiedPush). This is to ease the transition
  1249. // until the Android app will send the "Rate-Topics" header.
  1250. func (s *Server) maybeSetRateVisitors(r *http.Request, v *visitor, topics []*topic, rateTopics []string) error {
  1251. // Bail out if not enabled
  1252. if !s.config.VisitorSubscriberRateLimiting {
  1253. return nil
  1254. }
  1255. // Make a list of topics that we'll actually set the RateVisitor on
  1256. eligibleRateTopics := make([]*topic, 0)
  1257. for _, t := range topics {
  1258. if (strings.HasPrefix(t.ID, unifiedPushTopicPrefix) && len(t.ID) == unifiedPushTopicLength) || util.Contains(rateTopics, t.ID) {
  1259. eligibleRateTopics = append(eligibleRateTopics, t)
  1260. }
  1261. }
  1262. if len(eligibleRateTopics) == 0 {
  1263. return nil
  1264. }
  1265. // If access controls are turned off, v has access to everything, and we can set the rate visitor
  1266. if s.userManager == nil {
  1267. return s.setRateVisitors(r, v, eligibleRateTopics)
  1268. }
  1269. // If access controls are enabled, only set rate visitor if
  1270. // - topic is reserved, and v.user is the owner
  1271. // - topic is not reserved, and v.user has write access
  1272. writableRateTopics := make([]*topic, 0)
  1273. for _, t := range topics {
  1274. ownerUserID, err := s.userManager.ReservationOwner(t.ID)
  1275. if err != nil {
  1276. return err
  1277. }
  1278. if ownerUserID == "" {
  1279. if err := s.userManager.Authorize(v.User(), t.ID, user.PermissionWrite); err == nil {
  1280. writableRateTopics = append(writableRateTopics, t)
  1281. }
  1282. } else if ownerUserID == v.MaybeUserID() {
  1283. writableRateTopics = append(writableRateTopics, t)
  1284. }
  1285. }
  1286. return s.setRateVisitors(r, v, writableRateTopics)
  1287. }
  1288. func (s *Server) setRateVisitors(r *http.Request, v *visitor, rateTopics []*topic) error {
  1289. for _, t := range rateTopics {
  1290. logvr(v, r).
  1291. Tag(tagSubscribe).
  1292. With(t).
  1293. Debug("Setting visitor as rate visitor for topic %s", t.ID)
  1294. t.SetRateVisitor(v)
  1295. }
  1296. return nil
  1297. }
  1298. // sendOldMessages selects old messages from the messageCache and calls sub for each of them. It uses since as the
  1299. // marker, returning only messages that are newer than the marker.
  1300. func (s *Server) sendOldMessages(topics []*topic, since sinceMarker, scheduled bool, v *visitor, sub subscriber) error {
  1301. if since.IsNone() {
  1302. return nil
  1303. }
  1304. messages := make([]*message, 0)
  1305. for _, t := range topics {
  1306. topicMessages, err := s.messageCache.Messages(t.ID, since, scheduled)
  1307. if err != nil {
  1308. return err
  1309. }
  1310. messages = append(messages, topicMessages...)
  1311. }
  1312. sort.Slice(messages, func(i, j int) bool {
  1313. return messages[i].Time < messages[j].Time
  1314. })
  1315. for _, m := range messages {
  1316. if err := sub(v, m); err != nil {
  1317. return err
  1318. }
  1319. }
  1320. return nil
  1321. }
  1322. // parseSince returns a timestamp identifying the time span from which cached messages should be received.
  1323. //
  1324. // Values in the "since=..." parameter can be either a unix timestamp or a duration (e.g. 12h), or
  1325. // "all" for all messages.
  1326. func parseSince(r *http.Request, poll bool) (sinceMarker, error) {
  1327. since := readParam(r, "x-since", "since", "si")
  1328. // Easy cases (empty, all, none)
  1329. if since == "" {
  1330. if poll {
  1331. return sinceAllMessages, nil
  1332. }
  1333. return sinceNoMessages, nil
  1334. } else if since == "all" {
  1335. return sinceAllMessages, nil
  1336. } else if since == "none" {
  1337. return sinceNoMessages, nil
  1338. }
  1339. // ID, timestamp, duration
  1340. if validMessageID(since) {
  1341. return newSinceID(since), nil
  1342. } else if s, err := strconv.ParseInt(since, 10, 64); err == nil {
  1343. return newSinceTime(s), nil
  1344. } else if d, err := time.ParseDuration(since); err == nil {
  1345. return newSinceTime(time.Now().Add(-1 * d).Unix()), nil
  1346. }
  1347. return sinceNoMessages, errHTTPBadRequestSinceInvalid
  1348. }
  1349. func (s *Server) handleOptions(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  1350. w.Header().Set("Access-Control-Allow-Methods", "GET, PUT, POST, PATCH, DELETE")
  1351. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1352. w.Header().Set("Access-Control-Allow-Headers", "*") // CORS, allow auth via JS // FIXME is this terrible?
  1353. return nil
  1354. }
  1355. func (s *Server) topicFromPath(path string) (*topic, error) {
  1356. parts := strings.Split(path, "/")
  1357. if len(parts) < 2 {
  1358. return nil, errHTTPBadRequestTopicInvalid
  1359. }
  1360. return s.topicFromID(parts[1])
  1361. }
  1362. func (s *Server) topicsFromPath(path string) ([]*topic, string, error) {
  1363. parts := strings.Split(path, "/")
  1364. if len(parts) < 2 {
  1365. return nil, "", errHTTPBadRequestTopicInvalid
  1366. }
  1367. topicIDs := util.SplitNoEmpty(parts[1], ",")
  1368. topics, err := s.topicsFromIDs(topicIDs...)
  1369. if err != nil {
  1370. return nil, "", errHTTPBadRequestTopicInvalid
  1371. }
  1372. return topics, parts[1], nil
  1373. }
  1374. func (s *Server) topicsFromIDs(ids ...string) ([]*topic, error) {
  1375. s.mu.Lock()
  1376. defer s.mu.Unlock()
  1377. topics := make([]*topic, 0)
  1378. for _, id := range ids {
  1379. if util.Contains(s.config.DisallowedTopics, id) {
  1380. return nil, errHTTPBadRequestTopicDisallowed
  1381. }
  1382. if _, ok := s.topics[id]; !ok {
  1383. if len(s.topics) >= s.config.TotalTopicLimit {
  1384. return nil, errHTTPTooManyRequestsLimitTotalTopics
  1385. }
  1386. s.topics[id] = newTopic(id)
  1387. }
  1388. topics = append(topics, s.topics[id])
  1389. }
  1390. return topics, nil
  1391. }
  1392. func (s *Server) topicFromID(id string) (*topic, error) {
  1393. topics, err := s.topicsFromIDs(id)
  1394. if err != nil {
  1395. return nil, err
  1396. }
  1397. return topics[0], nil
  1398. }
  1399. func (s *Server) runSMTPServer() error {
  1400. s.smtpServerBackend = newMailBackend(s.config, s.handle)
  1401. s.smtpServer = smtp.NewServer(s.smtpServerBackend)
  1402. s.smtpServer.Addr = s.config.SMTPServerListen
  1403. s.smtpServer.Domain = s.config.SMTPServerDomain
  1404. s.smtpServer.ReadTimeout = 10 * time.Second
  1405. s.smtpServer.WriteTimeout = 10 * time.Second
  1406. s.smtpServer.MaxMessageBytes = 1024 * 1024 // Must be much larger than message size (headers, multipart, etc.)
  1407. s.smtpServer.MaxRecipients = 1
  1408. s.smtpServer.AllowInsecureAuth = true
  1409. return s.smtpServer.ListenAndServe()
  1410. }
  1411. func (s *Server) runManager() {
  1412. for {
  1413. select {
  1414. case <-time.After(s.config.ManagerInterval):
  1415. log.
  1416. Tag(tagManager).
  1417. Timing(s.execManager).
  1418. Debug("Manager finished")
  1419. case <-s.closeChan:
  1420. return
  1421. }
  1422. }
  1423. }
  1424. // runStatsResetter runs once a day (usually midnight UTC) to reset all the visitor's message and
  1425. // email counters. The stats are used to display the counters in the web app, as well as for rate limiting.
  1426. func (s *Server) runStatsResetter() {
  1427. for {
  1428. runAt := util.NextOccurrenceUTC(s.config.VisitorStatsResetTime, time.Now())
  1429. timer := time.NewTimer(time.Until(runAt))
  1430. log.Tag(tagResetter).Debug("Waiting until %v to reset visitor stats", runAt)
  1431. select {
  1432. case <-timer.C:
  1433. log.Tag(tagResetter).Debug("Running stats resetter")
  1434. s.resetStats()
  1435. case <-s.closeChan:
  1436. log.Tag(tagResetter).Debug("Stopping stats resetter")
  1437. timer.Stop()
  1438. return
  1439. }
  1440. }
  1441. }
  1442. func (s *Server) resetStats() {
  1443. log.Info("Resetting all visitor stats (daily task)")
  1444. s.mu.Lock()
  1445. defer s.mu.Unlock() // Includes the database query to avoid races with other processes
  1446. for _, v := range s.visitors {
  1447. v.ResetStats()
  1448. }
  1449. if s.userManager != nil {
  1450. if err := s.userManager.ResetStats(); err != nil {
  1451. log.Tag(tagResetter).Warn("Failed to write to database: %s", err.Error())
  1452. }
  1453. }
  1454. }
  1455. func (s *Server) runFirebaseKeepaliver() {
  1456. if s.firebaseClient == nil {
  1457. return
  1458. }
  1459. v := newVisitor(s.config, s.messageCache, s.userManager, netip.IPv4Unspecified(), nil) // Background process, not a real visitor, uses IP 0.0.0.0
  1460. for {
  1461. select {
  1462. case <-time.After(s.config.FirebaseKeepaliveInterval):
  1463. s.sendToFirebase(v, newKeepaliveMessage(firebaseControlTopic))
  1464. /*
  1465. FIXME: Disable iOS polling entirely for now due to thundering herd problem (see #677)
  1466. To solve this, we'd have to shard the iOS poll topics to spread out the polling evenly.
  1467. Given that it's not really necessary to poll, turning it off for now should not have any impact.
  1468. case <-time.After(s.config.FirebasePollInterval):
  1469. s.sendToFirebase(v, newKeepaliveMessage(firebasePollTopic))
  1470. */
  1471. case <-s.closeChan:
  1472. return
  1473. }
  1474. }
  1475. }
  1476. func (s *Server) runDelayedSender() {
  1477. for {
  1478. select {
  1479. case <-time.After(s.config.DelayedSenderInterval):
  1480. if err := s.sendDelayedMessages(); err != nil {
  1481. log.Tag(tagPublish).Err(err).Warn("Error sending delayed messages")
  1482. }
  1483. case <-s.closeChan:
  1484. return
  1485. }
  1486. }
  1487. }
  1488. func (s *Server) sendDelayedMessages() error {
  1489. messages, err := s.messageCache.MessagesDue()
  1490. if err != nil {
  1491. return err
  1492. }
  1493. for _, m := range messages {
  1494. var u *user.User
  1495. if s.userManager != nil && m.User != "" {
  1496. u, err = s.userManager.UserByID(m.User)
  1497. if err != nil {
  1498. log.With(m).Err(err).Warn("Error sending delayed message")
  1499. continue
  1500. }
  1501. }
  1502. v := s.visitor(m.Sender, u)
  1503. if err := s.sendDelayedMessage(v, m); err != nil {
  1504. logvm(v, m).Err(err).Warn("Error sending delayed message")
  1505. }
  1506. }
  1507. return nil
  1508. }
  1509. func (s *Server) sendDelayedMessage(v *visitor, m *message) error {
  1510. logvm(v, m).Debug("Sending delayed message")
  1511. s.mu.Lock()
  1512. t, ok := s.topics[m.Topic] // If no subscribers, just mark message as published
  1513. s.mu.Unlock()
  1514. if ok {
  1515. go func() {
  1516. // We do not rate-limit messages here, since we've rate limited them in the PUT/POST handler
  1517. if err := t.Publish(v, m); err != nil {
  1518. logvm(v, m).Err(err).Warn("Unable to publish message")
  1519. }
  1520. }()
  1521. }
  1522. if s.firebaseClient != nil { // Firebase subscribers may not show up in topics map
  1523. go s.sendToFirebase(v, m)
  1524. }
  1525. if s.config.UpstreamBaseURL != "" {
  1526. go s.forwardPollRequest(v, m)
  1527. }
  1528. if err := s.messageCache.MarkPublished(m); err != nil {
  1529. return err
  1530. }
  1531. return nil
  1532. }
  1533. // transformBodyJSON peeks the request body, reads the JSON, and converts it to headers
  1534. // before passing it on to the next handler. This is meant to be used in combination with handlePublish.
  1535. func (s *Server) transformBodyJSON(next handleFunc) handleFunc {
  1536. return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1537. m, err := readJSONWithLimit[publishMessage](r.Body, s.config.MessageLimit*2, false) // 2x to account for JSON format overhead
  1538. if err != nil {
  1539. return err
  1540. }
  1541. if !topicRegex.MatchString(m.Topic) {
  1542. return errHTTPBadRequestTopicInvalid
  1543. }
  1544. if m.Message == "" {
  1545. m.Message = emptyMessageBody
  1546. }
  1547. r.URL.Path = "/" + m.Topic
  1548. r.Body = io.NopCloser(strings.NewReader(m.Message))
  1549. if m.Title != "" {
  1550. r.Header.Set("X-Title", m.Title)
  1551. }
  1552. if m.Priority != 0 {
  1553. r.Header.Set("X-Priority", fmt.Sprintf("%d", m.Priority))
  1554. }
  1555. if m.Tags != nil && len(m.Tags) > 0 {
  1556. r.Header.Set("X-Tags", strings.Join(m.Tags, ","))
  1557. }
  1558. if m.Attach != "" {
  1559. r.Header.Set("X-Attach", m.Attach)
  1560. }
  1561. if m.Filename != "" {
  1562. r.Header.Set("X-Filename", m.Filename)
  1563. }
  1564. if m.Click != "" {
  1565. r.Header.Set("X-Click", m.Click)
  1566. }
  1567. if m.Icon != "" {
  1568. r.Header.Set("X-Icon", m.Icon)
  1569. }
  1570. if len(m.Actions) > 0 {
  1571. actionsStr, err := json.Marshal(m.Actions)
  1572. if err != nil {
  1573. return errHTTPBadRequestMessageJSONInvalid
  1574. }
  1575. r.Header.Set("X-Actions", string(actionsStr))
  1576. }
  1577. if m.Email != "" {
  1578. r.Header.Set("X-Email", m.Email)
  1579. }
  1580. if m.Delay != "" {
  1581. r.Header.Set("X-Delay", m.Delay)
  1582. }
  1583. return next(w, r, v)
  1584. }
  1585. }
  1586. func (s *Server) transformMatrixJSON(next handleFunc) handleFunc {
  1587. return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1588. newRequest, err := newRequestFromMatrixJSON(r, s.config.BaseURL, s.config.MessageLimit)
  1589. if err != nil {
  1590. logvr(v, r).Tag(tagMatrix).Err(err).Debug("Invalid Matrix request")
  1591. if e, ok := err.(*errMatrixPushkeyRejected); ok {
  1592. return writeMatrixResponse(w, e.rejectedPushKey)
  1593. }
  1594. return err
  1595. }
  1596. if err := next(w, newRequest, v); err != nil {
  1597. logvr(v, r).Tag(tagMatrix).Err(err).Debug("Error handling Matrix request")
  1598. return err
  1599. }
  1600. return nil
  1601. }
  1602. }
  1603. func (s *Server) authorizeTopicWrite(next handleFunc) handleFunc {
  1604. return s.autorizeTopic(next, user.PermissionWrite)
  1605. }
  1606. func (s *Server) authorizeTopicRead(next handleFunc) handleFunc {
  1607. return s.autorizeTopic(next, user.PermissionRead)
  1608. }
  1609. func (s *Server) autorizeTopic(next handleFunc, perm user.Permission) handleFunc {
  1610. return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1611. if s.userManager == nil {
  1612. return next(w, r, v)
  1613. }
  1614. topics, _, err := s.topicsFromPath(r.URL.Path)
  1615. if err != nil {
  1616. return err
  1617. }
  1618. u := v.User()
  1619. for _, t := range topics {
  1620. if err := s.userManager.Authorize(u, t.ID, perm); err != nil {
  1621. logvr(v, r).With(t).Err(err).Debug("Access to topic %s not authorized", t.ID)
  1622. return errHTTPForbidden.With(t)
  1623. }
  1624. }
  1625. return next(w, r, v)
  1626. }
  1627. }
  1628. // maybeAuthenticate reads the "Authorization" header and will try to authenticate the user
  1629. // if it is set.
  1630. //
  1631. // - If auth-file is not configured, immediately return an IP-based visitor
  1632. // - If the header is not set or not supported (anything non-Basic and non-Bearer),
  1633. // an IP-based visitor is returned
  1634. // - If the header is set, authenticate will be called to check the username/password (Basic auth),
  1635. // or the token (Bearer auth), and read the user from the database
  1636. //
  1637. // This function will ALWAYS return a visitor, even if an error occurs (e.g. unauthorized), so
  1638. // that subsequent logging calls still have a visitor context.
  1639. func (s *Server) maybeAuthenticate(r *http.Request) (*visitor, error) {
  1640. // Read "Authorization" header value, and exit out early if it's not set
  1641. ip := extractIPAddress(r, s.config.BehindProxy)
  1642. vip := s.visitor(ip, nil)
  1643. if s.userManager == nil {
  1644. return vip, nil
  1645. }
  1646. header, err := readAuthHeader(r)
  1647. if err != nil {
  1648. return vip, err
  1649. } else if !supportedAuthHeader(header) {
  1650. return vip, nil
  1651. }
  1652. // If we're trying to auth, check the rate limiter first
  1653. if !vip.AuthAllowed() {
  1654. return vip, errHTTPTooManyRequestsLimitAuthFailure // Always return visitor, even when error occurs!
  1655. }
  1656. u, err := s.authenticate(r, header)
  1657. if err != nil {
  1658. vip.AuthFailed()
  1659. logr(r).Err(err).Debug("Authentication failed")
  1660. return vip, errHTTPUnauthorized // Always return visitor, even when error occurs!
  1661. }
  1662. // Authentication with user was successful
  1663. return s.visitor(ip, u), nil
  1664. }
  1665. // authenticate a user based on basic auth username/password (Authorization: Basic ...), or token auth (Authorization: Bearer ...).
  1666. // The Authorization header can be passed as a header or the ?auth=... query param. The latter is required only to
  1667. // support the WebSocket JavaScript class, which does not support passing headers during the initial request. The auth
  1668. // query param is effectively doubly base64 encoded. Its format is base64(Basic base64(user:pass)).
  1669. func (s *Server) authenticate(r *http.Request, header string) (user *user.User, err error) {
  1670. if strings.HasPrefix(header, "Bearer") {
  1671. return s.authenticateBearerAuth(r, strings.TrimSpace(strings.TrimPrefix(header, "Bearer")))
  1672. }
  1673. return s.authenticateBasicAuth(r, header)
  1674. }
  1675. // readAuthHeader reads the raw value of the Authorization header, either from the actual HTTP header,
  1676. // or from the ?auth... query parameter
  1677. func readAuthHeader(r *http.Request) (string, error) {
  1678. value := strings.TrimSpace(r.Header.Get("Authorization"))
  1679. queryParam := readQueryParam(r, "authorization", "auth")
  1680. if queryParam != "" {
  1681. a, err := base64.RawURLEncoding.DecodeString(queryParam)
  1682. if err != nil {
  1683. return "", err
  1684. }
  1685. value = strings.TrimSpace(string(a))
  1686. }
  1687. return value, nil
  1688. }
  1689. // supportedAuthHeader returns true only if the Authorization header value starts
  1690. // with "Basic" or "Bearer". In particular, an empty value is not supported, and neither
  1691. // are things like "WebPush", or "vapid" (see #629).
  1692. func supportedAuthHeader(value string) bool {
  1693. value = strings.ToLower(value)
  1694. return strings.HasPrefix(value, "basic ") || strings.HasPrefix(value, "bearer ")
  1695. }
  1696. func (s *Server) authenticateBasicAuth(r *http.Request, value string) (user *user.User, err error) {
  1697. r.Header.Set("Authorization", value)
  1698. username, password, ok := r.BasicAuth()
  1699. if !ok {
  1700. return nil, errors.New("invalid basic auth")
  1701. } else if username == "" {
  1702. return s.authenticateBearerAuth(r, password) // Treat password as token
  1703. }
  1704. return s.userManager.Authenticate(username, password)
  1705. }
  1706. func (s *Server) authenticateBearerAuth(r *http.Request, token string) (*user.User, error) {
  1707. u, err := s.userManager.AuthenticateToken(token)
  1708. if err != nil {
  1709. return nil, err
  1710. }
  1711. ip := extractIPAddress(r, s.config.BehindProxy)
  1712. go s.userManager.EnqueueTokenUpdate(token, &user.TokenUpdate{
  1713. LastAccess: time.Now(),
  1714. LastOrigin: ip,
  1715. })
  1716. return u, nil
  1717. }
  1718. func (s *Server) visitor(ip netip.Addr, user *user.User) *visitor {
  1719. s.mu.Lock()
  1720. defer s.mu.Unlock()
  1721. id := visitorID(ip, user)
  1722. v, exists := s.visitors[id]
  1723. if !exists {
  1724. s.visitors[id] = newVisitor(s.config, s.messageCache, s.userManager, ip, user)
  1725. return s.visitors[id]
  1726. }
  1727. v.Keepalive()
  1728. v.SetUser(user) // Always update with the latest user, may be nil!
  1729. return v
  1730. }
  1731. func (s *Server) writeJSON(w http.ResponseWriter, v any) error {
  1732. w.Header().Set("Content-Type", "application/json")
  1733. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1734. if err := json.NewEncoder(w).Encode(v); err != nil {
  1735. return err
  1736. }
  1737. return nil
  1738. }