|
|
@@ -27,13 +27,16 @@ import (
|
|
|
|
|
|
// Server is the main server, providing the UI and API for ntfy
|
|
|
type Server struct {
|
|
|
- config *Config
|
|
|
- topics map[string]*topic
|
|
|
- visitors map[string]*visitor
|
|
|
- firebase subscriber
|
|
|
- messages int64
|
|
|
- cache cache
|
|
|
- mu sync.Mutex
|
|
|
+ config *Config
|
|
|
+ httpServer *http.Server
|
|
|
+ httpsServer *http.Server
|
|
|
+ topics map[string]*topic
|
|
|
+ visitors map[string]*visitor
|
|
|
+ firebase subscriber
|
|
|
+ messages int64
|
|
|
+ cache cache
|
|
|
+ closeChan chan bool
|
|
|
+ mu sync.Mutex
|
|
|
}
|
|
|
|
|
|
// errHTTP is a generic HTTP error for any non-200 HTTP error
|
|
|
@@ -198,17 +201,35 @@ func (s *Server) Run() error {
|
|
|
log.Printf("Listening on %s", listenStr)
|
|
|
http.HandleFunc("/", s.handle)
|
|
|
errChan := make(chan error)
|
|
|
+ s.mu.Lock()
|
|
|
+ s.closeChan = make(chan bool)
|
|
|
+ s.httpServer = &http.Server{Addr: s.config.ListenHTTP}
|
|
|
go func() {
|
|
|
- errChan <- http.ListenAndServe(s.config.ListenHTTP, nil)
|
|
|
+ errChan <- s.httpServer.ListenAndServe()
|
|
|
}()
|
|
|
if s.config.ListenHTTPS != "" {
|
|
|
+ s.httpsServer = &http.Server{Addr: s.config.ListenHTTP}
|
|
|
go func() {
|
|
|
- errChan <- http.ListenAndServeTLS(s.config.ListenHTTPS, s.config.CertFile, s.config.KeyFile, nil)
|
|
|
+ errChan <- s.httpsServer.ListenAndServeTLS(s.config.CertFile, s.config.KeyFile)
|
|
|
}()
|
|
|
}
|
|
|
+ s.mu.Unlock()
|
|
|
return <-errChan
|
|
|
}
|
|
|
|
|
|
+// Stop stops HTTP (+HTTPS) server and all managers
|
|
|
+func (s *Server) Stop() {
|
|
|
+ s.mu.Lock()
|
|
|
+ defer s.mu.Unlock()
|
|
|
+ if s.httpServer != nil {
|
|
|
+ s.httpServer.Close()
|
|
|
+ }
|
|
|
+ if s.httpsServer != nil {
|
|
|
+ s.httpsServer.Close()
|
|
|
+ }
|
|
|
+ close(s.closeChan)
|
|
|
+}
|
|
|
+
|
|
|
func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
|
|
|
if err := s.handleInternal(w, r); err != nil {
|
|
|
if e, ok := err.(*errHTTP); ok {
|
|
|
@@ -635,21 +656,25 @@ func (s *Server) updateStatsAndPrune() {
|
|
|
}
|
|
|
|
|
|
func (s *Server) runManager() {
|
|
|
- func() {
|
|
|
- ticker := time.NewTicker(s.config.ManagerInterval)
|
|
|
- for {
|
|
|
- <-ticker.C
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-time.After(s.config.ManagerInterval):
|
|
|
s.updateStatsAndPrune()
|
|
|
+ case <-s.closeChan:
|
|
|
+ return
|
|
|
}
|
|
|
- }()
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func (s *Server) runAtSender() {
|
|
|
- ticker := time.NewTicker(s.config.AtSenderInterval)
|
|
|
for {
|
|
|
- <-ticker.C
|
|
|
- if err := s.sendDelayedMessages(); err != nil {
|
|
|
- log.Printf("error sending scheduled messages: %s", err.Error())
|
|
|
+ select {
|
|
|
+ case <-time.After(s.config.AtSenderInterval):
|
|
|
+ if err := s.sendDelayedMessages(); err != nil {
|
|
|
+ log.Printf("error sending scheduled messages: %s", err.Error())
|
|
|
+ }
|
|
|
+ case <-s.closeChan:
|
|
|
+ return
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -658,14 +683,18 @@ func (s *Server) runFirebaseKeepliver() {
|
|
|
if s.firebase == nil {
|
|
|
return
|
|
|
}
|
|
|
- ticker := time.NewTicker(s.config.FirebaseKeepaliveInterval)
|
|
|
for {
|
|
|
- <-ticker.C
|
|
|
- if err := s.firebase(newKeepaliveMessage(firebaseControlTopic)); err != nil {
|
|
|
- log.Printf("error sending Firebase keepalive message: %s", err.Error())
|
|
|
+ select {
|
|
|
+ case <-time.After(s.config.FirebaseKeepaliveInterval):
|
|
|
+ if err := s.firebase(newKeepaliveMessage(firebaseControlTopic)); err != nil {
|
|
|
+ log.Printf("error sending Firebase keepalive message: %s", err.Error())
|
|
|
+ }
|
|
|
+ case <-s.closeChan:
|
|
|
+ return
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
func (s *Server) sendDelayedMessages() error {
|
|
|
s.mu.Lock()
|
|
|
defer s.mu.Unlock()
|