server.go 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173
  1. package server
  2. import (
  3. "bytes"
  4. "context"
  5. "embed"
  6. "encoding/base64"
  7. "encoding/json"
  8. "errors"
  9. firebase "firebase.google.com/go"
  10. "firebase.google.com/go/messaging"
  11. "fmt"
  12. "github.com/emersion/go-smtp"
  13. "github.com/gorilla/websocket"
  14. "golang.org/x/sync/errgroup"
  15. "google.golang.org/api/option"
  16. "heckel.io/ntfy/auth"
  17. "heckel.io/ntfy/util"
  18. "html/template"
  19. "io"
  20. "log"
  21. "net"
  22. "net/http"
  23. "net/http/httptest"
  24. "net/url"
  25. "os"
  26. "path"
  27. "path/filepath"
  28. "regexp"
  29. "strconv"
  30. "strings"
  31. "sync"
  32. "time"
  33. "unicode/utf8"
  34. )
  35. // Server is the main server, providing the UI and API for ntfy
  36. type Server struct {
  37. config *Config
  38. httpServer *http.Server
  39. httpsServer *http.Server
  40. unixListener net.Listener
  41. smtpServer *smtp.Server
  42. smtpBackend *smtpBackend
  43. topics map[string]*topic
  44. visitors map[string]*visitor
  45. firebase subscriber
  46. mailer mailer
  47. messages int64
  48. auth auth.Auther
  49. cache cache
  50. fileCache *fileCache
  51. closeChan chan bool
  52. mu sync.Mutex
  53. }
  54. type indexPage struct {
  55. Topic string
  56. CacheDuration time.Duration
  57. }
  58. // handleFunc extends the normal http.HandlerFunc to be able to easily return errors
  59. type handleFunc func(http.ResponseWriter, *http.Request, *visitor) error
  60. var (
  61. topicRegex = regexp.MustCompile(`^[-_A-Za-z0-9]{1,64}$`) // No /!
  62. topicPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app!
  63. jsonPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/json$`)
  64. ssePathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/sse$`)
  65. rawPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/raw$`)
  66. wsPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/ws$`)
  67. publishPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/(publish|send|trigger)$`)
  68. staticRegex = regexp.MustCompile(`^/static/.+`)
  69. docsRegex = regexp.MustCompile(`^/docs(|/.*)$`)
  70. fileRegex = regexp.MustCompile(`^/file/([-_A-Za-z0-9]{1,64})(?:\.[A-Za-z0-9]{1,16})?$`)
  71. disallowedTopics = []string{"docs", "static", "file"}
  72. attachURLRegex = regexp.MustCompile(`^https?://`)
  73. templateFnMap = template.FuncMap{
  74. "durationToHuman": util.DurationToHuman,
  75. }
  76. //go:embed "index.gohtml"
  77. indexSource string
  78. indexTemplate = template.Must(template.New("index").Funcs(templateFnMap).Parse(indexSource))
  79. //go:embed "example.html"
  80. exampleSource string
  81. //go:embed static
  82. webStaticFs embed.FS
  83. webStaticFsCached = &util.CachingEmbedFS{ModTime: time.Now(), FS: webStaticFs}
  84. //go:embed docs
  85. docsStaticFs embed.FS
  86. docsStaticCached = &util.CachingEmbedFS{ModTime: time.Now(), FS: docsStaticFs}
  87. )
  88. const (
  89. firebaseControlTopic = "~control" // See Android if changed
  90. emptyMessageBody = "triggered" // Used if message body is empty
  91. defaultAttachmentMessage = "You received a file: %s" // Used if message body is empty, and there is an attachment
  92. encodingBase64 = "base64"
  93. )
  94. // WebSocket constants
  95. const (
  96. wsWriteWait = 2 * time.Second
  97. wsBufferSize = 1024
  98. wsReadLimit = 64 // We only ever receive PINGs
  99. wsPongWait = 15 * time.Second
  100. )
  101. // New instantiates a new Server. It creates the cache and adds a Firebase
  102. // subscriber (if configured).
  103. func New(conf *Config) (*Server, error) {
  104. var firebaseSubscriber subscriber
  105. if conf.FirebaseKeyFile != "" {
  106. var err error
  107. firebaseSubscriber, err = createFirebaseSubscriber(conf)
  108. if err != nil {
  109. return nil, err
  110. }
  111. }
  112. var mailer mailer
  113. if conf.SMTPSenderAddr != "" {
  114. mailer = &smtpSender{config: conf}
  115. }
  116. cache, err := createCache(conf)
  117. if err != nil {
  118. return nil, err
  119. }
  120. topics, err := cache.Topics()
  121. if err != nil {
  122. return nil, err
  123. }
  124. var fileCache *fileCache
  125. if conf.AttachmentCacheDir != "" {
  126. fileCache, err = newFileCache(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, conf.AttachmentFileSizeLimit)
  127. if err != nil {
  128. return nil, err
  129. }
  130. }
  131. var auther auth.Auther
  132. if conf.AuthFile != "" {
  133. auther, err = auth.NewSQLiteAuth(conf.AuthFile, conf.AuthDefaultRead, conf.AuthDefaultWrite)
  134. if err != nil {
  135. return nil, err
  136. }
  137. }
  138. return &Server{
  139. config: conf,
  140. cache: cache,
  141. fileCache: fileCache,
  142. firebase: firebaseSubscriber,
  143. mailer: mailer,
  144. topics: topics,
  145. auth: auther,
  146. visitors: make(map[string]*visitor),
  147. }, nil
  148. }
  149. func createCache(conf *Config) (cache, error) {
  150. if conf.CacheDuration == 0 {
  151. return newNopCache(), nil
  152. } else if conf.CacheFile != "" {
  153. return newSqliteCache(conf.CacheFile)
  154. }
  155. return newMemCache(), nil
  156. }
  157. func createFirebaseSubscriber(conf *Config) (subscriber, error) {
  158. fb, err := firebase.NewApp(context.Background(), nil, option.WithCredentialsFile(conf.FirebaseKeyFile))
  159. if err != nil {
  160. return nil, err
  161. }
  162. msg, err := fb.Messaging(context.Background())
  163. if err != nil {
  164. return nil, err
  165. }
  166. return func(m *message) error {
  167. var data map[string]string // Matches https://ntfy.sh/docs/subscribe/api/#json-message-format
  168. switch m.Event {
  169. case keepaliveEvent, openEvent:
  170. data = map[string]string{
  171. "id": m.ID,
  172. "time": fmt.Sprintf("%d", m.Time),
  173. "event": m.Event,
  174. "topic": m.Topic,
  175. }
  176. case messageEvent:
  177. data = map[string]string{
  178. "id": m.ID,
  179. "time": fmt.Sprintf("%d", m.Time),
  180. "event": m.Event,
  181. "topic": m.Topic,
  182. "priority": fmt.Sprintf("%d", m.Priority),
  183. "tags": strings.Join(m.Tags, ","),
  184. "click": m.Click,
  185. "title": m.Title,
  186. "message": m.Message,
  187. "encoding": m.Encoding,
  188. }
  189. if m.Attachment != nil {
  190. data["attachment_name"] = m.Attachment.Name
  191. data["attachment_type"] = m.Attachment.Type
  192. data["attachment_size"] = fmt.Sprintf("%d", m.Attachment.Size)
  193. data["attachment_expires"] = fmt.Sprintf("%d", m.Attachment.Expires)
  194. data["attachment_url"] = m.Attachment.URL
  195. }
  196. }
  197. var androidConfig *messaging.AndroidConfig
  198. if m.Priority >= 4 {
  199. androidConfig = &messaging.AndroidConfig{
  200. Priority: "high",
  201. }
  202. }
  203. _, err := msg.Send(context.Background(), maybeTruncateFCMMessage(&messaging.Message{
  204. Topic: m.Topic,
  205. Data: data,
  206. Android: androidConfig,
  207. }))
  208. return err
  209. }, nil
  210. }
  211. // Run executes the main server. It listens on HTTP (+ HTTPS, if configured), and starts
  212. // a manager go routine to print stats and prune messages.
  213. func (s *Server) Run() error {
  214. var listenStr string
  215. if s.config.ListenHTTP != "" {
  216. listenStr += fmt.Sprintf(" %s[http]", s.config.ListenHTTP)
  217. }
  218. if s.config.ListenHTTPS != "" {
  219. listenStr += fmt.Sprintf(" %s[https]", s.config.ListenHTTPS)
  220. }
  221. if s.config.ListenUnix != "" {
  222. listenStr += fmt.Sprintf(" %s[unix]", s.config.ListenUnix)
  223. }
  224. if s.config.SMTPServerListen != "" {
  225. listenStr += fmt.Sprintf(" %s[smtp]", s.config.SMTPServerListen)
  226. }
  227. log.Printf("Listening on%s", listenStr)
  228. mux := http.NewServeMux()
  229. mux.HandleFunc("/", s.handle)
  230. errChan := make(chan error)
  231. s.mu.Lock()
  232. s.closeChan = make(chan bool)
  233. if s.config.ListenHTTP != "" {
  234. s.httpServer = &http.Server{Addr: s.config.ListenHTTP, Handler: mux}
  235. go func() {
  236. errChan <- s.httpServer.ListenAndServe()
  237. }()
  238. }
  239. if s.config.ListenHTTPS != "" {
  240. s.httpsServer = &http.Server{Addr: s.config.ListenHTTPS, Handler: mux}
  241. go func() {
  242. errChan <- s.httpsServer.ListenAndServeTLS(s.config.CertFile, s.config.KeyFile)
  243. }()
  244. }
  245. if s.config.ListenUnix != "" {
  246. go func() {
  247. var err error
  248. s.mu.Lock()
  249. os.Remove(s.config.ListenUnix)
  250. s.unixListener, err = net.Listen("unix", s.config.ListenUnix)
  251. if err != nil {
  252. errChan <- err
  253. return
  254. }
  255. s.mu.Unlock()
  256. httpServer := &http.Server{Handler: mux}
  257. errChan <- httpServer.Serve(s.unixListener)
  258. }()
  259. }
  260. if s.config.SMTPServerListen != "" {
  261. go func() {
  262. errChan <- s.runSMTPServer()
  263. }()
  264. }
  265. s.mu.Unlock()
  266. go s.runManager()
  267. go s.runAtSender()
  268. go s.runFirebaseKeepaliver()
  269. return <-errChan
  270. }
  271. // Stop stops HTTP (+HTTPS) server and all managers
  272. func (s *Server) Stop() {
  273. s.mu.Lock()
  274. defer s.mu.Unlock()
  275. if s.httpServer != nil {
  276. s.httpServer.Close()
  277. }
  278. if s.httpsServer != nil {
  279. s.httpsServer.Close()
  280. }
  281. if s.unixListener != nil {
  282. s.unixListener.Close()
  283. }
  284. if s.smtpServer != nil {
  285. s.smtpServer.Close()
  286. }
  287. close(s.closeChan)
  288. }
  289. func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
  290. if err := s.handleInternal(w, r); err != nil {
  291. if websocket.IsWebSocketUpgrade(r) {
  292. log.Printf("[%s] WS %s %s - %s", r.RemoteAddr, r.Method, r.URL.Path, err.Error())
  293. return // Do not attempt to write to upgraded connection
  294. }
  295. httpErr, ok := err.(*errHTTP)
  296. if !ok {
  297. httpErr = errHTTPInternalError
  298. }
  299. log.Printf("[%s] HTTP %s %s - %d - %d - %s", r.RemoteAddr, r.Method, r.URL.Path, httpErr.HTTPCode, httpErr.Code, err.Error())
  300. w.Header().Set("Content-Type", "application/json")
  301. w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
  302. w.WriteHeader(httpErr.HTTPCode)
  303. io.WriteString(w, httpErr.JSON()+"\n")
  304. }
  305. }
  306. func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request) error {
  307. v := s.visitor(r)
  308. if r.Method == http.MethodGet && r.URL.Path == "/" {
  309. return s.handleHome(w, r)
  310. } else if r.Method == http.MethodGet && r.URL.Path == "/example.html" {
  311. return s.handleExample(w, r)
  312. } else if r.Method == http.MethodHead && r.URL.Path == "/" {
  313. return s.handleEmpty(w, r)
  314. } else if r.Method == http.MethodGet && staticRegex.MatchString(r.URL.Path) {
  315. return s.handleStatic(w, r)
  316. } else if r.Method == http.MethodGet && docsRegex.MatchString(r.URL.Path) {
  317. return s.handleDocs(w, r)
  318. } else if r.Method == http.MethodGet && fileRegex.MatchString(r.URL.Path) && s.config.AttachmentCacheDir != "" {
  319. return s.limitRequests(s.handleFile)(w, r, v)
  320. } else if r.Method == http.MethodOptions {
  321. return s.handleOptions(w, r)
  322. } else if r.Method == http.MethodGet && topicPathRegex.MatchString(r.URL.Path) {
  323. return s.handleTopic(w, r)
  324. } else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && topicPathRegex.MatchString(r.URL.Path) {
  325. return s.limitRequests(s.authWrite(s.handlePublish))(w, r, v)
  326. } else if r.Method == http.MethodGet && publishPathRegex.MatchString(r.URL.Path) {
  327. return s.limitRequests(s.authWrite(s.handlePublish))(w, r, v)
  328. } else if r.Method == http.MethodGet && jsonPathRegex.MatchString(r.URL.Path) {
  329. return s.limitRequests(s.authRead(s.handleSubscribeJSON))(w, r, v)
  330. } else if r.Method == http.MethodGet && ssePathRegex.MatchString(r.URL.Path) {
  331. return s.limitRequests(s.authRead(s.handleSubscribeSSE))(w, r, v)
  332. } else if r.Method == http.MethodGet && rawPathRegex.MatchString(r.URL.Path) {
  333. return s.limitRequests(s.authRead(s.handleSubscribeRaw))(w, r, v)
  334. } else if r.Method == http.MethodGet && wsPathRegex.MatchString(r.URL.Path) {
  335. return s.limitRequests(s.authRead(s.handleSubscribeWS))(w, r, v)
  336. }
  337. return errHTTPNotFound
  338. }
  339. func (s *Server) handleHome(w http.ResponseWriter, r *http.Request) error {
  340. return indexTemplate.Execute(w, &indexPage{
  341. Topic: r.URL.Path[1:],
  342. CacheDuration: s.config.CacheDuration,
  343. })
  344. }
  345. func (s *Server) handleTopic(w http.ResponseWriter, r *http.Request) error {
  346. unifiedpush := readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see PUT/POST too!
  347. if unifiedpush {
  348. w.Header().Set("Content-Type", "application/json")
  349. w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
  350. _, err := io.WriteString(w, `{"unifiedpush":{"version":1}}`+"\n")
  351. return err
  352. }
  353. return s.handleHome(w, r)
  354. }
  355. func (s *Server) handleEmpty(_ http.ResponseWriter, _ *http.Request) error {
  356. return nil
  357. }
  358. func (s *Server) handleExample(w http.ResponseWriter, _ *http.Request) error {
  359. _, err := io.WriteString(w, exampleSource)
  360. return err
  361. }
  362. func (s *Server) handleStatic(w http.ResponseWriter, r *http.Request) error {
  363. http.FileServer(http.FS(webStaticFsCached)).ServeHTTP(w, r)
  364. return nil
  365. }
  366. func (s *Server) handleDocs(w http.ResponseWriter, r *http.Request) error {
  367. http.FileServer(http.FS(docsStaticCached)).ServeHTTP(w, r)
  368. return nil
  369. }
  370. func (s *Server) handleFile(w http.ResponseWriter, r *http.Request, v *visitor) error {
  371. if s.config.AttachmentCacheDir == "" {
  372. return errHTTPInternalError
  373. }
  374. matches := fileRegex.FindStringSubmatch(r.URL.Path)
  375. if len(matches) != 2 {
  376. return errHTTPInternalErrorInvalidFilePath
  377. }
  378. messageID := matches[1]
  379. file := filepath.Join(s.config.AttachmentCacheDir, messageID)
  380. stat, err := os.Stat(file)
  381. if err != nil {
  382. return errHTTPNotFound
  383. }
  384. if err := v.BandwidthLimiter().Allow(stat.Size()); err != nil {
  385. return errHTTPTooManyRequestsAttachmentBandwidthLimit
  386. }
  387. w.Header().Set("Content-Length", fmt.Sprintf("%d", stat.Size()))
  388. f, err := os.Open(file)
  389. if err != nil {
  390. return err
  391. }
  392. defer f.Close()
  393. _, err = io.Copy(util.NewContentTypeWriter(w, r.URL.Path), f)
  394. return err
  395. }
  396. func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visitor) error {
  397. t, err := s.topicFromPath(r.URL.Path)
  398. if err != nil {
  399. return err
  400. }
  401. body, err := util.Peak(r.Body, s.config.MessageLimit)
  402. if err != nil {
  403. return err
  404. }
  405. m := newDefaultMessage(t.ID, "")
  406. cache, firebase, email, unifiedpush, err := s.parsePublishParams(r, v, m)
  407. if err != nil {
  408. return err
  409. }
  410. if err := s.handlePublishBody(r, v, m, body, unifiedpush); err != nil {
  411. return err
  412. }
  413. if m.Message == "" {
  414. m.Message = emptyMessageBody
  415. }
  416. delayed := m.Time > time.Now().Unix()
  417. if !delayed {
  418. if err := t.Publish(m); err != nil {
  419. return err
  420. }
  421. }
  422. if s.firebase != nil && firebase && !delayed {
  423. go func() {
  424. if err := s.firebase(m); err != nil {
  425. log.Printf("Unable to publish to Firebase: %v", err.Error())
  426. }
  427. }()
  428. }
  429. if s.mailer != nil && email != "" && !delayed {
  430. go func() {
  431. if err := s.mailer.Send(v.ip, email, m); err != nil {
  432. log.Printf("Unable to send email: %v", err.Error())
  433. }
  434. }()
  435. }
  436. if cache {
  437. if err := s.cache.AddMessage(m); err != nil {
  438. return err
  439. }
  440. }
  441. w.Header().Set("Content-Type", "application/json")
  442. w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
  443. if err := json.NewEncoder(w).Encode(m); err != nil {
  444. return err
  445. }
  446. s.mu.Lock()
  447. s.messages++
  448. s.mu.Unlock()
  449. return nil
  450. }
  451. func (s *Server) parsePublishParams(r *http.Request, v *visitor, m *message) (cache bool, firebase bool, email string, unifiedpush bool, err error) {
  452. cache = readBoolParam(r, true, "x-cache", "cache")
  453. firebase = readBoolParam(r, true, "x-firebase", "firebase")
  454. m.Title = readParam(r, "x-title", "title", "t")
  455. m.Click = readParam(r, "x-click", "click")
  456. filename := readParam(r, "x-filename", "filename", "file", "f")
  457. attach := readParam(r, "x-attach", "attach", "a")
  458. if attach != "" || filename != "" {
  459. m.Attachment = &attachment{}
  460. }
  461. if filename != "" {
  462. m.Attachment.Name = filename
  463. }
  464. if attach != "" {
  465. if !attachURLRegex.MatchString(attach) {
  466. return false, false, "", false, errHTTPBadRequestAttachmentURLInvalid
  467. }
  468. m.Attachment.URL = attach
  469. if m.Attachment.Name == "" {
  470. u, err := url.Parse(m.Attachment.URL)
  471. if err == nil {
  472. m.Attachment.Name = path.Base(u.Path)
  473. if m.Attachment.Name == "." || m.Attachment.Name == "/" {
  474. m.Attachment.Name = ""
  475. }
  476. }
  477. }
  478. if m.Attachment.Name == "" {
  479. m.Attachment.Name = "attachment"
  480. }
  481. }
  482. email = readParam(r, "x-email", "x-e-mail", "email", "e-mail", "mail", "e")
  483. if email != "" {
  484. if err := v.EmailAllowed(); err != nil {
  485. return false, false, "", false, errHTTPTooManyRequestsLimitEmails
  486. }
  487. }
  488. if s.mailer == nil && email != "" {
  489. return false, false, "", false, errHTTPBadRequestEmailDisabled
  490. }
  491. messageStr := readParam(r, "x-message", "message", "m")
  492. if messageStr != "" {
  493. m.Message = messageStr
  494. }
  495. m.Priority, err = util.ParsePriority(readParam(r, "x-priority", "priority", "prio", "p"))
  496. if err != nil {
  497. return false, false, "", false, errHTTPBadRequestPriorityInvalid
  498. }
  499. tagsStr := readParam(r, "x-tags", "tags", "tag", "ta")
  500. if tagsStr != "" {
  501. m.Tags = make([]string, 0)
  502. for _, s := range util.SplitNoEmpty(tagsStr, ",") {
  503. m.Tags = append(m.Tags, strings.TrimSpace(s))
  504. }
  505. }
  506. delayStr := readParam(r, "x-delay", "delay", "x-at", "at", "x-in", "in")
  507. if delayStr != "" {
  508. if !cache {
  509. return false, false, "", false, errHTTPBadRequestDelayNoCache
  510. }
  511. if email != "" {
  512. return false, false, "", false, errHTTPBadRequestDelayNoEmail // we cannot store the email address (yet)
  513. }
  514. delay, err := util.ParseFutureTime(delayStr, time.Now())
  515. if err != nil {
  516. return false, false, "", false, errHTTPBadRequestDelayCannotParse
  517. } else if delay.Unix() < time.Now().Add(s.config.MinDelay).Unix() {
  518. return false, false, "", false, errHTTPBadRequestDelayTooSmall
  519. } else if delay.Unix() > time.Now().Add(s.config.MaxDelay).Unix() {
  520. return false, false, "", false, errHTTPBadRequestDelayTooLarge
  521. }
  522. m.Time = delay.Unix()
  523. }
  524. unifiedpush = readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see GET too!
  525. if unifiedpush {
  526. firebase = false
  527. unifiedpush = true
  528. }
  529. return cache, firebase, email, unifiedpush, nil
  530. }
  531. // handlePublishBody consumes the PUT/POST body and decides whether the body is an attachment or the message.
  532. //
  533. // 1. curl -T somebinarydata.bin "ntfy.sh/mytopic?up=1"
  534. // If body is binary, encode as base64, if not do not encode
  535. // 2. curl -H "Attach: http://example.com/file.jpg" ntfy.sh/mytopic
  536. // Body must be a message, because we attached an external URL
  537. // 3. curl -T short.txt -H "Filename: short.txt" ntfy.sh/mytopic
  538. // Body must be attachment, because we passed a filename
  539. // 4. curl -T file.txt ntfy.sh/mytopic
  540. // If file.txt is <= 4096 (message limit) and valid UTF-8, treat it as a message
  541. // 5. curl -T file.txt ntfy.sh/mytopic
  542. // If file.txt is > message limit, treat it as an attachment
  543. func (s *Server) handlePublishBody(r *http.Request, v *visitor, m *message, body *util.PeakedReadCloser, unifiedpush bool) error {
  544. if unifiedpush {
  545. return s.handleBodyAsMessageAutoDetect(m, body) // Case 1
  546. } else if m.Attachment != nil && m.Attachment.URL != "" {
  547. return s.handleBodyAsTextMessage(m, body) // Case 2
  548. } else if m.Attachment != nil && m.Attachment.Name != "" {
  549. return s.handleBodyAsAttachment(r, v, m, body) // Case 3
  550. } else if !body.LimitReached && utf8.Valid(body.PeakedBytes) {
  551. return s.handleBodyAsTextMessage(m, body) // Case 4
  552. }
  553. return s.handleBodyAsAttachment(r, v, m, body) // Case 5
  554. }
  555. func (s *Server) handleBodyAsMessageAutoDetect(m *message, body *util.PeakedReadCloser) error {
  556. if utf8.Valid(body.PeakedBytes) {
  557. m.Message = string(body.PeakedBytes) // Do not trim
  558. } else {
  559. m.Message = base64.StdEncoding.EncodeToString(body.PeakedBytes)
  560. m.Encoding = encodingBase64
  561. }
  562. return nil
  563. }
  564. func (s *Server) handleBodyAsTextMessage(m *message, body *util.PeakedReadCloser) error {
  565. if !utf8.Valid(body.PeakedBytes) {
  566. return errHTTPBadRequestMessageNotUTF8
  567. }
  568. if len(body.PeakedBytes) > 0 { // Empty body should not override message (publish via GET!)
  569. m.Message = strings.TrimSpace(string(body.PeakedBytes)) // Truncates the message to the peak limit if required
  570. }
  571. if m.Attachment != nil && m.Attachment.Name != "" && m.Message == "" {
  572. m.Message = fmt.Sprintf(defaultAttachmentMessage, m.Attachment.Name)
  573. }
  574. return nil
  575. }
  576. func (s *Server) handleBodyAsAttachment(r *http.Request, v *visitor, m *message, body *util.PeakedReadCloser) error {
  577. if s.fileCache == nil || s.config.BaseURL == "" || s.config.AttachmentCacheDir == "" {
  578. return errHTTPBadRequestAttachmentsDisallowed
  579. } else if m.Time > time.Now().Add(s.config.AttachmentExpiryDuration).Unix() {
  580. return errHTTPBadRequestAttachmentsExpiryBeforeDelivery
  581. }
  582. visitorAttachmentsSize, err := s.cache.AttachmentsSize(v.ip)
  583. if err != nil {
  584. return err
  585. }
  586. remainingVisitorAttachmentSize := s.config.VisitorAttachmentTotalSizeLimit - visitorAttachmentsSize
  587. contentLengthStr := r.Header.Get("Content-Length")
  588. if contentLengthStr != "" { // Early "do-not-trust" check, hard limit see below
  589. contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64)
  590. if err == nil && (contentLength > remainingVisitorAttachmentSize || contentLength > s.config.AttachmentFileSizeLimit) {
  591. return errHTTPBadRequestAttachmentTooLarge
  592. }
  593. }
  594. if m.Attachment == nil {
  595. m.Attachment = &attachment{}
  596. }
  597. var ext string
  598. m.Attachment.Owner = v.ip // Important for attachment rate limiting
  599. m.Attachment.Expires = time.Now().Add(s.config.AttachmentExpiryDuration).Unix()
  600. m.Attachment.Type, ext = util.DetectContentType(body.PeakedBytes, m.Attachment.Name)
  601. m.Attachment.URL = fmt.Sprintf("%s/file/%s%s", s.config.BaseURL, m.ID, ext)
  602. if m.Attachment.Name == "" {
  603. m.Attachment.Name = fmt.Sprintf("attachment%s", ext)
  604. }
  605. if m.Message == "" {
  606. m.Message = fmt.Sprintf(defaultAttachmentMessage, m.Attachment.Name)
  607. }
  608. m.Attachment.Size, err = s.fileCache.Write(m.ID, body, v.BandwidthLimiter(), util.NewFixedLimiter(remainingVisitorAttachmentSize))
  609. if err == util.ErrLimitReached {
  610. return errHTTPBadRequestAttachmentTooLarge
  611. } else if err != nil {
  612. return err
  613. }
  614. return nil
  615. }
  616. func (s *Server) handleSubscribeJSON(w http.ResponseWriter, r *http.Request, v *visitor) error {
  617. encoder := func(msg *message) (string, error) {
  618. var buf bytes.Buffer
  619. if err := json.NewEncoder(&buf).Encode(&msg); err != nil {
  620. return "", err
  621. }
  622. return buf.String(), nil
  623. }
  624. return s.handleSubscribeHTTP(w, r, v, "application/x-ndjson", encoder)
  625. }
  626. func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request, v *visitor) error {
  627. encoder := func(msg *message) (string, error) {
  628. var buf bytes.Buffer
  629. if err := json.NewEncoder(&buf).Encode(&msg); err != nil {
  630. return "", err
  631. }
  632. if msg.Event != messageEvent {
  633. return fmt.Sprintf("event: %s\ndata: %s\n", msg.Event, buf.String()), nil // Browser's .onmessage() does not fire on this!
  634. }
  635. return fmt.Sprintf("data: %s\n", buf.String()), nil
  636. }
  637. return s.handleSubscribeHTTP(w, r, v, "text/event-stream", encoder)
  638. }
  639. func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request, v *visitor) error {
  640. encoder := func(msg *message) (string, error) {
  641. if msg.Event == messageEvent { // only handle default events
  642. return strings.ReplaceAll(msg.Message, "\n", " ") + "\n", nil
  643. }
  644. return "\n", nil // "keepalive" and "open" events just send an empty line
  645. }
  646. return s.handleSubscribeHTTP(w, r, v, "text/plain", encoder)
  647. }
  648. func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *visitor, contentType string, encoder messageEncoder) error {
  649. if err := v.SubscriptionAllowed(); err != nil {
  650. return errHTTPTooManyRequestsLimitSubscriptions
  651. }
  652. defer v.RemoveSubscription()
  653. topics, topicsStr, err := s.topicsFromPath(r.URL.Path)
  654. if err != nil {
  655. return err
  656. }
  657. poll, since, scheduled, filters, err := parseSubscribeParams(r)
  658. if err != nil {
  659. return err
  660. }
  661. var wlock sync.Mutex
  662. sub := func(msg *message) error {
  663. if !filters.Pass(msg) {
  664. return nil
  665. }
  666. m, err := encoder(msg)
  667. if err != nil {
  668. return err
  669. }
  670. wlock.Lock()
  671. defer wlock.Unlock()
  672. if _, err := w.Write([]byte(m)); err != nil {
  673. return err
  674. }
  675. if fl, ok := w.(http.Flusher); ok {
  676. fl.Flush()
  677. }
  678. return nil
  679. }
  680. w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
  681. w.Header().Set("Content-Type", contentType+"; charset=utf-8") // Android/Volley client needs charset!
  682. if poll {
  683. return s.sendOldMessages(topics, since, scheduled, sub)
  684. }
  685. subscriberIDs := make([]int, 0)
  686. for _, t := range topics {
  687. subscriberIDs = append(subscriberIDs, t.Subscribe(sub))
  688. }
  689. defer func() {
  690. for i, subscriberID := range subscriberIDs {
  691. topics[i].Unsubscribe(subscriberID) // Order!
  692. }
  693. }()
  694. if err := sub(newOpenMessage(topicsStr)); err != nil { // Send out open message
  695. return err
  696. }
  697. if err := s.sendOldMessages(topics, since, scheduled, sub); err != nil {
  698. return err
  699. }
  700. for {
  701. select {
  702. case <-r.Context().Done():
  703. return nil
  704. case <-time.After(s.config.KeepaliveInterval):
  705. v.Keepalive()
  706. if err := sub(newKeepaliveMessage(topicsStr)); err != nil { // Send keepalive message
  707. return err
  708. }
  709. }
  710. }
  711. }
  712. func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *visitor) error {
  713. if r.Header.Get("Upgrade") != "websocket" {
  714. return errHTTPBadRequestWebSocketsUpgradeHeaderMissing
  715. }
  716. if err := v.SubscriptionAllowed(); err != nil {
  717. return errHTTPTooManyRequestsLimitSubscriptions
  718. }
  719. defer v.RemoveSubscription()
  720. topics, topicsStr, err := s.topicsFromPath(r.URL.Path)
  721. if err != nil {
  722. return err
  723. }
  724. poll, since, scheduled, filters, err := parseSubscribeParams(r)
  725. if err != nil {
  726. return err
  727. }
  728. upgrader := &websocket.Upgrader{
  729. ReadBufferSize: wsBufferSize,
  730. WriteBufferSize: wsBufferSize,
  731. CheckOrigin: func(r *http.Request) bool {
  732. return true // We're open for business!
  733. },
  734. }
  735. conn, err := upgrader.Upgrade(w, r, nil)
  736. if err != nil {
  737. return err
  738. }
  739. defer conn.Close()
  740. var wlock sync.Mutex
  741. g, ctx := errgroup.WithContext(context.Background())
  742. g.Go(func() error {
  743. pongWait := s.config.KeepaliveInterval + wsPongWait
  744. conn.SetReadLimit(wsReadLimit)
  745. if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
  746. return err
  747. }
  748. conn.SetPongHandler(func(appData string) error {
  749. return conn.SetReadDeadline(time.Now().Add(pongWait))
  750. })
  751. for {
  752. _, _, err := conn.NextReader()
  753. if err != nil {
  754. return err
  755. }
  756. }
  757. })
  758. g.Go(func() error {
  759. ping := func() error {
  760. wlock.Lock()
  761. defer wlock.Unlock()
  762. if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
  763. return err
  764. }
  765. return conn.WriteMessage(websocket.PingMessage, nil)
  766. }
  767. for {
  768. select {
  769. case <-ctx.Done():
  770. return nil
  771. case <-time.After(s.config.KeepaliveInterval):
  772. v.Keepalive()
  773. if err := ping(); err != nil {
  774. return err
  775. }
  776. }
  777. }
  778. })
  779. sub := func(msg *message) error {
  780. if !filters.Pass(msg) {
  781. return nil
  782. }
  783. wlock.Lock()
  784. defer wlock.Unlock()
  785. if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
  786. return err
  787. }
  788. return conn.WriteJSON(msg)
  789. }
  790. w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
  791. if poll {
  792. return s.sendOldMessages(topics, since, scheduled, sub)
  793. }
  794. subscriberIDs := make([]int, 0)
  795. for _, t := range topics {
  796. subscriberIDs = append(subscriberIDs, t.Subscribe(sub))
  797. }
  798. defer func() {
  799. for i, subscriberID := range subscriberIDs {
  800. topics[i].Unsubscribe(subscriberID) // Order!
  801. }
  802. }()
  803. if err := sub(newOpenMessage(topicsStr)); err != nil { // Send out open message
  804. return err
  805. }
  806. if err := s.sendOldMessages(topics, since, scheduled, sub); err != nil {
  807. return err
  808. }
  809. err = g.Wait()
  810. if err != nil && websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
  811. return nil // Normal closures are not errors
  812. }
  813. return err
  814. }
  815. func parseSubscribeParams(r *http.Request) (poll bool, since sinceTime, scheduled bool, filters *queryFilter, err error) {
  816. poll = readBoolParam(r, false, "x-poll", "poll", "po")
  817. scheduled = readBoolParam(r, false, "x-scheduled", "scheduled", "sched")
  818. since, err = parseSince(r, poll)
  819. if err != nil {
  820. return
  821. }
  822. filters, err = parseQueryFilters(r)
  823. if err != nil {
  824. return
  825. }
  826. return
  827. }
  828. func (s *Server) sendOldMessages(topics []*topic, since sinceTime, scheduled bool, sub subscriber) error {
  829. if since.IsNone() {
  830. return nil
  831. }
  832. for _, t := range topics {
  833. messages, err := s.cache.Messages(t.ID, since, scheduled)
  834. if err != nil {
  835. return err
  836. }
  837. for _, m := range messages {
  838. if err := sub(m); err != nil {
  839. return err
  840. }
  841. }
  842. }
  843. return nil
  844. }
  845. // parseSince returns a timestamp identifying the time span from which cached messages should be received.
  846. //
  847. // Values in the "since=..." parameter can be either a unix timestamp or a duration (e.g. 12h), or
  848. // "all" for all messages.
  849. func parseSince(r *http.Request, poll bool) (sinceTime, error) {
  850. since := readParam(r, "x-since", "since", "si")
  851. if since == "" {
  852. if poll {
  853. return sinceAllMessages, nil
  854. }
  855. return sinceNoMessages, nil
  856. }
  857. if since == "all" {
  858. return sinceAllMessages, nil
  859. } else if s, err := strconv.ParseInt(since, 10, 64); err == nil {
  860. return sinceTime(time.Unix(s, 0)), nil
  861. } else if d, err := time.ParseDuration(since); err == nil {
  862. return sinceTime(time.Now().Add(-1 * d)), nil
  863. }
  864. return sinceNoMessages, errHTTPBadRequestSinceInvalid
  865. }
  866. func (s *Server) handleOptions(w http.ResponseWriter, _ *http.Request) error {
  867. w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
  868. w.Header().Set("Access-Control-Allow-Methods", "GET, PUT, POST")
  869. return nil
  870. }
  871. func (s *Server) topicFromPath(path string) (*topic, error) {
  872. parts := strings.Split(path, "/")
  873. if len(parts) < 2 {
  874. return nil, errHTTPBadRequestTopicInvalid
  875. }
  876. topics, err := s.topicsFromIDs(parts[1])
  877. if err != nil {
  878. return nil, err
  879. }
  880. return topics[0], nil
  881. }
  882. func (s *Server) topicsFromPath(path string) ([]*topic, string, error) {
  883. parts := strings.Split(path, "/")
  884. if len(parts) < 2 {
  885. return nil, "", errHTTPBadRequestTopicInvalid
  886. }
  887. topicIDs := util.SplitNoEmpty(parts[1], ",")
  888. topics, err := s.topicsFromIDs(topicIDs...)
  889. if err != nil {
  890. return nil, "", errHTTPBadRequestTopicInvalid
  891. }
  892. return topics, parts[1], nil
  893. }
  894. func (s *Server) topicsFromIDs(ids ...string) ([]*topic, error) {
  895. s.mu.Lock()
  896. defer s.mu.Unlock()
  897. topics := make([]*topic, 0)
  898. for _, id := range ids {
  899. if util.InStringList(disallowedTopics, id) {
  900. return nil, errHTTPBadRequestTopicDisallowed
  901. }
  902. if _, ok := s.topics[id]; !ok {
  903. if len(s.topics) >= s.config.TotalTopicLimit {
  904. return nil, errHTTPTooManyRequestsLimitTotalTopics
  905. }
  906. s.topics[id] = newTopic(id)
  907. }
  908. topics = append(topics, s.topics[id])
  909. }
  910. return topics, nil
  911. }
  912. func (s *Server) updateStatsAndPrune() {
  913. s.mu.Lock()
  914. defer s.mu.Unlock()
  915. // Expire visitors from rate visitors map
  916. for ip, v := range s.visitors {
  917. if v.Stale() {
  918. delete(s.visitors, ip)
  919. }
  920. }
  921. // Delete expired attachments
  922. if s.fileCache != nil {
  923. ids, err := s.cache.AttachmentsExpired()
  924. if err == nil {
  925. if err := s.fileCache.Remove(ids...); err != nil {
  926. log.Printf("error while deleting attachments: %s", err.Error())
  927. }
  928. } else {
  929. log.Printf("error retrieving expired attachments: %s", err.Error())
  930. }
  931. }
  932. // Prune message cache
  933. olderThan := time.Now().Add(-1 * s.config.CacheDuration)
  934. if err := s.cache.Prune(olderThan); err != nil {
  935. log.Printf("error pruning cache: %s", err.Error())
  936. }
  937. // Prune old topics, remove subscriptions without subscribers
  938. var subscribers, messages int
  939. for _, t := range s.topics {
  940. subs := t.Subscribers()
  941. msgs, err := s.cache.MessageCount(t.ID)
  942. if err != nil {
  943. log.Printf("cannot get stats for topic %s: %s", t.ID, err.Error())
  944. continue
  945. }
  946. if msgs == 0 && subs == 0 {
  947. delete(s.topics, t.ID)
  948. continue
  949. }
  950. subscribers += subs
  951. messages += msgs
  952. }
  953. // Mail stats
  954. var mailSuccess, mailFailure int64
  955. if s.smtpBackend != nil {
  956. mailSuccess, mailFailure = s.smtpBackend.Counts()
  957. }
  958. // Print stats
  959. log.Printf("Stats: %d message(s) published, %d in cache, %d successful mails, %d failed, %d topic(s) active, %d subscriber(s), %d visitor(s)",
  960. s.messages, messages, mailSuccess, mailFailure, len(s.topics), subscribers, len(s.visitors))
  961. }
  962. func (s *Server) runSMTPServer() error {
  963. sub := func(m *message) error {
  964. url := fmt.Sprintf("%s/%s", s.config.BaseURL, m.Topic)
  965. req, err := http.NewRequest("PUT", url, strings.NewReader(m.Message))
  966. if err != nil {
  967. return err
  968. }
  969. if m.Title != "" {
  970. req.Header.Set("Title", m.Title)
  971. }
  972. rr := httptest.NewRecorder()
  973. s.handle(rr, req)
  974. if rr.Code != http.StatusOK {
  975. return errors.New("error: " + rr.Body.String())
  976. }
  977. return nil
  978. }
  979. s.smtpBackend = newMailBackend(s.config, sub)
  980. s.smtpServer = smtp.NewServer(s.smtpBackend)
  981. s.smtpServer.Addr = s.config.SMTPServerListen
  982. s.smtpServer.Domain = s.config.SMTPServerDomain
  983. s.smtpServer.ReadTimeout = 10 * time.Second
  984. s.smtpServer.WriteTimeout = 10 * time.Second
  985. s.smtpServer.MaxMessageBytes = 1024 * 1024 // Must be much larger than message size (headers, multipart, etc.)
  986. s.smtpServer.MaxRecipients = 1
  987. s.smtpServer.AllowInsecureAuth = true
  988. return s.smtpServer.ListenAndServe()
  989. }
  990. func (s *Server) runManager() {
  991. for {
  992. select {
  993. case <-time.After(s.config.ManagerInterval):
  994. s.updateStatsAndPrune()
  995. case <-s.closeChan:
  996. return
  997. }
  998. }
  999. }
  1000. func (s *Server) runAtSender() {
  1001. for {
  1002. select {
  1003. case <-time.After(s.config.AtSenderInterval):
  1004. if err := s.sendDelayedMessages(); err != nil {
  1005. log.Printf("error sending scheduled messages: %s", err.Error())
  1006. }
  1007. case <-s.closeChan:
  1008. return
  1009. }
  1010. }
  1011. }
  1012. func (s *Server) runFirebaseKeepaliver() {
  1013. if s.firebase == nil {
  1014. return
  1015. }
  1016. for {
  1017. select {
  1018. case <-time.After(s.config.FirebaseKeepaliveInterval):
  1019. if err := s.firebase(newKeepaliveMessage(firebaseControlTopic)); err != nil {
  1020. log.Printf("error sending Firebase keepalive message: %s", err.Error())
  1021. }
  1022. case <-s.closeChan:
  1023. return
  1024. }
  1025. }
  1026. }
  1027. func (s *Server) sendDelayedMessages() error {
  1028. s.mu.Lock()
  1029. defer s.mu.Unlock()
  1030. messages, err := s.cache.MessagesDue()
  1031. if err != nil {
  1032. return err
  1033. }
  1034. for _, m := range messages {
  1035. t, ok := s.topics[m.Topic] // If no subscribers, just mark message as published
  1036. if ok {
  1037. if err := t.Publish(m); err != nil {
  1038. log.Printf("unable to publish message %s to topic %s: %v", m.ID, m.Topic, err.Error())
  1039. }
  1040. }
  1041. if s.firebase != nil { // Firebase subscribers may not show up in topics map
  1042. if err := s.firebase(m); err != nil {
  1043. log.Printf("unable to publish to Firebase: %v", err.Error())
  1044. }
  1045. }
  1046. if err := s.cache.MarkPublished(m); err != nil {
  1047. return err
  1048. }
  1049. }
  1050. return nil
  1051. }
  1052. func (s *Server) limitRequests(next handleFunc) handleFunc {
  1053. return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1054. if err := v.RequestAllowed(); err != nil {
  1055. return errHTTPTooManyRequestsLimitRequests
  1056. }
  1057. return next(w, r, v)
  1058. }
  1059. }
  1060. func (s *Server) authWrite(next handleFunc) handleFunc {
  1061. return s.withAuth(next, auth.PermissionWrite)
  1062. }
  1063. func (s *Server) authRead(next handleFunc) handleFunc {
  1064. return s.withAuth(next, auth.PermissionRead)
  1065. }
  1066. func (s *Server) withAuth(next handleFunc, perm auth.Permission) handleFunc {
  1067. return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
  1068. if s.auth == nil {
  1069. return next(w, r, v)
  1070. }
  1071. t, err := s.topicFromPath(r.URL.Path)
  1072. if err != nil {
  1073. return err
  1074. }
  1075. var user *auth.User // may stay nil if no auth header!
  1076. username, password, ok := r.BasicAuth()
  1077. if ok {
  1078. if user, err = s.auth.Authenticate(username, password); err != nil {
  1079. log.Printf("authentication failed: %s", err.Error())
  1080. return errHTTPUnauthorized
  1081. }
  1082. }
  1083. if err := s.auth.Authorize(user, t.ID, perm); err != nil {
  1084. log.Printf("unauthorized: %s", err.Error())
  1085. return errHTTPUnauthorized
  1086. }
  1087. return next(w, r, v)
  1088. }
  1089. }
  1090. // visitor creates or retrieves a rate.Limiter for the given visitor.
  1091. // This function was taken from https://www.alexedwards.net/blog/how-to-rate-limit-http-requests (MIT).
  1092. func (s *Server) visitor(r *http.Request) *visitor {
  1093. s.mu.Lock()
  1094. defer s.mu.Unlock()
  1095. remoteAddr := r.RemoteAddr
  1096. ip, _, err := net.SplitHostPort(remoteAddr)
  1097. if err != nil {
  1098. ip = remoteAddr // This should not happen in real life; only in tests.
  1099. }
  1100. if s.config.BehindProxy && r.Header.Get("X-Forwarded-For") != "" {
  1101. ip = r.Header.Get("X-Forwarded-For")
  1102. }
  1103. v, exists := s.visitors[ip]
  1104. if !exists {
  1105. s.visitors[ip] = newVisitor(s.config, ip)
  1106. return s.visitors[ip]
  1107. }
  1108. v.Keepalive()
  1109. return v
  1110. }