server.go 68 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889
  1. package server
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/sha256"
  6. "embed"
  7. "encoding/base64"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "github.com/emersion/go-smtp"
  12. "github.com/gorilla/websocket"
  13. "github.com/prometheus/client_golang/prometheus/promhttp"
  14. "golang.org/x/sync/errgroup"
  15. "heckel.io/ntfy/log"
  16. "heckel.io/ntfy/user"
  17. "heckel.io/ntfy/util"
  18. "io"
  19. "net"
  20. "net/http"
  21. "net/http/pprof"
  22. "net/netip"
  23. "net/url"
  24. "os"
  25. "path"
  26. "path/filepath"
  27. "regexp"
  28. "sort"
  29. "strconv"
  30. "strings"
  31. "sync"
  32. "time"
  33. "unicode/utf8"
  34. )
  35. // Server is the main server, providing the UI and API for ntfy
  36. type Server struct {
  37. config *Config
  38. httpServer *http.Server
  39. httpsServer *http.Server
  40. httpMetricsServer *http.Server
  41. httpProfileServer *http.Server
  42. unixListener net.Listener
  43. smtpServer *smtp.Server
  44. smtpServerBackend *smtpBackend
  45. smtpSender mailer
  46. topics map[string]*topic
  47. visitors map[string]*visitor // ip:<ip> or user:<user>
  48. firebaseClient *firebaseClient
  49. messages int64 // Total number of messages (persisted if messageCache enabled)
  50. messagesHistory []int64 // Last n values of the messages counter, used to determine rate
  51. userManager *user.Manager // Might be nil!
  52. messageCache *messageCache // Database that stores the messages
  53. fileCache *fileCache // File system based cache that stores attachments
  54. stripe stripeAPI // Stripe API, can be replaced with a mock
  55. priceCache *util.LookupCache[map[string]int64] // Stripe price ID -> price as cents (USD implied!)
  56. metricsHandler http.Handler // Handles /metrics if enable-metrics set, and listen-metrics-http not set
  57. closeChan chan bool
  58. mu sync.RWMutex
  59. }
  60. // handleFunc extends the normal http.HandlerFunc to be able to easily return errors
  61. type handleFunc func(http.ResponseWriter, *http.Request, *visitor) error
  62. var (
  63. // If changed, don't forget to update Android App and auth_sqlite.go
  64. topicRegex = regexp.MustCompile(`^[-_A-Za-z0-9]{1,64}$`) // No /!
  65. topicPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app!
  66. externalTopicPathRegex = regexp.MustCompile(`^/[^/]+\.[^/]+/[-_A-Za-z0-9]{1,64}$`) // Extended topic path, for web-app, e.g. /example.com/mytopic
  67. jsonPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/json$`)
  68. ssePathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/sse$`)
  69. rawPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/raw$`)
  70. wsPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/ws$`)
  71. authPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/auth$`)
  72. publishPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}/(publish|send|trigger)$`)
  73. webConfigPath = "/config.js"
  74. accountPath = "/account"
  75. matrixPushPath = "/_matrix/push/v1/notify"
  76. metricsPath = "/metrics"
  77. apiHealthPath = "/v1/health"
  78. apiStatsPath = "/v1/stats"
  79. apiTiersPath = "/v1/tiers"
  80. apiAccountPath = "/v1/account"
  81. apiAccountTokenPath = "/v1/account/token"
  82. apiAccountPasswordPath = "/v1/account/password"
  83. apiAccountSettingsPath = "/v1/account/settings"
  84. apiAccountSubscriptionPath = "/v1/account/subscription"
  85. apiAccountReservationPath = "/v1/account/reservation"
  86. apiAccountPhonePath = "/v1/account/phone"
  87. apiAccountBillingPortalPath = "/v1/account/billing/portal"
  88. apiAccountBillingWebhookPath = "/v1/account/billing/webhook"
  89. apiAccountBillingSubscriptionPath = "/v1/account/billing/subscription"
  90. apiAccountBillingSubscriptionCheckoutSuccessTemplate = "/v1/account/billing/subscription/success/{CHECKOUT_SESSION_ID}"
  91. apiAccountBillingSubscriptionCheckoutSuccessRegex = regexp.MustCompile(`/v1/account/billing/subscription/success/(.+)$`)
  92. apiAccountReservationSingleRegex = regexp.MustCompile(`/v1/account/reservation/([-_A-Za-z0-9]{1,64})$`)
  93. staticRegex = regexp.MustCompile(`^/static/.+`)
  94. docsRegex = regexp.MustCompile(`^/docs(|/.*)$`)
  95. fileRegex = regexp.MustCompile(`^/file/([-_A-Za-z0-9]{1,64})(?:\.[A-Za-z0-9]{1,16})?$`)
  96. urlRegex = regexp.MustCompile(`^https?://`)
  97. phoneNumberRegex = regexp.MustCompile(`^\+\d{1,100}$`)
  98. //go:embed site
  99. webFs embed.FS
  100. webFsCached = &util.CachingEmbedFS{ModTime: time.Now(), FS: webFs}
  101. webSiteDir = "/site"
  102. webAppIndex = "/app.html" // React app
  103. //go:embed docs
  104. docsStaticFs embed.FS
  105. docsStaticCached = &util.CachingEmbedFS{ModTime: time.Now(), FS: docsStaticFs}
  106. )
  107. const (
  108. firebaseControlTopic = "~control" // See Android if changed
  109. firebasePollTopic = "~poll" // See iOS if changed
  110. emptyMessageBody = "triggered" // Used if message body is empty
  111. newMessageBody = "New message" // Used in poll requests as generic message
  112. defaultAttachmentMessage = "You received a file: %s" // Used if message body is empty, and there is an attachment
  113. encodingBase64 = "base64" // Used mainly for binary UnifiedPush messages
  114. jsonBodyBytesLimit = 16384 // Max number of bytes for a JSON request body
  115. unifiedPushTopicPrefix = "up" // Temporarily, we rate limit all "up*" topics based on the subscriber
  116. unifiedPushTopicLength = 14 // Length of UnifiedPush topics, including the "up" part
  117. messagesHistoryMax = 10 // Number of message count values to keep in memory
  118. )
  119. // WebSocket constants
  120. const (
  121. wsWriteWait = 2 * time.Second
  122. wsBufferSize = 1024
  123. wsReadLimit = 64 // We only ever receive PINGs
  124. wsPongWait = 15 * time.Second
  125. )
  126. // New instantiates a new Server. It creates the cache and adds a Firebase
  127. // subscriber (if configured).
  128. func New(conf *Config) (*Server, error) {
  129. var mailer mailer
  130. if conf.SMTPSenderAddr != "" {
  131. mailer = &smtpSender{config: conf}
  132. }
  133. var stripe stripeAPI
  134. if conf.StripeSecretKey != "" {
  135. stripe = newStripeAPI()
  136. }
  137. messageCache, err := createMessageCache(conf)
  138. if err != nil {
  139. return nil, err
  140. }
  141. topics, err := messageCache.Topics()
  142. if err != nil {
  143. return nil, err
  144. }
  145. messages, err := messageCache.Stats()
  146. if err != nil {
  147. return nil, err
  148. }
  149. var fileCache *fileCache
  150. if conf.AttachmentCacheDir != "" {
  151. fileCache, err = newFileCache(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit)
  152. if err != nil {
  153. return nil, err
  154. }
  155. }
  156. var userManager *user.Manager
  157. if conf.AuthFile != "" {
  158. userManager, err = user.NewManager(conf.AuthFile, conf.AuthStartupQueries, conf.AuthDefault, conf.AuthBcryptCost, conf.AuthStatsQueueWriterInterval)
  159. if err != nil {
  160. return nil, err
  161. }
  162. }
  163. var firebaseClient *firebaseClient
  164. if conf.FirebaseKeyFile != "" {
  165. sender, err := newFirebaseSender(conf.FirebaseKeyFile)
  166. if err != nil {
  167. return nil, err
  168. }
  169. // This awkward logic is required because Go is weird about nil types and interfaces.
  170. // See issue #641, and https://go.dev/play/p/uur1flrv1t3 for an example
  171. var auther user.Auther
  172. if userManager != nil {
  173. auther = userManager
  174. }
  175. firebaseClient = newFirebaseClient(sender, auther)
  176. }
  177. s := &Server{
  178. config: conf,
  179. messageCache: messageCache,
  180. fileCache: fileCache,
  181. firebaseClient: firebaseClient,
  182. smtpSender: mailer,
  183. topics: topics,
  184. userManager: userManager,
  185. messages: messages,
  186. messagesHistory: []int64{messages},
  187. visitors: make(map[string]*visitor),
  188. stripe: stripe,
  189. }
  190. s.priceCache = util.NewLookupCache(s.fetchStripePrices, conf.StripePriceCacheDuration)
  191. return s, nil
  192. }
  193. func createMessageCache(conf *Config) (*messageCache, error) {
  194. if conf.CacheDuration == 0 {
  195. return newNopCache()
  196. } else if conf.CacheFile != "" {
  197. return newSqliteCache(conf.CacheFile, conf.CacheStartupQueries, conf.CacheDuration, conf.CacheBatchSize, conf.CacheBatchTimeout, false)
  198. }
  199. return newMemCache()
  200. }
  201. // Run executes the main server. It listens on HTTP (+ HTTPS, if configured), and starts
  202. // a manager go routine to print stats and prune messages.
  203. func (s *Server) Run() error {
  204. var listenStr string
  205. if s.config.ListenHTTP != "" {
  206. listenStr += fmt.Sprintf(" %s[http]", s.config.ListenHTTP)
  207. }
  208. if s.config.ListenHTTPS != "" {
  209. listenStr += fmt.Sprintf(" %s[https]", s.config.ListenHTTPS)
  210. }
  211. if s.config.ListenUnix != "" {
  212. listenStr += fmt.Sprintf(" %s[unix]", s.config.ListenUnix)
  213. }
  214. if s.config.SMTPServerListen != "" {
  215. listenStr += fmt.Sprintf(" %s[smtp]", s.config.SMTPServerListen)
  216. }
  217. if s.config.MetricsListenHTTP != "" {
  218. listenStr += fmt.Sprintf(" %s[http/metrics]", s.config.MetricsListenHTTP)
  219. }
  220. if s.config.ProfileListenHTTP != "" {
  221. listenStr += fmt.Sprintf(" %s[http/profile]", s.config.ProfileListenHTTP)
  222. }
  223. log.Tag(tagStartup).Info("Listening on%s, ntfy %s, log level is %s", listenStr, s.config.Version, log.CurrentLevel().String())
  224. if log.IsFile() {
  225. fmt.Fprintf(os.Stderr, "Listening on%s, ntfy %s\n", listenStr, s.config.Version)
  226. fmt.Fprintf(os.Stderr, "Logs are written to %s\n", log.File())
  227. }
  228. mux := http.NewServeMux()
  229. mux.HandleFunc("/", s.handle)
  230. errChan := make(chan error)
  231. s.mu.Lock()
  232. s.closeChan = make(chan bool)
  233. if s.config.ListenHTTP != "" {
  234. s.httpServer = &http.Server{Addr: s.config.ListenHTTP, Handler: mux}
  235. go func() {
  236. errChan <- s.httpServer.ListenAndServe()
  237. }()
  238. }
  239. if s.config.ListenHTTPS != "" {
  240. s.httpsServer = &http.Server{Addr: s.config.ListenHTTPS, Handler: mux}
  241. go func() {
  242. errChan <- s.httpsServer.ListenAndServeTLS(s.config.CertFile, s.config.KeyFile)
  243. }()
  244. }
  245. if s.config.ListenUnix != "" {
  246. go func() {
  247. var err error
  248. s.mu.Lock()
  249. os.Remove(s.config.ListenUnix)
  250. s.unixListener, err = net.Listen("unix", s.config.ListenUnix)
  251. if err != nil {
  252. s.mu.Unlock()
  253. errChan <- err
  254. return
  255. }
  256. defer s.unixListener.Close()
  257. if s.config.ListenUnixMode > 0 {
  258. if err := os.Chmod(s.config.ListenUnix, s.config.ListenUnixMode); err != nil {
  259. s.mu.Unlock()
  260. errChan <- err
  261. return
  262. }
  263. }
  264. s.mu.Unlock()
  265. httpServer := &http.Server{Handler: mux}
  266. errChan <- httpServer.Serve(s.unixListener)
  267. }()
  268. }
  269. if s.config.MetricsListenHTTP != "" {
  270. initMetrics()
  271. s.httpMetricsServer = &http.Server{Addr: s.config.MetricsListenHTTP, Handler: promhttp.Handler()}
  272. go func() {
  273. errChan <- s.httpMetricsServer.ListenAndServe()
  274. }()
  275. } else if s.config.EnableMetrics {
  276. initMetrics()
  277. s.metricsHandler = promhttp.Handler()
  278. }
  279. if s.config.ProfileListenHTTP != "" {
  280. profileMux := http.NewServeMux()
  281. profileMux.HandleFunc("/debug/pprof/", pprof.Index)
  282. profileMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
  283. profileMux.HandleFunc("/debug/pprof/profile", pprof.Profile)
  284. profileMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
  285. profileMux.HandleFunc("/debug/pprof/trace", pprof.Trace)
  286. s.httpProfileServer = &http.Server{Addr: s.config.ProfileListenHTTP, Handler: profileMux}
  287. go func() {
  288. errChan <- s.httpProfileServer.ListenAndServe()
  289. }()
  290. }
  291. if s.config.SMTPServerListen != "" {
  292. go func() {
  293. errChan <- s.runSMTPServer()
  294. }()
  295. }
  296. s.mu.Unlock()
  297. go s.runManager()
  298. go s.runStatsResetter()
  299. go s.runDelayedSender()
  300. go s.runFirebaseKeepaliver()
  301. return <-errChan
  302. }
  303. // Stop stops HTTP (+HTTPS) server and all managers
  304. func (s *Server) Stop() {
  305. s.mu.Lock()
  306. defer s.mu.Unlock()
  307. if s.httpServer != nil {
  308. s.httpServer.Close()
  309. }
  310. if s.httpsServer != nil {
  311. s.httpsServer.Close()
  312. }
  313. if s.unixListener != nil {
  314. s.unixListener.Close()
  315. }
  316. if s.smtpServer != nil {
  317. s.smtpServer.Close()
  318. }
  319. s.closeDatabases()
  320. close(s.closeChan)
  321. }
  322. func (s *Server) closeDatabases() {
  323. if s.userManager != nil {
  324. s.userManager.Close()
  325. }
  326. s.messageCache.Close()
  327. }
  328. // handle is the main entry point for all HTTP requests
  329. func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
  330. v, err := s.maybeAuthenticate(r) // Note: Always returns v, even when error is returned
  331. if err != nil {
  332. s.handleError(w, r, v, err)
  333. return
  334. }
  335. ev := logvr(v, r)
  336. if ev.IsTrace() {
  337. ev.Field("http_request", renderHTTPRequest(r)).Trace("HTTP request started")
  338. } else if logvr(v, r).IsDebug() {
  339. ev.Debug("HTTP request started")
  340. }
  341. logvr(v, r).
  342. Timing(func() {
  343. if err := s.handleInternal(w, r, v); err != nil {
  344. s.handleError(w, r, v, err)
  345. return
  346. }
  347. if metricHTTPRequests != nil {
  348. metricHTTPRequests.WithLabelValues("200", "20000", r.Method).Inc()
  349. }
  350. }).
  351. Debug("HTTP request finished")
  352. }
  353. func (s *Server) handleError(w http.ResponseWriter, r *http.Request, v *visitor, err error) {
  354. httpErr, ok := err.(*errHTTP)
  355. if !ok {
  356. httpErr = errHTTPInternalError
  357. }
  358. if metricHTTPRequests != nil {
  359. metricHTTPRequests.WithLabelValues(fmt.Sprintf("%d", httpErr.HTTPCode), fmt.Sprintf("%d", httpErr.Code), r.Method).Inc()
  360. }
  361. isRateLimiting := util.Contains(rateLimitingErrorCodes, httpErr.HTTPCode)
  362. isNormalError := strings.Contains(err.Error(), "i/o timeout") || util.Contains(normalErrorCodes, httpErr.HTTPCode)
  363. ev := logvr(v, r).Err(err)
  364. if websocket.IsWebSocketUpgrade(r) {
  365. ev.Tag(tagWebsocket).Fields(websocketErrorContext(err))
  366. if isNormalError {
  367. ev.Debug("WebSocket error (this error is okay, it happens a lot): %s", err.Error())
  368. } else {
  369. ev.Info("WebSocket error: %s", err.Error())
  370. }
  371. return // Do not attempt to write to upgraded connection
  372. }
  373. if isNormalError {
  374. ev.Debug("Connection closed with HTTP %d (ntfy error %d)", httpErr.HTTPCode, httpErr.Code)
  375. } else {
  376. ev.Info("Connection closed with HTTP %d (ntfy error %d)", httpErr.HTTPCode, httpErr.Code)
  377. }
  378. if isRateLimiting && s.config.StripeSecretKey != "" {
  379. u := v.User()
  380. if u == nil || u.Tier == nil {
  381. httpErr = httpErr.Wrap("increase your limits with a paid plan, see %s", s.config.BaseURL)
  382. }
  383. }
  384. w.Header().Set("Content-Type", "application/json")
  385. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  386. w.WriteHeader(httpErr.HTTPCode)
  387. io.WriteString(w, httpErr.JSON()+"\n")
  388. }
  389. func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request, v *visitor) error {
  390. if r.Method == http.MethodGet && r.URL.Path == "/" && s.config.WebRoot == "/" {
  391. return s.ensureWebEnabled(s.handleRoot)(w, r, v)
  392. } else if r.Method == http.MethodHead && r.URL.Path == "/" {
  393. return s.ensureWebEnabled(s.handleEmpty)(w, r, v)
  394. } else if r.Method == http.MethodGet && r.URL.Path == apiHealthPath {
  395. return s.handleHealth(w, r, v)
  396. } else if r.Method == http.MethodGet && r.URL.Path == webConfigPath {
  397. return s.ensureWebEnabled(s.handleWebConfig)(w, r, v)
  398. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountPath {
  399. return s.ensureUserManager(s.handleAccountCreate)(w, r, v)
  400. } else if r.Method == http.MethodGet && r.URL.Path == apiAccountPath {
  401. return s.handleAccountGet(w, r, v) // Allowed by anonymous
  402. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountPath {
  403. return s.ensureUser(s.withAccountSync(s.handleAccountDelete))(w, r, v)
  404. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountPasswordPath {
  405. return s.ensureUser(s.handleAccountPasswordChange)(w, r, v)
  406. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountTokenPath {
  407. return s.ensureUser(s.withAccountSync(s.handleAccountTokenCreate))(w, r, v)
  408. } else if r.Method == http.MethodPatch && r.URL.Path == apiAccountTokenPath {
  409. return s.ensureUser(s.withAccountSync(s.handleAccountTokenUpdate))(w, r, v)
  410. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountTokenPath {
  411. return s.ensureUser(s.withAccountSync(s.handleAccountTokenDelete))(w, r, v)
  412. } else if r.Method == http.MethodPatch && r.URL.Path == apiAccountSettingsPath {
  413. return s.ensureUser(s.withAccountSync(s.handleAccountSettingsChange))(w, r, v)
  414. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountSubscriptionPath {
  415. return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionAdd))(w, r, v)
  416. } else if r.Method == http.MethodPatch && r.URL.Path == apiAccountSubscriptionPath {
  417. return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionChange))(w, r, v)
  418. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountSubscriptionPath {
  419. return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionDelete))(w, r, v)
  420. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountReservationPath {
  421. return s.ensureUser(s.withAccountSync(s.handleAccountReservationAdd))(w, r, v)
  422. } else if r.Method == http.MethodDelete && apiAccountReservationSingleRegex.MatchString(r.URL.Path) {
  423. return s.ensureUser(s.withAccountSync(s.handleAccountReservationDelete))(w, r, v)
  424. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingSubscriptionPath {
  425. return s.ensurePaymentsEnabled(s.ensureUser(s.handleAccountBillingSubscriptionCreate))(w, r, v) // Account sync via incoming Stripe webhook
  426. } else if r.Method == http.MethodGet && apiAccountBillingSubscriptionCheckoutSuccessRegex.MatchString(r.URL.Path) {
  427. return s.ensurePaymentsEnabled(s.ensureUserManager(s.handleAccountBillingSubscriptionCreateSuccess))(w, r, v) // No user context!
  428. } else if r.Method == http.MethodPut && r.URL.Path == apiAccountBillingSubscriptionPath {
  429. return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingSubscriptionUpdate))(w, r, v) // Account sync via incoming Stripe webhook
  430. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountBillingSubscriptionPath {
  431. return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingSubscriptionDelete))(w, r, v) // Account sync via incoming Stripe webhook
  432. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingPortalPath {
  433. return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingPortalSessionCreate))(w, r, v)
  434. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingWebhookPath {
  435. return s.ensurePaymentsEnabled(s.ensureUserManager(s.handleAccountBillingWebhook))(w, r, v) // This request comes from Stripe!
  436. } else if r.Method == http.MethodPut && r.URL.Path == apiAccountPhonePath {
  437. return s.ensureUser(s.withAccountSync(s.handleAccountPhoneNumberAdd))(w, r, v)
  438. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountPhonePath {
  439. return s.ensureUser(s.withAccountSync(s.handleAccountPhoneNumberVerify))(w, r, v)
  440. } else if r.Method == http.MethodGet && r.URL.Path == apiStatsPath {
  441. return s.handleStats(w, r, v)
  442. } else if r.Method == http.MethodGet && r.URL.Path == apiTiersPath {
  443. return s.ensurePaymentsEnabled(s.handleBillingTiersGet)(w, r, v)
  444. } else if r.Method == http.MethodGet && r.URL.Path == matrixPushPath {
  445. return s.handleMatrixDiscovery(w)
  446. } else if r.Method == http.MethodGet && r.URL.Path == metricsPath && s.metricsHandler != nil {
  447. return s.handleMetrics(w, r, v)
  448. } else if r.Method == http.MethodGet && staticRegex.MatchString(r.URL.Path) {
  449. return s.ensureWebEnabled(s.handleStatic)(w, r, v)
  450. } else if r.Method == http.MethodGet && docsRegex.MatchString(r.URL.Path) {
  451. return s.ensureWebEnabled(s.handleDocs)(w, r, v)
  452. } else if (r.Method == http.MethodGet || r.Method == http.MethodHead) && fileRegex.MatchString(r.URL.Path) && s.config.AttachmentCacheDir != "" {
  453. return s.limitRequests(s.handleFile)(w, r, v)
  454. } else if r.Method == http.MethodOptions {
  455. return s.limitRequests(s.handleOptions)(w, r, v) // Should work even if the web app is not enabled, see #598
  456. } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && r.URL.Path == "/" {
  457. return s.transformBodyJSON(s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish)))(w, r, v)
  458. } else if r.Method == http.MethodPost && r.URL.Path == matrixPushPath {
  459. return s.transformMatrixJSON(s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublishMatrix)))(w, r, v)
  460. } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && topicPathRegex.MatchString(r.URL.Path) {
  461. return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish))(w, r, v)
  462. } else if r.Method == http.MethodGet && publishPathRegex.MatchString(r.URL.Path) {
  463. return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish))(w, r, v)
  464. } else if r.Method == http.MethodGet && jsonPathRegex.MatchString(r.URL.Path) {
  465. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeJSON))(w, r, v)
  466. } else if r.Method == http.MethodGet && ssePathRegex.MatchString(r.URL.Path) {
  467. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeSSE))(w, r, v)
  468. } else if r.Method == http.MethodGet && rawPathRegex.MatchString(r.URL.Path) {
  469. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeRaw))(w, r, v)
  470. } else if r.Method == http.MethodGet && wsPathRegex.MatchString(r.URL.Path) {
  471. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeWS))(w, r, v)
  472. } else if r.Method == http.MethodGet && authPathRegex.MatchString(r.URL.Path) {
  473. return s.limitRequests(s.authorizeTopicRead(s.handleTopicAuth))(w, r, v)
  474. } else if r.Method == http.MethodGet && (topicPathRegex.MatchString(r.URL.Path) || externalTopicPathRegex.MatchString(r.URL.Path)) {
  475. return s.ensureWebEnabled(s.handleTopic)(w, r, v)
  476. }
  477. return errHTTPNotFound
  478. }
  479. func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request, v *visitor) error {
  480. r.URL.Path = webAppIndex
  481. return s.handleStatic(w, r, v)
  482. }
  483. func (s *Server) handleTopic(w http.ResponseWriter, r *http.Request, v *visitor) error {
  484. unifiedpush := readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see PUT/POST too!
  485. if unifiedpush {
  486. w.Header().Set("Content-Type", "application/json")
  487. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  488. _, err := io.WriteString(w, `{"unifiedpush":{"version":1}}`+"\n")
  489. return err
  490. }
  491. r.URL.Path = webAppIndex
  492. return s.handleStatic(w, r, v)
  493. }
  494. func (s *Server) handleEmpty(_ http.ResponseWriter, _ *http.Request, _ *visitor) error {
  495. return nil
  496. }
  497. func (s *Server) handleTopicAuth(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  498. return s.writeJSON(w, newSuccessResponse())
  499. }
  500. func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  501. response := &apiHealthResponse{
  502. Healthy: true,
  503. }
  504. return s.writeJSON(w, response)
  505. }
  506. func (s *Server) handleWebConfig(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  507. response := &apiConfigResponse{
  508. BaseURL: "", // Will translate to window.location.origin
  509. AppRoot: s.config.WebRoot,
  510. EnableLogin: s.config.EnableLogin,
  511. EnableSignup: s.config.EnableSignup,
  512. EnablePayments: s.config.StripeSecretKey != "",
  513. EnableSMS: s.config.TwilioAccount != "",
  514. EnableCalls: s.config.TwilioAccount != "",
  515. EnableReservations: s.config.EnableReservations,
  516. BillingContact: s.config.BillingContact,
  517. DisallowedTopics: s.config.DisallowedTopics,
  518. }
  519. b, err := json.MarshalIndent(response, "", " ")
  520. if err != nil {
  521. return err
  522. }
  523. w.Header().Set("Content-Type", "text/javascript")
  524. _, err = io.WriteString(w, fmt.Sprintf("// Generated server configuration\nvar config = %s;\n", string(b)))
  525. return err
  526. }
  527. // handleMetrics returns Prometheus metrics. This endpoint is only called if enable-metrics is set,
  528. // and listen-metrics-http is not set.
  529. func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request, _ *visitor) error {
  530. s.metricsHandler.ServeHTTP(w, r)
  531. return nil
  532. }
  533. // handleStatic returns all static resources (excluding the docs), including the web app
  534. func (s *Server) handleStatic(w http.ResponseWriter, r *http.Request, _ *visitor) error {
  535. r.URL.Path = webSiteDir + r.URL.Path
  536. util.Gzip(http.FileServer(http.FS(webFsCached))).ServeHTTP(w, r)
  537. return nil
  538. }
  539. // handleDocs returns static resources related to the docs
  540. func (s *Server) handleDocs(w http.ResponseWriter, r *http.Request, _ *visitor) error {
  541. util.Gzip(http.FileServer(http.FS(docsStaticCached))).ServeHTTP(w, r)
  542. return nil
  543. }
  544. // handleStats returns the publicly available server stats
  545. func (s *Server) handleStats(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  546. s.mu.RLock()
  547. messages, n, rate := s.messages, len(s.messagesHistory), float64(0)
  548. if n > 1 {
  549. rate = float64(s.messagesHistory[n-1]-s.messagesHistory[0]) / (float64(n-1) * s.config.ManagerInterval.Seconds())
  550. }
  551. s.mu.RUnlock()
  552. response := &apiStatsResponse{
  553. Messages: messages,
  554. MessagesRate: rate,
  555. }
  556. return s.writeJSON(w, response)
  557. }
  558. // handleFile processes the download of attachment files. The method handles GET and HEAD requests against a file.
  559. // Before streaming the file to a client, it locates uploader (m.Sender or m.User) in the message cache, so it
  560. // can associate the download bandwidth with the uploader.
  561. func (s *Server) handleFile(w http.ResponseWriter, r *http.Request, v *visitor) error {
  562. if s.config.AttachmentCacheDir == "" {
  563. return errHTTPInternalError
  564. }
  565. matches := fileRegex.FindStringSubmatch(r.URL.Path)
  566. if len(matches) != 2 {
  567. return errHTTPInternalErrorInvalidPath
  568. }
  569. messageID := matches[1]
  570. file := filepath.Join(s.config.AttachmentCacheDir, messageID)
  571. stat, err := os.Stat(file)
  572. if err != nil {
  573. return errHTTPNotFound.Fields(log.Context{
  574. "message_id": messageID,
  575. "error_context": "filesystem",
  576. })
  577. }
  578. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  579. w.Header().Set("Content-Length", fmt.Sprintf("%d", stat.Size()))
  580. if r.Method == http.MethodHead {
  581. return nil
  582. }
  583. // Find message in database, and associate bandwidth to the uploader user
  584. // This is an easy way to
  585. // - avoid abuse (e.g. 1 uploader, 1k downloaders)
  586. // - and also uses the higher bandwidth limits of a paying user
  587. m, err := s.messageCache.Message(messageID)
  588. if err == errMessageNotFound {
  589. if s.config.CacheBatchTimeout > 0 {
  590. // Strange edge case: If we immediately after upload request the file (the web app does this for images),
  591. // and messages are persisted asynchronously, retry fetching from the database
  592. m, err = util.Retry(func() (*message, error) {
  593. return s.messageCache.Message(messageID)
  594. }, s.config.CacheBatchTimeout, 100*time.Millisecond, 300*time.Millisecond, 600*time.Millisecond)
  595. }
  596. if err != nil {
  597. return errHTTPNotFound.Fields(log.Context{
  598. "message_id": messageID,
  599. "error_context": "message_cache",
  600. })
  601. }
  602. } else if err != nil {
  603. return err
  604. }
  605. bandwidthVisitor := v
  606. if s.userManager != nil && m.User != "" {
  607. u, err := s.userManager.UserByID(m.User)
  608. if err != nil {
  609. return err
  610. }
  611. bandwidthVisitor = s.visitor(v.IP(), u)
  612. } else if m.Sender.IsValid() {
  613. bandwidthVisitor = s.visitor(m.Sender, nil)
  614. }
  615. if !bandwidthVisitor.BandwidthAllowed(stat.Size()) {
  616. return errHTTPTooManyRequestsLimitAttachmentBandwidth.With(m)
  617. }
  618. // Actually send file
  619. f, err := os.Open(file)
  620. if err != nil {
  621. return err
  622. }
  623. defer f.Close()
  624. _, err = io.Copy(util.NewContentTypeWriter(w, r.URL.Path), f)
  625. return err
  626. }
  627. func (s *Server) handleMatrixDiscovery(w http.ResponseWriter) error {
  628. if s.config.BaseURL == "" {
  629. return errHTTPInternalErrorMissingBaseURL
  630. }
  631. return writeMatrixDiscoveryResponse(w)
  632. }
  633. func (s *Server) handlePublishInternal(r *http.Request, v *visitor) (*message, error) {
  634. start := time.Now()
  635. t, err := fromContext[*topic](r, contextTopic)
  636. if err != nil {
  637. return nil, err
  638. }
  639. vrate, err := fromContext[*visitor](r, contextRateVisitor)
  640. if err != nil {
  641. return nil, err
  642. }
  643. body, err := util.Peek(r.Body, s.config.MessageLimit)
  644. if err != nil {
  645. return nil, err
  646. }
  647. m := newDefaultMessage(t.ID, "")
  648. cache, firebase, email, sms, call, unifiedpush, e := s.parsePublishParams(r, m)
  649. if e != nil {
  650. return nil, e.With(t)
  651. }
  652. if unifiedpush && s.config.VisitorSubscriberRateLimiting && t.RateVisitor() == nil {
  653. // UnifiedPush clients must subscribe before publishing to allow proper subscriber-based rate limiting (see
  654. // Rate-Topics header). The 5xx response is because some app servers (in particular Mastodon) will remove
  655. // the subscription as invalid if any 400-499 code (except 429/408) is returned.
  656. // See https://github.com/mastodon/mastodon/blob/730bb3e211a84a2f30e3e2bbeae3f77149824a68/app/workers/web/push_notification_worker.rb#L35-L46
  657. return nil, errHTTPInsufficientStorageUnifiedPush.With(t)
  658. } else if !util.ContainsIP(s.config.VisitorRequestExemptIPAddrs, v.ip) && !vrate.MessageAllowed() {
  659. return nil, errHTTPTooManyRequestsLimitMessages.With(t)
  660. } else if email != "" && !vrate.EmailAllowed() {
  661. return nil, errHTTPTooManyRequestsLimitEmails.With(t)
  662. } else if sms != "" && !vrate.SMSAllowed() {
  663. return nil, errHTTPTooManyRequestsLimitSMS.With(t)
  664. } else if call != "" && !vrate.CallAllowed() {
  665. return nil, errHTTPTooManyRequestsLimitCalls.With(t)
  666. }
  667. if m.PollID != "" {
  668. m = newPollRequestMessage(t.ID, m.PollID)
  669. }
  670. m.Sender = v.IP()
  671. m.User = v.MaybeUserID()
  672. if cache {
  673. m.Expires = time.Unix(m.Time, 0).Add(v.Limits().MessageExpiryDuration).Unix()
  674. }
  675. if err := s.handlePublishBody(r, v, m, body, unifiedpush); err != nil {
  676. return nil, err
  677. }
  678. if m.Message == "" {
  679. m.Message = emptyMessageBody
  680. }
  681. delayed := m.Time > time.Now().Unix()
  682. ev := logvrm(v, r, m).
  683. Tag(tagPublish).
  684. With(t).
  685. Fields(log.Context{
  686. "message_delayed": delayed,
  687. "message_firebase": firebase,
  688. "message_unifiedpush": unifiedpush,
  689. "message_email": email,
  690. })
  691. if ev.IsTrace() {
  692. ev.Field("message_body", util.MaybeMarshalJSON(m)).Trace("Received message")
  693. } else if ev.IsDebug() {
  694. ev.Debug("Received message")
  695. }
  696. if !delayed {
  697. if err := t.Publish(v, m); err != nil {
  698. return nil, err
  699. }
  700. if s.firebaseClient != nil && firebase {
  701. go s.sendToFirebase(v, m)
  702. }
  703. if s.smtpSender != nil && email != "" {
  704. go s.sendEmail(v, m, email)
  705. }
  706. if s.config.TwilioAccount != "" && sms != "" {
  707. go s.sendSMS(v, r, m, sms)
  708. }
  709. if s.config.TwilioAccount != "" && call != "" {
  710. go s.callPhone(v, r, m, call)
  711. }
  712. if s.config.UpstreamBaseURL != "" {
  713. go s.forwardPollRequest(v, m)
  714. }
  715. } else {
  716. logvrm(v, r, m).Tag(tagPublish).Debug("Message delayed, will process later")
  717. }
  718. if cache {
  719. logvrm(v, r, m).Tag(tagPublish).Debug("Adding message to cache")
  720. if err := s.messageCache.AddMessage(m); err != nil {
  721. return nil, err
  722. }
  723. }
  724. u := v.User()
  725. if s.userManager != nil && u != nil && u.Tier != nil {
  726. go s.userManager.EnqueueUserStats(u.ID, v.Stats())
  727. }
  728. s.mu.Lock()
  729. s.messages++
  730. s.mu.Unlock()
  731. if unifiedpush {
  732. minc(metricUnifiedPushPublishedSuccess)
  733. }
  734. mset(metricMessagePublishDurationMillis, time.Since(start).Milliseconds())
  735. return m, nil
  736. }
  737. func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visitor) error {
  738. m, err := s.handlePublishInternal(r, v)
  739. if err != nil {
  740. minc(metricMessagesPublishedFailure)
  741. return err
  742. }
  743. minc(metricMessagesPublishedSuccess)
  744. return s.writeJSON(w, m)
  745. }
  746. func (s *Server) handlePublishMatrix(w http.ResponseWriter, r *http.Request, v *visitor) error {
  747. _, err := s.handlePublishInternal(r, v)
  748. if err != nil {
  749. minc(metricMessagesPublishedFailure)
  750. minc(metricMatrixPublishedFailure)
  751. if e, ok := err.(*errHTTP); ok && e.HTTPCode == errHTTPInsufficientStorageUnifiedPush.HTTPCode {
  752. topic, err := fromContext[*topic](r, contextTopic)
  753. if err != nil {
  754. return err
  755. }
  756. pushKey, err := fromContext[string](r, contextMatrixPushKey)
  757. if err != nil {
  758. return err
  759. }
  760. if time.Since(topic.LastAccess()) > matrixRejectPushKeyForUnifiedPushTopicWithoutRateVisitorAfter {
  761. return writeMatrixResponse(w, pushKey)
  762. }
  763. }
  764. return err
  765. }
  766. minc(metricMessagesPublishedSuccess)
  767. minc(metricMatrixPublishedSuccess)
  768. return writeMatrixSuccess(w)
  769. }
  770. func (s *Server) sendToFirebase(v *visitor, m *message) {
  771. logvm(v, m).Tag(tagFirebase).Debug("Publishing to Firebase")
  772. if err := s.firebaseClient.Send(v, m); err != nil {
  773. minc(metricFirebasePublishedFailure)
  774. if err == errFirebaseTemporarilyBanned {
  775. logvm(v, m).Tag(tagFirebase).Err(err).Debug("Unable to publish to Firebase: %v", err.Error())
  776. } else {
  777. logvm(v, m).Tag(tagFirebase).Err(err).Warn("Unable to publish to Firebase: %v", err.Error())
  778. }
  779. return
  780. }
  781. minc(metricFirebasePublishedSuccess)
  782. }
  783. func (s *Server) sendEmail(v *visitor, m *message, email string) {
  784. logvm(v, m).Tag(tagEmail).Field("email", email).Debug("Sending email to %s", email)
  785. if err := s.smtpSender.Send(v, m, email); err != nil {
  786. logvm(v, m).Tag(tagEmail).Field("email", email).Err(err).Warn("Unable to send email to %s: %v", email, err.Error())
  787. minc(metricEmailsPublishedFailure)
  788. return
  789. }
  790. minc(metricEmailsPublishedSuccess)
  791. }
  792. func (s *Server) forwardPollRequest(v *visitor, m *message) {
  793. topicURL := fmt.Sprintf("%s/%s", s.config.BaseURL, m.Topic)
  794. topicHash := fmt.Sprintf("%x", sha256.Sum256([]byte(topicURL)))
  795. forwardURL := fmt.Sprintf("%s/%s", s.config.UpstreamBaseURL, topicHash)
  796. logvm(v, m).Debug("Publishing poll request to %s", forwardURL)
  797. req, err := http.NewRequest("POST", forwardURL, strings.NewReader(""))
  798. if err != nil {
  799. logvm(v, m).Err(err).Warn("Unable to publish poll request")
  800. return
  801. }
  802. req.Header.Set("X-Poll-ID", m.ID)
  803. var httpClient = &http.Client{
  804. Timeout: time.Second * 10,
  805. }
  806. response, err := httpClient.Do(req)
  807. if err != nil {
  808. logvm(v, m).Err(err).Warn("Unable to publish poll request")
  809. return
  810. } else if response.StatusCode != http.StatusOK {
  811. logvm(v, m).Err(err).Warn("Unable to publish poll request, unexpected HTTP status: %d", response.StatusCode)
  812. return
  813. }
  814. }
  815. func (s *Server) parsePublishParams(r *http.Request, m *message) (cache bool, firebase bool, email, sms, call string, unifiedpush bool, err *errHTTP) {
  816. cache = readBoolParam(r, true, "x-cache", "cache")
  817. firebase = readBoolParam(r, true, "x-firebase", "firebase")
  818. m.Title = maybeDecodeHeader(readParam(r, "x-title", "title", "t"))
  819. m.Click = readParam(r, "x-click", "click")
  820. icon := readParam(r, "x-icon", "icon")
  821. filename := readParam(r, "x-filename", "filename", "file", "f")
  822. attach := readParam(r, "x-attach", "attach", "a")
  823. if attach != "" || filename != "" {
  824. m.Attachment = &attachment{}
  825. }
  826. if filename != "" {
  827. m.Attachment.Name = filename
  828. }
  829. if attach != "" {
  830. if !urlRegex.MatchString(attach) {
  831. return false, false, "", "", "", false, errHTTPBadRequestAttachmentURLInvalid
  832. }
  833. m.Attachment.URL = attach
  834. if m.Attachment.Name == "" {
  835. u, err := url.Parse(m.Attachment.URL)
  836. if err == nil {
  837. m.Attachment.Name = path.Base(u.Path)
  838. if m.Attachment.Name == "." || m.Attachment.Name == "/" {
  839. m.Attachment.Name = ""
  840. }
  841. }
  842. }
  843. if m.Attachment.Name == "" {
  844. m.Attachment.Name = "attachment"
  845. }
  846. }
  847. if icon != "" {
  848. if !urlRegex.MatchString(icon) {
  849. return false, false, "", "", "", false, errHTTPBadRequestIconURLInvalid
  850. }
  851. m.Icon = icon
  852. }
  853. email = readParam(r, "x-email", "x-e-mail", "email", "e-mail", "mail", "e")
  854. if s.smtpSender == nil && email != "" {
  855. return false, false, "", "", "", false, errHTTPBadRequestEmailDisabled
  856. }
  857. sms = readParam(r, "x-sms", "sms")
  858. if sms != "" && s.config.TwilioAccount == "" {
  859. return false, false, "", "", "", false, errHTTPBadRequestTwilioDisabled
  860. } else if sms != "" && !phoneNumberRegex.MatchString(sms) {
  861. return false, false, "", "", "", false, errHTTPBadRequestPhoneNumberInvalid
  862. }
  863. call = readParam(r, "x-call", "call")
  864. if call != "" && s.config.TwilioAccount == "" {
  865. return false, false, "", "", "", false, errHTTPBadRequestTwilioDisabled
  866. } else if call != "" && !phoneNumberRegex.MatchString(call) {
  867. return false, false, "", "", "", false, errHTTPBadRequestPhoneNumberInvalid
  868. }
  869. messageStr := strings.ReplaceAll(readParam(r, "x-message", "message", "m"), "\\n", "\n")
  870. if messageStr != "" {
  871. m.Message = maybeDecodeHeader(messageStr)
  872. }
  873. var e error
  874. m.Priority, e = util.ParsePriority(readParam(r, "x-priority", "priority", "prio", "p"))
  875. if e != nil {
  876. return false, false, "", "", "", false, errHTTPBadRequestPriorityInvalid
  877. }
  878. m.Tags = readCommaSeparatedParam(r, "x-tags", "tags", "tag", "ta")
  879. for i, t := range m.Tags {
  880. m.Tags[i] = maybeDecodeHeader(t)
  881. }
  882. delayStr := readParam(r, "x-delay", "delay", "x-at", "at", "x-in", "in")
  883. if delayStr != "" {
  884. if !cache {
  885. return false, false, "", "", "", false, errHTTPBadRequestDelayNoCache
  886. }
  887. if email != "" {
  888. return false, false, "", "", "", false, errHTTPBadRequestDelayNoEmail // we cannot store the email address (yet)
  889. }
  890. delay, err := util.ParseFutureTime(delayStr, time.Now())
  891. if err != nil {
  892. return false, false, "", "", "", false, errHTTPBadRequestDelayCannotParse
  893. } else if delay.Unix() < time.Now().Add(s.config.MinDelay).Unix() {
  894. return false, false, "", "", "", false, errHTTPBadRequestDelayTooSmall
  895. } else if delay.Unix() > time.Now().Add(s.config.MaxDelay).Unix() {
  896. return false, false, "", "", "", false, errHTTPBadRequestDelayTooLarge
  897. }
  898. m.Time = delay.Unix()
  899. }
  900. actionsStr := readParam(r, "x-actions", "actions", "action")
  901. if actionsStr != "" {
  902. m.Actions, e = parseActions(actionsStr)
  903. if e != nil {
  904. return false, false, "", "", "", false, errHTTPBadRequestActionsInvalid.Wrap(e.Error())
  905. }
  906. }
  907. unifiedpush = readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see GET too!
  908. if unifiedpush {
  909. firebase = false
  910. unifiedpush = true
  911. }
  912. m.PollID = readParam(r, "x-poll-id", "poll-id")
  913. if m.PollID != "" {
  914. unifiedpush = false
  915. cache = false
  916. email = ""
  917. }
  918. return cache, firebase, email, sms, call, unifiedpush, nil
  919. }
  920. // handlePublishBody consumes the PUT/POST body and decides whether the body is an attachment or the message.
  921. //
  922. // 1. curl -X POST -H "Poll: 1234" ntfy.sh/...
  923. // If a message is flagged as poll request, the body does not matter and is discarded
  924. // 2. curl -T somebinarydata.bin "ntfy.sh/mytopic?up=1"
  925. // If body is binary, encode as base64, if not do not encode
  926. // 3. curl -H "Attach: http://example.com/file.jpg" ntfy.sh/mytopic
  927. // Body must be a message, because we attached an external URL
  928. // 4. curl -T short.txt -H "Filename: short.txt" ntfy.sh/mytopic
  929. // Body must be attachment, because we passed a filename
  930. // 5. curl -T file.txt ntfy.sh/mytopic
  931. // If file.txt is <= 4096 (message limit) and valid UTF-8, treat it as a message
  932. // 6. curl -T file.txt ntfy.sh/mytopic
  933. // If file.txt is > message limit, treat it as an attachment
  934. func (s *Server) handlePublishBody(r *http.Request, v *visitor, m *message, body *util.PeekedReadCloser, unifiedpush bool) error {
  935. if m.Event == pollRequestEvent { // Case 1
  936. return s.handleBodyDiscard(body)
  937. } else if unifiedpush {
  938. return s.handleBodyAsMessageAutoDetect(m, body) // Case 2
  939. } else if m.Attachment != nil && m.Attachment.URL != "" {
  940. return s.handleBodyAsTextMessage(m, body) // Case 3
  941. } else if m.Attachment != nil && m.Attachment.Name != "" {
  942. return s.handleBodyAsAttachment(r, v, m, body) // Case 4
  943. } else if !body.LimitReached && utf8.Valid(body.PeekedBytes) {
  944. return s.handleBodyAsTextMessage(m, body) // Case 5
  945. }
  946. return s.handleBodyAsAttachment(r, v, m, body) // Case 6
  947. }
  948. func (s *Server) handleBodyDiscard(body *util.PeekedReadCloser) error {
  949. _, err := io.Copy(io.Discard, body)
  950. _ = body.Close()
  951. return err
  952. }
  953. func (s *Server) handleBodyAsMessageAutoDetect(m *message, body *util.PeekedReadCloser) error {
  954. if utf8.Valid(body.PeekedBytes) {
  955. m.Message = string(body.PeekedBytes) // Do not trim
  956. } else {
  957. m.Message = base64.StdEncoding.EncodeToString(body.PeekedBytes)
  958. m.Encoding = encodingBase64
  959. }
  960. return nil
  961. }
  962. func (s *Server) handleBodyAsTextMessage(m *message, body *util.PeekedReadCloser) error {
  963. if !utf8.Valid(body.PeekedBytes) {
  964. return errHTTPBadRequestMessageNotUTF8.With(m)
  965. }
  966. if len(body.PeekedBytes) > 0 { // Empty body should not override message (publish via GET!)
  967. m.Message = strings.TrimSpace(string(body.PeekedBytes)) // Truncates the message to the peek limit if required
  968. }
  969. if m.Attachment != nil && m.Attachment.Name != "" && m.Message == "" {
  970. m.Message = fmt.Sprintf(defaultAttachmentMessage, m.Attachment.Name)
  971. }
  972. return nil
  973. }
  974. func (s *Server) handleBodyAsAttachment(r *http.Request, v *visitor, m *message, body *util.PeekedReadCloser) error {
  975. if s.fileCache == nil || s.config.BaseURL == "" || s.config.AttachmentCacheDir == "" {
  976. return errHTTPBadRequestAttachmentsDisallowed.With(m)
  977. }
  978. vinfo, err := v.Info()
  979. if err != nil {
  980. return err
  981. }
  982. attachmentExpiry := time.Now().Add(vinfo.Limits.AttachmentExpiryDuration).Unix()
  983. if m.Time > attachmentExpiry {
  984. return errHTTPBadRequestAttachmentsExpiryBeforeDelivery.With(m)
  985. }
  986. contentLengthStr := r.Header.Get("Content-Length")
  987. if contentLengthStr != "" { // Early "do-not-trust" check, hard limit see below
  988. contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64)
  989. if err == nil && (contentLength > vinfo.Stats.AttachmentTotalSizeRemaining || contentLength > vinfo.Limits.AttachmentFileSizeLimit) {
  990. return errHTTPEntityTooLargeAttachment.With(m).Fields(log.Context{
  991. "message_content_length": contentLength,
  992. "attachment_total_size_remaining": vinfo.Stats.AttachmentTotalSizeRemaining,
  993. "attachment_file_size_limit": vinfo.Limits.AttachmentFileSizeLimit,
  994. })
  995. }
  996. }
  997. if m.Attachment == nil {
  998. m.Attachment = &attachment{}
  999. }
  1000. var ext string
  1001. m.Attachment.Expires = attachmentExpiry
  1002. m.Attachment.Type, ext = util.DetectContentType(body.PeekedBytes, m.Attachment.Name)
  1003. m.Attachment.URL = fmt.Sprintf("%s/file/%s%s", s.config.BaseURL, m.ID, ext)
  1004. if m.Attachment.Name == "" {
  1005. m.Attachment.Name = fmt.Sprintf("attachment%s", ext)
  1006. }
  1007. if m.Message == "" {
  1008. m.Message = fmt.Sprintf(defaultAttachmentMessage, m.Attachment.Name)
  1009. }
  1010. limiters := []util.Limiter{
  1011. v.BandwidthLimiter(),
  1012. util.NewFixedLimiter(vinfo.Limits.AttachmentFileSizeLimit),
  1013. util.NewFixedLimiter(vinfo.Stats.AttachmentTotalSizeRemaining),
  1014. }
  1015. m.Attachment.Size, err = s.fileCache.Write(m.ID, body, limiters...)
  1016. if err == util.ErrLimitReached {
  1017. return errHTTPEntityTooLargeAttachment.With(m)
  1018. } else if err != nil {
  1019. return err
  1020. }
  1021. return nil
  1022. }
  1023. func (s *Server) handleSubscribeJSON(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1024. encoder := func(msg *message) (string, error) {
  1025. var buf bytes.Buffer
  1026. if err := json.NewEncoder(&buf).Encode(&msg); err != nil {
  1027. return "", err
  1028. }
  1029. return buf.String(), nil
  1030. }
  1031. return s.handleSubscribeHTTP(w, r, v, "application/x-ndjson", encoder)
  1032. }
  1033. func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1034. encoder := func(msg *message) (string, error) {
  1035. var buf bytes.Buffer
  1036. if err := json.NewEncoder(&buf).Encode(&msg); err != nil {
  1037. return "", err
  1038. }
  1039. if msg.Event != messageEvent {
  1040. return fmt.Sprintf("event: %s\ndata: %s\n", msg.Event, buf.String()), nil // Browser's .onmessage() does not fire on this!
  1041. }
  1042. return fmt.Sprintf("data: %s\n", buf.String()), nil
  1043. }
  1044. return s.handleSubscribeHTTP(w, r, v, "text/event-stream", encoder)
  1045. }
  1046. func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1047. encoder := func(msg *message) (string, error) {
  1048. if msg.Event == messageEvent { // only handle default events
  1049. return strings.ReplaceAll(msg.Message, "\n", " ") + "\n", nil
  1050. }
  1051. return "\n", nil // "keepalive" and "open" events just send an empty line
  1052. }
  1053. return s.handleSubscribeHTTP(w, r, v, "text/plain", encoder)
  1054. }
  1055. func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *visitor, contentType string, encoder messageEncoder) error {
  1056. logvr(v, r).Tag(tagSubscribe).Debug("HTTP stream connection opened")
  1057. defer logvr(v, r).Tag(tagSubscribe).Debug("HTTP stream connection closed")
  1058. if !v.SubscriptionAllowed() {
  1059. return errHTTPTooManyRequestsLimitSubscriptions
  1060. }
  1061. defer v.RemoveSubscription()
  1062. topics, topicsStr, err := s.topicsFromPath(r.URL.Path)
  1063. if err != nil {
  1064. return err
  1065. }
  1066. poll, since, scheduled, filters, rateTopics, err := parseSubscribeParams(r)
  1067. if err != nil {
  1068. return err
  1069. }
  1070. var wlock sync.Mutex
  1071. defer func() {
  1072. // Hack: This is the fix for a horrible data race that I have not been able to figure out in quite some time.
  1073. // It appears to be happening when the Go HTTP code reads from the socket when closing the request (i.e. AFTER
  1074. // this function returns), and causes a data race with the ResponseWriter. Locking wlock here silences the
  1075. // data race detector. See https://github.com/binwiederhier/ntfy/issues/338#issuecomment-1163425889.
  1076. wlock.TryLock()
  1077. }()
  1078. sub := func(v *visitor, msg *message) error {
  1079. if !filters.Pass(msg) {
  1080. return nil
  1081. }
  1082. m, err := encoder(msg)
  1083. if err != nil {
  1084. return err
  1085. }
  1086. wlock.Lock()
  1087. defer wlock.Unlock()
  1088. if _, err := w.Write([]byte(m)); err != nil {
  1089. return err
  1090. }
  1091. if fl, ok := w.(http.Flusher); ok {
  1092. fl.Flush()
  1093. }
  1094. return nil
  1095. }
  1096. if err := s.maybeSetRateVisitors(r, v, topics, rateTopics); err != nil {
  1097. return err
  1098. }
  1099. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1100. w.Header().Set("Content-Type", contentType+"; charset=utf-8") // Android/Volley client needs charset!
  1101. if poll {
  1102. for _, t := range topics {
  1103. t.Keepalive()
  1104. }
  1105. return s.sendOldMessages(topics, since, scheduled, v, sub)
  1106. }
  1107. ctx, cancel := context.WithCancel(context.Background())
  1108. defer cancel()
  1109. subscriberIDs := make([]int, 0)
  1110. for _, t := range topics {
  1111. subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel))
  1112. }
  1113. defer func() {
  1114. for i, subscriberID := range subscriberIDs {
  1115. topics[i].Unsubscribe(subscriberID) // Order!
  1116. }
  1117. }()
  1118. if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message
  1119. return err
  1120. }
  1121. if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {
  1122. return err
  1123. }
  1124. for {
  1125. select {
  1126. case <-ctx.Done():
  1127. return nil
  1128. case <-r.Context().Done():
  1129. return nil
  1130. case <-time.After(s.config.KeepaliveInterval):
  1131. ev := logvr(v, r).Tag(tagSubscribe)
  1132. if len(topics) == 1 {
  1133. ev.With(topics[0]).Trace("Sending keepalive message to %s", topics[0].ID)
  1134. } else {
  1135. ev.Trace("Sending keepalive message to %d topics", len(topics))
  1136. }
  1137. v.Keepalive()
  1138. for _, t := range topics {
  1139. t.Keepalive()
  1140. }
  1141. if err := sub(v, newKeepaliveMessage(topicsStr)); err != nil { // Send keepalive message
  1142. return err
  1143. }
  1144. }
  1145. }
  1146. }
  1147. func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1148. if strings.ToLower(r.Header.Get("Upgrade")) != "websocket" {
  1149. return errHTTPBadRequestWebSocketsUpgradeHeaderMissing
  1150. }
  1151. if !v.SubscriptionAllowed() {
  1152. return errHTTPTooManyRequestsLimitSubscriptions
  1153. }
  1154. defer v.RemoveSubscription()
  1155. logvr(v, r).Tag(tagWebsocket).Debug("WebSocket connection opened")
  1156. defer logvr(v, r).Tag(tagWebsocket).Debug("WebSocket connection closed")
  1157. topics, topicsStr, err := s.topicsFromPath(r.URL.Path)
  1158. if err != nil {
  1159. return err
  1160. }
  1161. poll, since, scheduled, filters, rateTopics, err := parseSubscribeParams(r)
  1162. if err != nil {
  1163. return err
  1164. }
  1165. upgrader := &websocket.Upgrader{
  1166. ReadBufferSize: wsBufferSize,
  1167. WriteBufferSize: wsBufferSize,
  1168. CheckOrigin: func(r *http.Request) bool {
  1169. return true // We're open for business!
  1170. },
  1171. }
  1172. conn, err := upgrader.Upgrade(w, r, nil)
  1173. if err != nil {
  1174. return err
  1175. }
  1176. defer conn.Close()
  1177. // Subscription connections can be canceled externally, see topic.CancelSubscribers
  1178. cancelCtx, cancel := context.WithCancel(context.Background())
  1179. defer cancel()
  1180. // Use errgroup to run WebSocket reader and writer in Go routines
  1181. var wlock sync.Mutex
  1182. g, gctx := errgroup.WithContext(cancelCtx)
  1183. g.Go(func() error {
  1184. pongWait := s.config.KeepaliveInterval + wsPongWait
  1185. conn.SetReadLimit(wsReadLimit)
  1186. if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
  1187. return err
  1188. }
  1189. conn.SetPongHandler(func(appData string) error {
  1190. logvr(v, r).Tag(tagWebsocket).Trace("Received WebSocket pong")
  1191. return conn.SetReadDeadline(time.Now().Add(pongWait))
  1192. })
  1193. for {
  1194. _, _, err := conn.NextReader()
  1195. if err != nil {
  1196. return err
  1197. }
  1198. select {
  1199. case <-gctx.Done():
  1200. return nil
  1201. default:
  1202. }
  1203. }
  1204. })
  1205. g.Go(func() error {
  1206. ping := func() error {
  1207. wlock.Lock()
  1208. defer wlock.Unlock()
  1209. if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
  1210. return err
  1211. }
  1212. logvr(v, r).Tag(tagWebsocket).Trace("Sending WebSocket ping")
  1213. return conn.WriteMessage(websocket.PingMessage, nil)
  1214. }
  1215. for {
  1216. select {
  1217. case <-gctx.Done():
  1218. return nil
  1219. case <-cancelCtx.Done():
  1220. logvr(v, r).Tag(tagWebsocket).Trace("Cancel received, closing subscriber connection")
  1221. conn.Close()
  1222. return &websocket.CloseError{Code: websocket.CloseNormalClosure, Text: "subscription was canceled"}
  1223. case <-time.After(s.config.KeepaliveInterval):
  1224. v.Keepalive()
  1225. for _, t := range topics {
  1226. t.Keepalive()
  1227. }
  1228. if err := ping(); err != nil {
  1229. return err
  1230. }
  1231. }
  1232. }
  1233. })
  1234. sub := func(v *visitor, msg *message) error {
  1235. if !filters.Pass(msg) {
  1236. return nil
  1237. }
  1238. wlock.Lock()
  1239. defer wlock.Unlock()
  1240. if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
  1241. return err
  1242. }
  1243. return conn.WriteJSON(msg)
  1244. }
  1245. if err := s.maybeSetRateVisitors(r, v, topics, rateTopics); err != nil {
  1246. return err
  1247. }
  1248. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1249. if poll {
  1250. for _, t := range topics {
  1251. t.Keepalive()
  1252. }
  1253. return s.sendOldMessages(topics, since, scheduled, v, sub)
  1254. }
  1255. subscriberIDs := make([]int, 0)
  1256. for _, t := range topics {
  1257. subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel))
  1258. }
  1259. defer func() {
  1260. for i, subscriberID := range subscriberIDs {
  1261. topics[i].Unsubscribe(subscriberID) // Order!
  1262. }
  1263. }()
  1264. if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message
  1265. return err
  1266. }
  1267. if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {
  1268. return err
  1269. }
  1270. err = g.Wait()
  1271. if err != nil && websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNoStatusReceived) {
  1272. logvr(v, r).Tag(tagWebsocket).Err(err).Fields(websocketErrorContext(err)).Trace("WebSocket connection closed")
  1273. return nil // Normal closures are not errors; note: "1006 (abnormal closure)" is treated as normal, because people disconnect a lot
  1274. }
  1275. return err
  1276. }
  1277. func parseSubscribeParams(r *http.Request) (poll bool, since sinceMarker, scheduled bool, filters *queryFilter, rateTopics []string, err error) {
  1278. poll = readBoolParam(r, false, "x-poll", "poll", "po")
  1279. scheduled = readBoolParam(r, false, "x-scheduled", "scheduled", "sched")
  1280. since, err = parseSince(r, poll)
  1281. if err != nil {
  1282. return
  1283. }
  1284. filters, err = parseQueryFilters(r)
  1285. if err != nil {
  1286. return
  1287. }
  1288. rateTopics = readCommaSeparatedParam(r, "x-rate-topics", "rate-topics")
  1289. return
  1290. }
  1291. // maybeSetRateVisitors sets the rate visitor on a topic (v.SetRateVisitor), indicating that all messages published
  1292. // to that topic will be rate limited against the rate visitor instead of the publishing visitor.
  1293. //
  1294. // Setting the rate visitor is ony allowed if the `visitor-subscriber-rate-limiting` setting is enabled, AND
  1295. // - auth-file is not set (everything is open by default)
  1296. // - or the topic is reserved, and v.user is the owner
  1297. // - or the topic is not reserved, and v.user has write access
  1298. //
  1299. // Note: This TEMPORARILY also registers all topics starting with "up" (= UnifiedPush). This is to ease the transition
  1300. // until the Android app will send the "Rate-Topics" header.
  1301. func (s *Server) maybeSetRateVisitors(r *http.Request, v *visitor, topics []*topic, rateTopics []string) error {
  1302. // Bail out if not enabled
  1303. if !s.config.VisitorSubscriberRateLimiting {
  1304. return nil
  1305. }
  1306. // Make a list of topics that we'll actually set the RateVisitor on
  1307. eligibleRateTopics := make([]*topic, 0)
  1308. for _, t := range topics {
  1309. if (strings.HasPrefix(t.ID, unifiedPushTopicPrefix) && len(t.ID) == unifiedPushTopicLength) || util.Contains(rateTopics, t.ID) {
  1310. eligibleRateTopics = append(eligibleRateTopics, t)
  1311. }
  1312. }
  1313. if len(eligibleRateTopics) == 0 {
  1314. return nil
  1315. }
  1316. // If access controls are turned off, v has access to everything, and we can set the rate visitor
  1317. if s.userManager == nil {
  1318. return s.setRateVisitors(r, v, eligibleRateTopics)
  1319. }
  1320. // If access controls are enabled, only set rate visitor if
  1321. // - topic is reserved, and v.user is the owner
  1322. // - topic is not reserved, and v.user has write access
  1323. writableRateTopics := make([]*topic, 0)
  1324. for _, t := range topics {
  1325. ownerUserID, err := s.userManager.ReservationOwner(t.ID)
  1326. if err != nil {
  1327. return err
  1328. }
  1329. if ownerUserID == "" {
  1330. if err := s.userManager.Authorize(v.User(), t.ID, user.PermissionWrite); err == nil {
  1331. writableRateTopics = append(writableRateTopics, t)
  1332. }
  1333. } else if ownerUserID == v.MaybeUserID() {
  1334. writableRateTopics = append(writableRateTopics, t)
  1335. }
  1336. }
  1337. return s.setRateVisitors(r, v, writableRateTopics)
  1338. }
  1339. func (s *Server) setRateVisitors(r *http.Request, v *visitor, rateTopics []*topic) error {
  1340. for _, t := range rateTopics {
  1341. logvr(v, r).
  1342. Tag(tagSubscribe).
  1343. With(t).
  1344. Debug("Setting visitor as rate visitor for topic %s", t.ID)
  1345. t.SetRateVisitor(v)
  1346. }
  1347. return nil
  1348. }
  1349. // sendOldMessages selects old messages from the messageCache and calls sub for each of them. It uses since as the
  1350. // marker, returning only messages that are newer than the marker.
  1351. func (s *Server) sendOldMessages(topics []*topic, since sinceMarker, scheduled bool, v *visitor, sub subscriber) error {
  1352. if since.IsNone() {
  1353. return nil
  1354. }
  1355. messages := make([]*message, 0)
  1356. for _, t := range topics {
  1357. topicMessages, err := s.messageCache.Messages(t.ID, since, scheduled)
  1358. if err != nil {
  1359. return err
  1360. }
  1361. messages = append(messages, topicMessages...)
  1362. }
  1363. sort.Slice(messages, func(i, j int) bool {
  1364. return messages[i].Time < messages[j].Time
  1365. })
  1366. for _, m := range messages {
  1367. if err := sub(v, m); err != nil {
  1368. return err
  1369. }
  1370. }
  1371. return nil
  1372. }
  1373. // parseSince returns a timestamp identifying the time span from which cached messages should be received.
  1374. //
  1375. // Values in the "since=..." parameter can be either a unix timestamp or a duration (e.g. 12h), or
  1376. // "all" for all messages.
  1377. func parseSince(r *http.Request, poll bool) (sinceMarker, error) {
  1378. since := readParam(r, "x-since", "since", "si")
  1379. // Easy cases (empty, all, none)
  1380. if since == "" {
  1381. if poll {
  1382. return sinceAllMessages, nil
  1383. }
  1384. return sinceNoMessages, nil
  1385. } else if since == "all" {
  1386. return sinceAllMessages, nil
  1387. } else if since == "none" {
  1388. return sinceNoMessages, nil
  1389. }
  1390. // ID, timestamp, duration
  1391. if validMessageID(since) {
  1392. return newSinceID(since), nil
  1393. } else if s, err := strconv.ParseInt(since, 10, 64); err == nil {
  1394. return newSinceTime(s), nil
  1395. } else if d, err := time.ParseDuration(since); err == nil {
  1396. return newSinceTime(time.Now().Add(-1 * d).Unix()), nil
  1397. }
  1398. return sinceNoMessages, errHTTPBadRequestSinceInvalid
  1399. }
  1400. func (s *Server) handleOptions(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  1401. w.Header().Set("Access-Control-Allow-Methods", "GET, PUT, POST, PATCH, DELETE")
  1402. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1403. w.Header().Set("Access-Control-Allow-Headers", "*") // CORS, allow auth via JS // FIXME is this terrible?
  1404. return nil
  1405. }
  1406. func (s *Server) topicFromPath(path string) (*topic, error) {
  1407. parts := strings.Split(path, "/")
  1408. if len(parts) < 2 {
  1409. return nil, errHTTPBadRequestTopicInvalid
  1410. }
  1411. return s.topicFromID(parts[1])
  1412. }
  1413. func (s *Server) topicsFromPath(path string) ([]*topic, string, error) {
  1414. parts := strings.Split(path, "/")
  1415. if len(parts) < 2 {
  1416. return nil, "", errHTTPBadRequestTopicInvalid
  1417. }
  1418. topicIDs := util.SplitNoEmpty(parts[1], ",")
  1419. topics, err := s.topicsFromIDs(topicIDs...)
  1420. if err != nil {
  1421. return nil, "", errHTTPBadRequestTopicInvalid
  1422. }
  1423. return topics, parts[1], nil
  1424. }
  1425. func (s *Server) topicsFromIDs(ids ...string) ([]*topic, error) {
  1426. s.mu.Lock()
  1427. defer s.mu.Unlock()
  1428. topics := make([]*topic, 0)
  1429. for _, id := range ids {
  1430. if util.Contains(s.config.DisallowedTopics, id) {
  1431. return nil, errHTTPBadRequestTopicDisallowed
  1432. }
  1433. if _, ok := s.topics[id]; !ok {
  1434. if len(s.topics) >= s.config.TotalTopicLimit {
  1435. return nil, errHTTPTooManyRequestsLimitTotalTopics
  1436. }
  1437. s.topics[id] = newTopic(id)
  1438. }
  1439. topics = append(topics, s.topics[id])
  1440. }
  1441. return topics, nil
  1442. }
  1443. func (s *Server) topicFromID(id string) (*topic, error) {
  1444. topics, err := s.topicsFromIDs(id)
  1445. if err != nil {
  1446. return nil, err
  1447. }
  1448. return topics[0], nil
  1449. }
  1450. func (s *Server) runSMTPServer() error {
  1451. s.smtpServerBackend = newMailBackend(s.config, s.handle)
  1452. s.smtpServer = smtp.NewServer(s.smtpServerBackend)
  1453. s.smtpServer.Addr = s.config.SMTPServerListen
  1454. s.smtpServer.Domain = s.config.SMTPServerDomain
  1455. s.smtpServer.ReadTimeout = 10 * time.Second
  1456. s.smtpServer.WriteTimeout = 10 * time.Second
  1457. s.smtpServer.MaxMessageBytes = 1024 * 1024 // Must be much larger than message size (headers, multipart, etc.)
  1458. s.smtpServer.MaxRecipients = 1
  1459. s.smtpServer.AllowInsecureAuth = true
  1460. return s.smtpServer.ListenAndServe()
  1461. }
  1462. func (s *Server) runManager() {
  1463. for {
  1464. select {
  1465. case <-time.After(s.config.ManagerInterval):
  1466. log.
  1467. Tag(tagManager).
  1468. Timing(s.execManager).
  1469. Debug("Manager finished")
  1470. case <-s.closeChan:
  1471. return
  1472. }
  1473. }
  1474. }
  1475. // runStatsResetter runs once a day (usually midnight UTC) to reset all the visitor's message and
  1476. // email counters. The stats are used to display the counters in the web app, as well as for rate limiting.
  1477. func (s *Server) runStatsResetter() {
  1478. for {
  1479. runAt := util.NextOccurrenceUTC(s.config.VisitorStatsResetTime, time.Now())
  1480. timer := time.NewTimer(time.Until(runAt))
  1481. log.Tag(tagResetter).Debug("Waiting until %v to reset visitor stats", runAt)
  1482. select {
  1483. case <-timer.C:
  1484. log.Tag(tagResetter).Debug("Running stats resetter")
  1485. s.resetStats()
  1486. case <-s.closeChan:
  1487. log.Tag(tagResetter).Debug("Stopping stats resetter")
  1488. timer.Stop()
  1489. return
  1490. }
  1491. }
  1492. }
  1493. func (s *Server) resetStats() {
  1494. log.Info("Resetting all visitor stats (daily task)")
  1495. s.mu.Lock()
  1496. defer s.mu.Unlock() // Includes the database query to avoid races with other processes
  1497. for _, v := range s.visitors {
  1498. v.ResetStats()
  1499. }
  1500. if s.userManager != nil {
  1501. if err := s.userManager.ResetStats(); err != nil {
  1502. log.Tag(tagResetter).Warn("Failed to write to database: %s", err.Error())
  1503. }
  1504. }
  1505. }
  1506. func (s *Server) runFirebaseKeepaliver() {
  1507. if s.firebaseClient == nil {
  1508. return
  1509. }
  1510. v := newVisitor(s.config, s.messageCache, s.userManager, netip.IPv4Unspecified(), nil) // Background process, not a real visitor, uses IP 0.0.0.0
  1511. for {
  1512. select {
  1513. case <-time.After(s.config.FirebaseKeepaliveInterval):
  1514. s.sendToFirebase(v, newKeepaliveMessage(firebaseControlTopic))
  1515. /*
  1516. FIXME: Disable iOS polling entirely for now due to thundering herd problem (see #677)
  1517. To solve this, we'd have to shard the iOS poll topics to spread out the polling evenly.
  1518. Given that it's not really necessary to poll, turning it off for now should not have any impact.
  1519. case <-time.After(s.config.FirebasePollInterval):
  1520. s.sendToFirebase(v, newKeepaliveMessage(firebasePollTopic))
  1521. */
  1522. case <-s.closeChan:
  1523. return
  1524. }
  1525. }
  1526. }
  1527. func (s *Server) runDelayedSender() {
  1528. for {
  1529. select {
  1530. case <-time.After(s.config.DelayedSenderInterval):
  1531. if err := s.sendDelayedMessages(); err != nil {
  1532. log.Tag(tagPublish).Err(err).Warn("Error sending delayed messages")
  1533. }
  1534. case <-s.closeChan:
  1535. return
  1536. }
  1537. }
  1538. }
  1539. func (s *Server) sendDelayedMessages() error {
  1540. messages, err := s.messageCache.MessagesDue()
  1541. if err != nil {
  1542. return err
  1543. }
  1544. for _, m := range messages {
  1545. var u *user.User
  1546. if s.userManager != nil && m.User != "" {
  1547. u, err = s.userManager.UserByID(m.User)
  1548. if err != nil {
  1549. log.With(m).Err(err).Warn("Error sending delayed message")
  1550. continue
  1551. }
  1552. }
  1553. v := s.visitor(m.Sender, u)
  1554. if err := s.sendDelayedMessage(v, m); err != nil {
  1555. logvm(v, m).Err(err).Warn("Error sending delayed message")
  1556. }
  1557. }
  1558. return nil
  1559. }
  1560. func (s *Server) sendDelayedMessage(v *visitor, m *message) error {
  1561. logvm(v, m).Debug("Sending delayed message")
  1562. s.mu.RLock()
  1563. t, ok := s.topics[m.Topic] // If no subscribers, just mark message as published
  1564. s.mu.RUnlock()
  1565. if ok {
  1566. go func() {
  1567. // We do not rate-limit messages here, since we've rate limited them in the PUT/POST handler
  1568. if err := t.Publish(v, m); err != nil {
  1569. logvm(v, m).Err(err).Warn("Unable to publish message")
  1570. }
  1571. }()
  1572. }
  1573. if s.firebaseClient != nil { // Firebase subscribers may not show up in topics map
  1574. go s.sendToFirebase(v, m)
  1575. }
  1576. if s.config.UpstreamBaseURL != "" {
  1577. go s.forwardPollRequest(v, m)
  1578. }
  1579. if err := s.messageCache.MarkPublished(m); err != nil {
  1580. return err
  1581. }
  1582. return nil
  1583. }
  1584. // transformBodyJSON peeks the request body, reads the JSON, and converts it to headers
  1585. // before passing it on to the next handler. This is meant to be used in combination with handlePublish.
  1586. func (s *Server) transformBodyJSON(next handleFunc) handleFunc {
  1587. return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1588. m, err := readJSONWithLimit[publishMessage](r.Body, s.config.MessageLimit*2, false) // 2x to account for JSON format overhead
  1589. if err != nil {
  1590. return err
  1591. }
  1592. if !topicRegex.MatchString(m.Topic) {
  1593. return errHTTPBadRequestTopicInvalid
  1594. }
  1595. if m.Message == "" {
  1596. m.Message = emptyMessageBody
  1597. }
  1598. r.URL.Path = "/" + m.Topic
  1599. r.Body = io.NopCloser(strings.NewReader(m.Message))
  1600. if m.Title != "" {
  1601. r.Header.Set("X-Title", m.Title)
  1602. }
  1603. if m.Priority != 0 {
  1604. r.Header.Set("X-Priority", fmt.Sprintf("%d", m.Priority))
  1605. }
  1606. if m.Tags != nil && len(m.Tags) > 0 {
  1607. r.Header.Set("X-Tags", strings.Join(m.Tags, ","))
  1608. }
  1609. if m.Attach != "" {
  1610. r.Header.Set("X-Attach", m.Attach)
  1611. }
  1612. if m.Filename != "" {
  1613. r.Header.Set("X-Filename", m.Filename)
  1614. }
  1615. if m.Click != "" {
  1616. r.Header.Set("X-Click", m.Click)
  1617. }
  1618. if m.Icon != "" {
  1619. r.Header.Set("X-Icon", m.Icon)
  1620. }
  1621. if len(m.Actions) > 0 {
  1622. actionsStr, err := json.Marshal(m.Actions)
  1623. if err != nil {
  1624. return errHTTPBadRequestMessageJSONInvalid
  1625. }
  1626. r.Header.Set("X-Actions", string(actionsStr))
  1627. }
  1628. if m.Email != "" {
  1629. r.Header.Set("X-Email", m.Email)
  1630. }
  1631. if m.Delay != "" {
  1632. r.Header.Set("X-Delay", m.Delay)
  1633. }
  1634. return next(w, r, v)
  1635. }
  1636. }
  1637. func (s *Server) transformMatrixJSON(next handleFunc) handleFunc {
  1638. return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1639. newRequest, err := newRequestFromMatrixJSON(r, s.config.BaseURL, s.config.MessageLimit)
  1640. if err != nil {
  1641. logvr(v, r).Tag(tagMatrix).Err(err).Debug("Invalid Matrix request")
  1642. if e, ok := err.(*errMatrixPushkeyRejected); ok {
  1643. return writeMatrixResponse(w, e.rejectedPushKey)
  1644. }
  1645. return err
  1646. }
  1647. if err := next(w, newRequest, v); err != nil {
  1648. logvr(v, r).Tag(tagMatrix).Err(err).Debug("Error handling Matrix request")
  1649. return err
  1650. }
  1651. return nil
  1652. }
  1653. }
  1654. func (s *Server) authorizeTopicWrite(next handleFunc) handleFunc {
  1655. return s.autorizeTopic(next, user.PermissionWrite)
  1656. }
  1657. func (s *Server) authorizeTopicRead(next handleFunc) handleFunc {
  1658. return s.autorizeTopic(next, user.PermissionRead)
  1659. }
  1660. func (s *Server) autorizeTopic(next handleFunc, perm user.Permission) handleFunc {
  1661. return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1662. if s.userManager == nil {
  1663. return next(w, r, v)
  1664. }
  1665. topics, _, err := s.topicsFromPath(r.URL.Path)
  1666. if err != nil {
  1667. return err
  1668. }
  1669. u := v.User()
  1670. for _, t := range topics {
  1671. if err := s.userManager.Authorize(u, t.ID, perm); err != nil {
  1672. logvr(v, r).With(t).Err(err).Debug("Access to topic %s not authorized", t.ID)
  1673. return errHTTPForbidden.With(t)
  1674. }
  1675. }
  1676. return next(w, r, v)
  1677. }
  1678. }
  1679. // maybeAuthenticate reads the "Authorization" header and will try to authenticate the user
  1680. // if it is set.
  1681. //
  1682. // - If auth-file is not configured, immediately return an IP-based visitor
  1683. // - If the header is not set or not supported (anything non-Basic and non-Bearer),
  1684. // an IP-based visitor is returned
  1685. // - If the header is set, authenticate will be called to check the username/password (Basic auth),
  1686. // or the token (Bearer auth), and read the user from the database
  1687. //
  1688. // This function will ALWAYS return a visitor, even if an error occurs (e.g. unauthorized), so
  1689. // that subsequent logging calls still have a visitor context.
  1690. func (s *Server) maybeAuthenticate(r *http.Request) (*visitor, error) {
  1691. // Read "Authorization" header value, and exit out early if it's not set
  1692. ip := extractIPAddress(r, s.config.BehindProxy)
  1693. vip := s.visitor(ip, nil)
  1694. if s.userManager == nil {
  1695. return vip, nil
  1696. }
  1697. header, err := readAuthHeader(r)
  1698. if err != nil {
  1699. return vip, err
  1700. } else if !supportedAuthHeader(header) {
  1701. return vip, nil
  1702. }
  1703. // If we're trying to auth, check the rate limiter first
  1704. if !vip.AuthAllowed() {
  1705. return vip, errHTTPTooManyRequestsLimitAuthFailure // Always return visitor, even when error occurs!
  1706. }
  1707. u, err := s.authenticate(r, header)
  1708. if err != nil {
  1709. vip.AuthFailed()
  1710. logr(r).Err(err).Debug("Authentication failed")
  1711. return vip, errHTTPUnauthorized // Always return visitor, even when error occurs!
  1712. }
  1713. // Authentication with user was successful
  1714. return s.visitor(ip, u), nil
  1715. }
  1716. // authenticate a user based on basic auth username/password (Authorization: Basic ...), or token auth (Authorization: Bearer ...).
  1717. // The Authorization header can be passed as a header or the ?auth=... query param. The latter is required only to
  1718. // support the WebSocket JavaScript class, which does not support passing headers during the initial request. The auth
  1719. // query param is effectively doubly base64 encoded. Its format is base64(Basic base64(user:pass)).
  1720. func (s *Server) authenticate(r *http.Request, header string) (user *user.User, err error) {
  1721. if strings.HasPrefix(header, "Bearer") {
  1722. return s.authenticateBearerAuth(r, strings.TrimSpace(strings.TrimPrefix(header, "Bearer")))
  1723. }
  1724. return s.authenticateBasicAuth(r, header)
  1725. }
  1726. // readAuthHeader reads the raw value of the Authorization header, either from the actual HTTP header,
  1727. // or from the ?auth... query parameter
  1728. func readAuthHeader(r *http.Request) (string, error) {
  1729. value := strings.TrimSpace(r.Header.Get("Authorization"))
  1730. queryParam := readQueryParam(r, "authorization", "auth")
  1731. if queryParam != "" {
  1732. a, err := base64.RawURLEncoding.DecodeString(queryParam)
  1733. if err != nil {
  1734. return "", err
  1735. }
  1736. value = strings.TrimSpace(string(a))
  1737. }
  1738. return value, nil
  1739. }
  1740. // supportedAuthHeader returns true only if the Authorization header value starts
  1741. // with "Basic" or "Bearer". In particular, an empty value is not supported, and neither
  1742. // are things like "WebPush", or "vapid" (see #629).
  1743. func supportedAuthHeader(value string) bool {
  1744. value = strings.ToLower(value)
  1745. return strings.HasPrefix(value, "basic ") || strings.HasPrefix(value, "bearer ")
  1746. }
  1747. func (s *Server) authenticateBasicAuth(r *http.Request, value string) (user *user.User, err error) {
  1748. r.Header.Set("Authorization", value)
  1749. username, password, ok := r.BasicAuth()
  1750. if !ok {
  1751. return nil, errors.New("invalid basic auth")
  1752. } else if username == "" {
  1753. return s.authenticateBearerAuth(r, password) // Treat password as token
  1754. }
  1755. return s.userManager.Authenticate(username, password)
  1756. }
  1757. func (s *Server) authenticateBearerAuth(r *http.Request, token string) (*user.User, error) {
  1758. u, err := s.userManager.AuthenticateToken(token)
  1759. if err != nil {
  1760. return nil, err
  1761. }
  1762. ip := extractIPAddress(r, s.config.BehindProxy)
  1763. go s.userManager.EnqueueTokenUpdate(token, &user.TokenUpdate{
  1764. LastAccess: time.Now(),
  1765. LastOrigin: ip,
  1766. })
  1767. return u, nil
  1768. }
  1769. func (s *Server) visitor(ip netip.Addr, user *user.User) *visitor {
  1770. s.mu.Lock()
  1771. defer s.mu.Unlock()
  1772. id := visitorID(ip, user)
  1773. v, exists := s.visitors[id]
  1774. if !exists {
  1775. s.visitors[id] = newVisitor(s.config, s.messageCache, s.userManager, ip, user)
  1776. return s.visitors[id]
  1777. }
  1778. v.Keepalive()
  1779. v.SetUser(user) // Always update with the latest user, may be nil!
  1780. return v
  1781. }
  1782. func (s *Server) writeJSON(w http.ResponseWriter, v any) error {
  1783. w.Header().Set("Content-Type", "application/json")
  1784. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1785. if err := json.NewEncoder(w).Encode(v); err != nil {
  1786. return err
  1787. }
  1788. return nil
  1789. }
  1790. func (s *Server) updateAndWriteStats(messagesCount int64) {
  1791. s.mu.Lock()
  1792. s.messagesHistory = append(s.messagesHistory, messagesCount)
  1793. if len(s.messagesHistory) > messagesHistoryMax {
  1794. s.messagesHistory = s.messagesHistory[1:]
  1795. }
  1796. s.mu.Unlock()
  1797. go func() {
  1798. if err := s.messageCache.UpdateStats(messagesCount); err != nil {
  1799. log.Tag(tagManager).Err(err).Warn("Cannot write messages stats")
  1800. }
  1801. }()
  1802. }