server.go 83 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244
  1. package server
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/sha256"
  6. "embed"
  7. "encoding/base64"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "io"
  12. "net"
  13. "net/http"
  14. "net/http/pprof"
  15. "net/netip"
  16. "net/url"
  17. "os"
  18. "path"
  19. "path/filepath"
  20. "regexp"
  21. "sort"
  22. "strconv"
  23. "strings"
  24. "sync"
  25. "text/template"
  26. "time"
  27. "unicode/utf8"
  28. "github.com/emersion/go-smtp"
  29. "github.com/gorilla/websocket"
  30. "github.com/prometheus/client_golang/prometheus/promhttp"
  31. "golang.org/x/sync/errgroup"
  32. "gopkg.in/yaml.v2"
  33. "heckel.io/ntfy/v2/log"
  34. "heckel.io/ntfy/v2/payments"
  35. "heckel.io/ntfy/v2/user"
  36. "heckel.io/ntfy/v2/util"
  37. "heckel.io/ntfy/v2/util/sprig"
  38. )
  39. // Server is the main server, providing the UI and API for ntfy
  40. type Server struct {
  41. config *Config
  42. httpServer *http.Server
  43. httpsServer *http.Server
  44. httpMetricsServer *http.Server
  45. httpProfileServer *http.Server
  46. unixListener net.Listener
  47. smtpServer *smtp.Server
  48. smtpServerBackend *smtpBackend
  49. smtpSender mailer
  50. topics map[string]*topic
  51. visitors map[string]*visitor // ip:<ip> or user:<user>
  52. firebaseClient *firebaseClient
  53. messages int64 // Total number of messages (persisted if messageCache enabled)
  54. messagesHistory []int64 // Last n values of the messages counter, used to determine rate
  55. userManager *user.Manager // Might be nil!
  56. messageCache *messageCache // Database that stores the messages
  57. webPush *webPushStore // Database that stores web push subscriptions
  58. fileCache *fileCache // File system based cache that stores attachments
  59. stripe stripeAPI // Stripe API, can be replaced with a mock
  60. priceCache *util.LookupCache[map[string]int64] // Stripe price ID -> price as cents (USD implied!)
  61. metricsHandler http.Handler // Handles /metrics if enable-metrics set, and listen-metrics-http not set
  62. closeChan chan bool
  63. mu sync.RWMutex
  64. }
  65. // handleFunc extends the normal http.HandlerFunc to be able to easily return errors
  66. type handleFunc func(http.ResponseWriter, *http.Request, *visitor) error
  67. var (
  68. // If changed, don't forget to update Android App and auth_sqlite.go
  69. topicRegex = regexp.MustCompile(`^[-_A-Za-z0-9]{1,64}$`) // No /!
  70. topicPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app!
  71. externalTopicPathRegex = regexp.MustCompile(`^/[^/]+\.[^/]+/[-_A-Za-z0-9]{1,64}$`) // Extended topic path, for web-app, e.g. /example.com/mytopic
  72. jsonPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/json$`)
  73. ssePathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/sse$`)
  74. rawPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/raw$`)
  75. wsPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/ws$`)
  76. authPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/auth$`)
  77. publishPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}/(publish|send|trigger)$`)
  78. updatePathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}/[-_A-Za-z0-9]{1,64}$`)
  79. clearPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}/[-_A-Za-z0-9]{1,64}/(read|clear)$`)
  80. sequenceIDRegex = topicRegex
  81. webConfigPath = "/config.js"
  82. webManifestPath = "/manifest.webmanifest"
  83. accountPath = "/account"
  84. matrixPushPath = "/_matrix/push/v1/notify"
  85. metricsPath = "/metrics"
  86. apiHealthPath = "/v1/health"
  87. apiConfigPath = "/v1/config"
  88. apiStatsPath = "/v1/stats"
  89. apiWebPushPath = "/v1/webpush"
  90. apiTiersPath = "/v1/tiers"
  91. apiUsersPath = "/v1/users"
  92. apiUsersAccessPath = "/v1/users/access"
  93. apiAccountPath = "/v1/account"
  94. apiAccountTokenPath = "/v1/account/token"
  95. apiAccountPasswordPath = "/v1/account/password"
  96. apiAccountSettingsPath = "/v1/account/settings"
  97. apiAccountSubscriptionPath = "/v1/account/subscription"
  98. apiAccountReservationPath = "/v1/account/reservation"
  99. apiAccountPhonePath = "/v1/account/phone"
  100. apiAccountPhoneVerifyPath = "/v1/account/phone/verify"
  101. apiAccountBillingPortalPath = "/v1/account/billing/portal"
  102. apiAccountBillingWebhookPath = "/v1/account/billing/webhook"
  103. apiAccountBillingSubscriptionPath = "/v1/account/billing/subscription"
  104. apiAccountBillingSubscriptionCheckoutSuccessTemplate = "/v1/account/billing/subscription/success/{CHECKOUT_SESSION_ID}"
  105. apiAccountBillingSubscriptionCheckoutSuccessRegex = regexp.MustCompile(`/v1/account/billing/subscription/success/(.+)$`)
  106. apiAccountReservationSingleRegex = regexp.MustCompile(`/v1/account/reservation/([-_A-Za-z0-9]{1,64})$`)
  107. staticRegex = regexp.MustCompile(`^/(static/.+|app.html|sw.js|sw.js.map)$`)
  108. docsRegex = regexp.MustCompile(`^/docs(|/.*)$`)
  109. fileRegex = regexp.MustCompile(`^/file/([-_A-Za-z0-9]{1,64})(?:\.[A-Za-z0-9]{1,16})?$`)
  110. urlRegex = regexp.MustCompile(`^https?://`)
  111. phoneNumberRegex = regexp.MustCompile(`^\+\d{1,100}$`)
  112. //go:embed site
  113. webFs embed.FS
  114. webFsCached = &util.CachingEmbedFS{ModTime: time.Now(), FS: webFs}
  115. webSiteDir = "/site"
  116. webAppIndex = "/app.html" // React app
  117. //go:embed docs
  118. docsStaticFs embed.FS
  119. docsStaticCached = &util.CachingEmbedFS{ModTime: time.Now(), FS: docsStaticFs}
  120. //go:embed templates
  121. templatesFs embed.FS // Contains template config files (e.g. grafana.yml, github.yml, ...)
  122. templatesDir = "templates"
  123. // templateDisallowedRegex tests a template for disallowed expressions. While not really dangerous, they
  124. // are not useful, and seem potentially troublesome.
  125. templateDisallowedRegex = regexp.MustCompile(`(?m)\{\{-?\s*(call|template|define)\b`)
  126. templateNameRegex = regexp.MustCompile(`^[-_A-Za-z0-9]+$`)
  127. )
  128. const (
  129. firebaseControlTopic = "~control" // See Android if changed
  130. firebasePollTopic = "~poll" // See iOS if changed (DISABLED for now)
  131. emptyMessageBody = "triggered" // Used when a message body is empty
  132. newMessageBody = "New message" // Used in poll requests as generic message
  133. defaultAttachmentMessage = "You received a file: %s" // Used if message body is empty, and there is an attachment
  134. encodingBase64 = "base64" // Used mainly for binary UnifiedPush messages
  135. jsonBodyBytesLimit = 131072 // Max number of bytes for a request bodys (unless MessageLimit is higher)
  136. unifiedPushTopicPrefix = "up" // Temporarily, we rate limit all "up*" topics based on the subscriber
  137. unifiedPushTopicLength = 14 // Length of UnifiedPush topics, including the "up" part
  138. messagesHistoryMax = 10 // Number of message count values to keep in memory
  139. templateMaxExecutionTime = 100 * time.Millisecond // Maximum time a template can take to execute, used to prevent DoS attacks
  140. templateMaxOutputBytes = 1024 * 1024 // Maximum number of bytes a template can output, used to prevent DoS attacks
  141. templateFileExtension = ".yml" // Template files must end with this extension
  142. )
  143. // WebSocket constants
  144. const (
  145. wsWriteWait = 2 * time.Second
  146. wsBufferSize = 1024
  147. wsReadLimit = 64 // We only ever receive PINGs
  148. wsPongWait = 15 * time.Second
  149. )
  150. // New instantiates a new Server. It creates the cache and adds a Firebase
  151. // subscriber (if configured).
  152. func New(conf *Config) (*Server, error) {
  153. var mailer mailer
  154. if conf.SMTPSenderAddr != "" {
  155. mailer = &smtpSender{config: conf}
  156. }
  157. var stripe stripeAPI
  158. if payments.Available && conf.StripeSecretKey != "" {
  159. stripe = newStripeAPI()
  160. }
  161. messageCache, err := createMessageCache(conf)
  162. if err != nil {
  163. return nil, err
  164. }
  165. var webPush *webPushStore
  166. if conf.WebPushPublicKey != "" {
  167. webPush, err = newWebPushStore(conf.WebPushFile, conf.WebPushStartupQueries)
  168. if err != nil {
  169. return nil, err
  170. }
  171. }
  172. topics, err := messageCache.Topics()
  173. if err != nil {
  174. return nil, err
  175. }
  176. messages, err := messageCache.Stats()
  177. if err != nil {
  178. return nil, err
  179. }
  180. var fileCache *fileCache
  181. if conf.AttachmentCacheDir != "" {
  182. fileCache, err = newFileCache(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit)
  183. if err != nil {
  184. return nil, err
  185. }
  186. }
  187. var userManager *user.Manager
  188. if conf.AuthFile != "" {
  189. authConfig := &user.Config{
  190. Filename: conf.AuthFile,
  191. StartupQueries: conf.AuthStartupQueries,
  192. DefaultAccess: conf.AuthDefault,
  193. ProvisionEnabled: true, // Enable provisioning of users and access
  194. Users: conf.AuthUsers,
  195. Access: conf.AuthAccess,
  196. Tokens: conf.AuthTokens,
  197. BcryptCost: conf.AuthBcryptCost,
  198. QueueWriterInterval: conf.AuthStatsQueueWriterInterval,
  199. }
  200. userManager, err = user.NewManager(authConfig)
  201. if err != nil {
  202. return nil, err
  203. }
  204. }
  205. var firebaseClient *firebaseClient
  206. if conf.FirebaseKeyFile != "" {
  207. sender, err := newFirebaseSender(conf.FirebaseKeyFile)
  208. if err != nil {
  209. return nil, err
  210. }
  211. // This awkward logic is required because Go is weird about nil types and interfaces.
  212. // See issue #641, and https://go.dev/play/p/uur1flrv1t3 for an example
  213. var auther user.Auther
  214. if userManager != nil {
  215. auther = userManager
  216. }
  217. firebaseClient = newFirebaseClient(sender, auther)
  218. }
  219. s := &Server{
  220. config: conf,
  221. messageCache: messageCache,
  222. webPush: webPush,
  223. fileCache: fileCache,
  224. firebaseClient: firebaseClient,
  225. smtpSender: mailer,
  226. topics: topics,
  227. userManager: userManager,
  228. messages: messages,
  229. messagesHistory: []int64{messages},
  230. visitors: make(map[string]*visitor),
  231. stripe: stripe,
  232. }
  233. s.priceCache = util.NewLookupCache(s.fetchStripePrices, conf.StripePriceCacheDuration)
  234. return s, nil
  235. }
  236. func createMessageCache(conf *Config) (*messageCache, error) {
  237. if conf.CacheDuration == 0 {
  238. return newNopCache()
  239. } else if conf.CacheFile != "" {
  240. return newSqliteCache(conf.CacheFile, conf.CacheStartupQueries, conf.CacheDuration, conf.CacheBatchSize, conf.CacheBatchTimeout, false)
  241. }
  242. return newMemCache()
  243. }
  244. // Run executes the main server. It listens on HTTP (+ HTTPS, if configured), and starts
  245. // a manager go routine to print stats and prune messages.
  246. func (s *Server) Run() error {
  247. var listenStr string
  248. if s.config.ListenHTTP != "" {
  249. listenStr += fmt.Sprintf(" %s[http]", s.config.ListenHTTP)
  250. }
  251. if s.config.ListenHTTPS != "" {
  252. listenStr += fmt.Sprintf(" %s[https]", s.config.ListenHTTPS)
  253. }
  254. if s.config.ListenUnix != "" {
  255. listenStr += fmt.Sprintf(" %s[unix]", s.config.ListenUnix)
  256. }
  257. if s.config.SMTPServerListen != "" {
  258. listenStr += fmt.Sprintf(" %s[smtp]", s.config.SMTPServerListen)
  259. }
  260. if s.config.MetricsListenHTTP != "" {
  261. listenStr += fmt.Sprintf(" %s[http/metrics]", s.config.MetricsListenHTTP)
  262. }
  263. if s.config.ProfileListenHTTP != "" {
  264. listenStr += fmt.Sprintf(" %s[http/profile]", s.config.ProfileListenHTTP)
  265. }
  266. log.Tag(tagStartup).Info("Listening on%s, ntfy %s, log level is %s", listenStr, s.config.BuildVersion, log.CurrentLevel().String())
  267. if log.IsFile() {
  268. fmt.Fprintf(os.Stderr, "Listening on%s, ntfy %s\n", listenStr, s.config.BuildVersion)
  269. fmt.Fprintf(os.Stderr, "Logs are written to %s\n", log.File())
  270. }
  271. mux := http.NewServeMux()
  272. mux.HandleFunc("/", s.handle)
  273. errChan := make(chan error)
  274. s.mu.Lock()
  275. s.closeChan = make(chan bool)
  276. if s.config.ListenHTTP != "" {
  277. s.httpServer = &http.Server{Addr: s.config.ListenHTTP, Handler: mux}
  278. go func() {
  279. errChan <- s.httpServer.ListenAndServe()
  280. }()
  281. }
  282. if s.config.ListenHTTPS != "" {
  283. s.httpsServer = &http.Server{Addr: s.config.ListenHTTPS, Handler: mux}
  284. go func() {
  285. errChan <- s.httpsServer.ListenAndServeTLS(s.config.CertFile, s.config.KeyFile)
  286. }()
  287. }
  288. if s.config.ListenUnix != "" {
  289. go func() {
  290. var err error
  291. s.mu.Lock()
  292. os.Remove(s.config.ListenUnix)
  293. s.unixListener, err = net.Listen("unix", s.config.ListenUnix)
  294. if err != nil {
  295. s.mu.Unlock()
  296. errChan <- err
  297. return
  298. }
  299. defer s.unixListener.Close()
  300. if s.config.ListenUnixMode > 0 {
  301. if err := os.Chmod(s.config.ListenUnix, s.config.ListenUnixMode); err != nil {
  302. s.mu.Unlock()
  303. errChan <- err
  304. return
  305. }
  306. }
  307. s.mu.Unlock()
  308. httpServer := &http.Server{Handler: mux}
  309. errChan <- httpServer.Serve(s.unixListener)
  310. }()
  311. }
  312. if s.config.MetricsListenHTTP != "" {
  313. initMetrics()
  314. s.httpMetricsServer = &http.Server{Addr: s.config.MetricsListenHTTP, Handler: promhttp.Handler()}
  315. go func() {
  316. errChan <- s.httpMetricsServer.ListenAndServe()
  317. }()
  318. } else if s.config.EnableMetrics {
  319. initMetrics()
  320. s.metricsHandler = promhttp.Handler()
  321. }
  322. if s.config.ProfileListenHTTP != "" {
  323. profileMux := http.NewServeMux()
  324. profileMux.HandleFunc("/debug/pprof/", pprof.Index)
  325. profileMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
  326. profileMux.HandleFunc("/debug/pprof/profile", pprof.Profile)
  327. profileMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
  328. profileMux.HandleFunc("/debug/pprof/trace", pprof.Trace)
  329. s.httpProfileServer = &http.Server{Addr: s.config.ProfileListenHTTP, Handler: profileMux}
  330. go func() {
  331. errChan <- s.httpProfileServer.ListenAndServe()
  332. }()
  333. }
  334. if s.config.SMTPServerListen != "" {
  335. go func() {
  336. errChan <- s.runSMTPServer()
  337. }()
  338. }
  339. s.mu.Unlock()
  340. go s.runManager()
  341. go s.runStatsResetter()
  342. go s.runDelayedSender()
  343. go s.runFirebaseKeepaliver()
  344. return <-errChan
  345. }
  346. // Stop stops HTTP (+HTTPS) server and all managers
  347. func (s *Server) Stop() {
  348. s.mu.Lock()
  349. defer s.mu.Unlock()
  350. if s.httpServer != nil {
  351. s.httpServer.Close()
  352. }
  353. if s.httpsServer != nil {
  354. s.httpsServer.Close()
  355. }
  356. if s.unixListener != nil {
  357. s.unixListener.Close()
  358. }
  359. if s.smtpServer != nil {
  360. s.smtpServer.Close()
  361. }
  362. s.closeDatabases()
  363. close(s.closeChan)
  364. }
  365. func (s *Server) closeDatabases() {
  366. if s.userManager != nil {
  367. s.userManager.Close()
  368. }
  369. s.messageCache.Close()
  370. if s.webPush != nil {
  371. s.webPush.Close()
  372. }
  373. }
  374. // handle is the main entry point for all HTTP requests
  375. func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
  376. v, err := s.maybeAuthenticate(r) // Note: Always returns v, even when error is returned
  377. if err != nil {
  378. s.handleError(w, r, v, err)
  379. return
  380. }
  381. ev := logvr(v, r)
  382. if ev.IsTrace() {
  383. ev.Field("http_request", renderHTTPRequest(r)).Trace("HTTP request started")
  384. } else if logvr(v, r).IsDebug() {
  385. ev.Debug("HTTP request started")
  386. }
  387. logvr(v, r).
  388. Timing(func() {
  389. if err := s.handleInternal(w, r, v); err != nil {
  390. s.handleError(w, r, v, err)
  391. return
  392. }
  393. if metricHTTPRequests != nil {
  394. metricHTTPRequests.WithLabelValues("200", "20000", r.Method).Inc()
  395. }
  396. }).
  397. Debug("HTTP request finished")
  398. }
  399. func (s *Server) handleError(w http.ResponseWriter, r *http.Request, v *visitor, err error) {
  400. httpErr, ok := err.(*errHTTP)
  401. if !ok {
  402. httpErr = errHTTPInternalError
  403. }
  404. if metricHTTPRequests != nil {
  405. metricHTTPRequests.WithLabelValues(fmt.Sprintf("%d", httpErr.HTTPCode), fmt.Sprintf("%d", httpErr.Code), r.Method).Inc()
  406. }
  407. isRateLimiting := util.Contains(rateLimitingErrorCodes, httpErr.HTTPCode)
  408. isNormalError := strings.Contains(err.Error(), "i/o timeout") || util.Contains(normalErrorCodes, httpErr.HTTPCode)
  409. ev := logvr(v, r).Err(err)
  410. if websocket.IsWebSocketUpgrade(r) {
  411. ev.Tag(tagWebsocket).Fields(websocketErrorContext(err))
  412. if isNormalError {
  413. ev.Debug("WebSocket error (this error is okay, it happens a lot): %s", err.Error())
  414. } else {
  415. ev.Info("WebSocket error: %s", err.Error())
  416. }
  417. w.WriteHeader(httpErr.HTTPCode)
  418. return // Do not attempt to write any body to upgraded connection
  419. }
  420. if isNormalError {
  421. ev.Debug("Connection closed with HTTP %d (ntfy error %d)", httpErr.HTTPCode, httpErr.Code)
  422. } else {
  423. ev.Info("Connection closed with HTTP %d (ntfy error %d)", httpErr.HTTPCode, httpErr.Code)
  424. }
  425. if isRateLimiting && s.config.StripeSecretKey != "" {
  426. u := v.User()
  427. if u == nil || u.Tier == nil {
  428. httpErr = httpErr.Wrap("increase your limits with a paid plan, see %s", s.config.BaseURL)
  429. }
  430. }
  431. w.Header().Set("Content-Type", "application/json")
  432. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  433. w.WriteHeader(httpErr.HTTPCode)
  434. io.WriteString(w, httpErr.JSON()+"\n")
  435. }
  436. func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request, v *visitor) error {
  437. if r.Method == http.MethodGet && r.URL.Path == "/" && s.config.WebRoot == "/" {
  438. return s.ensureWebEnabled(s.handleRoot)(w, r, v)
  439. } else if r.Method == http.MethodHead && r.URL.Path == "/" {
  440. return s.ensureWebEnabled(s.handleEmpty)(w, r, v)
  441. } else if r.Method == http.MethodGet && r.URL.Path == apiHealthPath {
  442. return s.handleHealth(w, r, v)
  443. } else if r.Method == http.MethodGet && r.URL.Path == apiConfigPath {
  444. return s.handleConfig(w, r, v)
  445. } else if r.Method == http.MethodGet && r.URL.Path == webConfigPath {
  446. return s.ensureWebEnabled(s.handleWebConfig)(w, r, v)
  447. } else if r.Method == http.MethodGet && r.URL.Path == webManifestPath {
  448. return s.ensureWebPushEnabled(s.handleWebManifest)(w, r, v)
  449. } else if r.Method == http.MethodGet && r.URL.Path == apiUsersPath {
  450. return s.ensureAdmin(s.handleUsersGet)(w, r, v)
  451. } else if r.Method == http.MethodPost && r.URL.Path == apiUsersPath {
  452. return s.ensureAdmin(s.handleUsersAdd)(w, r, v)
  453. } else if r.Method == http.MethodPut && r.URL.Path == apiUsersPath {
  454. return s.ensureAdmin(s.handleUsersUpdate)(w, r, v)
  455. } else if r.Method == http.MethodDelete && r.URL.Path == apiUsersPath {
  456. return s.ensureAdmin(s.handleUsersDelete)(w, r, v)
  457. } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && r.URL.Path == apiUsersAccessPath {
  458. return s.ensureAdmin(s.handleAccessAllow)(w, r, v)
  459. } else if r.Method == http.MethodDelete && r.URL.Path == apiUsersAccessPath {
  460. return s.ensureAdmin(s.handleAccessReset)(w, r, v)
  461. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountPath {
  462. return s.ensureUserManager(s.handleAccountCreate)(w, r, v)
  463. } else if r.Method == http.MethodGet && r.URL.Path == apiAccountPath {
  464. return s.handleAccountGet(w, r, v) // Allowed by anonymous
  465. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountPath {
  466. return s.ensureUser(s.withAccountSync(s.handleAccountDelete))(w, r, v)
  467. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountPasswordPath {
  468. return s.ensureUser(s.handleAccountPasswordChange)(w, r, v)
  469. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountTokenPath {
  470. return s.ensureUser(s.withAccountSync(s.handleAccountTokenCreate))(w, r, v)
  471. } else if r.Method == http.MethodPatch && r.URL.Path == apiAccountTokenPath {
  472. return s.ensureUser(s.withAccountSync(s.handleAccountTokenUpdate))(w, r, v)
  473. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountTokenPath {
  474. return s.ensureUser(s.withAccountSync(s.handleAccountTokenDelete))(w, r, v)
  475. } else if r.Method == http.MethodPatch && r.URL.Path == apiAccountSettingsPath {
  476. return s.ensureUser(s.withAccountSync(s.handleAccountSettingsChange))(w, r, v)
  477. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountSubscriptionPath {
  478. return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionAdd))(w, r, v)
  479. } else if r.Method == http.MethodPatch && r.URL.Path == apiAccountSubscriptionPath {
  480. return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionChange))(w, r, v)
  481. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountSubscriptionPath {
  482. return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionDelete))(w, r, v)
  483. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountReservationPath {
  484. return s.ensureUser(s.withAccountSync(s.handleAccountReservationAdd))(w, r, v)
  485. } else if r.Method == http.MethodDelete && apiAccountReservationSingleRegex.MatchString(r.URL.Path) {
  486. return s.ensureUser(s.withAccountSync(s.handleAccountReservationDelete))(w, r, v)
  487. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingSubscriptionPath {
  488. return s.ensurePaymentsEnabled(s.ensureUser(s.handleAccountBillingSubscriptionCreate))(w, r, v) // Account sync via incoming Stripe webhook
  489. } else if r.Method == http.MethodGet && apiAccountBillingSubscriptionCheckoutSuccessRegex.MatchString(r.URL.Path) {
  490. return s.ensurePaymentsEnabled(s.ensureUserManager(s.handleAccountBillingSubscriptionCreateSuccess))(w, r, v) // No user context!
  491. } else if r.Method == http.MethodPut && r.URL.Path == apiAccountBillingSubscriptionPath {
  492. return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingSubscriptionUpdate))(w, r, v) // Account sync via incoming Stripe webhook
  493. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountBillingSubscriptionPath {
  494. return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingSubscriptionDelete))(w, r, v) // Account sync via incoming Stripe webhook
  495. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingPortalPath {
  496. return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingPortalSessionCreate))(w, r, v)
  497. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingWebhookPath {
  498. return s.ensurePaymentsEnabled(s.ensureUserManager(s.handleAccountBillingWebhook))(w, r, v) // This request comes from Stripe!
  499. } else if r.Method == http.MethodPut && r.URL.Path == apiAccountPhoneVerifyPath {
  500. return s.ensureUser(s.ensureCallsEnabled(s.withAccountSync(s.handleAccountPhoneNumberVerify)))(w, r, v)
  501. } else if r.Method == http.MethodPut && r.URL.Path == apiAccountPhonePath {
  502. return s.ensureUser(s.ensureCallsEnabled(s.withAccountSync(s.handleAccountPhoneNumberAdd)))(w, r, v)
  503. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountPhonePath {
  504. return s.ensureUser(s.ensureCallsEnabled(s.withAccountSync(s.handleAccountPhoneNumberDelete)))(w, r, v)
  505. } else if r.Method == http.MethodPost && apiWebPushPath == r.URL.Path {
  506. return s.ensureWebPushEnabled(s.limitRequests(s.handleWebPushUpdate))(w, r, v)
  507. } else if r.Method == http.MethodDelete && apiWebPushPath == r.URL.Path {
  508. return s.ensureWebPushEnabled(s.limitRequests(s.handleWebPushDelete))(w, r, v)
  509. } else if r.Method == http.MethodGet && r.URL.Path == apiStatsPath {
  510. return s.handleStats(w, r, v)
  511. } else if r.Method == http.MethodGet && r.URL.Path == apiTiersPath {
  512. return s.ensurePaymentsEnabled(s.handleBillingTiersGet)(w, r, v)
  513. } else if r.Method == http.MethodGet && r.URL.Path == matrixPushPath {
  514. return s.handleMatrixDiscovery(w)
  515. } else if r.Method == http.MethodGet && r.URL.Path == metricsPath && s.metricsHandler != nil {
  516. return s.handleMetrics(w, r, v)
  517. } else if r.Method == http.MethodGet && staticRegex.MatchString(r.URL.Path) {
  518. return s.ensureWebEnabled(s.handleStatic)(w, r, v)
  519. } else if r.Method == http.MethodGet && docsRegex.MatchString(r.URL.Path) {
  520. return s.ensureWebEnabled(s.handleDocs)(w, r, v)
  521. } else if (r.Method == http.MethodGet || r.Method == http.MethodHead) && fileRegex.MatchString(r.URL.Path) && s.config.AttachmentCacheDir != "" {
  522. return s.limitRequests(s.handleFile)(w, r, v)
  523. } else if r.Method == http.MethodOptions {
  524. return s.limitRequests(s.handleOptions)(w, r, v) // Should work even if the web app is not enabled, see #598
  525. } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && r.URL.Path == "/" {
  526. return s.transformBodyJSON(s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish)))(w, r, v)
  527. } else if r.Method == http.MethodPost && r.URL.Path == matrixPushPath {
  528. return s.transformMatrixJSON(s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublishMatrix)))(w, r, v)
  529. } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && (topicPathRegex.MatchString(r.URL.Path) || updatePathRegex.MatchString(r.URL.Path)) {
  530. return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish))(w, r, v)
  531. } else if r.Method == http.MethodDelete && updatePathRegex.MatchString(r.URL.Path) {
  532. return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handleDelete))(w, r, v)
  533. } else if r.Method == http.MethodPut && clearPathRegex.MatchString(r.URL.Path) {
  534. return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handleClear))(w, r, v)
  535. } else if r.Method == http.MethodGet && publishPathRegex.MatchString(r.URL.Path) {
  536. return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish))(w, r, v)
  537. } else if r.Method == http.MethodGet && jsonPathRegex.MatchString(r.URL.Path) {
  538. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeJSON))(w, r, v)
  539. } else if r.Method == http.MethodGet && ssePathRegex.MatchString(r.URL.Path) {
  540. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeSSE))(w, r, v)
  541. } else if r.Method == http.MethodGet && rawPathRegex.MatchString(r.URL.Path) {
  542. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeRaw))(w, r, v)
  543. } else if r.Method == http.MethodGet && wsPathRegex.MatchString(r.URL.Path) {
  544. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeWS))(w, r, v)
  545. } else if r.Method == http.MethodGet && authPathRegex.MatchString(r.URL.Path) {
  546. return s.limitRequests(s.authorizeTopicRead(s.handleTopicAuth))(w, r, v)
  547. } else if r.Method == http.MethodGet && (topicPathRegex.MatchString(r.URL.Path) || externalTopicPathRegex.MatchString(r.URL.Path)) {
  548. return s.ensureWebEnabled(s.handleTopic)(w, r, v)
  549. }
  550. return errHTTPNotFound
  551. }
  552. func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request, v *visitor) error {
  553. r.URL.Path = webAppIndex
  554. return s.handleStatic(w, r, v)
  555. }
  556. func (s *Server) handleTopic(w http.ResponseWriter, r *http.Request, v *visitor) error {
  557. unifiedpush := readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see PUT/POST too!
  558. if unifiedpush {
  559. w.Header().Set("Content-Type", "application/json")
  560. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  561. _, err := io.WriteString(w, `{"unifiedpush":{"version":1}}`+"\n")
  562. return err
  563. }
  564. r.URL.Path = webAppIndex
  565. return s.handleStatic(w, r, v)
  566. }
  567. func (s *Server) handleEmpty(_ http.ResponseWriter, _ *http.Request, _ *visitor) error {
  568. return nil
  569. }
  570. func (s *Server) handleTopicAuth(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  571. return s.writeJSON(w, newSuccessResponse())
  572. }
  573. func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  574. response := &apiHealthResponse{
  575. Healthy: true,
  576. }
  577. return s.writeJSON(w, response)
  578. }
  579. func (s *Server) handleConfig(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  580. w.Header().Set("Cache-Control", "no-cache")
  581. return s.writeJSON(w, s.configResponse())
  582. }
  583. func (s *Server) handleWebConfig(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  584. b, err := json.MarshalIndent(s.configResponse(), "", " ")
  585. if err != nil {
  586. return err
  587. }
  588. w.Header().Set("Content-Type", "text/javascript")
  589. w.Header().Set("Cache-Control", "no-cache")
  590. _, err = io.WriteString(w, fmt.Sprintf("// Generated server configuration\nvar config = %s;\n", string(b)))
  591. return err
  592. }
  593. func (s *Server) configResponse() *apiConfigResponse {
  594. return &apiConfigResponse{
  595. BaseURL: "", // Will translate to window.location.origin
  596. AppRoot: s.config.WebRoot,
  597. EnableLogin: s.config.EnableLogin,
  598. RequireLogin: s.config.RequireLogin,
  599. EnableSignup: s.config.EnableSignup,
  600. EnablePayments: s.config.StripeSecretKey != "",
  601. EnableCalls: s.config.TwilioAccount != "",
  602. EnableEmails: s.config.SMTPSenderFrom != "",
  603. EnableReservations: s.config.EnableReservations,
  604. EnableWebPush: s.config.WebPushPublicKey != "",
  605. BillingContact: s.config.BillingContact,
  606. WebPushPublicKey: s.config.WebPushPublicKey,
  607. DisallowedTopics: s.config.DisallowedTopics,
  608. ConfigHash: s.config.Hash(),
  609. }
  610. }
  611. // handleWebManifest serves the web app manifest for the progressive web app (PWA)
  612. func (s *Server) handleWebManifest(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  613. response := &webManifestResponse{
  614. Name: "ntfy web",
  615. Description: "ntfy lets you send push notifications via scripts from any computer or phone",
  616. ShortName: "ntfy",
  617. Scope: "/",
  618. StartURL: s.config.WebRoot,
  619. Display: "standalone",
  620. BackgroundColor: "#ffffff",
  621. ThemeColor: "#317f6f",
  622. Icons: []*webManifestIcon{
  623. {SRC: "/static/images/pwa-192x192.png", Sizes: "192x192", Type: "image/png"},
  624. {SRC: "/static/images/pwa-512x512.png", Sizes: "512x512", Type: "image/png"},
  625. },
  626. }
  627. return s.writeJSONWithContentType(w, response, "application/manifest+json")
  628. }
  629. // handleMetrics returns Prometheus metrics. This endpoint is only called if enable-metrics is set,
  630. // and listen-metrics-http is not set.
  631. func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request, _ *visitor) error {
  632. s.metricsHandler.ServeHTTP(w, r)
  633. return nil
  634. }
  635. // handleStatic returns all static resources (excluding the docs), including the web app
  636. func (s *Server) handleStatic(w http.ResponseWriter, r *http.Request, _ *visitor) error {
  637. r.URL.Path = webSiteDir + r.URL.Path
  638. util.Gzip(http.FileServer(http.FS(webFsCached))).ServeHTTP(w, r)
  639. return nil
  640. }
  641. // handleDocs returns static resources related to the docs
  642. func (s *Server) handleDocs(w http.ResponseWriter, r *http.Request, _ *visitor) error {
  643. util.Gzip(http.FileServer(http.FS(docsStaticCached))).ServeHTTP(w, r)
  644. return nil
  645. }
  646. // handleStats returns the publicly available server stats
  647. func (s *Server) handleStats(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  648. s.mu.RLock()
  649. messages, n, rate := s.messages, len(s.messagesHistory), float64(0)
  650. if n > 1 {
  651. rate = float64(s.messagesHistory[n-1]-s.messagesHistory[0]) / (float64(n-1) * s.config.ManagerInterval.Seconds())
  652. }
  653. s.mu.RUnlock()
  654. response := &apiStatsResponse{
  655. Messages: messages,
  656. MessagesRate: rate,
  657. }
  658. return s.writeJSON(w, response)
  659. }
  660. // handleFile processes the download of attachment files. The method handles GET and HEAD requests against a file.
  661. // Before streaming the file to a client, it locates uploader (m.Sender or m.User) in the message cache, so it
  662. // can associate the download bandwidth with the uploader.
  663. func (s *Server) handleFile(w http.ResponseWriter, r *http.Request, v *visitor) error {
  664. if s.config.AttachmentCacheDir == "" {
  665. return errHTTPInternalError
  666. }
  667. matches := fileRegex.FindStringSubmatch(r.URL.Path)
  668. if len(matches) != 2 {
  669. return errHTTPInternalErrorInvalidPath
  670. }
  671. messageID := matches[1]
  672. file := filepath.Join(s.config.AttachmentCacheDir, messageID)
  673. stat, err := os.Stat(file)
  674. if err != nil {
  675. return errHTTPNotFound.Fields(log.Context{
  676. "message_id": messageID,
  677. "error_context": "filesystem",
  678. })
  679. }
  680. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  681. w.Header().Set("Content-Length", fmt.Sprintf("%d", stat.Size()))
  682. if r.Method == http.MethodHead {
  683. return nil
  684. }
  685. // Find message in database, and associate bandwidth to the uploader user
  686. // This is an easy way to
  687. // - avoid abuse (e.g. 1 uploader, 1k downloaders)
  688. // - and also uses the higher bandwidth limits of a paying user
  689. m, err := s.messageCache.Message(messageID)
  690. if errors.Is(err, errMessageNotFound) {
  691. if s.config.CacheBatchTimeout > 0 {
  692. // Strange edge case: If we immediately after upload request the file (the web app does this for images),
  693. // and messages are persisted asynchronously, retry fetching from the database
  694. m, err = util.Retry(func() (*message, error) {
  695. return s.messageCache.Message(messageID)
  696. }, s.config.CacheBatchTimeout, 100*time.Millisecond, 300*time.Millisecond, 600*time.Millisecond)
  697. }
  698. if err != nil {
  699. return errHTTPNotFound.Fields(log.Context{
  700. "message_id": messageID,
  701. "error_context": "message_cache",
  702. })
  703. }
  704. } else if err != nil {
  705. return err
  706. }
  707. bandwidthVisitor := v
  708. if s.userManager != nil && m.User != "" {
  709. u, err := s.userManager.UserByID(m.User)
  710. if err != nil {
  711. return err
  712. }
  713. bandwidthVisitor = s.visitor(v.IP(), u)
  714. } else if m.Sender.IsValid() {
  715. bandwidthVisitor = s.visitor(m.Sender, nil)
  716. }
  717. if !bandwidthVisitor.BandwidthAllowed(stat.Size()) {
  718. return errHTTPTooManyRequestsLimitAttachmentBandwidth.With(m)
  719. }
  720. // Actually send file
  721. f, err := os.Open(file)
  722. if err != nil {
  723. return err
  724. }
  725. defer f.Close()
  726. if m.Attachment.Name != "" {
  727. w.Header().Set("Content-Disposition", "attachment; filename="+strconv.Quote(m.Attachment.Name))
  728. }
  729. _, err = io.Copy(util.NewContentTypeWriter(w, r.URL.Path), f)
  730. return err
  731. }
  732. func (s *Server) handleMatrixDiscovery(w http.ResponseWriter) error {
  733. if s.config.BaseURL == "" {
  734. return errHTTPInternalErrorMissingBaseURL
  735. }
  736. return writeMatrixDiscoveryResponse(w)
  737. }
  738. func (s *Server) handlePublishInternal(r *http.Request, v *visitor) (*message, error) {
  739. start := time.Now()
  740. t, err := fromContext[*topic](r, contextTopic)
  741. if err != nil {
  742. return nil, err
  743. }
  744. vrate, err := fromContext[*visitor](r, contextRateVisitor)
  745. if err != nil {
  746. return nil, err
  747. }
  748. body, err := util.Peek(r.Body, s.config.MessageSizeLimit)
  749. if err != nil {
  750. return nil, err
  751. }
  752. m := newDefaultMessage(t.ID, "")
  753. cache, firebase, email, call, template, unifiedpush, e := s.parsePublishParams(r, m)
  754. if e != nil {
  755. return nil, e.With(t)
  756. }
  757. if unifiedpush && s.config.VisitorSubscriberRateLimiting && t.RateVisitor() == nil {
  758. // UnifiedPush clients must subscribe before publishing to allow proper subscriber-based rate limiting.
  759. // The 5xx response is because some app servers (in particular Mastodon) will remove
  760. // the subscription as invalid if any 400-499 code (except 429/408) is returned.
  761. // See https://github.com/mastodon/mastodon/blob/730bb3e211a84a2f30e3e2bbeae3f77149824a68/app/workers/web/push_notification_worker.rb#L35-L46
  762. return nil, errHTTPInsufficientStorageUnifiedPush.With(t)
  763. } else if !util.ContainsIP(s.config.VisitorRequestExemptPrefixes, v.ip) && !vrate.MessageAllowed() {
  764. return nil, errHTTPTooManyRequestsLimitMessages.With(t)
  765. } else if email != "" && !vrate.EmailAllowed() {
  766. return nil, errHTTPTooManyRequestsLimitEmails.With(t)
  767. } else if call != "" {
  768. var httpErr *errHTTP
  769. call, httpErr = s.convertPhoneNumber(v.User(), call)
  770. if httpErr != nil {
  771. return nil, httpErr.With(t)
  772. } else if !vrate.CallAllowed() {
  773. return nil, errHTTPTooManyRequestsLimitCalls.With(t)
  774. }
  775. }
  776. if m.PollID != "" {
  777. m = newPollRequestMessage(t.ID, m.PollID)
  778. }
  779. m.Sender = v.IP()
  780. m.User = v.MaybeUserID()
  781. if cache {
  782. m.Expires = time.Unix(m.Time, 0).Add(v.Limits().MessageExpiryDuration).Unix()
  783. }
  784. if err := s.handlePublishBody(r, v, m, body, template, unifiedpush); err != nil {
  785. return nil, err
  786. }
  787. if m.Message == "" {
  788. m.Message = emptyMessageBody
  789. }
  790. delayed := m.Time > time.Now().Unix()
  791. ev := logvrm(v, r, m).
  792. Tag(tagPublish).
  793. With(t).
  794. Fields(log.Context{
  795. "message_delayed": delayed,
  796. "message_firebase": firebase,
  797. "message_unifiedpush": unifiedpush,
  798. "message_email": email,
  799. "message_call": call,
  800. })
  801. if ev.IsTrace() {
  802. ev.Field("message_body", util.MaybeMarshalJSON(m)).Trace("Received message")
  803. } else if ev.IsDebug() {
  804. ev.Debug("Received message")
  805. }
  806. if !delayed {
  807. if err := t.Publish(v, m); err != nil {
  808. return nil, err
  809. }
  810. if s.firebaseClient != nil && firebase {
  811. go s.sendToFirebase(v, m)
  812. }
  813. if s.smtpSender != nil && email != "" {
  814. go s.sendEmail(v, m, email)
  815. }
  816. if s.config.TwilioAccount != "" && call != "" {
  817. go s.callPhone(v, r, m, call)
  818. }
  819. if s.config.UpstreamBaseURL != "" && !unifiedpush { // UP messages are not sent to upstream
  820. go s.forwardPollRequest(v, m)
  821. }
  822. if s.config.WebPushPublicKey != "" {
  823. go s.publishToWebPushEndpoints(v, m)
  824. }
  825. } else {
  826. logvrm(v, r, m).Tag(tagPublish).Debug("Message delayed, will process later")
  827. }
  828. if cache {
  829. // Delete any existing scheduled message with the same sequence ID
  830. if err := s.messageCache.DeleteScheduledBySequenceID(t.ID, m.SequenceID); err != nil {
  831. return nil, err
  832. }
  833. logvrm(v, r, m).Tag(tagPublish).Debug("Adding message to cache")
  834. if err := s.messageCache.AddMessage(m); err != nil {
  835. return nil, err
  836. }
  837. }
  838. u := v.User()
  839. if s.userManager != nil && u != nil && u.Tier != nil {
  840. go s.userManager.EnqueueUserStats(u.ID, v.Stats())
  841. }
  842. s.mu.Lock()
  843. s.messages++
  844. s.mu.Unlock()
  845. if unifiedpush {
  846. minc(metricUnifiedPushPublishedSuccess)
  847. }
  848. mset(metricMessagePublishDurationMillis, time.Since(start).Milliseconds())
  849. return m, nil
  850. }
  851. func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visitor) error {
  852. m, err := s.handlePublishInternal(r, v)
  853. if err != nil {
  854. minc(metricMessagesPublishedFailure)
  855. return err
  856. }
  857. minc(metricMessagesPublishedSuccess)
  858. return s.writeJSON(w, m.forJSON())
  859. }
  860. func (s *Server) handlePublishMatrix(w http.ResponseWriter, r *http.Request, v *visitor) error {
  861. _, err := s.handlePublishInternal(r, v)
  862. if err != nil {
  863. minc(metricMessagesPublishedFailure)
  864. minc(metricMatrixPublishedFailure)
  865. if e, ok := err.(*errHTTP); ok && e.HTTPCode == errHTTPInsufficientStorageUnifiedPush.HTTPCode {
  866. topic, err := fromContext[*topic](r, contextTopic)
  867. if err != nil {
  868. return err
  869. }
  870. pushKey, err := fromContext[string](r, contextMatrixPushKey)
  871. if err != nil {
  872. return err
  873. }
  874. if time.Since(topic.LastAccess()) > matrixRejectPushKeyForUnifiedPushTopicWithoutRateVisitorAfter {
  875. return writeMatrixResponse(w, pushKey)
  876. }
  877. }
  878. return err
  879. }
  880. minc(metricMessagesPublishedSuccess)
  881. minc(metricMatrixPublishedSuccess)
  882. return writeMatrixSuccess(w)
  883. }
  884. func (s *Server) handleDelete(w http.ResponseWriter, r *http.Request, v *visitor) error {
  885. return s.handleActionMessage(w, r, v, messageDeleteEvent)
  886. }
  887. func (s *Server) handleClear(w http.ResponseWriter, r *http.Request, v *visitor) error {
  888. return s.handleActionMessage(w, r, v, messageClearEvent)
  889. }
  890. func (s *Server) handleActionMessage(w http.ResponseWriter, r *http.Request, v *visitor, event string) error {
  891. t, err := fromContext[*topic](r, contextTopic)
  892. if err != nil {
  893. return err
  894. }
  895. vrate, err := fromContext[*visitor](r, contextRateVisitor)
  896. if err != nil {
  897. return err
  898. }
  899. if !util.ContainsIP(s.config.VisitorRequestExemptPrefixes, v.ip) && !vrate.MessageAllowed() {
  900. return errHTTPTooManyRequestsLimitMessages.With(t)
  901. }
  902. sequenceID, e := s.sequenceIDFromPath(r.URL.Path)
  903. if e != nil {
  904. return e.With(t)
  905. }
  906. // Create an action message with the given event type
  907. m := newActionMessage(event, t.ID, sequenceID)
  908. m.Sender = v.IP()
  909. m.User = v.MaybeUserID()
  910. m.Expires = time.Unix(m.Time, 0).Add(v.Limits().MessageExpiryDuration).Unix()
  911. // Publish to subscribers
  912. if err := t.Publish(v, m); err != nil {
  913. return err
  914. }
  915. // Send to Firebase for Android clients
  916. if s.firebaseClient != nil {
  917. go s.sendToFirebase(v, m)
  918. }
  919. // Send to web push endpoints
  920. if s.config.WebPushPublicKey != "" {
  921. go s.publishToWebPushEndpoints(v, m)
  922. }
  923. // Delete any existing scheduled message with the same sequence ID
  924. if err := s.messageCache.DeleteScheduledBySequenceID(t.ID, sequenceID); err != nil {
  925. return err
  926. }
  927. // Add to message cache
  928. if err := s.messageCache.AddMessage(m); err != nil {
  929. return err
  930. }
  931. logvrm(v, r, m).Tag(tagPublish).Debug("Published %s for sequence ID %s", event, sequenceID)
  932. s.mu.Lock()
  933. s.messages++
  934. s.mu.Unlock()
  935. return s.writeJSON(w, m.forJSON())
  936. }
  937. func (s *Server) sendToFirebase(v *visitor, m *message) {
  938. logvm(v, m).Tag(tagFirebase).Debug("Publishing to Firebase")
  939. if err := s.firebaseClient.Send(v, m); err != nil {
  940. minc(metricFirebasePublishedFailure)
  941. if errors.Is(err, errFirebaseTemporarilyBanned) {
  942. logvm(v, m).Tag(tagFirebase).Err(err).Debug("Unable to publish to Firebase: %v", err.Error())
  943. } else {
  944. logvm(v, m).Tag(tagFirebase).Err(err).Warn("Unable to publish to Firebase: %v", err.Error())
  945. }
  946. return
  947. }
  948. minc(metricFirebasePublishedSuccess)
  949. }
  950. func (s *Server) sendEmail(v *visitor, m *message, email string) {
  951. logvm(v, m).Tag(tagEmail).Field("email", email).Debug("Sending email to %s", email)
  952. if err := s.smtpSender.Send(v, m, email); err != nil {
  953. logvm(v, m).Tag(tagEmail).Field("email", email).Err(err).Warn("Unable to send email to %s: %v", email, err.Error())
  954. minc(metricEmailsPublishedFailure)
  955. return
  956. }
  957. minc(metricEmailsPublishedSuccess)
  958. }
  959. func (s *Server) forwardPollRequest(v *visitor, m *message) {
  960. topicURL := fmt.Sprintf("%s/%s", s.config.BaseURL, m.Topic)
  961. topicHash := fmt.Sprintf("%x", sha256.Sum256([]byte(topicURL)))
  962. forwardURL := fmt.Sprintf("%s/%s", s.config.UpstreamBaseURL, topicHash)
  963. logvm(v, m).Debug("Publishing poll request to %s", forwardURL)
  964. req, err := http.NewRequest("POST", forwardURL, strings.NewReader(""))
  965. if err != nil {
  966. logvm(v, m).Err(err).Warn("Unable to publish poll request")
  967. return
  968. }
  969. req.Header.Set("User-Agent", "ntfy/"+s.config.BuildVersion)
  970. req.Header.Set("X-Poll-ID", m.ID)
  971. if s.config.UpstreamAccessToken != "" {
  972. req.Header.Set("Authorization", util.BearerAuth(s.config.UpstreamAccessToken))
  973. }
  974. var httpClient = &http.Client{
  975. Timeout: time.Second * 10,
  976. }
  977. response, err := httpClient.Do(req)
  978. if err != nil {
  979. logvm(v, m).Err(err).Warn("Unable to publish poll request")
  980. return
  981. } else if response.StatusCode != http.StatusOK {
  982. if response.StatusCode == http.StatusTooManyRequests {
  983. logvm(v, m).Err(err).Warn("Unable to publish poll request, the upstream server %s responded with HTTP %s; you may solve this by sending fewer daily messages, or by configuring upstream-access-token (assuming you have an account with higher rate limits) ", s.config.UpstreamBaseURL, response.Status)
  984. } else {
  985. logvm(v, m).Err(err).Warn("Unable to publish poll request, the upstream server %s responded with HTTP %s", s.config.UpstreamBaseURL, response.Status)
  986. }
  987. return
  988. }
  989. }
  990. func (s *Server) parsePublishParams(r *http.Request, m *message) (cache bool, firebase bool, email, call string, template templateMode, unifiedpush bool, err *errHTTP) {
  991. if r.Method != http.MethodGet && updatePathRegex.MatchString(r.URL.Path) {
  992. pathSequenceID, err := s.sequenceIDFromPath(r.URL.Path)
  993. if err != nil {
  994. return false, false, "", "", "", false, err
  995. }
  996. m.SequenceID = pathSequenceID
  997. } else {
  998. sequenceID := readParam(r, "x-sequence-id", "sequence-id", "sid")
  999. if sequenceID != "" {
  1000. if sequenceIDRegex.MatchString(sequenceID) {
  1001. m.SequenceID = sequenceID
  1002. } else {
  1003. return false, false, "", "", "", false, errHTTPBadRequestSequenceIDInvalid
  1004. }
  1005. } else {
  1006. m.SequenceID = m.ID
  1007. }
  1008. }
  1009. cache = readBoolParam(r, true, "x-cache", "cache")
  1010. firebase = readBoolParam(r, true, "x-firebase", "firebase")
  1011. m.Title = readParam(r, "x-title", "title", "t")
  1012. m.Click = readParam(r, "x-click", "click")
  1013. icon := readParam(r, "x-icon", "icon")
  1014. filename := readParam(r, "x-filename", "filename", "file", "f")
  1015. attach := readParam(r, "x-attach", "attach", "a")
  1016. if attach != "" || filename != "" {
  1017. m.Attachment = &attachment{}
  1018. }
  1019. if filename != "" {
  1020. m.Attachment.Name = filename
  1021. }
  1022. if attach != "" {
  1023. if !urlRegex.MatchString(attach) {
  1024. return false, false, "", "", "", false, errHTTPBadRequestAttachmentURLInvalid
  1025. }
  1026. m.Attachment.URL = attach
  1027. if m.Attachment.Name == "" {
  1028. u, err := url.Parse(m.Attachment.URL)
  1029. if err == nil {
  1030. m.Attachment.Name = path.Base(u.Path)
  1031. if m.Attachment.Name == "." || m.Attachment.Name == "/" {
  1032. m.Attachment.Name = ""
  1033. }
  1034. }
  1035. }
  1036. if m.Attachment.Name == "" {
  1037. m.Attachment.Name = "attachment"
  1038. }
  1039. }
  1040. if icon != "" {
  1041. if !urlRegex.MatchString(icon) {
  1042. return false, false, "", "", "", false, errHTTPBadRequestIconURLInvalid
  1043. }
  1044. m.Icon = icon
  1045. }
  1046. email = readParam(r, "x-email", "x-e-mail", "email", "e-mail", "mail", "e")
  1047. if s.smtpSender == nil && email != "" {
  1048. return false, false, "", "", "", false, errHTTPBadRequestEmailDisabled
  1049. }
  1050. call = readParam(r, "x-call", "call")
  1051. if call != "" && (s.config.TwilioAccount == "" || s.userManager == nil) {
  1052. return false, false, "", "", "", false, errHTTPBadRequestPhoneCallsDisabled
  1053. } else if call != "" && !isBoolValue(call) && !phoneNumberRegex.MatchString(call) {
  1054. return false, false, "", "", "", false, errHTTPBadRequestPhoneNumberInvalid
  1055. }
  1056. template = templateMode(readParam(r, "x-template", "template", "tpl"))
  1057. messageStr := readParam(r, "x-message", "message", "m")
  1058. if !template.InlineMode() {
  1059. // Convert "\n" to literal newline everything but inline mode
  1060. messageStr = strings.ReplaceAll(messageStr, "\\n", "\n")
  1061. }
  1062. if messageStr != "" {
  1063. m.Message = messageStr
  1064. }
  1065. var e error
  1066. m.Priority, e = util.ParsePriority(readParam(r, "x-priority", "priority", "prio", "p"))
  1067. if e != nil {
  1068. return false, false, "", "", "", false, errHTTPBadRequestPriorityInvalid
  1069. }
  1070. m.Tags = readCommaSeparatedParam(r, "x-tags", "tags", "tag", "ta")
  1071. delayStr := readParam(r, "x-delay", "delay", "x-at", "at", "x-in", "in")
  1072. if delayStr != "" {
  1073. if !cache {
  1074. return false, false, "", "", "", false, errHTTPBadRequestDelayNoCache
  1075. }
  1076. if email != "" {
  1077. return false, false, "", "", "", false, errHTTPBadRequestDelayNoEmail // we cannot store the email address (yet)
  1078. }
  1079. if call != "" {
  1080. return false, false, "", "", "", false, errHTTPBadRequestDelayNoCall // we cannot store the phone number (yet)
  1081. }
  1082. delay, err := util.ParseFutureTime(delayStr, time.Now())
  1083. if err != nil {
  1084. return false, false, "", "", "", false, errHTTPBadRequestDelayCannotParse
  1085. } else if delay.Unix() < time.Now().Add(s.config.MessageDelayMin).Unix() {
  1086. return false, false, "", "", "", false, errHTTPBadRequestDelayTooSmall
  1087. } else if delay.Unix() > time.Now().Add(s.config.MessageDelayMax).Unix() {
  1088. return false, false, "", "", "", false, errHTTPBadRequestDelayTooLarge
  1089. }
  1090. m.Time = delay.Unix()
  1091. }
  1092. actionsStr := readParam(r, "x-actions", "actions", "action")
  1093. if actionsStr != "" {
  1094. m.Actions, e = parseActions(actionsStr)
  1095. if e != nil {
  1096. return false, false, "", "", "", false, errHTTPBadRequestActionsInvalid.Wrap("%s", e.Error())
  1097. }
  1098. }
  1099. contentType, markdown := readParam(r, "content-type", "content_type"), readBoolParam(r, false, "x-markdown", "markdown", "md")
  1100. if markdown || strings.ToLower(contentType) == "text/markdown" {
  1101. m.ContentType = "text/markdown"
  1102. }
  1103. unifiedpush = readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see GET too!
  1104. contentEncoding := readParam(r, "content-encoding")
  1105. if unifiedpush || contentEncoding == "aes128gcm" {
  1106. firebase = false
  1107. unifiedpush = true
  1108. }
  1109. m.PollID = readParam(r, "x-poll-id", "poll-id")
  1110. if m.PollID != "" {
  1111. unifiedpush = false
  1112. cache = false
  1113. email = ""
  1114. }
  1115. return cache, firebase, email, call, template, unifiedpush, nil
  1116. }
  1117. // handlePublishBody consumes the PUT/POST body and decides whether the body is an attachment or the message.
  1118. //
  1119. // 1. curl -X POST -H "Poll: 1234" ntfy.sh/...
  1120. // If a message is flagged as poll request, the body does not matter and is discarded
  1121. // 2. curl -T somebinarydata.bin "ntfy.sh/mytopic?up=1"
  1122. // If UnifiedPush is enabled, encode as base64 if body is binary, and do not trim
  1123. // 3. curl -H "Attach: http://example.com/file.jpg" ntfy.sh/mytopic
  1124. // Body must be a message, because we attached an external URL
  1125. // 4. curl -T short.txt -H "Filename: short.txt" ntfy.sh/mytopic
  1126. // Body must be attachment, because we passed a filename
  1127. // 5. curl -H "Template: yes" -T file.txt ntfy.sh/mytopic
  1128. // If templating is enabled, read up to 32k and treat message body as JSON
  1129. // 6. curl -T file.txt ntfy.sh/mytopic
  1130. // If file.txt is <= 4096 (message limit) and valid UTF-8, treat it as a message
  1131. // 7. curl -T file.txt ntfy.sh/mytopic
  1132. // In all other cases, mostly if file.txt is > message limit, treat it as an attachment
  1133. func (s *Server) handlePublishBody(r *http.Request, v *visitor, m *message, body *util.PeekedReadCloser, template templateMode, unifiedpush bool) error {
  1134. if m.Event == pollRequestEvent { // Case 1
  1135. return s.handleBodyDiscard(body)
  1136. } else if unifiedpush {
  1137. return s.handleBodyAsMessageAutoDetect(m, body) // Case 2
  1138. } else if m.Attachment != nil && m.Attachment.URL != "" {
  1139. return s.handleBodyAsTextMessage(m, body) // Case 3
  1140. } else if m.Attachment != nil && m.Attachment.Name != "" {
  1141. return s.handleBodyAsAttachment(r, v, m, body) // Case 4
  1142. } else if template.Enabled() {
  1143. return s.handleBodyAsTemplatedTextMessage(m, template, body) // Case 5
  1144. } else if !body.LimitReached && utf8.Valid(body.PeekedBytes) {
  1145. return s.handleBodyAsTextMessage(m, body) // Case 6
  1146. }
  1147. return s.handleBodyAsAttachment(r, v, m, body) // Case 7
  1148. }
  1149. func (s *Server) handleBodyDiscard(body *util.PeekedReadCloser) error {
  1150. _, err := io.Copy(io.Discard, body)
  1151. _ = body.Close()
  1152. return err
  1153. }
  1154. func (s *Server) handleBodyAsMessageAutoDetect(m *message, body *util.PeekedReadCloser) error {
  1155. if utf8.Valid(body.PeekedBytes) {
  1156. m.Message = string(body.PeekedBytes) // Do not trim
  1157. } else {
  1158. m.Message = base64.StdEncoding.EncodeToString(body.PeekedBytes)
  1159. m.Encoding = encodingBase64
  1160. }
  1161. return nil
  1162. }
  1163. func (s *Server) handleBodyAsTextMessage(m *message, body *util.PeekedReadCloser) error {
  1164. if !utf8.Valid(body.PeekedBytes) {
  1165. return errHTTPBadRequestMessageNotUTF8.With(m)
  1166. }
  1167. if len(body.PeekedBytes) > 0 { // Empty body should not override message (publish via GET!)
  1168. m.Message = strings.TrimSpace(string(body.PeekedBytes)) // Truncates the message to the peek limit if required
  1169. }
  1170. if m.Attachment != nil && m.Attachment.Name != "" && m.Message == "" {
  1171. m.Message = fmt.Sprintf(defaultAttachmentMessage, m.Attachment.Name)
  1172. }
  1173. return nil
  1174. }
  1175. func (s *Server) handleBodyAsTemplatedTextMessage(m *message, template templateMode, body *util.PeekedReadCloser) error {
  1176. body, err := util.Peek(body, max(s.config.MessageSizeLimit, jsonBodyBytesLimit))
  1177. if err != nil {
  1178. return err
  1179. } else if body.LimitReached {
  1180. return errHTTPEntityTooLargeJSONBody
  1181. }
  1182. peekedBody := strings.TrimSpace(string(body.PeekedBytes))
  1183. if template.FileMode() {
  1184. if err := s.renderTemplateFromFile(m, template.FileName(), peekedBody); err != nil {
  1185. return err
  1186. }
  1187. } else {
  1188. if err := s.renderTemplateFromParams(m, peekedBody); err != nil {
  1189. return err
  1190. }
  1191. }
  1192. if len(m.Title) > s.config.MessageSizeLimit || len(m.Message) > s.config.MessageSizeLimit {
  1193. return errHTTPBadRequestTemplateMessageTooLarge
  1194. }
  1195. return nil
  1196. }
  1197. // renderTemplateFromFile transforms the JSON message body according to a template from the filesystem.
  1198. // The template file must be in the templates directory, or in the configured template directory.
  1199. func (s *Server) renderTemplateFromFile(m *message, templateName, peekedBody string) error {
  1200. if !templateNameRegex.MatchString(templateName) {
  1201. return errHTTPBadRequestTemplateFileNotFound
  1202. }
  1203. templateContent, _ := templatesFs.ReadFile(filepath.Join(templatesDir, templateName+templateFileExtension)) // Read from the embedded filesystem first
  1204. if s.config.TemplateDir != "" {
  1205. if b, _ := os.ReadFile(filepath.Join(s.config.TemplateDir, templateName+templateFileExtension)); len(b) > 0 {
  1206. templateContent = b
  1207. }
  1208. }
  1209. if len(templateContent) == 0 {
  1210. return errHTTPBadRequestTemplateFileNotFound
  1211. }
  1212. var tpl templateFile
  1213. if err := yaml.Unmarshal(templateContent, &tpl); err != nil {
  1214. return errHTTPBadRequestTemplateFileInvalid
  1215. }
  1216. var err error
  1217. if tpl.Message != nil {
  1218. if m.Message, err = s.renderTemplate(*tpl.Message, peekedBody); err != nil {
  1219. return err
  1220. }
  1221. }
  1222. if tpl.Title != nil {
  1223. if m.Title, err = s.renderTemplate(*tpl.Title, peekedBody); err != nil {
  1224. return err
  1225. }
  1226. }
  1227. return nil
  1228. }
  1229. // renderTemplateFromParams transforms the JSON message body according to the inline template in the
  1230. // message and title parameters.
  1231. func (s *Server) renderTemplateFromParams(m *message, peekedBody string) error {
  1232. var err error
  1233. if m.Message, err = s.renderTemplate(m.Message, peekedBody); err != nil {
  1234. return err
  1235. }
  1236. if m.Title, err = s.renderTemplate(m.Title, peekedBody); err != nil {
  1237. return err
  1238. }
  1239. return nil
  1240. }
  1241. // renderTemplate renders a template with the given JSON source data.
  1242. func (s *Server) renderTemplate(tpl string, source string) (string, error) {
  1243. if templateDisallowedRegex.MatchString(tpl) {
  1244. return "", errHTTPBadRequestTemplateDisallowedFunctionCalls
  1245. }
  1246. var data any
  1247. if err := json.Unmarshal([]byte(source), &data); err != nil {
  1248. return "", errHTTPBadRequestTemplateMessageNotJSON
  1249. }
  1250. t, err := template.New("").Funcs(sprig.TxtFuncMap()).Parse(tpl)
  1251. if err != nil {
  1252. return "", errHTTPBadRequestTemplateInvalid.Wrap("%s", err.Error())
  1253. }
  1254. var buf bytes.Buffer
  1255. limitWriter := util.NewLimitWriter(util.NewTimeoutWriter(&buf, templateMaxExecutionTime), util.NewFixedLimiter(templateMaxOutputBytes))
  1256. if err := t.Execute(limitWriter, data); err != nil {
  1257. return "", errHTTPBadRequestTemplateExecuteFailed.Wrap("%s", err.Error())
  1258. }
  1259. return strings.TrimSpace(strings.ReplaceAll(buf.String(), "\\n", "\n")), nil // replace any remaining "\n" (those outside of template curly braces) with newlines
  1260. }
  1261. func (s *Server) handleBodyAsAttachment(r *http.Request, v *visitor, m *message, body *util.PeekedReadCloser) error {
  1262. if s.fileCache == nil || s.config.BaseURL == "" || s.config.AttachmentCacheDir == "" {
  1263. return errHTTPBadRequestAttachmentsDisallowed.With(m)
  1264. }
  1265. vinfo, err := v.Info()
  1266. if err != nil {
  1267. return err
  1268. }
  1269. attachmentExpiry := time.Now().Add(vinfo.Limits.AttachmentExpiryDuration).Unix()
  1270. if m.Time > attachmentExpiry {
  1271. return errHTTPBadRequestAttachmentsExpiryBeforeDelivery.With(m)
  1272. }
  1273. contentLengthStr := r.Header.Get("Content-Length")
  1274. if contentLengthStr != "" { // Early "do-not-trust" check, hard limit see below
  1275. contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64)
  1276. if err == nil && (contentLength > vinfo.Stats.AttachmentTotalSizeRemaining || contentLength > vinfo.Limits.AttachmentFileSizeLimit) {
  1277. return errHTTPEntityTooLargeAttachment.With(m).Fields(log.Context{
  1278. "message_content_length": contentLength,
  1279. "attachment_total_size_remaining": vinfo.Stats.AttachmentTotalSizeRemaining,
  1280. "attachment_file_size_limit": vinfo.Limits.AttachmentFileSizeLimit,
  1281. })
  1282. }
  1283. }
  1284. if m.Attachment == nil {
  1285. m.Attachment = &attachment{}
  1286. }
  1287. var ext string
  1288. m.Attachment.Expires = attachmentExpiry
  1289. m.Attachment.Type, ext = util.DetectContentType(body.PeekedBytes, m.Attachment.Name)
  1290. m.Attachment.URL = fmt.Sprintf("%s/file/%s%s", s.config.BaseURL, m.ID, ext)
  1291. if m.Attachment.Name == "" {
  1292. m.Attachment.Name = fmt.Sprintf("attachment%s", ext)
  1293. }
  1294. if m.Message == "" {
  1295. m.Message = fmt.Sprintf(defaultAttachmentMessage, m.Attachment.Name)
  1296. }
  1297. limiters := []util.Limiter{
  1298. v.BandwidthLimiter(),
  1299. util.NewFixedLimiter(vinfo.Limits.AttachmentFileSizeLimit),
  1300. util.NewFixedLimiter(vinfo.Stats.AttachmentTotalSizeRemaining),
  1301. }
  1302. m.Attachment.Size, err = s.fileCache.Write(m.ID, body, limiters...)
  1303. if errors.Is(err, util.ErrLimitReached) {
  1304. return errHTTPEntityTooLargeAttachment.With(m)
  1305. } else if err != nil {
  1306. return err
  1307. }
  1308. return nil
  1309. }
  1310. func (s *Server) handleSubscribeJSON(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1311. encoder := func(msg *message) (string, error) {
  1312. var buf bytes.Buffer
  1313. if err := json.NewEncoder(&buf).Encode(msg.forJSON()); err != nil {
  1314. return "", err
  1315. }
  1316. return buf.String(), nil
  1317. }
  1318. return s.handleSubscribeHTTP(w, r, v, "application/x-ndjson", encoder)
  1319. }
  1320. func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1321. encoder := func(msg *message) (string, error) {
  1322. var buf bytes.Buffer
  1323. if err := json.NewEncoder(&buf).Encode(msg.forJSON()); err != nil {
  1324. return "", err
  1325. }
  1326. if msg.Event != messageEvent && msg.Event != messageDeleteEvent && msg.Event != messageClearEvent {
  1327. return fmt.Sprintf("event: %s\ndata: %s\n", msg.Event, buf.String()), nil // Browser's .onmessage() does not fire on this!
  1328. }
  1329. return fmt.Sprintf("data: %s\n", buf.String()), nil
  1330. }
  1331. return s.handleSubscribeHTTP(w, r, v, "text/event-stream", encoder)
  1332. }
  1333. func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1334. encoder := func(msg *message) (string, error) {
  1335. if msg.Event == messageEvent { // only handle default events
  1336. return strings.ReplaceAll(msg.Message, "\n", " ") + "\n", nil
  1337. }
  1338. return "\n", nil // "keepalive" and "open" events just send an empty line
  1339. }
  1340. return s.handleSubscribeHTTP(w, r, v, "text/plain", encoder)
  1341. }
  1342. func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *visitor, contentType string, encoder messageEncoder) error {
  1343. logvr(v, r).Tag(tagSubscribe).Debug("HTTP stream connection opened")
  1344. defer logvr(v, r).Tag(tagSubscribe).Debug("HTTP stream connection closed")
  1345. if !v.SubscriptionAllowed() {
  1346. return errHTTPTooManyRequestsLimitSubscriptions
  1347. }
  1348. defer v.RemoveSubscription()
  1349. topics, topicsStr, err := s.topicsFromPath(r.URL.Path)
  1350. if err != nil {
  1351. return err
  1352. }
  1353. poll, since, scheduled, filters, err := parseSubscribeParams(r)
  1354. if err != nil {
  1355. return err
  1356. }
  1357. var wlock sync.Mutex
  1358. defer func() {
  1359. // Hack: This is the fix for a horrible data race that I have not been able to figure out in quite some time.
  1360. // It appears to be happening when the Go HTTP code reads from the socket when closing the request (i.e. AFTER
  1361. // this function returns), and causes a data race with the ResponseWriter. Locking wlock here silences the
  1362. // data race detector. See https://github.com/binwiederhier/ntfy/issues/338#issuecomment-1163425889.
  1363. wlock.TryLock()
  1364. }()
  1365. sub := func(v *visitor, msg *message) error {
  1366. if !filters.Pass(msg) {
  1367. return nil
  1368. }
  1369. m, err := encoder(msg)
  1370. if err != nil {
  1371. return err
  1372. }
  1373. wlock.Lock()
  1374. defer wlock.Unlock()
  1375. if _, err := w.Write([]byte(m)); err != nil {
  1376. return err
  1377. }
  1378. if fl, ok := w.(http.Flusher); ok {
  1379. fl.Flush()
  1380. }
  1381. return nil
  1382. }
  1383. if err := s.maybeSetRateVisitors(r, v, topics); err != nil {
  1384. return err
  1385. }
  1386. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1387. w.Header().Set("Content-Type", contentType+"; charset=utf-8") // Android/Volley client needs charset!
  1388. if poll {
  1389. for _, t := range topics {
  1390. t.Keepalive()
  1391. }
  1392. return s.sendOldMessages(topics, since, scheduled, v, sub)
  1393. }
  1394. ctx, cancel := context.WithCancel(context.Background())
  1395. defer cancel()
  1396. subscriberIDs := make([]int, 0)
  1397. for _, t := range topics {
  1398. subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel))
  1399. }
  1400. defer func() {
  1401. for i, subscriberID := range subscriberIDs {
  1402. topics[i].Unsubscribe(subscriberID) // Order!
  1403. }
  1404. }()
  1405. if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message
  1406. return err
  1407. }
  1408. if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {
  1409. return err
  1410. }
  1411. for {
  1412. select {
  1413. case <-ctx.Done():
  1414. return nil
  1415. case <-r.Context().Done():
  1416. return nil
  1417. case <-time.After(s.config.KeepaliveInterval):
  1418. ev := logvr(v, r).Tag(tagSubscribe)
  1419. if len(topics) == 1 {
  1420. ev.With(topics[0]).Trace("Sending keepalive message to %s", topics[0].ID)
  1421. } else {
  1422. ev.Trace("Sending keepalive message to %d topics", len(topics))
  1423. }
  1424. v.Keepalive()
  1425. for _, t := range topics {
  1426. t.Keepalive()
  1427. }
  1428. if err := sub(v, newKeepaliveMessage(topicsStr)); err != nil { // Send keepalive message
  1429. return err
  1430. }
  1431. }
  1432. }
  1433. }
  1434. func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1435. if strings.ToLower(r.Header.Get("Upgrade")) != "websocket" {
  1436. return errHTTPBadRequestWebSocketsUpgradeHeaderMissing
  1437. }
  1438. if !v.SubscriptionAllowed() {
  1439. return errHTTPTooManyRequestsLimitSubscriptions
  1440. }
  1441. defer v.RemoveSubscription()
  1442. logvr(v, r).Tag(tagWebsocket).Debug("WebSocket connection opened")
  1443. defer logvr(v, r).Tag(tagWebsocket).Debug("WebSocket connection closed")
  1444. topics, topicsStr, err := s.topicsFromPath(r.URL.Path)
  1445. if err != nil {
  1446. return err
  1447. }
  1448. poll, since, scheduled, filters, err := parseSubscribeParams(r)
  1449. if err != nil {
  1450. return err
  1451. }
  1452. upgrader := &websocket.Upgrader{
  1453. ReadBufferSize: wsBufferSize,
  1454. WriteBufferSize: wsBufferSize,
  1455. CheckOrigin: func(r *http.Request) bool {
  1456. return true // We're open for business!
  1457. },
  1458. }
  1459. conn, err := upgrader.Upgrade(w, r, nil)
  1460. if err != nil {
  1461. return err
  1462. }
  1463. defer conn.Close()
  1464. // Subscription connections can be canceled externally, see topic.CancelSubscribersExceptUser
  1465. cancelCtx, cancel := context.WithCancel(context.Background())
  1466. defer cancel()
  1467. // Use errgroup to run WebSocket reader and writer in Go routines
  1468. var wlock sync.Mutex
  1469. g, gctx := errgroup.WithContext(cancelCtx)
  1470. g.Go(func() error {
  1471. pongWait := s.config.KeepaliveInterval + wsPongWait
  1472. conn.SetReadLimit(wsReadLimit)
  1473. if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
  1474. return err
  1475. }
  1476. conn.SetPongHandler(func(appData string) error {
  1477. logvr(v, r).Tag(tagWebsocket).Trace("Received WebSocket pong")
  1478. return conn.SetReadDeadline(time.Now().Add(pongWait))
  1479. })
  1480. for {
  1481. _, _, err := conn.NextReader()
  1482. if err != nil {
  1483. return err
  1484. }
  1485. select {
  1486. case <-gctx.Done():
  1487. return nil
  1488. default:
  1489. }
  1490. }
  1491. })
  1492. g.Go(func() error {
  1493. ping := func() error {
  1494. wlock.Lock()
  1495. defer wlock.Unlock()
  1496. if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
  1497. return err
  1498. }
  1499. logvr(v, r).Tag(tagWebsocket).Trace("Sending WebSocket ping")
  1500. return conn.WriteMessage(websocket.PingMessage, nil)
  1501. }
  1502. for {
  1503. select {
  1504. case <-gctx.Done():
  1505. return nil
  1506. case <-cancelCtx.Done():
  1507. logvr(v, r).Tag(tagWebsocket).Trace("Cancel received, closing subscriber connection")
  1508. conn.Close()
  1509. return &websocket.CloseError{Code: websocket.CloseNormalClosure, Text: "subscription was canceled"}
  1510. case <-time.After(s.config.KeepaliveInterval):
  1511. v.Keepalive()
  1512. for _, t := range topics {
  1513. t.Keepalive()
  1514. }
  1515. if err := ping(); err != nil {
  1516. return err
  1517. }
  1518. }
  1519. }
  1520. })
  1521. sub := func(v *visitor, msg *message) error {
  1522. if !filters.Pass(msg) {
  1523. return nil
  1524. }
  1525. wlock.Lock()
  1526. defer wlock.Unlock()
  1527. if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
  1528. return err
  1529. }
  1530. return conn.WriteJSON(msg)
  1531. }
  1532. if err := s.maybeSetRateVisitors(r, v, topics); err != nil {
  1533. return err
  1534. }
  1535. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1536. if poll {
  1537. for _, t := range topics {
  1538. t.Keepalive()
  1539. }
  1540. return s.sendOldMessages(topics, since, scheduled, v, sub)
  1541. }
  1542. subscriberIDs := make([]int, 0)
  1543. for _, t := range topics {
  1544. subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel))
  1545. }
  1546. defer func() {
  1547. for i, subscriberID := range subscriberIDs {
  1548. topics[i].Unsubscribe(subscriberID) // Order!
  1549. }
  1550. }()
  1551. if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message
  1552. return err
  1553. }
  1554. if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {
  1555. return err
  1556. }
  1557. err = g.Wait()
  1558. if err != nil && websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNoStatusReceived) {
  1559. logvr(v, r).Tag(tagWebsocket).Err(err).Fields(websocketErrorContext(err)).Trace("WebSocket connection closed")
  1560. return nil // Normal closures are not errors; note: "1006 (abnormal closure)" is treated as normal, because people disconnect a lot
  1561. }
  1562. return err
  1563. }
  1564. func parseSubscribeParams(r *http.Request) (poll bool, since sinceMarker, scheduled bool, filters *queryFilter, err error) {
  1565. poll = readBoolParam(r, false, "x-poll", "poll", "po")
  1566. scheduled = readBoolParam(r, false, "x-scheduled", "scheduled", "sched")
  1567. since, err = parseSince(r, poll)
  1568. if err != nil {
  1569. return
  1570. }
  1571. filters, err = parseQueryFilters(r)
  1572. if err != nil {
  1573. return
  1574. }
  1575. return
  1576. }
  1577. // maybeSetRateVisitors sets the rate visitor on a topic (v.SetRateVisitor), indicating that all messages published
  1578. // to that topic will be rate limited against the rate visitor instead of the publishing visitor.
  1579. //
  1580. // Setting the rate visitor is ony allowed if the `visitor-subscriber-rate-limiting` setting is enabled, AND
  1581. // - auth-file is not set (everything is open by default)
  1582. // - or the topic is reserved, and v.user is the owner
  1583. // - or the topic is not reserved, and v.user has write access
  1584. //
  1585. // This only applies to UnifiedPush topics ("up...").
  1586. func (s *Server) maybeSetRateVisitors(r *http.Request, v *visitor, topics []*topic) error {
  1587. // Bail out if not enabled
  1588. if !s.config.VisitorSubscriberRateLimiting {
  1589. return nil
  1590. }
  1591. // Make a list of topics that we'll actually set the RateVisitor on
  1592. eligibleRateTopics := make([]*topic, 0)
  1593. for _, t := range topics {
  1594. if strings.HasPrefix(t.ID, unifiedPushTopicPrefix) && len(t.ID) == unifiedPushTopicLength {
  1595. eligibleRateTopics = append(eligibleRateTopics, t)
  1596. }
  1597. }
  1598. if len(eligibleRateTopics) == 0 {
  1599. return nil
  1600. }
  1601. // If access controls are turned off, v has access to everything, and we can set the rate visitor
  1602. if s.userManager == nil {
  1603. return s.setRateVisitors(r, v, eligibleRateTopics)
  1604. }
  1605. // If access controls are enabled, only set rate visitor if
  1606. // - topic is reserved, and v.user is the owner
  1607. // - topic is not reserved, and v.user has write access
  1608. writableRateTopics := make([]*topic, 0)
  1609. for _, t := range topics {
  1610. if !util.Contains(eligibleRateTopics, t) {
  1611. continue
  1612. }
  1613. ownerUserID, err := s.userManager.ReservationOwner(t.ID)
  1614. if err != nil {
  1615. return err
  1616. }
  1617. if ownerUserID == "" {
  1618. if err := s.userManager.Authorize(v.User(), t.ID, user.PermissionWrite); err == nil {
  1619. writableRateTopics = append(writableRateTopics, t)
  1620. }
  1621. } else if ownerUserID == v.MaybeUserID() {
  1622. writableRateTopics = append(writableRateTopics, t)
  1623. }
  1624. }
  1625. return s.setRateVisitors(r, v, writableRateTopics)
  1626. }
  1627. func (s *Server) setRateVisitors(r *http.Request, v *visitor, rateTopics []*topic) error {
  1628. for _, t := range rateTopics {
  1629. logvr(v, r).
  1630. Tag(tagSubscribe).
  1631. With(t).
  1632. Debug("Setting visitor as rate visitor for topic %s", t.ID)
  1633. t.SetRateVisitor(v)
  1634. }
  1635. return nil
  1636. }
  1637. // sendOldMessages selects old messages from the messageCache and calls sub for each of them. It uses since as the
  1638. // marker, returning only messages that are newer than the marker.
  1639. func (s *Server) sendOldMessages(topics []*topic, since sinceMarker, scheduled bool, v *visitor, sub subscriber) error {
  1640. if since.IsNone() {
  1641. return nil
  1642. }
  1643. messages := make([]*message, 0)
  1644. for _, t := range topics {
  1645. topicMessages, err := s.messageCache.Messages(t.ID, since, scheduled)
  1646. if err != nil {
  1647. return err
  1648. }
  1649. messages = append(messages, topicMessages...)
  1650. }
  1651. sort.Slice(messages, func(i, j int) bool {
  1652. return messages[i].Time < messages[j].Time
  1653. })
  1654. for _, m := range messages {
  1655. if err := sub(v, m); err != nil {
  1656. return err
  1657. }
  1658. }
  1659. return nil
  1660. }
  1661. // parseSince returns a timestamp identifying the time span from which cached messages should be received.
  1662. //
  1663. // Values in the "since=..." parameter can be either a unix timestamp or a duration (e.g. 12h),
  1664. // "all" for all messages, or "latest" for the most recent message for a topic
  1665. func parseSince(r *http.Request, poll bool) (sinceMarker, error) {
  1666. since := readParam(r, "x-since", "since", "si")
  1667. // Easy cases (empty, all, none)
  1668. if since == "" {
  1669. if poll {
  1670. return sinceAllMessages, nil
  1671. }
  1672. return sinceNoMessages, nil
  1673. } else if since == "all" {
  1674. return sinceAllMessages, nil
  1675. } else if since == "latest" {
  1676. return sinceLatestMessage, nil
  1677. } else if since == "none" {
  1678. return sinceNoMessages, nil
  1679. }
  1680. // ID, timestamp, duration
  1681. if validMessageID(since) {
  1682. return newSinceID(since), nil
  1683. } else if s, err := strconv.ParseInt(since, 10, 64); err == nil {
  1684. return newSinceTime(s), nil
  1685. } else if d, err := time.ParseDuration(since); err == nil {
  1686. return newSinceTime(time.Now().Add(-1 * d).Unix()), nil
  1687. }
  1688. return sinceNoMessages, errHTTPBadRequestSinceInvalid
  1689. }
  1690. func (s *Server) handleOptions(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  1691. w.Header().Set("Access-Control-Allow-Methods", "GET, PUT, POST, PATCH, DELETE")
  1692. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1693. w.Header().Set("Access-Control-Allow-Headers", "*") // CORS, allow auth via JS // FIXME is this terrible?
  1694. return nil
  1695. }
  1696. // topicFromPath returns the topic from a root path (e.g. /mytopic), creating it if it doesn't exist.
  1697. func (s *Server) topicFromPath(path string) (*topic, error) {
  1698. parts := strings.Split(path, "/")
  1699. if len(parts) < 2 {
  1700. return nil, errHTTPBadRequestTopicInvalid
  1701. }
  1702. return s.topicFromID(parts[1])
  1703. }
  1704. // topicsFromPath returns the topic from a root path (e.g. /mytopic,mytopic2), creating it if it doesn't exist.
  1705. func (s *Server) topicsFromPath(path string) ([]*topic, string, error) {
  1706. parts := strings.Split(path, "/")
  1707. if len(parts) < 2 {
  1708. return nil, "", errHTTPBadRequestTopicInvalid
  1709. }
  1710. topicIDs := util.SplitNoEmpty(parts[1], ",")
  1711. topics, err := s.topicsFromIDs(topicIDs...)
  1712. if err != nil {
  1713. return nil, "", errHTTPBadRequestTopicInvalid
  1714. }
  1715. return topics, parts[1], nil
  1716. }
  1717. // sequenceIDFromPath returns the sequence ID from a path like /mytopic/sequenceIdHere
  1718. func (s *Server) sequenceIDFromPath(path string) (string, *errHTTP) {
  1719. parts := strings.Split(path, "/")
  1720. if len(parts) < 3 {
  1721. return "", errHTTPBadRequestSequenceIDInvalid
  1722. }
  1723. return parts[2], nil
  1724. }
  1725. // topicsFromIDs returns the topics with the given IDs, creating them if they don't exist.
  1726. func (s *Server) topicsFromIDs(ids ...string) ([]*topic, error) {
  1727. s.mu.Lock()
  1728. defer s.mu.Unlock()
  1729. topics := make([]*topic, 0)
  1730. for _, id := range ids {
  1731. if util.Contains(s.config.DisallowedTopics, id) {
  1732. return nil, errHTTPBadRequestTopicDisallowed
  1733. }
  1734. if _, ok := s.topics[id]; !ok {
  1735. if len(s.topics) >= s.config.TotalTopicLimit {
  1736. return nil, errHTTPTooManyRequestsLimitTotalTopics
  1737. }
  1738. s.topics[id] = newTopic(id)
  1739. }
  1740. topics = append(topics, s.topics[id])
  1741. }
  1742. return topics, nil
  1743. }
  1744. // topicFromID returns the topic with the given ID, creating it if it doesn't exist.
  1745. func (s *Server) topicFromID(id string) (*topic, error) {
  1746. topics, err := s.topicsFromIDs(id)
  1747. if err != nil {
  1748. return nil, err
  1749. }
  1750. return topics[0], nil
  1751. }
  1752. // topicsFromPattern returns a list of topics matching the given pattern, but it does not create them.
  1753. func (s *Server) topicsFromPattern(pattern string) ([]*topic, error) {
  1754. s.mu.RLock()
  1755. defer s.mu.RUnlock()
  1756. patternRegexp, err := regexp.Compile("^" + strings.ReplaceAll(pattern, "*", ".*") + "$")
  1757. if err != nil {
  1758. return nil, err
  1759. }
  1760. topics := make([]*topic, 0)
  1761. for _, t := range s.topics {
  1762. if patternRegexp.MatchString(t.ID) {
  1763. topics = append(topics, t)
  1764. }
  1765. }
  1766. return topics, nil
  1767. }
  1768. func (s *Server) runSMTPServer() error {
  1769. s.smtpServerBackend = newMailBackend(s.config, s.handle)
  1770. s.smtpServer = smtp.NewServer(s.smtpServerBackend)
  1771. s.smtpServer.Addr = s.config.SMTPServerListen
  1772. s.smtpServer.Domain = s.config.SMTPServerDomain
  1773. s.smtpServer.ReadTimeout = 10 * time.Second
  1774. s.smtpServer.WriteTimeout = 10 * time.Second
  1775. s.smtpServer.MaxMessageBytes = 1024 * 1024 // Must be much larger than message size (headers, multipart, etc.)
  1776. s.smtpServer.MaxRecipients = 1
  1777. s.smtpServer.AllowInsecureAuth = true
  1778. return s.smtpServer.ListenAndServe()
  1779. }
  1780. func (s *Server) runManager() {
  1781. for {
  1782. select {
  1783. case <-time.After(s.config.ManagerInterval):
  1784. log.
  1785. Tag(tagManager).
  1786. Timing(s.execManager).
  1787. Debug("Manager finished")
  1788. case <-s.closeChan:
  1789. return
  1790. }
  1791. }
  1792. }
  1793. // runStatsResetter runs once a day (usually midnight UTC) to reset all the visitor's message and
  1794. // email counters. The stats are used to display the counters in the web app, as well as for rate limiting.
  1795. func (s *Server) runStatsResetter() {
  1796. for {
  1797. runAt := util.NextOccurrenceUTC(s.config.VisitorStatsResetTime, time.Now())
  1798. timer := time.NewTimer(time.Until(runAt))
  1799. log.Tag(tagResetter).Debug("Waiting until %v to reset visitor stats", runAt)
  1800. select {
  1801. case <-timer.C:
  1802. log.Tag(tagResetter).Debug("Running stats resetter")
  1803. s.resetStats()
  1804. case <-s.closeChan:
  1805. log.Tag(tagResetter).Debug("Stopping stats resetter")
  1806. timer.Stop()
  1807. return
  1808. }
  1809. }
  1810. }
  1811. func (s *Server) resetStats() {
  1812. log.Info("Resetting all visitor stats (daily task)")
  1813. s.mu.Lock()
  1814. defer s.mu.Unlock() // Includes the database query to avoid races with other processes
  1815. for _, v := range s.visitors {
  1816. v.ResetStats()
  1817. }
  1818. if s.userManager != nil {
  1819. if err := s.userManager.ResetStats(); err != nil {
  1820. log.Tag(tagResetter).Warn("Failed to write to database: %s", err.Error())
  1821. }
  1822. }
  1823. }
  1824. func (s *Server) runFirebaseKeepaliver() {
  1825. if s.firebaseClient == nil {
  1826. return
  1827. }
  1828. v := newVisitor(s.config, s.messageCache, s.userManager, netip.IPv4Unspecified(), nil) // Background process, not a real visitor, uses IP 0.0.0.0
  1829. for {
  1830. select {
  1831. case <-time.After(s.config.FirebaseKeepaliveInterval):
  1832. s.sendToFirebase(v, newKeepaliveMessage(firebaseControlTopic))
  1833. /*
  1834. FIXME: Disable iOS polling entirely for now due to thundering herd problem (see #677)
  1835. To solve this, we'd have to shard the iOS poll topics to spread out the polling evenly.
  1836. Given that it's not really necessary to poll, turning it off for now should not have any impact.
  1837. case <-time.After(s.config.FirebasePollInterval):
  1838. s.sendToFirebase(v, newKeepaliveMessage(firebasePollTopic))
  1839. */
  1840. case <-s.closeChan:
  1841. return
  1842. }
  1843. }
  1844. }
  1845. func (s *Server) runDelayedSender() {
  1846. for {
  1847. select {
  1848. case <-time.After(s.config.DelayedSenderInterval):
  1849. if err := s.sendDelayedMessages(); err != nil {
  1850. log.Tag(tagPublish).Err(err).Warn("Error sending delayed messages")
  1851. }
  1852. case <-s.closeChan:
  1853. return
  1854. }
  1855. }
  1856. }
  1857. func (s *Server) sendDelayedMessages() error {
  1858. messages, err := s.messageCache.MessagesDue()
  1859. if err != nil {
  1860. return err
  1861. }
  1862. for _, m := range messages {
  1863. var u *user.User
  1864. if s.userManager != nil && m.User != "" {
  1865. u, err = s.userManager.UserByID(m.User)
  1866. if err != nil {
  1867. log.With(m).Err(err).Warn("Error sending delayed message")
  1868. continue
  1869. }
  1870. }
  1871. v := s.visitor(m.Sender, u)
  1872. if err := s.sendDelayedMessage(v, m); err != nil {
  1873. logvm(v, m).Err(err).Warn("Error sending delayed message")
  1874. }
  1875. }
  1876. return nil
  1877. }
  1878. func (s *Server) sendDelayedMessage(v *visitor, m *message) error {
  1879. logvm(v, m).Debug("Sending delayed message")
  1880. s.mu.RLock()
  1881. t, ok := s.topics[m.Topic] // If no subscribers, just mark message as published
  1882. s.mu.RUnlock()
  1883. if ok {
  1884. go func() {
  1885. // We do not rate-limit messages here, since we've rate limited them in the PUT/POST handler
  1886. if err := t.Publish(v, m); err != nil {
  1887. logvm(v, m).Err(err).Warn("Unable to publish message")
  1888. }
  1889. }()
  1890. }
  1891. if s.firebaseClient != nil { // Firebase subscribers may not show up in topics map
  1892. go s.sendToFirebase(v, m)
  1893. }
  1894. if s.config.UpstreamBaseURL != "" {
  1895. go s.forwardPollRequest(v, m)
  1896. }
  1897. if s.config.WebPushPublicKey != "" {
  1898. go s.publishToWebPushEndpoints(v, m)
  1899. }
  1900. if err := s.messageCache.MarkPublished(m); err != nil {
  1901. return err
  1902. }
  1903. return nil
  1904. }
  1905. // transformBodyJSON peeks the request body, reads the JSON, and converts it to headers
  1906. // before passing it on to the next handler. This is meant to be used in combination with handlePublish.
  1907. func (s *Server) transformBodyJSON(next handleFunc) handleFunc {
  1908. return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1909. m, err := readJSONWithLimit[publishMessage](r.Body, s.config.MessageSizeLimit*2, false) // 2x to account for JSON format overhead
  1910. if err != nil {
  1911. return err
  1912. }
  1913. if !topicRegex.MatchString(m.Topic) {
  1914. return errHTTPBadRequestTopicInvalid
  1915. }
  1916. if m.Message == "" {
  1917. m.Message = emptyMessageBody
  1918. }
  1919. r.URL.Path = "/" + m.Topic
  1920. r.Body = io.NopCloser(strings.NewReader(m.Message))
  1921. if m.Title != "" {
  1922. r.Header.Set("X-Title", m.Title)
  1923. }
  1924. if m.Priority != 0 {
  1925. r.Header.Set("X-Priority", fmt.Sprintf("%d", m.Priority))
  1926. }
  1927. if len(m.Tags) > 0 {
  1928. r.Header.Set("X-Tags", strings.Join(m.Tags, ","))
  1929. }
  1930. if m.Attach != "" {
  1931. r.Header.Set("X-Attach", m.Attach)
  1932. }
  1933. if m.Filename != "" {
  1934. r.Header.Set("X-Filename", m.Filename)
  1935. }
  1936. if m.Click != "" {
  1937. r.Header.Set("X-Click", m.Click)
  1938. }
  1939. if m.Icon != "" {
  1940. r.Header.Set("X-Icon", m.Icon)
  1941. }
  1942. if m.Markdown {
  1943. r.Header.Set("X-Markdown", "yes")
  1944. }
  1945. if len(m.Actions) > 0 {
  1946. actionsStr, err := json.Marshal(m.Actions)
  1947. if err != nil {
  1948. return errHTTPBadRequestMessageJSONInvalid
  1949. }
  1950. r.Header.Set("X-Actions", string(actionsStr))
  1951. }
  1952. if m.Email != "" {
  1953. r.Header.Set("X-Email", m.Email)
  1954. }
  1955. if m.Delay != "" {
  1956. r.Header.Set("X-Delay", m.Delay)
  1957. }
  1958. if m.Call != "" {
  1959. r.Header.Set("X-Call", m.Call)
  1960. }
  1961. if m.Cache != "" {
  1962. r.Header.Set("X-Cache", m.Cache)
  1963. }
  1964. if m.Firebase != "" {
  1965. r.Header.Set("X-Firebase", m.Firebase)
  1966. }
  1967. if m.SequenceID != "" {
  1968. r.Header.Set("X-Sequence-ID", m.SequenceID)
  1969. }
  1970. return next(w, r, v)
  1971. }
  1972. }
  1973. func (s *Server) transformMatrixJSON(next handleFunc) handleFunc {
  1974. return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1975. newRequest, err := newRequestFromMatrixJSON(r, s.config.BaseURL, s.config.MessageSizeLimit)
  1976. if err != nil {
  1977. logvr(v, r).Tag(tagMatrix).Err(err).Debug("Invalid Matrix request")
  1978. if e, ok := err.(*errMatrixPushkeyRejected); ok {
  1979. return writeMatrixResponse(w, e.rejectedPushKey)
  1980. }
  1981. return err
  1982. }
  1983. if err := next(w, newRequest, v); err != nil {
  1984. logvr(v, r).Tag(tagMatrix).Err(err).Debug("Error handling Matrix request")
  1985. return err
  1986. }
  1987. return nil
  1988. }
  1989. }
  1990. func (s *Server) authorizeTopicWrite(next handleFunc) handleFunc {
  1991. return s.authorizeTopic(next, user.PermissionWrite)
  1992. }
  1993. func (s *Server) authorizeTopicRead(next handleFunc) handleFunc {
  1994. return s.authorizeTopic(next, user.PermissionRead)
  1995. }
  1996. func (s *Server) authorizeTopic(next handleFunc, perm user.Permission) handleFunc {
  1997. return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1998. if s.userManager == nil {
  1999. return next(w, r, v)
  2000. }
  2001. topics, _, err := s.topicsFromPath(r.URL.Path)
  2002. if err != nil {
  2003. return err
  2004. }
  2005. u := v.User()
  2006. for _, t := range topics {
  2007. if err := s.userManager.Authorize(u, t.ID, perm); err != nil {
  2008. logvr(v, r).With(t).Err(err).Debug("Access to topic %s not authorized", t.ID)
  2009. return errHTTPForbidden.With(t)
  2010. }
  2011. }
  2012. return next(w, r, v)
  2013. }
  2014. }
  2015. // maybeAuthenticate reads the "Authorization" header and will try to authenticate the user
  2016. // if it is set.
  2017. //
  2018. // - If auth-file is not configured, immediately return an IP-based visitor
  2019. // - If the header is not set or not supported (anything non-Basic and non-Bearer),
  2020. // an IP-based visitor is returned
  2021. // - If the header is set, authenticate will be called to check the username/password (Basic auth),
  2022. // or the token (Bearer auth), and read the user from the database
  2023. //
  2024. // This function will ALWAYS return a visitor, even if an error occurs (e.g. unauthorized), so
  2025. // that subsequent logging calls still have a visitor context.
  2026. func (s *Server) maybeAuthenticate(r *http.Request) (*visitor, error) {
  2027. // Read the "Authorization" header value and exit out early if it's not set
  2028. ip := extractIPAddress(r, s.config.BehindProxy, s.config.ProxyForwardedHeader, s.config.ProxyTrustedPrefixes)
  2029. vip := s.visitor(ip, nil)
  2030. if s.userManager == nil {
  2031. return vip, nil
  2032. }
  2033. header, err := readAuthHeader(r)
  2034. if err != nil {
  2035. return vip, err
  2036. } else if !supportedAuthHeader(header) {
  2037. return vip, nil
  2038. }
  2039. // If we're trying to auth, check the rate limiter first
  2040. if !vip.AuthAllowed() {
  2041. return vip, errHTTPTooManyRequestsLimitAuthFailure // Always return visitor, even when error occurs!
  2042. }
  2043. u, err := s.authenticate(r, header)
  2044. if err != nil {
  2045. vip.AuthFailed()
  2046. logr(r).Err(err).Debug("Authentication failed")
  2047. return vip, errHTTPUnauthorized // Always return visitor, even when error occurs!
  2048. }
  2049. // Authentication with user was successful
  2050. return s.visitor(ip, u), nil
  2051. }
  2052. // authenticate a user based on basic auth username/password (Authorization: Basic ...), or token auth (Authorization: Bearer ...).
  2053. // The Authorization header can be passed as a header or the ?auth=... query param. The latter is required only to
  2054. // support the WebSocket JavaScript class, which does not support passing headers during the initial request. The auth
  2055. // query param is effectively doubly base64 encoded. Its format is base64(Basic base64(user:pass)).
  2056. func (s *Server) authenticate(r *http.Request, header string) (user *user.User, err error) {
  2057. if strings.HasPrefix(header, "Bearer") {
  2058. return s.authenticateBearerAuth(r, strings.TrimSpace(strings.TrimPrefix(header, "Bearer")))
  2059. }
  2060. return s.authenticateBasicAuth(r, header)
  2061. }
  2062. // readAuthHeader reads the raw value of the Authorization header, either from the actual HTTP header,
  2063. // or from the ?auth... query parameter
  2064. func readAuthHeader(r *http.Request) (string, error) {
  2065. value := strings.TrimSpace(r.Header.Get("Authorization"))
  2066. queryParam := readQueryParam(r, "authorization", "auth")
  2067. if queryParam != "" {
  2068. a, err := base64.RawURLEncoding.DecodeString(queryParam)
  2069. if err != nil {
  2070. return "", err
  2071. }
  2072. value = strings.TrimSpace(string(a))
  2073. }
  2074. return value, nil
  2075. }
  2076. // supportedAuthHeader returns true only if the Authorization header value starts
  2077. // with "Basic" or "Bearer". In particular, an empty value is not supported, and neither
  2078. // are things like "WebPush", or "vapid" (see #629).
  2079. func supportedAuthHeader(value string) bool {
  2080. value = strings.ToLower(value)
  2081. return strings.HasPrefix(value, "basic ") || strings.HasPrefix(value, "bearer ")
  2082. }
  2083. func (s *Server) authenticateBasicAuth(r *http.Request, value string) (user *user.User, err error) {
  2084. r.Header.Set("Authorization", value)
  2085. username, password, ok := r.BasicAuth()
  2086. if !ok {
  2087. return nil, errors.New("invalid basic auth")
  2088. } else if username == "" {
  2089. return s.authenticateBearerAuth(r, password) // Treat password as token
  2090. }
  2091. return s.userManager.Authenticate(username, password)
  2092. }
  2093. func (s *Server) authenticateBearerAuth(r *http.Request, token string) (*user.User, error) {
  2094. u, err := s.userManager.AuthenticateToken(token)
  2095. if err != nil {
  2096. return nil, err
  2097. }
  2098. ip := extractIPAddress(r, s.config.BehindProxy, s.config.ProxyForwardedHeader, s.config.ProxyTrustedPrefixes)
  2099. go s.userManager.EnqueueTokenUpdate(token, &user.TokenUpdate{
  2100. LastAccess: time.Now(),
  2101. LastOrigin: ip,
  2102. })
  2103. return u, nil
  2104. }
  2105. func (s *Server) visitor(ip netip.Addr, user *user.User) *visitor {
  2106. s.mu.Lock()
  2107. defer s.mu.Unlock()
  2108. id := visitorID(ip, user, s.config)
  2109. v, exists := s.visitors[id]
  2110. if !exists {
  2111. s.visitors[id] = newVisitor(s.config, s.messageCache, s.userManager, ip, user)
  2112. return s.visitors[id]
  2113. }
  2114. v.Keepalive()
  2115. v.SetUser(user) // Always update with the latest user, may be nil!
  2116. return v
  2117. }
  2118. func (s *Server) writeJSON(w http.ResponseWriter, v any) error {
  2119. return s.writeJSONWithContentType(w, v, "application/json")
  2120. }
  2121. func (s *Server) writeJSONWithContentType(w http.ResponseWriter, v any, contentType string) error {
  2122. w.Header().Set("Content-Type", contentType)
  2123. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  2124. if err := json.NewEncoder(w).Encode(v); err != nil {
  2125. return err
  2126. }
  2127. return nil
  2128. }
  2129. func (s *Server) updateAndWriteStats(messagesCount int64) {
  2130. s.mu.Lock()
  2131. s.messagesHistory = append(s.messagesHistory, messagesCount)
  2132. if len(s.messagesHistory) > messagesHistoryMax {
  2133. s.messagesHistory = s.messagesHistory[1:]
  2134. }
  2135. s.mu.Unlock()
  2136. go func() {
  2137. if err := s.messageCache.UpdateStats(messagesCount); err != nil {
  2138. log.Tag(tagManager).Err(err).Warn("Cannot write messages stats")
  2139. }
  2140. }()
  2141. }