server.go 85 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301
  1. package server
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/sha256"
  6. "embed"
  7. "encoding/base64"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "io"
  12. "net"
  13. "net/http"
  14. "net/http/pprof"
  15. "net/netip"
  16. "net/url"
  17. "os"
  18. "path"
  19. "path/filepath"
  20. "regexp"
  21. "sort"
  22. "strconv"
  23. "strings"
  24. "sync"
  25. "text/template"
  26. "time"
  27. "unicode/utf8"
  28. "github.com/emersion/go-smtp"
  29. "github.com/gorilla/websocket"
  30. "github.com/prometheus/client_golang/prometheus/promhttp"
  31. "golang.org/x/sync/errgroup"
  32. "gopkg.in/yaml.v2"
  33. "heckel.io/ntfy/v2/log"
  34. "heckel.io/ntfy/v2/payments"
  35. "heckel.io/ntfy/v2/user"
  36. "heckel.io/ntfy/v2/util"
  37. "heckel.io/ntfy/v2/util/sprig"
  38. )
  39. // Server is the main server, providing the UI and API for ntfy
  40. type Server struct {
  41. config *Config
  42. httpServer *http.Server
  43. httpsServer *http.Server
  44. httpMetricsServer *http.Server
  45. httpProfileServer *http.Server
  46. unixListener net.Listener
  47. smtpServer *smtp.Server
  48. smtpServerBackend *smtpBackend
  49. smtpSender mailer
  50. topics map[string]*topic
  51. visitors map[string]*visitor // ip:<ip> or user:<user>
  52. firebaseClient *firebaseClient
  53. messages int64 // Total number of messages (persisted if messageCache enabled)
  54. messagesHistory []int64 // Last n values of the messages counter, used to determine rate
  55. userManager *user.Manager // Might be nil!
  56. messageCache *messageCache // Database that stores the messages
  57. webPush *webPushStore // Database that stores web push subscriptions
  58. fileCache *fileCache // File system based cache that stores attachments
  59. stripe stripeAPI // Stripe API, can be replaced with a mock
  60. priceCache *util.LookupCache[map[string]int64] // Stripe price ID -> price as cents (USD implied!)
  61. metricsHandler http.Handler // Handles /metrics if enable-metrics set, and listen-metrics-http not set
  62. closeChan chan bool
  63. mu sync.RWMutex
  64. }
  65. // handleFunc extends the normal http.HandlerFunc to be able to easily return errors
  66. type handleFunc func(http.ResponseWriter, *http.Request, *visitor) error
  67. var (
  68. // If changed, don't forget to update Android App and auth_sqlite.go
  69. topicRegex = regexp.MustCompile(`^[-_A-Za-z0-9]{1,64}$`) // No /!
  70. topicPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app!
  71. externalTopicPathRegex = regexp.MustCompile(`^/[^/]+\.[^/]+/[-_A-Za-z0-9]{1,64}$`) // Extended topic path, for web-app, e.g. /example.com/mytopic
  72. jsonPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/json$`)
  73. ssePathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/sse$`)
  74. rawPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/raw$`)
  75. wsPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/ws$`)
  76. authPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/auth$`)
  77. publishPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}/(publish|send|trigger)$`)
  78. updatePathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}/[-_A-Za-z0-9]{1,64}$`)
  79. clearPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}/[-_A-Za-z0-9]{1,64}/(read|clear)$`)
  80. sequenceIDRegex = topicRegex
  81. webConfigPath = "/config.js"
  82. webManifestPath = "/manifest.webmanifest"
  83. accountPath = "/account"
  84. matrixPushPath = "/_matrix/push/v1/notify"
  85. metricsPath = "/metrics"
  86. apiHealthPath = "/v1/health"
  87. apiVersionPath = "/v1/version"
  88. apiConfigPath = "/v1/config"
  89. apiStatsPath = "/v1/stats"
  90. apiWebPushPath = "/v1/webpush"
  91. apiTiersPath = "/v1/tiers"
  92. apiUsersPath = "/v1/users"
  93. apiUsersAccessPath = "/v1/users/access"
  94. apiAccountPath = "/v1/account"
  95. apiAccountTokenPath = "/v1/account/token"
  96. apiAccountPasswordPath = "/v1/account/password"
  97. apiAccountSettingsPath = "/v1/account/settings"
  98. apiAccountSubscriptionPath = "/v1/account/subscription"
  99. apiAccountReservationPath = "/v1/account/reservation"
  100. apiAccountPhonePath = "/v1/account/phone"
  101. apiAccountPhoneVerifyPath = "/v1/account/phone/verify"
  102. apiAccountBillingPortalPath = "/v1/account/billing/portal"
  103. apiAccountBillingWebhookPath = "/v1/account/billing/webhook"
  104. apiAccountBillingSubscriptionPath = "/v1/account/billing/subscription"
  105. apiAccountBillingSubscriptionCheckoutSuccessTemplate = "/v1/account/billing/subscription/success/{CHECKOUT_SESSION_ID}"
  106. apiAccountBillingSubscriptionCheckoutSuccessRegex = regexp.MustCompile(`/v1/account/billing/subscription/success/(.+)$`)
  107. apiAccountReservationSingleRegex = regexp.MustCompile(`/v1/account/reservation/([-_A-Za-z0-9]{1,64})$`)
  108. staticRegex = regexp.MustCompile(`^/(static/.+|app.html|sw.js|sw.js.map)$`)
  109. docsRegex = regexp.MustCompile(`^/docs(|/.*)$`)
  110. fileRegex = regexp.MustCompile(`^/file/([-_A-Za-z0-9]{1,64})(?:\.[A-Za-z0-9]{1,16})?$`)
  111. urlRegex = regexp.MustCompile(`^https?://`)
  112. phoneNumberRegex = regexp.MustCompile(`^\+\d{1,100}$`)
  113. //go:embed site
  114. webFs embed.FS
  115. webFsCached = &util.CachingEmbedFS{ModTime: time.Now(), FS: webFs}
  116. webSiteDir = "/site"
  117. webAppIndex = "/app.html" // React app
  118. //go:embed docs
  119. docsStaticFs embed.FS
  120. docsStaticCached = &util.CachingEmbedFS{ModTime: time.Now(), FS: docsStaticFs}
  121. //go:embed templates
  122. templatesFs embed.FS // Contains template config files (e.g. grafana.yml, github.yml, ...)
  123. templatesDir = "templates"
  124. // templateDisallowedRegex tests a template for disallowed expressions. While not really dangerous, they
  125. // are not useful, and seem potentially troublesome.
  126. templateDisallowedRegex = regexp.MustCompile(`(?m)\{\{-?\s*(call|template|define)\b`)
  127. templateNameRegex = regexp.MustCompile(`^[-_A-Za-z0-9]+$`)
  128. )
  129. const (
  130. firebaseControlTopic = "~control" // See Android if changed
  131. firebasePollTopic = "~poll" // See iOS if changed (DISABLED for now)
  132. emptyMessageBody = "triggered" // Used when a message body is empty
  133. newMessageBody = "New message" // Used in poll requests as generic message
  134. defaultAttachmentMessage = "You received a file: %s" // Used if message body is empty, and there is an attachment
  135. encodingBase64 = "base64" // Used mainly for binary UnifiedPush messages
  136. jsonBodyBytesLimit = 131072 // Max number of bytes for a request bodys (unless MessageLimit is higher)
  137. unifiedPushTopicPrefix = "up" // Temporarily, we rate limit all "up*" topics based on the subscriber
  138. unifiedPushTopicLength = 14 // Length of UnifiedPush topics, including the "up" part
  139. messagesHistoryMax = 10 // Number of message count values to keep in memory
  140. templateMaxExecutionTime = 100 * time.Millisecond // Maximum time a template can take to execute, used to prevent DoS attacks
  141. templateMaxOutputBytes = 1024 * 1024 // Maximum number of bytes a template can output, used to prevent DoS attacks
  142. templateFileExtension = ".yml" // Template files must end with this extension
  143. )
  144. // WebSocket constants
  145. const (
  146. wsWriteWait = 2 * time.Second
  147. wsBufferSize = 1024
  148. wsReadLimit = 64 // We only ever receive PINGs
  149. wsPongWait = 15 * time.Second
  150. )
  151. // New instantiates a new Server. It creates the cache and adds a Firebase
  152. // subscriber (if configured).
  153. func New(conf *Config) (*Server, error) {
  154. var mailer mailer
  155. if conf.SMTPSenderAddr != "" {
  156. mailer = &smtpSender{config: conf}
  157. }
  158. var stripe stripeAPI
  159. if payments.Available && conf.StripeSecretKey != "" {
  160. stripe = newStripeAPI()
  161. }
  162. messageCache, err := createMessageCache(conf)
  163. if err != nil {
  164. return nil, err
  165. }
  166. var webPush *webPushStore
  167. if conf.WebPushPublicKey != "" {
  168. webPush, err = newWebPushStore(conf.WebPushFile, conf.WebPushStartupQueries)
  169. if err != nil {
  170. return nil, err
  171. }
  172. }
  173. topics, err := messageCache.Topics()
  174. if err != nil {
  175. return nil, err
  176. }
  177. messages, err := messageCache.Stats()
  178. if err != nil {
  179. return nil, err
  180. }
  181. var fileCache *fileCache
  182. if conf.AttachmentCacheDir != "" {
  183. fileCache, err = newFileCache(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit)
  184. if err != nil {
  185. return nil, err
  186. }
  187. }
  188. var userManager *user.Manager
  189. if conf.AuthFile != "" {
  190. authConfig := &user.Config{
  191. Filename: conf.AuthFile,
  192. StartupQueries: conf.AuthStartupQueries,
  193. DefaultAccess: conf.AuthDefault,
  194. ProvisionEnabled: true, // Enable provisioning of users and access
  195. Users: conf.AuthUsers,
  196. Access: conf.AuthAccess,
  197. Tokens: conf.AuthTokens,
  198. BcryptCost: conf.AuthBcryptCost,
  199. QueueWriterInterval: conf.AuthStatsQueueWriterInterval,
  200. }
  201. userManager, err = user.NewManager(authConfig)
  202. if err != nil {
  203. return nil, err
  204. }
  205. }
  206. var firebaseClient *firebaseClient
  207. if conf.FirebaseKeyFile != "" {
  208. sender, err := newFirebaseSender(conf.FirebaseKeyFile)
  209. if err != nil {
  210. return nil, err
  211. }
  212. // This awkward logic is required because Go is weird about nil types and interfaces.
  213. // See issue #641, and https://go.dev/play/p/uur1flrv1t3 for an example
  214. var auther user.Auther
  215. if userManager != nil {
  216. auther = userManager
  217. }
  218. firebaseClient = newFirebaseClient(sender, auther)
  219. }
  220. s := &Server{
  221. config: conf,
  222. messageCache: messageCache,
  223. webPush: webPush,
  224. fileCache: fileCache,
  225. firebaseClient: firebaseClient,
  226. smtpSender: mailer,
  227. topics: topics,
  228. userManager: userManager,
  229. messages: messages,
  230. messagesHistory: []int64{messages},
  231. visitors: make(map[string]*visitor),
  232. stripe: stripe,
  233. }
  234. s.priceCache = util.NewLookupCache(s.fetchStripePrices, conf.StripePriceCacheDuration)
  235. return s, nil
  236. }
  237. func createMessageCache(conf *Config) (*messageCache, error) {
  238. if conf.CacheDuration == 0 {
  239. return newNopCache()
  240. } else if conf.CacheFile != "" {
  241. return newSqliteCache(conf.CacheFile, conf.CacheStartupQueries, conf.CacheDuration, conf.CacheBatchSize, conf.CacheBatchTimeout, false)
  242. }
  243. return newMemCache()
  244. }
  245. // Run executes the main server. It listens on HTTP (+ HTTPS, if configured), and starts
  246. // a manager go routine to print stats and prune messages.
  247. func (s *Server) Run() error {
  248. var listenStr string
  249. if s.config.ListenHTTP != "" {
  250. listenStr += fmt.Sprintf(" %s[http]", s.config.ListenHTTP)
  251. }
  252. if s.config.ListenHTTPS != "" {
  253. listenStr += fmt.Sprintf(" %s[https]", s.config.ListenHTTPS)
  254. }
  255. if s.config.ListenUnix != "" {
  256. listenStr += fmt.Sprintf(" %s[unix]", s.config.ListenUnix)
  257. }
  258. if s.config.SMTPServerListen != "" {
  259. listenStr += fmt.Sprintf(" %s[smtp]", s.config.SMTPServerListen)
  260. }
  261. if s.config.MetricsListenHTTP != "" {
  262. listenStr += fmt.Sprintf(" %s[http/metrics]", s.config.MetricsListenHTTP)
  263. }
  264. if s.config.ProfileListenHTTP != "" {
  265. listenStr += fmt.Sprintf(" %s[http/profile]", s.config.ProfileListenHTTP)
  266. }
  267. log.Tag(tagStartup).Info("Listening on%s, ntfy %s, log level is %s", listenStr, s.config.BuildVersion, log.CurrentLevel().String())
  268. if log.IsFile() {
  269. fmt.Fprintf(os.Stderr, "Listening on%s, ntfy %s\n", listenStr, s.config.BuildVersion)
  270. fmt.Fprintf(os.Stderr, "Logs are written to %s\n", log.File())
  271. }
  272. mux := http.NewServeMux()
  273. mux.HandleFunc("/", s.handle)
  274. errChan := make(chan error)
  275. s.mu.Lock()
  276. s.closeChan = make(chan bool)
  277. if s.config.ListenHTTP != "" {
  278. s.httpServer = &http.Server{Addr: s.config.ListenHTTP, Handler: mux}
  279. go func() {
  280. errChan <- s.httpServer.ListenAndServe()
  281. }()
  282. }
  283. if s.config.ListenHTTPS != "" {
  284. s.httpsServer = &http.Server{Addr: s.config.ListenHTTPS, Handler: mux}
  285. go func() {
  286. errChan <- s.httpsServer.ListenAndServeTLS(s.config.CertFile, s.config.KeyFile)
  287. }()
  288. }
  289. if s.config.ListenUnix != "" {
  290. go func() {
  291. var err error
  292. s.mu.Lock()
  293. os.Remove(s.config.ListenUnix)
  294. s.unixListener, err = net.Listen("unix", s.config.ListenUnix)
  295. if err != nil {
  296. s.mu.Unlock()
  297. errChan <- err
  298. return
  299. }
  300. defer s.unixListener.Close()
  301. if s.config.ListenUnixMode > 0 {
  302. if err := os.Chmod(s.config.ListenUnix, s.config.ListenUnixMode); err != nil {
  303. s.mu.Unlock()
  304. errChan <- err
  305. return
  306. }
  307. }
  308. s.mu.Unlock()
  309. httpServer := &http.Server{Handler: mux}
  310. errChan <- httpServer.Serve(s.unixListener)
  311. }()
  312. }
  313. if s.config.MetricsListenHTTP != "" {
  314. initMetrics()
  315. s.httpMetricsServer = &http.Server{Addr: s.config.MetricsListenHTTP, Handler: promhttp.Handler()}
  316. go func() {
  317. errChan <- s.httpMetricsServer.ListenAndServe()
  318. }()
  319. } else if s.config.EnableMetrics {
  320. initMetrics()
  321. s.metricsHandler = promhttp.Handler()
  322. }
  323. if s.config.ProfileListenHTTP != "" {
  324. profileMux := http.NewServeMux()
  325. profileMux.HandleFunc("/debug/pprof/", pprof.Index)
  326. profileMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
  327. profileMux.HandleFunc("/debug/pprof/profile", pprof.Profile)
  328. profileMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
  329. profileMux.HandleFunc("/debug/pprof/trace", pprof.Trace)
  330. s.httpProfileServer = &http.Server{Addr: s.config.ProfileListenHTTP, Handler: profileMux}
  331. go func() {
  332. errChan <- s.httpProfileServer.ListenAndServe()
  333. }()
  334. }
  335. if s.config.SMTPServerListen != "" {
  336. go func() {
  337. errChan <- s.runSMTPServer()
  338. }()
  339. }
  340. s.mu.Unlock()
  341. go s.runManager()
  342. go s.runStatsResetter()
  343. go s.runDelayedSender()
  344. go s.runFirebaseKeepaliver()
  345. return <-errChan
  346. }
  347. // Stop stops HTTP (+HTTPS) server and all managers
  348. func (s *Server) Stop() {
  349. s.mu.Lock()
  350. defer s.mu.Unlock()
  351. if s.httpServer != nil {
  352. s.httpServer.Close()
  353. }
  354. if s.httpsServer != nil {
  355. s.httpsServer.Close()
  356. }
  357. if s.unixListener != nil {
  358. s.unixListener.Close()
  359. }
  360. if s.smtpServer != nil {
  361. s.smtpServer.Close()
  362. }
  363. s.closeDatabases()
  364. close(s.closeChan)
  365. }
  366. func (s *Server) closeDatabases() {
  367. if s.userManager != nil {
  368. s.userManager.Close()
  369. }
  370. s.messageCache.Close()
  371. if s.webPush != nil {
  372. s.webPush.Close()
  373. }
  374. }
  375. // handle is the main entry point for all HTTP requests
  376. func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
  377. v, err := s.maybeAuthenticate(r) // Note: Always returns v, even when error is returned
  378. if err != nil {
  379. s.handleError(w, r, v, err)
  380. return
  381. }
  382. ev := logvr(v, r)
  383. if ev.IsTrace() {
  384. ev.Field("http_request", renderHTTPRequest(r)).Trace("HTTP request started")
  385. } else if logvr(v, r).IsDebug() {
  386. ev.Debug("HTTP request started")
  387. }
  388. logvr(v, r).
  389. Timing(func() {
  390. if err := s.handleInternal(w, r, v); err != nil {
  391. s.handleError(w, r, v, err)
  392. return
  393. }
  394. if metricHTTPRequests != nil {
  395. metricHTTPRequests.WithLabelValues("200", "20000", r.Method).Inc()
  396. }
  397. }).
  398. Debug("HTTP request finished")
  399. }
  400. func (s *Server) handleError(w http.ResponseWriter, r *http.Request, v *visitor, err error) {
  401. httpErr, ok := err.(*errHTTP)
  402. if !ok {
  403. httpErr = errHTTPInternalError
  404. }
  405. if metricHTTPRequests != nil {
  406. metricHTTPRequests.WithLabelValues(fmt.Sprintf("%d", httpErr.HTTPCode), fmt.Sprintf("%d", httpErr.Code), r.Method).Inc()
  407. }
  408. isRateLimiting := util.Contains(rateLimitingErrorCodes, httpErr.HTTPCode)
  409. isNormalError := strings.Contains(err.Error(), "i/o timeout") || util.Contains(normalErrorCodes, httpErr.HTTPCode)
  410. ev := logvr(v, r).Err(err)
  411. if websocket.IsWebSocketUpgrade(r) {
  412. ev.Tag(tagWebsocket).Fields(websocketErrorContext(err))
  413. if isNormalError {
  414. ev.Debug("WebSocket error (this error is okay, it happens a lot): %s", err.Error())
  415. } else {
  416. ev.Info("WebSocket error: %s", err.Error())
  417. }
  418. // Write error response only if the connection was not hijacked yet. Bytes written to hijacked
  419. // connections are WebSocket frames, not HTTP, and will cause "http: response.WriteHeader on hijacked
  420. // connection" log spam.
  421. var postUpgradeErr *errWebSocketPostUpgrade
  422. if !errors.As(err, &postUpgradeErr) {
  423. w.WriteHeader(httpErr.HTTPCode)
  424. }
  425. return
  426. }
  427. if isNormalError {
  428. ev.Debug("Connection closed with HTTP %d (ntfy error %d)", httpErr.HTTPCode, httpErr.Code)
  429. } else {
  430. ev.Info("Connection closed with HTTP %d (ntfy error %d)", httpErr.HTTPCode, httpErr.Code)
  431. }
  432. if isRateLimiting && s.config.StripeSecretKey != "" {
  433. u := v.User()
  434. if u == nil || u.Tier == nil {
  435. httpErr = httpErr.Wrap("increase your limits with a paid plan, see %s", s.config.BaseURL)
  436. }
  437. }
  438. w.Header().Set("Content-Type", "application/json")
  439. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  440. w.WriteHeader(httpErr.HTTPCode)
  441. io.WriteString(w, httpErr.JSON()+"\n")
  442. }
  443. func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request, v *visitor) error {
  444. if r.Method == http.MethodGet && r.URL.Path == "/" && s.config.WebRoot == "/" {
  445. return s.ensureWebEnabled(s.handleRoot)(w, r, v)
  446. } else if r.Method == http.MethodHead && r.URL.Path == "/" {
  447. return s.ensureWebEnabled(s.handleEmpty)(w, r, v)
  448. } else if r.Method == http.MethodGet && r.URL.Path == apiHealthPath {
  449. return s.handleHealth(w, r, v)
  450. } else if r.Method == http.MethodGet && r.URL.Path == apiVersionPath {
  451. return s.ensureAdmin(s.handleVersion)(w, r, v)
  452. } else if r.Method == http.MethodGet && r.URL.Path == apiConfigPath {
  453. return s.handleConfig(w, r, v)
  454. } else if r.Method == http.MethodGet && r.URL.Path == webConfigPath {
  455. return s.ensureWebEnabled(s.handleWebConfig)(w, r, v)
  456. } else if r.Method == http.MethodGet && r.URL.Path == webManifestPath {
  457. return s.ensureWebPushEnabled(s.handleWebManifest)(w, r, v)
  458. } else if r.Method == http.MethodGet && r.URL.Path == apiUsersPath {
  459. return s.ensureAdmin(s.handleUsersGet)(w, r, v)
  460. } else if r.Method == http.MethodPost && r.URL.Path == apiUsersPath {
  461. return s.ensureAdmin(s.handleUsersAdd)(w, r, v)
  462. } else if r.Method == http.MethodPut && r.URL.Path == apiUsersPath {
  463. return s.ensureAdmin(s.handleUsersUpdate)(w, r, v)
  464. } else if r.Method == http.MethodDelete && r.URL.Path == apiUsersPath {
  465. return s.ensureAdmin(s.handleUsersDelete)(w, r, v)
  466. } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && r.URL.Path == apiUsersAccessPath {
  467. return s.ensureAdmin(s.handleAccessAllow)(w, r, v)
  468. } else if r.Method == http.MethodDelete && r.URL.Path == apiUsersAccessPath {
  469. return s.ensureAdmin(s.handleAccessReset)(w, r, v)
  470. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountPath {
  471. return s.ensureUserManager(s.handleAccountCreate)(w, r, v)
  472. } else if r.Method == http.MethodGet && r.URL.Path == apiAccountPath {
  473. return s.handleAccountGet(w, r, v) // Allowed by anonymous
  474. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountPath {
  475. return s.ensureUser(s.withAccountSync(s.handleAccountDelete))(w, r, v)
  476. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountPasswordPath {
  477. return s.ensureUser(s.handleAccountPasswordChange)(w, r, v)
  478. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountTokenPath {
  479. return s.ensureUser(s.withAccountSync(s.handleAccountTokenCreate))(w, r, v)
  480. } else if r.Method == http.MethodPatch && r.URL.Path == apiAccountTokenPath {
  481. return s.ensureUser(s.withAccountSync(s.handleAccountTokenUpdate))(w, r, v)
  482. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountTokenPath {
  483. return s.ensureUser(s.withAccountSync(s.handleAccountTokenDelete))(w, r, v)
  484. } else if r.Method == http.MethodPatch && r.URL.Path == apiAccountSettingsPath {
  485. return s.ensureUser(s.withAccountSync(s.handleAccountSettingsChange))(w, r, v)
  486. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountSubscriptionPath {
  487. return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionAdd))(w, r, v)
  488. } else if r.Method == http.MethodPatch && r.URL.Path == apiAccountSubscriptionPath {
  489. return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionChange))(w, r, v)
  490. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountSubscriptionPath {
  491. return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionDelete))(w, r, v)
  492. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountReservationPath {
  493. return s.ensureUser(s.withAccountSync(s.handleAccountReservationAdd))(w, r, v)
  494. } else if r.Method == http.MethodDelete && apiAccountReservationSingleRegex.MatchString(r.URL.Path) {
  495. return s.ensureUser(s.withAccountSync(s.handleAccountReservationDelete))(w, r, v)
  496. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingSubscriptionPath {
  497. return s.ensurePaymentsEnabled(s.ensureUser(s.handleAccountBillingSubscriptionCreate))(w, r, v) // Account sync via incoming Stripe webhook
  498. } else if r.Method == http.MethodGet && apiAccountBillingSubscriptionCheckoutSuccessRegex.MatchString(r.URL.Path) {
  499. return s.ensurePaymentsEnabled(s.ensureUserManager(s.handleAccountBillingSubscriptionCreateSuccess))(w, r, v) // No user context!
  500. } else if r.Method == http.MethodPut && r.URL.Path == apiAccountBillingSubscriptionPath {
  501. return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingSubscriptionUpdate))(w, r, v) // Account sync via incoming Stripe webhook
  502. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountBillingSubscriptionPath {
  503. return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingSubscriptionDelete))(w, r, v) // Account sync via incoming Stripe webhook
  504. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingPortalPath {
  505. return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingPortalSessionCreate))(w, r, v)
  506. } else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingWebhookPath {
  507. return s.ensurePaymentsEnabled(s.ensureUserManager(s.handleAccountBillingWebhook))(w, r, v) // This request comes from Stripe!
  508. } else if r.Method == http.MethodPut && r.URL.Path == apiAccountPhoneVerifyPath {
  509. return s.ensureUser(s.ensureCallsEnabled(s.withAccountSync(s.handleAccountPhoneNumberVerify)))(w, r, v)
  510. } else if r.Method == http.MethodPut && r.URL.Path == apiAccountPhonePath {
  511. return s.ensureUser(s.ensureCallsEnabled(s.withAccountSync(s.handleAccountPhoneNumberAdd)))(w, r, v)
  512. } else if r.Method == http.MethodDelete && r.URL.Path == apiAccountPhonePath {
  513. return s.ensureUser(s.ensureCallsEnabled(s.withAccountSync(s.handleAccountPhoneNumberDelete)))(w, r, v)
  514. } else if r.Method == http.MethodPost && apiWebPushPath == r.URL.Path {
  515. return s.ensureWebPushEnabled(s.limitRequests(s.handleWebPushUpdate))(w, r, v)
  516. } else if r.Method == http.MethodDelete && apiWebPushPath == r.URL.Path {
  517. return s.ensureWebPushEnabled(s.limitRequests(s.handleWebPushDelete))(w, r, v)
  518. } else if r.Method == http.MethodGet && r.URL.Path == apiStatsPath {
  519. return s.handleStats(w, r, v)
  520. } else if r.Method == http.MethodGet && r.URL.Path == apiTiersPath {
  521. return s.ensurePaymentsEnabled(s.handleBillingTiersGet)(w, r, v)
  522. } else if r.Method == http.MethodGet && r.URL.Path == matrixPushPath {
  523. return s.handleMatrixDiscovery(w)
  524. } else if r.Method == http.MethodGet && r.URL.Path == metricsPath && s.metricsHandler != nil {
  525. return s.handleMetrics(w, r, v)
  526. } else if r.Method == http.MethodGet && staticRegex.MatchString(r.URL.Path) {
  527. return s.ensureWebEnabled(s.handleStatic)(w, r, v)
  528. } else if r.Method == http.MethodGet && docsRegex.MatchString(r.URL.Path) {
  529. return s.ensureWebEnabled(s.handleDocs)(w, r, v)
  530. } else if (r.Method == http.MethodGet || r.Method == http.MethodHead) && fileRegex.MatchString(r.URL.Path) && s.config.AttachmentCacheDir != "" {
  531. return s.limitRequests(s.handleFile)(w, r, v)
  532. } else if r.Method == http.MethodOptions {
  533. return s.limitRequests(s.handleOptions)(w, r, v) // Should work even if the web app is not enabled, see #598
  534. } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && r.URL.Path == "/" {
  535. return s.transformBodyJSON(s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish)))(w, r, v)
  536. } else if r.Method == http.MethodPost && r.URL.Path == matrixPushPath {
  537. return s.transformMatrixJSON(s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublishMatrix)))(w, r, v)
  538. } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && (topicPathRegex.MatchString(r.URL.Path) || updatePathRegex.MatchString(r.URL.Path)) {
  539. return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish))(w, r, v)
  540. } else if r.Method == http.MethodDelete && updatePathRegex.MatchString(r.URL.Path) {
  541. return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handleDelete))(w, r, v)
  542. } else if r.Method == http.MethodPut && clearPathRegex.MatchString(r.URL.Path) {
  543. return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handleClear))(w, r, v)
  544. } else if r.Method == http.MethodGet && publishPathRegex.MatchString(r.URL.Path) {
  545. return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish))(w, r, v)
  546. } else if r.Method == http.MethodGet && jsonPathRegex.MatchString(r.URL.Path) {
  547. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeJSON))(w, r, v)
  548. } else if r.Method == http.MethodGet && ssePathRegex.MatchString(r.URL.Path) {
  549. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeSSE))(w, r, v)
  550. } else if r.Method == http.MethodGet && rawPathRegex.MatchString(r.URL.Path) {
  551. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeRaw))(w, r, v)
  552. } else if r.Method == http.MethodGet && wsPathRegex.MatchString(r.URL.Path) {
  553. return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeWS))(w, r, v)
  554. } else if r.Method == http.MethodGet && authPathRegex.MatchString(r.URL.Path) {
  555. return s.limitRequests(s.authorizeTopicRead(s.handleTopicAuth))(w, r, v)
  556. } else if r.Method == http.MethodGet && (topicPathRegex.MatchString(r.URL.Path) || externalTopicPathRegex.MatchString(r.URL.Path)) {
  557. return s.ensureWebEnabled(s.handleTopic)(w, r, v)
  558. }
  559. return errHTTPNotFound
  560. }
  561. func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request, v *visitor) error {
  562. r.URL.Path = webAppIndex
  563. return s.handleStatic(w, r, v)
  564. }
  565. func (s *Server) handleTopic(w http.ResponseWriter, r *http.Request, v *visitor) error {
  566. unifiedpush := readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see PUT/POST too!
  567. if unifiedpush {
  568. w.Header().Set("Content-Type", "application/json")
  569. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  570. _, err := io.WriteString(w, `{"unifiedpush":{"version":1}}`+"\n")
  571. return err
  572. }
  573. r.URL.Path = webAppIndex
  574. return s.handleStatic(w, r, v)
  575. }
  576. func (s *Server) handleEmpty(_ http.ResponseWriter, _ *http.Request, _ *visitor) error {
  577. return nil
  578. }
  579. func (s *Server) handleTopicAuth(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  580. return s.writeJSON(w, newSuccessResponse())
  581. }
  582. func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  583. response := &apiHealthResponse{
  584. Healthy: true,
  585. }
  586. return s.writeJSON(w, response)
  587. }
  588. func (s *Server) handleConfig(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  589. w.Header().Set("Cache-Control", "no-cache")
  590. return s.writeJSON(w, s.configResponse())
  591. }
  592. func (s *Server) handleWebConfig(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  593. b, err := json.MarshalIndent(s.configResponse(), "", " ")
  594. if err != nil {
  595. return err
  596. }
  597. w.Header().Set("Content-Type", "text/javascript")
  598. w.Header().Set("Cache-Control", "no-cache")
  599. _, err = io.WriteString(w, fmt.Sprintf("// Generated server configuration\nvar config = %s;\n", string(b)))
  600. return err
  601. }
  602. func (s *Server) configResponse() *apiConfigResponse {
  603. return &apiConfigResponse{
  604. BaseURL: "", // Will translate to window.location.origin
  605. AppRoot: s.config.WebRoot,
  606. EnableLogin: s.config.EnableLogin,
  607. RequireLogin: s.config.RequireLogin,
  608. EnableSignup: s.config.EnableSignup,
  609. EnablePayments: s.config.StripeSecretKey != "",
  610. EnableCalls: s.config.TwilioAccount != "",
  611. EnableEmails: s.config.SMTPSenderFrom != "",
  612. EnableReservations: s.config.EnableReservations,
  613. EnableWebPush: s.config.WebPushPublicKey != "",
  614. BillingContact: s.config.BillingContact,
  615. WebPushPublicKey: s.config.WebPushPublicKey,
  616. DisallowedTopics: s.config.DisallowedTopics,
  617. ConfigHash: s.config.Hash(),
  618. }
  619. }
  620. // handleWebManifest serves the web app manifest for the progressive web app (PWA)
  621. func (s *Server) handleWebManifest(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  622. response := &webManifestResponse{
  623. Name: "ntfy",
  624. Description: "ntfy lets you send push notifications via scripts from any computer or phone",
  625. ShortName: "ntfy",
  626. Scope: "/",
  627. StartURL: s.config.WebRoot,
  628. Display: "standalone",
  629. BackgroundColor: "#ffffff",
  630. ThemeColor: "#317f6f",
  631. Icons: []*webManifestIcon{
  632. {SRC: "/static/images/pwa-192x192.png", Sizes: "192x192", Type: "image/png"},
  633. {SRC: "/static/images/pwa-512x512.png", Sizes: "512x512", Type: "image/png"},
  634. },
  635. }
  636. return s.writeJSONWithContentType(w, response, "application/manifest+json")
  637. }
  638. // handleMetrics returns Prometheus metrics. This endpoint is only called if enable-metrics is set,
  639. // and listen-metrics-http is not set.
  640. func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request, _ *visitor) error {
  641. s.metricsHandler.ServeHTTP(w, r)
  642. return nil
  643. }
  644. // handleStatic returns all static resources (excluding the docs), including the web app
  645. func (s *Server) handleStatic(w http.ResponseWriter, r *http.Request, _ *visitor) error {
  646. r.URL.Path = webSiteDir + r.URL.Path
  647. util.Gzip(http.FileServer(http.FS(webFsCached))).ServeHTTP(w, r)
  648. return nil
  649. }
  650. // handleDocs returns static resources related to the docs
  651. func (s *Server) handleDocs(w http.ResponseWriter, r *http.Request, _ *visitor) error {
  652. util.Gzip(http.FileServer(http.FS(docsStaticCached))).ServeHTTP(w, r)
  653. return nil
  654. }
  655. // handleStats returns the publicly available server stats
  656. func (s *Server) handleStats(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  657. s.mu.RLock()
  658. messages, n, rate := s.messages, len(s.messagesHistory), float64(0)
  659. if n > 1 {
  660. rate = float64(s.messagesHistory[n-1]-s.messagesHistory[0]) / (float64(n-1) * s.config.ManagerInterval.Seconds())
  661. }
  662. s.mu.RUnlock()
  663. response := &apiStatsResponse{
  664. Messages: messages,
  665. MessagesRate: rate,
  666. }
  667. return s.writeJSON(w, response)
  668. }
  669. // handleFile processes the download of attachment files. The method handles GET and HEAD requests against a file.
  670. // Before streaming the file to a client, it locates uploader (m.Sender or m.User) in the message cache, so it
  671. // can associate the download bandwidth with the uploader.
  672. func (s *Server) handleFile(w http.ResponseWriter, r *http.Request, v *visitor) error {
  673. if s.config.AttachmentCacheDir == "" {
  674. return errHTTPInternalError
  675. }
  676. matches := fileRegex.FindStringSubmatch(r.URL.Path)
  677. if len(matches) != 2 {
  678. return errHTTPInternalErrorInvalidPath
  679. }
  680. messageID := matches[1]
  681. file := filepath.Join(s.config.AttachmentCacheDir, messageID)
  682. stat, err := os.Stat(file)
  683. if err != nil {
  684. return errHTTPNotFound.Fields(log.Context{
  685. "message_id": messageID,
  686. "error_context": "filesystem",
  687. })
  688. }
  689. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  690. w.Header().Set("Content-Length", fmt.Sprintf("%d", stat.Size()))
  691. if r.Method == http.MethodHead {
  692. return nil
  693. }
  694. // Find message in database, and associate bandwidth to the uploader user
  695. // This is an easy way to
  696. // - avoid abuse (e.g. 1 uploader, 1k downloaders)
  697. // - and also uses the higher bandwidth limits of a paying user
  698. m, err := s.messageCache.Message(messageID)
  699. if errors.Is(err, errMessageNotFound) {
  700. if s.config.CacheBatchTimeout > 0 {
  701. // Strange edge case: If we immediately after upload request the file (the web app does this for images),
  702. // and messages are persisted asynchronously, retry fetching from the database
  703. m, err = util.Retry(func() (*message, error) {
  704. return s.messageCache.Message(messageID)
  705. }, s.config.CacheBatchTimeout, 100*time.Millisecond, 300*time.Millisecond, 600*time.Millisecond)
  706. }
  707. if err != nil {
  708. return errHTTPNotFound.Fields(log.Context{
  709. "message_id": messageID,
  710. "error_context": "message_cache",
  711. })
  712. }
  713. } else if err != nil {
  714. return err
  715. }
  716. bandwidthVisitor := v
  717. if s.userManager != nil && m.User != "" {
  718. u, err := s.userManager.UserByID(m.User)
  719. if err != nil {
  720. return err
  721. }
  722. bandwidthVisitor = s.visitor(v.IP(), u)
  723. } else if m.Sender.IsValid() {
  724. bandwidthVisitor = s.visitor(m.Sender, nil)
  725. }
  726. if !bandwidthVisitor.BandwidthAllowed(stat.Size()) {
  727. return errHTTPTooManyRequestsLimitAttachmentBandwidth.With(m)
  728. }
  729. // Actually send file
  730. f, err := os.Open(file)
  731. if err != nil {
  732. return err
  733. }
  734. defer f.Close()
  735. if m.Attachment.Name != "" {
  736. w.Header().Set("Content-Disposition", "attachment; filename="+strconv.Quote(m.Attachment.Name))
  737. }
  738. _, err = io.Copy(util.NewContentTypeWriter(w, r.URL.Path), f)
  739. return err
  740. }
  741. func (s *Server) handleMatrixDiscovery(w http.ResponseWriter) error {
  742. if s.config.BaseURL == "" {
  743. return errHTTPInternalErrorMissingBaseURL
  744. }
  745. return writeMatrixDiscoveryResponse(w)
  746. }
  747. func (s *Server) handlePublishInternal(r *http.Request, v *visitor) (*message, error) {
  748. start := time.Now()
  749. t, err := fromContext[*topic](r, contextTopic)
  750. if err != nil {
  751. return nil, err
  752. }
  753. vrate, err := fromContext[*visitor](r, contextRateVisitor)
  754. if err != nil {
  755. return nil, err
  756. }
  757. body, err := util.Peek(r.Body, s.config.MessageSizeLimit)
  758. if err != nil {
  759. return nil, err
  760. }
  761. m := newDefaultMessage(t.ID, "")
  762. cache, firebase, email, call, template, unifiedpush, priorityStr, e := s.parsePublishParams(r, m)
  763. if e != nil {
  764. return nil, e.With(t)
  765. }
  766. if unifiedpush && s.config.VisitorSubscriberRateLimiting && t.RateVisitor() == nil {
  767. // UnifiedPush clients must subscribe before publishing to allow proper subscriber-based rate limiting.
  768. // The 5xx response is because some app servers (in particular Mastodon) will remove
  769. // the subscription as invalid if any 400-499 code (except 429/408) is returned.
  770. // See https://github.com/mastodon/mastodon/blob/730bb3e211a84a2f30e3e2bbeae3f77149824a68/app/workers/web/push_notification_worker.rb#L35-L46
  771. return nil, errHTTPInsufficientStorageUnifiedPush.With(t)
  772. } else if !util.ContainsIP(s.config.VisitorRequestExemptPrefixes, v.ip) && !vrate.MessageAllowed() {
  773. return nil, errHTTPTooManyRequestsLimitMessages.With(t)
  774. } else if email != "" && !vrate.EmailAllowed() {
  775. return nil, errHTTPTooManyRequestsLimitEmails.With(t)
  776. } else if call != "" {
  777. var httpErr *errHTTP
  778. call, httpErr = s.convertPhoneNumber(v.User(), call)
  779. if httpErr != nil {
  780. return nil, httpErr.With(t)
  781. } else if !vrate.CallAllowed() {
  782. return nil, errHTTPTooManyRequestsLimitCalls.With(t)
  783. }
  784. }
  785. if m.PollID != "" {
  786. m = newPollRequestMessage(t.ID, m.PollID)
  787. }
  788. m.Sender = v.IP()
  789. m.User = v.MaybeUserID()
  790. if cache {
  791. m.Expires = time.Unix(m.Time, 0).Add(v.Limits().MessageExpiryDuration).Unix()
  792. }
  793. if err := s.handlePublishBody(r, v, m, body, template, unifiedpush, priorityStr); err != nil {
  794. return nil, err
  795. }
  796. if m.Message == "" {
  797. m.Message = emptyMessageBody
  798. }
  799. delayed := m.Time > time.Now().Unix()
  800. ev := logvrm(v, r, m).
  801. Tag(tagPublish).
  802. With(t).
  803. Fields(log.Context{
  804. "message_delayed": delayed,
  805. "message_firebase": firebase,
  806. "message_unifiedpush": unifiedpush,
  807. "message_email": email,
  808. "message_call": call,
  809. })
  810. if ev.IsTrace() {
  811. ev.Field("message_body", util.MaybeMarshalJSON(m)).Trace("Received message")
  812. } else if ev.IsDebug() {
  813. ev.Debug("Received message")
  814. }
  815. if !delayed {
  816. if err := t.Publish(v, m); err != nil {
  817. return nil, err
  818. }
  819. if s.firebaseClient != nil && firebase {
  820. go s.sendToFirebase(v, m)
  821. }
  822. if s.smtpSender != nil && email != "" {
  823. go s.sendEmail(v, m, email)
  824. }
  825. if s.config.TwilioAccount != "" && call != "" {
  826. go s.callPhone(v, r, m, call)
  827. }
  828. if s.config.UpstreamBaseURL != "" && !unifiedpush { // UP messages are not sent to upstream
  829. go s.forwardPollRequest(v, m)
  830. }
  831. if s.config.WebPushPublicKey != "" {
  832. go s.publishToWebPushEndpoints(v, m)
  833. }
  834. } else {
  835. logvrm(v, r, m).Tag(tagPublish).Debug("Message delayed, will process later")
  836. }
  837. if cache {
  838. // Delete any existing scheduled message with the same sequence ID
  839. deletedIDs, err := s.messageCache.DeleteScheduledBySequenceID(t.ID, m.SequenceID)
  840. if err != nil {
  841. return nil, err
  842. }
  843. // Delete attachment files for deleted scheduled messages
  844. if s.fileCache != nil && len(deletedIDs) > 0 {
  845. if err := s.fileCache.Remove(deletedIDs...); err != nil {
  846. logvrm(v, r, m).Tag(tagPublish).Err(err).Warn("Error removing attachments for deleted scheduled messages")
  847. }
  848. }
  849. logvrm(v, r, m).Tag(tagPublish).Debug("Adding message to cache")
  850. if err := s.messageCache.AddMessage(m); err != nil {
  851. return nil, err
  852. }
  853. }
  854. u := v.User()
  855. if s.userManager != nil && u != nil && u.Tier != nil {
  856. go s.userManager.EnqueueUserStats(u.ID, v.Stats())
  857. }
  858. s.mu.Lock()
  859. s.messages++
  860. s.mu.Unlock()
  861. if unifiedpush {
  862. minc(metricUnifiedPushPublishedSuccess)
  863. }
  864. mset(metricMessagePublishDurationMillis, time.Since(start).Milliseconds())
  865. return m, nil
  866. }
  867. func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visitor) error {
  868. m, err := s.handlePublishInternal(r, v)
  869. if err != nil {
  870. minc(metricMessagesPublishedFailure)
  871. return err
  872. }
  873. minc(metricMessagesPublishedSuccess)
  874. return s.writeJSON(w, m.forJSON())
  875. }
  876. func (s *Server) handlePublishMatrix(w http.ResponseWriter, r *http.Request, v *visitor) error {
  877. _, err := s.handlePublishInternal(r, v)
  878. if err != nil {
  879. minc(metricMessagesPublishedFailure)
  880. minc(metricMatrixPublishedFailure)
  881. if e, ok := err.(*errHTTP); ok && e.HTTPCode == errHTTPInsufficientStorageUnifiedPush.HTTPCode {
  882. topic, err := fromContext[*topic](r, contextTopic)
  883. if err != nil {
  884. return err
  885. }
  886. pushKey, err := fromContext[string](r, contextMatrixPushKey)
  887. if err != nil {
  888. return err
  889. }
  890. if time.Since(topic.LastAccess()) > matrixRejectPushKeyForUnifiedPushTopicWithoutRateVisitorAfter {
  891. return writeMatrixResponse(w, pushKey)
  892. }
  893. }
  894. return err
  895. }
  896. minc(metricMessagesPublishedSuccess)
  897. minc(metricMatrixPublishedSuccess)
  898. return writeMatrixSuccess(w)
  899. }
  900. func (s *Server) handleDelete(w http.ResponseWriter, r *http.Request, v *visitor) error {
  901. return s.handleActionMessage(w, r, v, messageDeleteEvent)
  902. }
  903. func (s *Server) handleClear(w http.ResponseWriter, r *http.Request, v *visitor) error {
  904. return s.handleActionMessage(w, r, v, messageClearEvent)
  905. }
  906. func (s *Server) handleActionMessage(w http.ResponseWriter, r *http.Request, v *visitor, event string) error {
  907. t, err := fromContext[*topic](r, contextTopic)
  908. if err != nil {
  909. return err
  910. }
  911. vrate, err := fromContext[*visitor](r, contextRateVisitor)
  912. if err != nil {
  913. return err
  914. }
  915. if !util.ContainsIP(s.config.VisitorRequestExemptPrefixes, v.ip) && !vrate.MessageAllowed() {
  916. return errHTTPTooManyRequestsLimitMessages.With(t)
  917. }
  918. sequenceID, e := s.sequenceIDFromPath(r.URL.Path)
  919. if e != nil {
  920. return e.With(t)
  921. }
  922. // Create an action message with the given event type
  923. m := newActionMessage(event, t.ID, sequenceID)
  924. m.Sender = v.IP()
  925. m.User = v.MaybeUserID()
  926. m.Expires = time.Unix(m.Time, 0).Add(v.Limits().MessageExpiryDuration).Unix()
  927. // Publish to subscribers
  928. if err := t.Publish(v, m); err != nil {
  929. return err
  930. }
  931. // Send to Firebase for Android clients
  932. if s.firebaseClient != nil {
  933. go s.sendToFirebase(v, m)
  934. }
  935. // Send to web push endpoints
  936. if s.config.WebPushPublicKey != "" {
  937. go s.publishToWebPushEndpoints(v, m)
  938. }
  939. if event == messageDeleteEvent {
  940. // Delete any existing scheduled message with the same sequence ID
  941. deletedIDs, err := s.messageCache.DeleteScheduledBySequenceID(t.ID, sequenceID)
  942. if err != nil {
  943. return err
  944. }
  945. // Delete attachment files for deleted scheduled messages
  946. if s.fileCache != nil && len(deletedIDs) > 0 {
  947. if err := s.fileCache.Remove(deletedIDs...); err != nil {
  948. logvrm(v, r, m).Tag(tagPublish).Err(err).Warn("Error removing attachments for deleted scheduled messages")
  949. }
  950. }
  951. }
  952. // Add to message cache
  953. if err := s.messageCache.AddMessage(m); err != nil {
  954. return err
  955. }
  956. logvrm(v, r, m).Tag(tagPublish).Debug("Published %s for sequence ID %s", event, sequenceID)
  957. s.mu.Lock()
  958. s.messages++
  959. s.mu.Unlock()
  960. return s.writeJSON(w, m.forJSON())
  961. }
  962. func (s *Server) sendToFirebase(v *visitor, m *message) {
  963. logvm(v, m).Tag(tagFirebase).Debug("Publishing to Firebase")
  964. if err := s.firebaseClient.Send(v, m); err != nil {
  965. minc(metricFirebasePublishedFailure)
  966. if errors.Is(err, errFirebaseTemporarilyBanned) {
  967. logvm(v, m).Tag(tagFirebase).Err(err).Debug("Unable to publish to Firebase: %v", err.Error())
  968. } else {
  969. logvm(v, m).Tag(tagFirebase).Err(err).Warn("Unable to publish to Firebase: %v", err.Error())
  970. }
  971. return
  972. }
  973. minc(metricFirebasePublishedSuccess)
  974. }
  975. func (s *Server) sendEmail(v *visitor, m *message, email string) {
  976. logvm(v, m).Tag(tagEmail).Field("email", email).Debug("Sending email to %s", email)
  977. if err := s.smtpSender.Send(v, m, email); err != nil {
  978. logvm(v, m).Tag(tagEmail).Field("email", email).Err(err).Warn("Unable to send email to %s: %v", email, err.Error())
  979. minc(metricEmailsPublishedFailure)
  980. return
  981. }
  982. minc(metricEmailsPublishedSuccess)
  983. }
  984. func (s *Server) forwardPollRequest(v *visitor, m *message) {
  985. topicURL := fmt.Sprintf("%s/%s", s.config.BaseURL, m.Topic)
  986. topicHash := fmt.Sprintf("%x", sha256.Sum256([]byte(topicURL)))
  987. forwardURL := fmt.Sprintf("%s/%s", s.config.UpstreamBaseURL, topicHash)
  988. logvm(v, m).Debug("Publishing poll request to %s", forwardURL)
  989. req, err := http.NewRequest("POST", forwardURL, strings.NewReader(""))
  990. if err != nil {
  991. logvm(v, m).Err(err).Warn("Unable to publish poll request")
  992. return
  993. }
  994. req.Header.Set("User-Agent", "ntfy/"+s.config.BuildVersion)
  995. req.Header.Set("X-Poll-ID", m.ID)
  996. if s.config.UpstreamAccessToken != "" {
  997. req.Header.Set("Authorization", util.BearerAuth(s.config.UpstreamAccessToken))
  998. }
  999. var httpClient = &http.Client{
  1000. Timeout: time.Second * 10,
  1001. }
  1002. response, err := httpClient.Do(req)
  1003. if err != nil {
  1004. logvm(v, m).Err(err).Warn("Unable to publish poll request")
  1005. return
  1006. } else if response.StatusCode != http.StatusOK {
  1007. if response.StatusCode == http.StatusTooManyRequests {
  1008. logvm(v, m).Err(err).Warn("Unable to publish poll request, the upstream server %s responded with HTTP %s; you may solve this by sending fewer daily messages, or by configuring upstream-access-token (assuming you have an account with higher rate limits) ", s.config.UpstreamBaseURL, response.Status)
  1009. } else {
  1010. logvm(v, m).Err(err).Warn("Unable to publish poll request, the upstream server %s responded with HTTP %s", s.config.UpstreamBaseURL, response.Status)
  1011. }
  1012. return
  1013. }
  1014. }
  1015. func (s *Server) parsePublishParams(r *http.Request, m *message) (cache bool, firebase bool, email, call string, template templateMode, unifiedpush bool, priorityStr string, err *errHTTP) {
  1016. if r.Method != http.MethodGet && updatePathRegex.MatchString(r.URL.Path) {
  1017. pathSequenceID, err := s.sequenceIDFromPath(r.URL.Path)
  1018. if err != nil {
  1019. return false, false, "", "", "", false, "", err
  1020. }
  1021. m.SequenceID = pathSequenceID
  1022. } else {
  1023. sequenceID := readParam(r, "x-sequence-id", "sequence-id", "sid")
  1024. if sequenceID != "" {
  1025. if sequenceIDRegex.MatchString(sequenceID) {
  1026. m.SequenceID = sequenceID
  1027. } else {
  1028. return false, false, "", "", "", false, "", errHTTPBadRequestSequenceIDInvalid
  1029. }
  1030. } else {
  1031. m.SequenceID = m.ID
  1032. }
  1033. }
  1034. cache = readBoolParam(r, true, "x-cache", "cache")
  1035. firebase = readBoolParam(r, true, "x-firebase", "firebase")
  1036. m.Title = readParam(r, "x-title", "title", "t")
  1037. m.Click = readParam(r, "x-click", "click")
  1038. icon := readParam(r, "x-icon", "icon")
  1039. filename := readParam(r, "x-filename", "filename", "file", "f")
  1040. attach := readParam(r, "x-attach", "attach", "a")
  1041. if attach != "" || filename != "" {
  1042. m.Attachment = &attachment{}
  1043. }
  1044. if filename != "" {
  1045. m.Attachment.Name = filename
  1046. }
  1047. if attach != "" {
  1048. if !urlRegex.MatchString(attach) {
  1049. return false, false, "", "", "", false, "", errHTTPBadRequestAttachmentURLInvalid
  1050. }
  1051. m.Attachment.URL = attach
  1052. if m.Attachment.Name == "" {
  1053. u, err := url.Parse(m.Attachment.URL)
  1054. if err == nil {
  1055. m.Attachment.Name = path.Base(u.Path)
  1056. if m.Attachment.Name == "." || m.Attachment.Name == "/" {
  1057. m.Attachment.Name = ""
  1058. }
  1059. }
  1060. }
  1061. if m.Attachment.Name == "" {
  1062. m.Attachment.Name = "attachment"
  1063. }
  1064. }
  1065. if icon != "" {
  1066. if !urlRegex.MatchString(icon) {
  1067. return false, false, "", "", "", false, "", errHTTPBadRequestIconURLInvalid
  1068. }
  1069. m.Icon = icon
  1070. }
  1071. email = readParam(r, "x-email", "x-e-mail", "email", "e-mail", "mail", "e")
  1072. if s.smtpSender == nil && email != "" {
  1073. return false, false, "", "", "", false, "", errHTTPBadRequestEmailDisabled
  1074. }
  1075. call = readParam(r, "x-call", "call")
  1076. if call != "" && (s.config.TwilioAccount == "" || s.userManager == nil) {
  1077. return false, false, "", "", "", false, "", errHTTPBadRequestPhoneCallsDisabled
  1078. } else if call != "" && !isBoolValue(call) && !phoneNumberRegex.MatchString(call) {
  1079. return false, false, "", "", "", false, "", errHTTPBadRequestPhoneNumberInvalid
  1080. }
  1081. template = templateMode(readParam(r, "x-template", "template", "tpl"))
  1082. messageStr := readParam(r, "x-message", "message", "m")
  1083. if !template.InlineMode() {
  1084. // Convert "\n" to literal newline everything but inline mode
  1085. messageStr = strings.ReplaceAll(messageStr, "\\n", "\n")
  1086. }
  1087. if messageStr != "" {
  1088. m.Message = messageStr
  1089. }
  1090. var e error
  1091. priorityStr = readParam(r, "x-priority", "priority", "prio", "p")
  1092. if !template.Enabled() {
  1093. m.Priority, e = util.ParsePriority(priorityStr)
  1094. if e != nil {
  1095. return false, false, "", "", "", false, "", errHTTPBadRequestPriorityInvalid
  1096. }
  1097. priorityStr = "" // Clear since it's already parsed
  1098. }
  1099. m.Tags = readCommaSeparatedParam(r, "x-tags", "tags", "tag", "ta")
  1100. delayStr := readParam(r, "x-delay", "delay", "x-at", "at", "x-in", "in")
  1101. if delayStr != "" {
  1102. if !cache {
  1103. return false, false, "", "", "", false, "", errHTTPBadRequestDelayNoCache
  1104. }
  1105. if email != "" {
  1106. return false, false, "", "", "", false, "", errHTTPBadRequestDelayNoEmail // we cannot store the email address (yet)
  1107. }
  1108. if call != "" {
  1109. return false, false, "", "", "", false, "", errHTTPBadRequestDelayNoCall // we cannot store the phone number (yet)
  1110. }
  1111. delay, err := util.ParseFutureTime(delayStr, time.Now())
  1112. if err != nil {
  1113. return false, false, "", "", "", false, "", errHTTPBadRequestDelayCannotParse
  1114. } else if delay.Unix() < time.Now().Add(s.config.MessageDelayMin).Unix() {
  1115. return false, false, "", "", "", false, "", errHTTPBadRequestDelayTooSmall
  1116. } else if delay.Unix() > time.Now().Add(s.config.MessageDelayMax).Unix() {
  1117. return false, false, "", "", "", false, "", errHTTPBadRequestDelayTooLarge
  1118. }
  1119. m.Time = delay.Unix()
  1120. }
  1121. actionsStr := readParam(r, "x-actions", "actions", "action")
  1122. if actionsStr != "" {
  1123. m.Actions, e = parseActions(actionsStr)
  1124. if e != nil {
  1125. return false, false, "", "", "", false, "", errHTTPBadRequestActionsInvalid.Wrap("%s", e.Error())
  1126. }
  1127. }
  1128. contentType, markdown := readParam(r, "content-type", "content_type"), readBoolParam(r, false, "x-markdown", "markdown", "md")
  1129. if markdown || strings.ToLower(contentType) == "text/markdown" {
  1130. m.ContentType = "text/markdown"
  1131. }
  1132. unifiedpush = readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see GET too!
  1133. contentEncoding := readParam(r, "content-encoding")
  1134. if unifiedpush || contentEncoding == "aes128gcm" {
  1135. firebase = false
  1136. unifiedpush = true
  1137. }
  1138. m.PollID = readParam(r, "x-poll-id", "poll-id")
  1139. if m.PollID != "" {
  1140. unifiedpush = false
  1141. cache = false
  1142. email = ""
  1143. }
  1144. return cache, firebase, email, call, template, unifiedpush, priorityStr, nil
  1145. }
  1146. // handlePublishBody consumes the PUT/POST body and decides whether the body is an attachment or the message.
  1147. //
  1148. // 1. curl -X POST -H "Poll: 1234" ntfy.sh/...
  1149. // If a message is flagged as poll request, the body does not matter and is discarded
  1150. // 2. curl -T somebinarydata.bin "ntfy.sh/mytopic?up=1"
  1151. // If UnifiedPush is enabled, encode as base64 if body is binary, and do not trim
  1152. // 3. curl -H "Attach: http://example.com/file.jpg" ntfy.sh/mytopic
  1153. // Body must be a message, because we attached an external URL
  1154. // 4. curl -T short.txt -H "Filename: short.txt" ntfy.sh/mytopic
  1155. // Body must be attachment, because we passed a filename
  1156. // 5. curl -H "Template: yes" -T file.txt ntfy.sh/mytopic
  1157. // If templating is enabled, read up to 32k and treat message body as JSON
  1158. // 6. curl -T file.txt ntfy.sh/mytopic
  1159. // If file.txt is <= 4096 (message limit) and valid UTF-8, treat it as a message
  1160. // 7. curl -T file.txt ntfy.sh/mytopic
  1161. // In all other cases, mostly if file.txt is > message limit, treat it as an attachment
  1162. func (s *Server) handlePublishBody(r *http.Request, v *visitor, m *message, body *util.PeekedReadCloser, template templateMode, unifiedpush bool, priorityStr string) error {
  1163. if m.Event == pollRequestEvent { // Case 1
  1164. return s.handleBodyDiscard(body)
  1165. } else if unifiedpush {
  1166. return s.handleBodyAsMessageAutoDetect(m, body) // Case 2
  1167. } else if m.Attachment != nil && m.Attachment.URL != "" {
  1168. return s.handleBodyAsTextMessage(m, body) // Case 3
  1169. } else if m.Attachment != nil && m.Attachment.Name != "" {
  1170. return s.handleBodyAsAttachment(r, v, m, body) // Case 4
  1171. } else if template.Enabled() {
  1172. return s.handleBodyAsTemplatedTextMessage(m, template, body, priorityStr) // Case 5
  1173. } else if !body.LimitReached && utf8.Valid(body.PeekedBytes) {
  1174. return s.handleBodyAsTextMessage(m, body) // Case 6
  1175. }
  1176. return s.handleBodyAsAttachment(r, v, m, body) // Case 7
  1177. }
  1178. func (s *Server) handleBodyDiscard(body *util.PeekedReadCloser) error {
  1179. _, err := io.Copy(io.Discard, body)
  1180. _ = body.Close()
  1181. return err
  1182. }
  1183. func (s *Server) handleBodyAsMessageAutoDetect(m *message, body *util.PeekedReadCloser) error {
  1184. if utf8.Valid(body.PeekedBytes) {
  1185. m.Message = string(body.PeekedBytes) // Do not trim
  1186. } else {
  1187. m.Message = base64.StdEncoding.EncodeToString(body.PeekedBytes)
  1188. m.Encoding = encodingBase64
  1189. }
  1190. return nil
  1191. }
  1192. func (s *Server) handleBodyAsTextMessage(m *message, body *util.PeekedReadCloser) error {
  1193. if !utf8.Valid(body.PeekedBytes) {
  1194. return errHTTPBadRequestMessageNotUTF8.With(m)
  1195. }
  1196. if len(body.PeekedBytes) > 0 { // Empty body should not override message (publish via GET!)
  1197. m.Message = strings.TrimSpace(string(body.PeekedBytes)) // Truncates the message to the peek limit if required
  1198. }
  1199. if m.Attachment != nil && m.Attachment.Name != "" && m.Message == "" {
  1200. m.Message = fmt.Sprintf(defaultAttachmentMessage, m.Attachment.Name)
  1201. }
  1202. return nil
  1203. }
  1204. func (s *Server) handleBodyAsTemplatedTextMessage(m *message, template templateMode, body *util.PeekedReadCloser, priorityStr string) error {
  1205. body, err := util.Peek(body, max(s.config.MessageSizeLimit, jsonBodyBytesLimit))
  1206. if err != nil {
  1207. return err
  1208. } else if body.LimitReached {
  1209. return errHTTPEntityTooLargeJSONBody
  1210. }
  1211. peekedBody := strings.TrimSpace(string(body.PeekedBytes))
  1212. if template.FileMode() {
  1213. if err := s.renderTemplateFromFile(m, template.FileName(), peekedBody); err != nil {
  1214. return err
  1215. }
  1216. } else {
  1217. if err := s.renderTemplateFromParams(m, peekedBody, priorityStr); err != nil {
  1218. return err
  1219. }
  1220. }
  1221. if len(m.Title) > s.config.MessageSizeLimit || len(m.Message) > s.config.MessageSizeLimit {
  1222. return errHTTPBadRequestTemplateMessageTooLarge
  1223. }
  1224. return nil
  1225. }
  1226. // renderTemplateFromFile transforms the JSON message body according to a template from the filesystem.
  1227. // The template file must be in the templates directory, or in the configured template directory.
  1228. func (s *Server) renderTemplateFromFile(m *message, templateName, peekedBody string) error {
  1229. if !templateNameRegex.MatchString(templateName) {
  1230. return errHTTPBadRequestTemplateFileNotFound
  1231. }
  1232. templateContent, _ := templatesFs.ReadFile(filepath.Join(templatesDir, templateName+templateFileExtension)) // Read from the embedded filesystem first
  1233. if s.config.TemplateDir != "" {
  1234. if b, _ := os.ReadFile(filepath.Join(s.config.TemplateDir, templateName+templateFileExtension)); len(b) > 0 {
  1235. templateContent = b
  1236. }
  1237. }
  1238. if len(templateContent) == 0 {
  1239. return errHTTPBadRequestTemplateFileNotFound
  1240. }
  1241. var tpl templateFile
  1242. if err := yaml.Unmarshal(templateContent, &tpl); err != nil {
  1243. return errHTTPBadRequestTemplateFileInvalid
  1244. }
  1245. var err error
  1246. if tpl.Message != nil {
  1247. if m.Message, err = s.renderTemplate(templateName+" (message)", *tpl.Message, peekedBody); err != nil {
  1248. return err
  1249. }
  1250. }
  1251. if tpl.Title != nil {
  1252. if m.Title, err = s.renderTemplate(templateName+" (title)", *tpl.Title, peekedBody); err != nil {
  1253. return err
  1254. }
  1255. }
  1256. if tpl.Priority != nil {
  1257. renderedPriority, err := s.renderTemplate(templateName+" (priority)", *tpl.Priority, peekedBody)
  1258. if err != nil {
  1259. return err
  1260. }
  1261. if m.Priority, err = util.ParsePriority(renderedPriority); err != nil {
  1262. return errHTTPBadRequestPriorityInvalid
  1263. }
  1264. }
  1265. return nil
  1266. }
  1267. // renderTemplateFromParams transforms the JSON message body according to the inline template in the
  1268. // message, title, and priority parameters.
  1269. func (s *Server) renderTemplateFromParams(m *message, peekedBody string, priorityStr string) error {
  1270. var err error
  1271. if m.Message, err = s.renderTemplate("priority query parameter", m.Message, peekedBody); err != nil {
  1272. return err
  1273. }
  1274. if m.Title, err = s.renderTemplate("title query parameter", m.Title, peekedBody); err != nil {
  1275. return err
  1276. }
  1277. if priorityStr != "" {
  1278. renderedPriority, err := s.renderTemplate("priority query parameter", priorityStr, peekedBody)
  1279. if err != nil {
  1280. return err
  1281. }
  1282. if m.Priority, err = util.ParsePriority(renderedPriority); err != nil {
  1283. return errHTTPBadRequestPriorityInvalid
  1284. }
  1285. }
  1286. return nil
  1287. }
  1288. // renderTemplate renders a template with the given JSON source data.
  1289. func (s *Server) renderTemplate(name, tpl, source string) (string, error) {
  1290. if templateDisallowedRegex.MatchString(tpl) {
  1291. return "", errHTTPBadRequestTemplateDisallowedFunctionCalls
  1292. }
  1293. var data any
  1294. if err := json.Unmarshal([]byte(source), &data); err != nil {
  1295. return "", errHTTPBadRequestTemplateMessageNotJSON
  1296. }
  1297. t, err := template.New("").Funcs(sprig.TxtFuncMap()).Parse(tpl)
  1298. if err != nil {
  1299. return "", errHTTPBadRequestTemplateInvalid.Wrap("%s", err.Error())
  1300. }
  1301. var buf bytes.Buffer
  1302. limitWriter := util.NewLimitWriter(util.NewTimeoutWriter(&buf, templateMaxExecutionTime), util.NewFixedLimiter(templateMaxOutputBytes))
  1303. if err := t.Execute(limitWriter, data); err != nil {
  1304. return "", errHTTPBadRequestTemplateExecuteFailed.Wrap("template %s: %s", name, err.Error())
  1305. }
  1306. return strings.TrimSpace(strings.ReplaceAll(buf.String(), "\\n", "\n")), nil // replace any remaining "\n" (those outside of template curly braces) with newlines
  1307. }
  1308. func (s *Server) handleBodyAsAttachment(r *http.Request, v *visitor, m *message, body *util.PeekedReadCloser) error {
  1309. if s.fileCache == nil || s.config.BaseURL == "" || s.config.AttachmentCacheDir == "" {
  1310. return errHTTPBadRequestAttachmentsDisallowed.With(m)
  1311. }
  1312. vinfo, err := v.Info()
  1313. if err != nil {
  1314. return err
  1315. }
  1316. attachmentExpiry := time.Now().Add(vinfo.Limits.AttachmentExpiryDuration).Unix()
  1317. if m.Time > attachmentExpiry {
  1318. return errHTTPBadRequestAttachmentsExpiryBeforeDelivery.With(m)
  1319. }
  1320. contentLengthStr := r.Header.Get("Content-Length")
  1321. if contentLengthStr != "" { // Early "do-not-trust" check, hard limit see below
  1322. contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64)
  1323. if err == nil && (contentLength > vinfo.Stats.AttachmentTotalSizeRemaining || contentLength > vinfo.Limits.AttachmentFileSizeLimit) {
  1324. return errHTTPEntityTooLargeAttachment.With(m).Fields(log.Context{
  1325. "message_content_length": contentLength,
  1326. "attachment_total_size_remaining": vinfo.Stats.AttachmentTotalSizeRemaining,
  1327. "attachment_file_size_limit": vinfo.Limits.AttachmentFileSizeLimit,
  1328. })
  1329. }
  1330. }
  1331. if m.Attachment == nil {
  1332. m.Attachment = &attachment{}
  1333. }
  1334. var ext string
  1335. m.Attachment.Expires = attachmentExpiry
  1336. m.Attachment.Type, ext = util.DetectContentType(body.PeekedBytes, m.Attachment.Name)
  1337. m.Attachment.URL = fmt.Sprintf("%s/file/%s%s", s.config.BaseURL, m.ID, ext)
  1338. if m.Attachment.Name == "" {
  1339. m.Attachment.Name = fmt.Sprintf("attachment%s", ext)
  1340. }
  1341. if m.Message == "" {
  1342. m.Message = fmt.Sprintf(defaultAttachmentMessage, m.Attachment.Name)
  1343. }
  1344. limiters := []util.Limiter{
  1345. v.BandwidthLimiter(),
  1346. util.NewFixedLimiter(vinfo.Limits.AttachmentFileSizeLimit),
  1347. util.NewFixedLimiter(vinfo.Stats.AttachmentTotalSizeRemaining),
  1348. }
  1349. m.Attachment.Size, err = s.fileCache.Write(m.ID, body, limiters...)
  1350. if errors.Is(err, util.ErrLimitReached) {
  1351. return errHTTPEntityTooLargeAttachment.With(m)
  1352. } else if err != nil {
  1353. return err
  1354. }
  1355. return nil
  1356. }
  1357. func (s *Server) handleSubscribeJSON(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1358. encoder := func(msg *message) (string, error) {
  1359. var buf bytes.Buffer
  1360. if err := json.NewEncoder(&buf).Encode(msg.forJSON()); err != nil {
  1361. return "", err
  1362. }
  1363. return buf.String(), nil
  1364. }
  1365. return s.handleSubscribeHTTP(w, r, v, "application/x-ndjson", encoder)
  1366. }
  1367. func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1368. encoder := func(msg *message) (string, error) {
  1369. var buf bytes.Buffer
  1370. if err := json.NewEncoder(&buf).Encode(msg.forJSON()); err != nil {
  1371. return "", err
  1372. }
  1373. if msg.Event != messageEvent && msg.Event != messageDeleteEvent && msg.Event != messageClearEvent {
  1374. return fmt.Sprintf("event: %s\ndata: %s\n", msg.Event, buf.String()), nil // Browser's .onmessage() does not fire on this!
  1375. }
  1376. return fmt.Sprintf("data: %s\n", buf.String()), nil
  1377. }
  1378. return s.handleSubscribeHTTP(w, r, v, "text/event-stream", encoder)
  1379. }
  1380. func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1381. encoder := func(msg *message) (string, error) {
  1382. if msg.Event == messageEvent { // only handle default events
  1383. return strings.ReplaceAll(msg.Message, "\n", " ") + "\n", nil
  1384. }
  1385. return "\n", nil // "keepalive" and "open" events just send an empty line
  1386. }
  1387. return s.handleSubscribeHTTP(w, r, v, "text/plain", encoder)
  1388. }
  1389. func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *visitor, contentType string, encoder messageEncoder) error {
  1390. logvr(v, r).Tag(tagSubscribe).Debug("HTTP stream connection opened")
  1391. defer logvr(v, r).Tag(tagSubscribe).Debug("HTTP stream connection closed")
  1392. if !v.SubscriptionAllowed() {
  1393. return errHTTPTooManyRequestsLimitSubscriptions
  1394. }
  1395. defer v.RemoveSubscription()
  1396. topics, topicsStr, err := s.topicsFromPath(r.URL.Path)
  1397. if err != nil {
  1398. return err
  1399. }
  1400. poll, since, scheduled, filters, err := parseSubscribeParams(r)
  1401. if err != nil {
  1402. return err
  1403. }
  1404. var wlock sync.Mutex
  1405. var closed bool
  1406. defer func() {
  1407. // This blocks until any in-flight sub() call finishes writing/flushing the response writer,
  1408. // then marks the connection as closed so future sub() calls are no-ops. This prevents a panic
  1409. // from writing to a response writer that has been cleaned up after the handler returns.
  1410. // See https://github.com/binwiederhier/ntfy/issues/338#issuecomment-1163425889
  1411. // and https://github.com/binwiederhier/ntfy/pull/1598.
  1412. wlock.Lock()
  1413. closed = true
  1414. wlock.Unlock()
  1415. }()
  1416. sub := func(v *visitor, msg *message) error {
  1417. if !filters.Pass(msg) {
  1418. return nil
  1419. }
  1420. m, err := encoder(msg)
  1421. if err != nil {
  1422. return err
  1423. }
  1424. wlock.Lock()
  1425. defer wlock.Unlock()
  1426. if closed {
  1427. return nil
  1428. }
  1429. if _, err := w.Write([]byte(m)); err != nil {
  1430. return err
  1431. }
  1432. if fl, ok := w.(http.Flusher); ok {
  1433. fl.Flush()
  1434. }
  1435. return nil
  1436. }
  1437. if err := s.maybeSetRateVisitors(r, v, topics); err != nil {
  1438. return err
  1439. }
  1440. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1441. w.Header().Set("Content-Type", contentType+"; charset=utf-8") // Android/Volley client needs charset!
  1442. if poll {
  1443. for _, t := range topics {
  1444. t.Keepalive()
  1445. }
  1446. return s.sendOldMessages(topics, since, scheduled, v, sub)
  1447. }
  1448. ctx, cancel := context.WithCancel(context.Background())
  1449. defer cancel()
  1450. subscriberIDs := make([]int, 0)
  1451. for _, t := range topics {
  1452. subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel))
  1453. }
  1454. defer func() {
  1455. for i, subscriberID := range subscriberIDs {
  1456. topics[i].Unsubscribe(subscriberID) // Order!
  1457. }
  1458. }()
  1459. if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message
  1460. return err
  1461. }
  1462. if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {
  1463. return err
  1464. }
  1465. for {
  1466. select {
  1467. case <-ctx.Done():
  1468. return nil
  1469. case <-r.Context().Done():
  1470. return nil
  1471. case <-time.After(s.config.KeepaliveInterval):
  1472. ev := logvr(v, r).Tag(tagSubscribe)
  1473. if len(topics) == 1 {
  1474. ev.With(topics[0]).Trace("Sending keepalive message to %s", topics[0].ID)
  1475. } else {
  1476. ev.Trace("Sending keepalive message to %d topics", len(topics))
  1477. }
  1478. v.Keepalive()
  1479. for _, t := range topics {
  1480. t.Keepalive()
  1481. }
  1482. if err := sub(v, newKeepaliveMessage(topicsStr)); err != nil { // Send keepalive message
  1483. return err
  1484. }
  1485. }
  1486. }
  1487. }
  1488. func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1489. if strings.ToLower(r.Header.Get("Upgrade")) != "websocket" {
  1490. return errHTTPBadRequestWebSocketsUpgradeHeaderMissing
  1491. }
  1492. if !v.SubscriptionAllowed() {
  1493. return errHTTPTooManyRequestsLimitSubscriptions
  1494. }
  1495. defer v.RemoveSubscription()
  1496. logvr(v, r).Tag(tagWebsocket).Debug("WebSocket connection opened")
  1497. defer logvr(v, r).Tag(tagWebsocket).Debug("WebSocket connection closed")
  1498. topics, topicsStr, err := s.topicsFromPath(r.URL.Path)
  1499. if err != nil {
  1500. return err
  1501. }
  1502. poll, since, scheduled, filters, err := parseSubscribeParams(r)
  1503. if err != nil {
  1504. return err
  1505. }
  1506. upgrader := &websocket.Upgrader{
  1507. ReadBufferSize: wsBufferSize,
  1508. WriteBufferSize: wsBufferSize,
  1509. CheckOrigin: func(r *http.Request) bool {
  1510. return true // We're open for business!
  1511. },
  1512. }
  1513. conn, err := upgrader.Upgrade(w, r, nil)
  1514. if err != nil {
  1515. return err
  1516. }
  1517. defer conn.Close()
  1518. // Subscription connections can be canceled externally, see topic.CancelSubscribersExceptUser
  1519. cancelCtx, cancel := context.WithCancel(context.Background())
  1520. defer cancel()
  1521. // Use errgroup to run WebSocket reader and writer in Go routines
  1522. var wlock sync.Mutex
  1523. g, gctx := errgroup.WithContext(cancelCtx)
  1524. g.Go(func() error {
  1525. pongWait := s.config.KeepaliveInterval + wsPongWait
  1526. conn.SetReadLimit(wsReadLimit)
  1527. if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
  1528. return err
  1529. }
  1530. conn.SetPongHandler(func(appData string) error {
  1531. logvr(v, r).Tag(tagWebsocket).Trace("Received WebSocket pong")
  1532. return conn.SetReadDeadline(time.Now().Add(pongWait))
  1533. })
  1534. for {
  1535. _, _, err := conn.NextReader()
  1536. if err != nil {
  1537. return err
  1538. }
  1539. select {
  1540. case <-gctx.Done():
  1541. return nil
  1542. default:
  1543. }
  1544. }
  1545. })
  1546. g.Go(func() error {
  1547. ping := func() error {
  1548. wlock.Lock()
  1549. defer wlock.Unlock()
  1550. if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
  1551. return err
  1552. }
  1553. logvr(v, r).Tag(tagWebsocket).Trace("Sending WebSocket ping")
  1554. return conn.WriteMessage(websocket.PingMessage, nil)
  1555. }
  1556. for {
  1557. select {
  1558. case <-gctx.Done():
  1559. return nil
  1560. case <-cancelCtx.Done():
  1561. logvr(v, r).Tag(tagWebsocket).Trace("Cancel received, closing subscriber connection")
  1562. conn.Close()
  1563. return &websocket.CloseError{Code: websocket.CloseNormalClosure, Text: "subscription was canceled"}
  1564. case <-time.After(s.config.KeepaliveInterval):
  1565. v.Keepalive()
  1566. for _, t := range topics {
  1567. t.Keepalive()
  1568. }
  1569. if err := ping(); err != nil {
  1570. return err
  1571. }
  1572. }
  1573. }
  1574. })
  1575. sub := func(v *visitor, msg *message) error {
  1576. if !filters.Pass(msg) {
  1577. return nil
  1578. }
  1579. wlock.Lock()
  1580. defer wlock.Unlock()
  1581. if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
  1582. return err
  1583. }
  1584. return conn.WriteJSON(msg)
  1585. }
  1586. if err := s.maybeSetRateVisitors(r, v, topics); err != nil {
  1587. return err
  1588. }
  1589. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1590. if poll {
  1591. for _, t := range topics {
  1592. t.Keepalive()
  1593. }
  1594. return s.sendOldMessages(topics, since, scheduled, v, sub)
  1595. }
  1596. subscriberIDs := make([]int, 0)
  1597. for _, t := range topics {
  1598. subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel))
  1599. }
  1600. defer func() {
  1601. for i, subscriberID := range subscriberIDs {
  1602. topics[i].Unsubscribe(subscriberID) // Order!
  1603. }
  1604. }()
  1605. if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message
  1606. return err
  1607. }
  1608. if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {
  1609. return err
  1610. }
  1611. err = g.Wait()
  1612. if err != nil && websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNoStatusReceived) {
  1613. logvr(v, r).Tag(tagWebsocket).Err(err).Fields(websocketErrorContext(err)).Trace("WebSocket connection closed")
  1614. return nil // Normal closures are not errors; note: "1006 (abnormal closure)" is treated as normal, because people disconnect a lot
  1615. }
  1616. if err != nil {
  1617. return &errWebSocketPostUpgrade{err}
  1618. }
  1619. return nil
  1620. }
  1621. func parseSubscribeParams(r *http.Request) (poll bool, since sinceMarker, scheduled bool, filters *queryFilter, err error) {
  1622. poll = readBoolParam(r, false, "x-poll", "poll", "po")
  1623. scheduled = readBoolParam(r, false, "x-scheduled", "scheduled", "sched")
  1624. since, err = parseSince(r, poll)
  1625. if err != nil {
  1626. return
  1627. }
  1628. filters, err = parseQueryFilters(r)
  1629. if err != nil {
  1630. return
  1631. }
  1632. return
  1633. }
  1634. // maybeSetRateVisitors sets the rate visitor on a topic (v.SetRateVisitor), indicating that all messages published
  1635. // to that topic will be rate limited against the rate visitor instead of the publishing visitor.
  1636. //
  1637. // Setting the rate visitor is ony allowed if the `visitor-subscriber-rate-limiting` setting is enabled, AND
  1638. // - auth-file is not set (everything is open by default)
  1639. // - or the topic is reserved, and v.user is the owner
  1640. // - or the topic is not reserved, and v.user has write access
  1641. //
  1642. // This only applies to UnifiedPush topics ("up...").
  1643. func (s *Server) maybeSetRateVisitors(r *http.Request, v *visitor, topics []*topic) error {
  1644. // Bail out if not enabled
  1645. if !s.config.VisitorSubscriberRateLimiting {
  1646. return nil
  1647. }
  1648. // Make a list of topics that we'll actually set the RateVisitor on
  1649. eligibleRateTopics := make([]*topic, 0)
  1650. for _, t := range topics {
  1651. if strings.HasPrefix(t.ID, unifiedPushTopicPrefix) && len(t.ID) == unifiedPushTopicLength {
  1652. eligibleRateTopics = append(eligibleRateTopics, t)
  1653. }
  1654. }
  1655. if len(eligibleRateTopics) == 0 {
  1656. return nil
  1657. }
  1658. // If access controls are turned off, v has access to everything, and we can set the rate visitor
  1659. if s.userManager == nil {
  1660. return s.setRateVisitors(r, v, eligibleRateTopics)
  1661. }
  1662. // If access controls are enabled, only set rate visitor if
  1663. // - topic is reserved, and v.user is the owner
  1664. // - topic is not reserved, and v.user has write access
  1665. writableRateTopics := make([]*topic, 0)
  1666. for _, t := range topics {
  1667. if !util.Contains(eligibleRateTopics, t) {
  1668. continue
  1669. }
  1670. ownerUserID, err := s.userManager.ReservationOwner(t.ID)
  1671. if err != nil {
  1672. return err
  1673. }
  1674. if ownerUserID == "" {
  1675. if err := s.userManager.Authorize(v.User(), t.ID, user.PermissionWrite); err == nil {
  1676. writableRateTopics = append(writableRateTopics, t)
  1677. }
  1678. } else if ownerUserID == v.MaybeUserID() {
  1679. writableRateTopics = append(writableRateTopics, t)
  1680. }
  1681. }
  1682. return s.setRateVisitors(r, v, writableRateTopics)
  1683. }
  1684. func (s *Server) setRateVisitors(r *http.Request, v *visitor, rateTopics []*topic) error {
  1685. for _, t := range rateTopics {
  1686. logvr(v, r).
  1687. Tag(tagSubscribe).
  1688. With(t).
  1689. Debug("Setting visitor as rate visitor for topic %s", t.ID)
  1690. t.SetRateVisitor(v)
  1691. }
  1692. return nil
  1693. }
  1694. // sendOldMessages selects old messages from the messageCache and calls sub for each of them. It uses since as the
  1695. // marker, returning only messages that are newer than the marker.
  1696. func (s *Server) sendOldMessages(topics []*topic, since sinceMarker, scheduled bool, v *visitor, sub subscriber) error {
  1697. if since.IsNone() {
  1698. return nil
  1699. }
  1700. messages := make([]*message, 0)
  1701. for _, t := range topics {
  1702. topicMessages, err := s.messageCache.Messages(t.ID, since, scheduled)
  1703. if err != nil {
  1704. return err
  1705. }
  1706. messages = append(messages, topicMessages...)
  1707. }
  1708. sort.Slice(messages, func(i, j int) bool {
  1709. return messages[i].Time < messages[j].Time
  1710. })
  1711. for _, m := range messages {
  1712. if err := sub(v, m); err != nil {
  1713. return err
  1714. }
  1715. }
  1716. return nil
  1717. }
  1718. // parseSince returns a timestamp identifying the time span from which cached messages should be received.
  1719. //
  1720. // Values in the "since=..." parameter can be either a unix timestamp or a duration (e.g. 12h),
  1721. // "all" for all messages, or "latest" for the most recent message for a topic
  1722. func parseSince(r *http.Request, poll bool) (sinceMarker, error) {
  1723. since := readParam(r, "x-since", "since", "si")
  1724. // Easy cases (empty, all, none)
  1725. if since == "" {
  1726. if poll {
  1727. return sinceAllMessages, nil
  1728. }
  1729. return sinceNoMessages, nil
  1730. } else if since == "all" {
  1731. return sinceAllMessages, nil
  1732. } else if since == "latest" {
  1733. return sinceLatestMessage, nil
  1734. } else if since == "none" {
  1735. return sinceNoMessages, nil
  1736. }
  1737. // ID, timestamp, duration
  1738. if validMessageID(since) {
  1739. return newSinceID(since), nil
  1740. } else if s, err := strconv.ParseInt(since, 10, 64); err == nil {
  1741. return newSinceTime(s), nil
  1742. } else if d, err := time.ParseDuration(since); err == nil {
  1743. return newSinceTime(time.Now().Add(-1 * d).Unix()), nil
  1744. }
  1745. return sinceNoMessages, errHTTPBadRequestSinceInvalid
  1746. }
  1747. func (s *Server) handleOptions(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
  1748. w.Header().Set("Access-Control-Allow-Methods", "GET, PUT, POST, PATCH, DELETE")
  1749. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  1750. w.Header().Set("Access-Control-Allow-Headers", "*") // CORS, allow auth via JS // FIXME is this terrible?
  1751. return nil
  1752. }
  1753. // topicFromPath returns the topic from a root path (e.g. /mytopic), creating it if it doesn't exist.
  1754. func (s *Server) topicFromPath(path string) (*topic, error) {
  1755. parts := strings.Split(path, "/")
  1756. if len(parts) < 2 {
  1757. return nil, errHTTPBadRequestTopicInvalid
  1758. }
  1759. return s.topicFromID(parts[1])
  1760. }
  1761. // topicsFromPath returns the topic from a root path (e.g. /mytopic,mytopic2), creating it if it doesn't exist.
  1762. func (s *Server) topicsFromPath(path string) ([]*topic, string, error) {
  1763. parts := strings.Split(path, "/")
  1764. if len(parts) < 2 {
  1765. return nil, "", errHTTPBadRequestTopicInvalid
  1766. }
  1767. topicIDs := util.SplitNoEmpty(parts[1], ",")
  1768. topics, err := s.topicsFromIDs(topicIDs...)
  1769. if err != nil {
  1770. return nil, "", errHTTPBadRequestTopicInvalid
  1771. }
  1772. return topics, parts[1], nil
  1773. }
  1774. // sequenceIDFromPath returns the sequence ID from a path like /mytopic/sequenceIdHere
  1775. func (s *Server) sequenceIDFromPath(path string) (string, *errHTTP) {
  1776. parts := strings.Split(path, "/")
  1777. if len(parts) < 3 {
  1778. return "", errHTTPBadRequestSequenceIDInvalid
  1779. }
  1780. return parts[2], nil
  1781. }
  1782. // topicsFromIDs returns the topics with the given IDs, creating them if they don't exist.
  1783. func (s *Server) topicsFromIDs(ids ...string) ([]*topic, error) {
  1784. s.mu.Lock()
  1785. defer s.mu.Unlock()
  1786. topics := make([]*topic, 0)
  1787. for _, id := range ids {
  1788. if util.Contains(s.config.DisallowedTopics, id) {
  1789. return nil, errHTTPBadRequestTopicDisallowed
  1790. }
  1791. if _, ok := s.topics[id]; !ok {
  1792. if len(s.topics) >= s.config.TotalTopicLimit {
  1793. return nil, errHTTPTooManyRequestsLimitTotalTopics
  1794. }
  1795. s.topics[id] = newTopic(id)
  1796. }
  1797. topics = append(topics, s.topics[id])
  1798. }
  1799. return topics, nil
  1800. }
  1801. // topicFromID returns the topic with the given ID, creating it if it doesn't exist.
  1802. func (s *Server) topicFromID(id string) (*topic, error) {
  1803. topics, err := s.topicsFromIDs(id)
  1804. if err != nil {
  1805. return nil, err
  1806. }
  1807. return topics[0], nil
  1808. }
  1809. // topicsFromPattern returns a list of topics matching the given pattern, but it does not create them.
  1810. func (s *Server) topicsFromPattern(pattern string) ([]*topic, error) {
  1811. s.mu.RLock()
  1812. defer s.mu.RUnlock()
  1813. patternRegexp, err := regexp.Compile("^" + strings.ReplaceAll(pattern, "*", ".*") + "$")
  1814. if err != nil {
  1815. return nil, err
  1816. }
  1817. topics := make([]*topic, 0)
  1818. for _, t := range s.topics {
  1819. if patternRegexp.MatchString(t.ID) {
  1820. topics = append(topics, t)
  1821. }
  1822. }
  1823. return topics, nil
  1824. }
  1825. func (s *Server) runSMTPServer() error {
  1826. s.smtpServerBackend = newMailBackend(s.config, s.handle)
  1827. s.smtpServer = smtp.NewServer(s.smtpServerBackend)
  1828. s.smtpServer.Addr = s.config.SMTPServerListen
  1829. s.smtpServer.Domain = s.config.SMTPServerDomain
  1830. s.smtpServer.ReadTimeout = 10 * time.Second
  1831. s.smtpServer.WriteTimeout = 10 * time.Second
  1832. s.smtpServer.MaxMessageBytes = 1024 * 1024 // Must be much larger than message size (headers, multipart, etc.)
  1833. s.smtpServer.MaxRecipients = 1
  1834. s.smtpServer.AllowInsecureAuth = true
  1835. return s.smtpServer.ListenAndServe()
  1836. }
  1837. func (s *Server) runManager() {
  1838. for {
  1839. select {
  1840. case <-time.After(s.config.ManagerInterval):
  1841. log.
  1842. Tag(tagManager).
  1843. Timing(s.execManager).
  1844. Debug("Manager finished")
  1845. case <-s.closeChan:
  1846. return
  1847. }
  1848. }
  1849. }
  1850. // runStatsResetter runs once a day (usually midnight UTC) to reset all the visitor's message and
  1851. // email counters. The stats are used to display the counters in the web app, as well as for rate limiting.
  1852. func (s *Server) runStatsResetter() {
  1853. for {
  1854. runAt := util.NextOccurrenceUTC(s.config.VisitorStatsResetTime, time.Now())
  1855. timer := time.NewTimer(time.Until(runAt))
  1856. log.Tag(tagResetter).Debug("Waiting until %v to reset visitor stats", runAt)
  1857. select {
  1858. case <-timer.C:
  1859. log.Tag(tagResetter).Debug("Running stats resetter")
  1860. s.resetStats()
  1861. case <-s.closeChan:
  1862. log.Tag(tagResetter).Debug("Stopping stats resetter")
  1863. timer.Stop()
  1864. return
  1865. }
  1866. }
  1867. }
  1868. func (s *Server) resetStats() {
  1869. log.Info("Resetting all visitor stats (daily task)")
  1870. s.mu.Lock()
  1871. defer s.mu.Unlock() // Includes the database query to avoid races with other processes
  1872. for _, v := range s.visitors {
  1873. v.ResetStats()
  1874. }
  1875. if s.userManager != nil {
  1876. if err := s.userManager.ResetStats(); err != nil {
  1877. log.Tag(tagResetter).Warn("Failed to write to database: %s", err.Error())
  1878. }
  1879. }
  1880. }
  1881. func (s *Server) runFirebaseKeepaliver() {
  1882. if s.firebaseClient == nil {
  1883. return
  1884. }
  1885. v := newVisitor(s.config, s.messageCache, s.userManager, netip.IPv4Unspecified(), nil) // Background process, not a real visitor, uses IP 0.0.0.0
  1886. for {
  1887. select {
  1888. case <-time.After(s.config.FirebaseKeepaliveInterval):
  1889. s.sendToFirebase(v, newKeepaliveMessage(firebaseControlTopic))
  1890. /*
  1891. FIXME: Disable iOS polling entirely for now due to thundering herd problem (see #677)
  1892. To solve this, we'd have to shard the iOS poll topics to spread out the polling evenly.
  1893. Given that it's not really necessary to poll, turning it off for now should not have any impact.
  1894. case <-time.After(s.config.FirebasePollInterval):
  1895. s.sendToFirebase(v, newKeepaliveMessage(firebasePollTopic))
  1896. */
  1897. case <-s.closeChan:
  1898. return
  1899. }
  1900. }
  1901. }
  1902. func (s *Server) runDelayedSender() {
  1903. for {
  1904. select {
  1905. case <-time.After(s.config.DelayedSenderInterval):
  1906. if err := s.sendDelayedMessages(); err != nil {
  1907. log.Tag(tagPublish).Err(err).Warn("Error sending delayed messages")
  1908. }
  1909. case <-s.closeChan:
  1910. return
  1911. }
  1912. }
  1913. }
  1914. func (s *Server) sendDelayedMessages() error {
  1915. messages, err := s.messageCache.MessagesDue()
  1916. if err != nil {
  1917. return err
  1918. }
  1919. for _, m := range messages {
  1920. var u *user.User
  1921. if s.userManager != nil && m.User != "" {
  1922. u, err = s.userManager.UserByID(m.User)
  1923. if err != nil {
  1924. log.With(m).Err(err).Warn("Error sending delayed message")
  1925. continue
  1926. }
  1927. }
  1928. v := s.visitor(m.Sender, u)
  1929. if err := s.sendDelayedMessage(v, m); err != nil {
  1930. logvm(v, m).Err(err).Warn("Error sending delayed message")
  1931. }
  1932. }
  1933. return nil
  1934. }
  1935. func (s *Server) sendDelayedMessage(v *visitor, m *message) error {
  1936. logvm(v, m).Debug("Sending delayed message")
  1937. s.mu.RLock()
  1938. t, ok := s.topics[m.Topic] // If no subscribers, just mark message as published
  1939. s.mu.RUnlock()
  1940. if ok {
  1941. go func() {
  1942. // We do not rate-limit messages here, since we've rate limited them in the PUT/POST handler
  1943. if err := t.Publish(v, m); err != nil {
  1944. logvm(v, m).Err(err).Warn("Unable to publish message")
  1945. }
  1946. }()
  1947. }
  1948. if s.firebaseClient != nil { // Firebase subscribers may not show up in topics map
  1949. go s.sendToFirebase(v, m)
  1950. }
  1951. if s.config.UpstreamBaseURL != "" {
  1952. go s.forwardPollRequest(v, m)
  1953. }
  1954. if s.config.WebPushPublicKey != "" {
  1955. go s.publishToWebPushEndpoints(v, m)
  1956. }
  1957. if err := s.messageCache.MarkPublished(m); err != nil {
  1958. return err
  1959. }
  1960. return nil
  1961. }
  1962. // transformBodyJSON peeks the request body, reads the JSON, and converts it to headers
  1963. // before passing it on to the next handler. This is meant to be used in combination with handlePublish.
  1964. func (s *Server) transformBodyJSON(next handleFunc) handleFunc {
  1965. return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1966. m, err := readJSONWithLimit[publishMessage](r.Body, s.config.MessageSizeLimit*2, false) // 2x to account for JSON format overhead
  1967. if err != nil {
  1968. return err
  1969. }
  1970. if !topicRegex.MatchString(m.Topic) {
  1971. return errHTTPBadRequestTopicInvalid
  1972. }
  1973. if m.Message == "" {
  1974. m.Message = emptyMessageBody
  1975. }
  1976. r.URL.Path = "/" + m.Topic
  1977. r.Body = io.NopCloser(strings.NewReader(m.Message))
  1978. if m.Title != "" {
  1979. r.Header.Set("X-Title", m.Title)
  1980. }
  1981. if m.Priority != 0 {
  1982. r.Header.Set("X-Priority", fmt.Sprintf("%d", m.Priority))
  1983. }
  1984. if len(m.Tags) > 0 {
  1985. r.Header.Set("X-Tags", strings.Join(m.Tags, ","))
  1986. }
  1987. if m.Attach != "" {
  1988. r.Header.Set("X-Attach", m.Attach)
  1989. }
  1990. if m.Filename != "" {
  1991. r.Header.Set("X-Filename", m.Filename)
  1992. }
  1993. if m.Click != "" {
  1994. r.Header.Set("X-Click", m.Click)
  1995. }
  1996. if m.Icon != "" {
  1997. r.Header.Set("X-Icon", m.Icon)
  1998. }
  1999. if m.Markdown {
  2000. r.Header.Set("X-Markdown", "yes")
  2001. }
  2002. if len(m.Actions) > 0 {
  2003. actionsStr, err := json.Marshal(m.Actions)
  2004. if err != nil {
  2005. return errHTTPBadRequestMessageJSONInvalid
  2006. }
  2007. r.Header.Set("X-Actions", string(actionsStr))
  2008. }
  2009. if m.Email != "" {
  2010. r.Header.Set("X-Email", m.Email)
  2011. }
  2012. if m.Delay != "" {
  2013. r.Header.Set("X-Delay", m.Delay)
  2014. }
  2015. if m.Call != "" {
  2016. r.Header.Set("X-Call", m.Call)
  2017. }
  2018. if m.Cache != "" {
  2019. r.Header.Set("X-Cache", m.Cache)
  2020. }
  2021. if m.Firebase != "" {
  2022. r.Header.Set("X-Firebase", m.Firebase)
  2023. }
  2024. if m.SequenceID != "" {
  2025. r.Header.Set("X-Sequence-ID", m.SequenceID)
  2026. }
  2027. return next(w, r, v)
  2028. }
  2029. }
  2030. func (s *Server) transformMatrixJSON(next handleFunc) handleFunc {
  2031. return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
  2032. newRequest, err := newRequestFromMatrixJSON(r, s.config.BaseURL, s.config.MessageSizeLimit)
  2033. if err != nil {
  2034. logvr(v, r).Tag(tagMatrix).Err(err).Debug("Invalid Matrix request")
  2035. if e, ok := err.(*errMatrixPushkeyRejected); ok {
  2036. return writeMatrixResponse(w, e.rejectedPushKey)
  2037. }
  2038. return err
  2039. }
  2040. if err := next(w, newRequest, v); err != nil {
  2041. logvr(v, r).Tag(tagMatrix).Err(err).Debug("Error handling Matrix request")
  2042. return err
  2043. }
  2044. return nil
  2045. }
  2046. }
  2047. func (s *Server) authorizeTopicWrite(next handleFunc) handleFunc {
  2048. return s.authorizeTopic(next, user.PermissionWrite)
  2049. }
  2050. func (s *Server) authorizeTopicRead(next handleFunc) handleFunc {
  2051. return s.authorizeTopic(next, user.PermissionRead)
  2052. }
  2053. func (s *Server) authorizeTopic(next handleFunc, perm user.Permission) handleFunc {
  2054. return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
  2055. if s.userManager == nil {
  2056. return next(w, r, v)
  2057. }
  2058. topics, _, err := s.topicsFromPath(r.URL.Path)
  2059. if err != nil {
  2060. return err
  2061. }
  2062. u := v.User()
  2063. for _, t := range topics {
  2064. if err := s.userManager.Authorize(u, t.ID, perm); err != nil {
  2065. logvr(v, r).With(t).Err(err).Debug("Access to topic %s not authorized", t.ID)
  2066. return errHTTPForbidden.With(t)
  2067. }
  2068. }
  2069. return next(w, r, v)
  2070. }
  2071. }
  2072. // maybeAuthenticate reads the "Authorization" header and will try to authenticate the user
  2073. // if it is set.
  2074. //
  2075. // - If auth-file is not configured, immediately return an IP-based visitor
  2076. // - If the header is not set or not supported (anything non-Basic and non-Bearer),
  2077. // an IP-based visitor is returned
  2078. // - If the header is set, authenticate will be called to check the username/password (Basic auth),
  2079. // or the token (Bearer auth), and read the user from the database
  2080. //
  2081. // This function will ALWAYS return a visitor, even if an error occurs (e.g. unauthorized), so
  2082. // that subsequent logging calls still have a visitor context.
  2083. func (s *Server) maybeAuthenticate(r *http.Request) (*visitor, error) {
  2084. // Read the "Authorization" header value and exit out early if it's not set
  2085. ip := extractIPAddress(r, s.config.BehindProxy, s.config.ProxyForwardedHeader, s.config.ProxyTrustedPrefixes)
  2086. vip := s.visitor(ip, nil)
  2087. if s.userManager == nil {
  2088. return vip, nil
  2089. }
  2090. header, err := readAuthHeader(r)
  2091. if err != nil {
  2092. return vip, err
  2093. } else if !supportedAuthHeader(header) {
  2094. return vip, nil
  2095. }
  2096. // If we're trying to auth, check the rate limiter first
  2097. if !vip.AuthAllowed() {
  2098. return vip, errHTTPTooManyRequestsLimitAuthFailure // Always return visitor, even when error occurs!
  2099. }
  2100. u, err := s.authenticate(r, header)
  2101. if err != nil {
  2102. vip.AuthFailed()
  2103. logr(r).Err(err).Debug("Authentication failed")
  2104. return vip, errHTTPUnauthorized // Always return visitor, even when error occurs!
  2105. }
  2106. // Authentication with user was successful
  2107. return s.visitor(ip, u), nil
  2108. }
  2109. // authenticate a user based on basic auth username/password (Authorization: Basic ...), or token auth (Authorization: Bearer ...).
  2110. // The Authorization header can be passed as a header or the ?auth=... query param. The latter is required only to
  2111. // support the WebSocket JavaScript class, which does not support passing headers during the initial request. The auth
  2112. // query param is effectively doubly base64 encoded. Its format is base64(Basic base64(user:pass)).
  2113. func (s *Server) authenticate(r *http.Request, header string) (user *user.User, err error) {
  2114. if strings.HasPrefix(header, "Bearer") {
  2115. return s.authenticateBearerAuth(r, strings.TrimSpace(strings.TrimPrefix(header, "Bearer")))
  2116. }
  2117. return s.authenticateBasicAuth(r, header)
  2118. }
  2119. // readAuthHeader reads the raw value of the Authorization header, either from the actual HTTP header,
  2120. // or from the ?auth... query parameter
  2121. func readAuthHeader(r *http.Request) (string, error) {
  2122. value := strings.TrimSpace(r.Header.Get("Authorization"))
  2123. queryParam := readQueryParam(r, "authorization", "auth")
  2124. if queryParam != "" {
  2125. a, err := base64.RawURLEncoding.DecodeString(queryParam)
  2126. if err != nil {
  2127. return "", err
  2128. }
  2129. value = strings.TrimSpace(string(a))
  2130. }
  2131. return value, nil
  2132. }
  2133. // supportedAuthHeader returns true only if the Authorization header value starts
  2134. // with "Basic" or "Bearer". In particular, an empty value is not supported, and neither
  2135. // are things like "WebPush", or "vapid" (see #629).
  2136. func supportedAuthHeader(value string) bool {
  2137. value = strings.ToLower(value)
  2138. return strings.HasPrefix(value, "basic ") || strings.HasPrefix(value, "bearer ")
  2139. }
  2140. func (s *Server) authenticateBasicAuth(r *http.Request, value string) (user *user.User, err error) {
  2141. r.Header.Set("Authorization", value)
  2142. username, password, ok := r.BasicAuth()
  2143. if !ok {
  2144. return nil, errors.New("invalid basic auth")
  2145. } else if username == "" {
  2146. return s.authenticateBearerAuth(r, password) // Treat password as token
  2147. }
  2148. return s.userManager.Authenticate(username, password)
  2149. }
  2150. func (s *Server) authenticateBearerAuth(r *http.Request, token string) (*user.User, error) {
  2151. u, err := s.userManager.AuthenticateToken(token)
  2152. if err != nil {
  2153. return nil, err
  2154. }
  2155. ip := extractIPAddress(r, s.config.BehindProxy, s.config.ProxyForwardedHeader, s.config.ProxyTrustedPrefixes)
  2156. go s.userManager.EnqueueTokenUpdate(token, &user.TokenUpdate{
  2157. LastAccess: time.Now(),
  2158. LastOrigin: ip,
  2159. })
  2160. return u, nil
  2161. }
  2162. func (s *Server) visitor(ip netip.Addr, user *user.User) *visitor {
  2163. s.mu.Lock()
  2164. defer s.mu.Unlock()
  2165. id := visitorID(ip, user, s.config)
  2166. v, exists := s.visitors[id]
  2167. if !exists {
  2168. s.visitors[id] = newVisitor(s.config, s.messageCache, s.userManager, ip, user)
  2169. return s.visitors[id]
  2170. }
  2171. v.Keepalive()
  2172. v.SetUser(user) // Always update with the latest user, may be nil!
  2173. return v
  2174. }
  2175. func (s *Server) writeJSON(w http.ResponseWriter, v any) error {
  2176. return s.writeJSONWithContentType(w, v, "application/json")
  2177. }
  2178. func (s *Server) writeJSONWithContentType(w http.ResponseWriter, v any, contentType string) error {
  2179. w.Header().Set("Content-Type", contentType)
  2180. w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
  2181. if err := json.NewEncoder(w).Encode(v); err != nil {
  2182. return err
  2183. }
  2184. return nil
  2185. }
  2186. func (s *Server) updateAndWriteStats(messagesCount int64) {
  2187. s.mu.Lock()
  2188. s.messagesHistory = append(s.messagesHistory, messagesCount)
  2189. if len(s.messagesHistory) > messagesHistoryMax {
  2190. s.messagesHistory = s.messagesHistory[1:]
  2191. }
  2192. s.mu.Unlock()
  2193. go func() {
  2194. if err := s.messageCache.UpdateStats(messagesCount); err != nil {
  2195. log.Tag(tagManager).Err(err).Warn("Cannot write messages stats")
  2196. }
  2197. }()
  2198. }