ソースを参照

Stats resetter at midnight UTC

binwiederhier 3 年 前
コミット
3dd8dd4288
10 ファイル変更180 行追加59 行削除
  1. 7 2
      server/config.go
  2. 42 12
      server/server.go
  3. 61 27
      server/server_test.go
  4. 7 8
      server/types.go
  5. 13 2
      server/visitor.go
  6. 17 6
      user/manager.go
  7. 12 0
      util/time.go
  8. 20 0
      util/time_test.go
  9. 0 1
      web/public/config.js
  10. 1 1
      web/src/components/Login.js

+ 7 - 2
server/config.go

@@ -51,6 +51,11 @@ const (
 	DefaultVisitorAttachmentDailyBandwidthLimit = 500 * 1024 * 1024 // 500 MB
 )
 
+var (
+	// DefaultVisitorStatsResetTime defines the time at which visitor stats are reset (wall clock only)
+	DefaultVisitorStatsResetTime = time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC)
+)
+
 // Config is the main config struct for the application. Use New to instantiate a default config struct.
 type Config struct {
 	BaseURL                              string
@@ -103,12 +108,11 @@ type Config struct {
 	VisitorEmailLimitReplenish           time.Duration
 	VisitorAccountCreateLimitBurst       int
 	VisitorAccountCreateLimitReplenish   time.Duration
+	VisitorStatsResetTime                time.Time // Time of the day at which to reset visitor stats
 	BehindProxy                          bool
 	EnableWeb                            bool
 	EnableSignup                         bool // Enable creation of accounts via API and UI
 	EnableLogin                          bool
-	EnableEmailConfirm                   bool
-	EnablePasswordReset                  bool
 	EnablePayments                       bool
 	EnableReservations                   bool   // Allow users with role "user" to own/reserve topics
 	Version                              string // injected by App
@@ -155,6 +159,7 @@ func NewConfig() *Config {
 		VisitorEmailLimitReplenish:           DefaultVisitorEmailLimitReplenish,
 		VisitorAccountCreateLimitBurst:       DefaultVisitorAccountCreateLimitBurst,
 		VisitorAccountCreateLimitReplenish:   DefaultVisitorAccountCreateLimitReplenish,
+		VisitorStatsResetTime:                DefaultVisitorStatsResetTime,
 		BehindProxy:                          false,
 		EnableWeb:                            true,
 		Version:                              "",

+ 42 - 12
server/server.go

@@ -37,9 +37,9 @@ import (
 /*
 	TODO
 		Limits & rate limiting:
+			users without tier: should the stats be persisted? are they meaningful?
+				-> test that the visitor is based on the IP address!
 			login/account endpoints
-		reset daily Limits for users
-			- set last_stats_reset in migration
 		update last_seen when API is accessed
 		Make sure account endpoints make sense for admins
 
@@ -55,6 +55,7 @@ import (
 		Tests:
 		- Change tier from higher to lower tier (delete reservations)
 		- Message rate limiting and reset tests
+		- test that the visitor is based on the IP address when a user has no tier
 		Docs:
 		- "expires" field in message
 		- server.yml: enable-X flags
@@ -266,6 +267,7 @@ func (s *Server) Run() error {
 	}
 	s.mu.Unlock()
 	go s.runManager()
+	go s.runStatsResetter()
 	go s.runDelayedSender()
 	go s.runFirebaseKeepaliver()
 
@@ -450,14 +452,13 @@ func (s *Server) handleWebConfig(w http.ResponseWriter, _ *http.Request, _ *visi
 		appRoot = "/app"
 	}
 	response := &apiConfigResponse{
-		BaseURL:             "", // Will translate to window.location.origin
-		AppRoot:             appRoot,
-		EnableLogin:         s.config.EnableLogin,
-		EnableSignup:        s.config.EnableSignup,
-		EnablePasswordReset: s.config.EnablePasswordReset,
-		EnablePayments:      s.config.EnablePayments,
-		EnableReservations:  s.config.EnableReservations,
-		DisallowedTopics:    disallowedTopics,
+		BaseURL:            "", // Will translate to window.location.origin
+		AppRoot:            appRoot,
+		EnableLogin:        s.config.EnableLogin,
+		EnableSignup:       s.config.EnableSignup,
+		EnablePayments:     s.config.EnablePayments,
+		EnableReservations: s.config.EnableReservations,
+		DisallowedTopics:   disallowedTopics,
 	}
 	b, err := json.MarshalIndent(response, "", "  ")
 	if err != nil {
@@ -563,7 +564,7 @@ func (s *Server) handlePublishWithoutResponse(r *http.Request, v *visitor) (*mes
 			go s.sendToFirebase(v, m)
 		}
 		if s.smtpSender != nil && email != "" {
-			v.IncrEmails()
+			v.IncrementEmails()
 			go s.sendEmail(v, m, email)
 		}
 		if s.config.UpstreamBaseURL != "" {
@@ -578,7 +579,7 @@ func (s *Server) handlePublishWithoutResponse(r *http.Request, v *visitor) (*mes
 			return nil, err
 		}
 	}
-	v.IncrMessages()
+	v.IncrementMessages()
 	if s.userManager != nil && v.user != nil {
 		s.userManager.EnqueueStats(v.user)
 	}
@@ -1334,6 +1335,35 @@ func (s *Server) runManager() {
 	}
 }
 
+func (s *Server) runStatsResetter() {
+	for {
+		runAt := util.NextOccurrenceUTC(s.config.VisitorStatsResetTime, time.Now())
+		timer := time.NewTimer(time.Until(runAt))
+		log.Debug("Stats resetter: Waiting until %v to reset visitor stats", runAt)
+		select {
+		case <-timer.C:
+			s.resetStats()
+		case <-s.closeChan:
+			timer.Stop()
+			return
+		}
+	}
+}
+
+func (s *Server) resetStats() {
+	log.Info("Resetting all visitor stats (daily task)")
+	s.mu.Lock()
+	defer s.mu.Unlock() // Includes the database query to avoid races with other processes
+	for _, v := range s.visitors {
+		v.ResetStats()
+	}
+	if s.userManager != nil {
+		if err := s.userManager.ResetStats(); err != nil {
+			log.Warn("Failed to write to database: %s", err.Error())
+		}
+	}
+}
+
 func (s *Server) runFirebaseKeepaliver() {
 	if s.firebaseClient == nil {
 		return

+ 61 - 27
server/server_test.go

@@ -622,22 +622,20 @@ func TestServer_SubscribeWithQueryFilters(t *testing.T) {
 }
 
 func TestServer_Auth_Success_Admin(t *testing.T) {
-	c := newTestConfig(t)
-	c.AuthFile = filepath.Join(t.TempDir(), "user.db")
+	c := newTestConfigWithAuthFile(t)
 	s := newTestServer(t, c)
 
 	require.Nil(t, s.userManager.AddUser("phil", "phil", user.RoleAdmin, "unit-test"))
 
 	response := request(t, s, "GET", "/mytopic/auth", "", map[string]string{
-		"Authorization": basicAuth("phil:phil"),
+		"Authorization": util.BasicAuth("phil", "phil"),
 	})
 	require.Equal(t, 200, response.Code)
 	require.Equal(t, `{"success":true}`+"\n", response.Body.String())
 }
 
 func TestServer_Auth_Success_User(t *testing.T) {
-	c := newTestConfig(t)
-	c.AuthFile = filepath.Join(t.TempDir(), "user.db")
+	c := newTestConfigWithAuthFile(t)
 	c.AuthDefault = user.PermissionDenyAll
 	s := newTestServer(t, c)
 
@@ -645,14 +643,13 @@ func TestServer_Auth_Success_User(t *testing.T) {
 	require.Nil(t, s.userManager.AllowAccess("", "ben", "mytopic", true, true))
 
 	response := request(t, s, "GET", "/mytopic/auth", "", map[string]string{
-		"Authorization": basicAuth("ben:ben"),
+		"Authorization": util.BasicAuth("ben", "ben"),
 	})
 	require.Equal(t, 200, response.Code)
 }
 
 func TestServer_Auth_Success_User_MultipleTopics(t *testing.T) {
-	c := newTestConfig(t)
-	c.AuthFile = filepath.Join(t.TempDir(), "user.db")
+	c := newTestConfigWithAuthFile(t)
 	c.AuthDefault = user.PermissionDenyAll
 	s := newTestServer(t, c)
 
@@ -661,12 +658,12 @@ func TestServer_Auth_Success_User_MultipleTopics(t *testing.T) {
 	require.Nil(t, s.userManager.AllowAccess("", "ben", "anothertopic", true, true))
 
 	response := request(t, s, "GET", "/mytopic,anothertopic/auth", "", map[string]string{
-		"Authorization": basicAuth("ben:ben"),
+		"Authorization": util.BasicAuth("ben", "ben"),
 	})
 	require.Equal(t, 200, response.Code)
 
 	response = request(t, s, "GET", "/mytopic,anothertopic,NOT-THIS-ONE/auth", "", map[string]string{
-		"Authorization": basicAuth("ben:ben"),
+		"Authorization": util.BasicAuth("ben", "ben"),
 	})
 	require.Equal(t, 403, response.Code)
 }
@@ -680,14 +677,13 @@ func TestServer_Auth_Fail_InvalidPass(t *testing.T) {
 	require.Nil(t, s.userManager.AddUser("phil", "phil", user.RoleAdmin, "unit-test"))
 
 	response := request(t, s, "GET", "/mytopic/auth", "", map[string]string{
-		"Authorization": basicAuth("phil:INVALID"),
+		"Authorization": util.BasicAuth("phil", "INVALID"),
 	})
 	require.Equal(t, 401, response.Code)
 }
 
 func TestServer_Auth_Fail_Unauthorized(t *testing.T) {
-	c := newTestConfig(t)
-	c.AuthFile = filepath.Join(t.TempDir(), "user.db")
+	c := newTestConfigWithAuthFile(t)
 	c.AuthDefault = user.PermissionDenyAll
 	s := newTestServer(t, c)
 
@@ -695,14 +691,13 @@ func TestServer_Auth_Fail_Unauthorized(t *testing.T) {
 	require.Nil(t, s.userManager.AllowAccess("", "ben", "sometopic", true, true)) // Not mytopic!
 
 	response := request(t, s, "GET", "/mytopic/auth", "", map[string]string{
-		"Authorization": basicAuth("ben:ben"),
+		"Authorization": util.BasicAuth("ben", "ben"),
 	})
 	require.Equal(t, 403, response.Code)
 }
 
 func TestServer_Auth_Fail_CannotPublish(t *testing.T) {
-	c := newTestConfig(t)
-	c.AuthFile = filepath.Join(t.TempDir(), "user.db")
+	c := newTestConfigWithAuthFile(t)
 	c.AuthDefault = user.PermissionReadWrite // Open by default
 	s := newTestServer(t, c)
 
@@ -720,7 +715,7 @@ func TestServer_Auth_Fail_CannotPublish(t *testing.T) {
 	require.Equal(t, 403, response.Code) // Cannot write as anonymous
 
 	response = request(t, s, "PUT", "/announcements", "test", map[string]string{
-		"Authorization": basicAuth("phil:phil"),
+		"Authorization": util.BasicAuth("phil", "phil"),
 	})
 	require.Equal(t, 200, response.Code)
 
@@ -732,22 +727,64 @@ func TestServer_Auth_Fail_CannotPublish(t *testing.T) {
 }
 
 func TestServer_Auth_ViaQuery(t *testing.T) {
-	c := newTestConfig(t)
-	c.AuthFile = filepath.Join(t.TempDir(), "user.db")
+	c := newTestConfigWithAuthFile(t)
 	c.AuthDefault = user.PermissionDenyAll
 	s := newTestServer(t, c)
 
 	require.Nil(t, s.userManager.AddUser("ben", "some pass", user.RoleAdmin, "unit-test"))
 
-	u := fmt.Sprintf("/mytopic/json?poll=1&auth=%s", base64.RawURLEncoding.EncodeToString([]byte(basicAuth("ben:some pass"))))
+	u := fmt.Sprintf("/mytopic/json?poll=1&auth=%s", base64.RawURLEncoding.EncodeToString([]byte(util.BasicAuth("ben", "some pass"))))
 	response := request(t, s, "GET", u, "", nil)
 	require.Equal(t, 200, response.Code)
 
-	u = fmt.Sprintf("/mytopic/json?poll=1&auth=%s", base64.RawURLEncoding.EncodeToString([]byte(basicAuth("ben:WRONNNGGGG"))))
+	u = fmt.Sprintf("/mytopic/json?poll=1&auth=%s", base64.RawURLEncoding.EncodeToString([]byte(util.BasicAuth("ben", "WRONNNGGGG"))))
 	response = request(t, s, "GET", u, "", nil)
 	require.Equal(t, 401, response.Code)
 }
 
+func TestServer_StatsResetter(t *testing.T) {
+	c := newTestConfigWithAuthFile(t)
+	c.AuthDefault = user.PermissionDenyAll
+	c.VisitorStatsResetTime = time.Now().Add(time.Second)
+	s := newTestServer(t, c)
+	go s.runStatsResetter()
+
+	require.Nil(t, s.userManager.AddUser("phil", "phil", user.RoleUser, "unit-test"))
+	require.Nil(t, s.userManager.AllowAccess("", "phil", "mytopic", true, true))
+
+	for i := 0; i < 5; i++ {
+		response := request(t, s, "PUT", "/mytopic", "test", map[string]string{
+			"Authorization": util.BasicAuth("phil", "phil"),
+		})
+		require.Equal(t, 200, response.Code)
+	}
+
+	response := request(t, s, "GET", "/v1/account", "", map[string]string{
+		"Authorization": util.BasicAuth("phil", "phil"),
+	})
+	require.Equal(t, 200, response.Code)
+
+	// User stats show 10 messages
+	response = request(t, s, "GET", "/v1/account", "", map[string]string{
+		"Authorization": util.BasicAuth("phil", "phil"),
+	})
+	require.Equal(t, 200, response.Code)
+	account, err := util.UnmarshalJSON[apiAccountResponse](io.NopCloser(response.Body))
+	require.Nil(t, err)
+	require.Equal(t, int64(5), account.Stats.Messages)
+
+	// Start stats resetter
+	time.Sleep(1200 * time.Millisecond)
+
+	// User stats show 0 messages now!
+	response = request(t, s, "GET", "/v1/account", "", nil)
+	require.Equal(t, 200, response.Code)
+	account, err = util.UnmarshalJSON[apiAccountResponse](io.NopCloser(response.Body))
+	require.Nil(t, err)
+	require.Equal(t, int64(0), account.Stats.Messages)
+
+}
+
 type testMailer struct {
 	count int
 	mu    sync.Mutex
@@ -1478,12 +1515,13 @@ func TestServer_PublishAttachmentAccountStats(t *testing.T) {
 	// User stats
 	response = request(t, s, "GET", "/v1/account", "", nil)
 	require.Equal(t, 200, response.Code)
-	var account *apiAccountResponse
-	require.Nil(t, json.NewDecoder(strings.NewReader(response.Body.String())).Decode(&account))
+	account, err := util.UnmarshalJSON[apiAccountResponse](io.NopCloser(response.Body))
+	require.Nil(t, err)
 	require.Equal(t, int64(5000), account.Limits.AttachmentFileSize)
 	require.Equal(t, int64(6000), account.Limits.AttachmentTotalSize)
 	require.Equal(t, int64(4999), account.Stats.AttachmentTotalSize)
 	require.Equal(t, int64(1001), account.Stats.AttachmentTotalSizeRemaining)
+	require.Equal(t, int64(1), account.Stats.Messages)
 }
 
 func TestServer_Visitor_XForwardedFor_None(t *testing.T) {
@@ -1644,10 +1682,6 @@ func toHTTPError(t *testing.T, s string) *errHTTP {
 	return &e
 }
 
-func basicAuth(s string) string {
-	return fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte(s)))
-}
-
 func readAll(t *testing.T, rc io.ReadCloser) string {
 	b, err := io.ReadAll(rc)
 	if err != nil {

+ 7 - 8
server/types.go

@@ -287,12 +287,11 @@ type apiAccountAccessRequest struct {
 }
 
 type apiConfigResponse struct {
-	BaseURL             string   `json:"base_url"`
-	AppRoot             string   `json:"app_root"`
-	EnableLogin         bool     `json:"enable_login"`
-	EnableSignup        bool     `json:"enable_signup"`
-	EnablePasswordReset bool     `json:"enable_password_reset"`
-	EnablePayments      bool     `json:"enable_payments"`
-	EnableReservations  bool     `json:"enable_reservations"`
-	DisallowedTopics    []string `json:"disallowed_topics"`
+	BaseURL            string   `json:"base_url"`
+	AppRoot            string   `json:"app_root"`
+	EnableLogin        bool     `json:"enable_login"`
+	EnableSignup       bool     `json:"enable_signup"`
+	EnablePayments     bool     `json:"enable_payments"`
+	EnableReservations bool     `json:"enable_reservations"`
+	DisallowedTopics   []string `json:"disallowed_topics"`
 }

+ 13 - 2
server/visitor.go

@@ -182,7 +182,7 @@ func (v *visitor) Stale() bool {
 	return time.Since(v.seen) > visitorExpungeAfter
 }
 
-func (v *visitor) IncrMessages() {
+func (v *visitor) IncrementMessages() {
 	v.mu.Lock()
 	defer v.mu.Unlock()
 	v.messages++
@@ -191,7 +191,7 @@ func (v *visitor) IncrMessages() {
 	}
 }
 
-func (v *visitor) IncrEmails() {
+func (v *visitor) IncrementEmails() {
 	v.mu.Lock()
 	defer v.mu.Unlock()
 	v.emails++
@@ -200,6 +200,17 @@ func (v *visitor) IncrEmails() {
 	}
 }
 
+func (v *visitor) ResetStats() {
+	v.mu.Lock()
+	defer v.mu.Unlock()
+	v.messages = 0
+	v.emails = 0
+	if v.user != nil {
+		v.user.Stats.Messages = 0
+		v.user.Stats.Emails = 0
+	}
+}
+
 func (v *visitor) Limits() *visitorLimits {
 	limits := &visitorLimits{}
 	if v.user != nil && v.user.Tier != nil {

+ 17 - 6
user/manager.go

@@ -59,7 +59,6 @@ const (
 			created_by TEXT NOT NULL,
 			created_at INT NOT NULL,
 			last_seen INT NOT NULL,
-			last_stats_reset INT NOT NULL DEFAULT (0),
 		    FOREIGN KEY (tier_id) REFERENCES tier (id)
 		);
 		CREATE UNIQUE INDEX idx_user ON user (user);
@@ -128,11 +127,12 @@ const (
 				ELSE 2
 			END, user
 	`
-	updateUserPassQuery  = `UPDATE user SET pass = ? WHERE user = ?`
-	updateUserRoleQuery  = `UPDATE user SET role = ? WHERE user = ?`
-	updateUserPrefsQuery = `UPDATE user SET prefs = ? WHERE user = ?`
-	updateUserStatsQuery = `UPDATE user SET stats_messages = ?, stats_emails = ? WHERE user = ?`
-	deleteUserQuery      = `DELETE FROM user WHERE user = ?`
+	updateUserPassQuery          = `UPDATE user SET pass = ? WHERE user = ?`
+	updateUserRoleQuery          = `UPDATE user SET role = ? WHERE user = ?`
+	updateUserPrefsQuery         = `UPDATE user SET prefs = ? WHERE user = ?`
+	updateUserStatsQuery         = `UPDATE user SET stats_messages = ?, stats_emails = ? WHERE user = ?`
+	updateUserStatsResetAllQuery = `UPDATE user SET stats_messages = 0, stats_emails = 0`
+	deleteUserQuery              = `DELETE FROM user WHERE user = ?`
 
 	upsertUserAccessQuery = `
 		INSERT INTO user_access (user_id, topic, read, write, owner_user_id) 
@@ -394,6 +394,17 @@ func (a *Manager) ChangeSettings(user *User) error {
 	return nil
 }
 
+// ResetStats resets all user stats in the user database. This touches all users.
+func (a *Manager) ResetStats() error {
+	a.mu.Lock()
+	defer a.mu.Unlock()
+	if _, err := a.db.Exec(updateUserStatsResetAllQuery); err != nil {
+		return err
+	}
+	a.statsQueue = make(map[string]*User)
+	return nil
+}
+
 // EnqueueStats adds the user to a queue which writes out user stats (messages, emails, ..) in
 // batches at a regular interval
 func (a *Manager) EnqueueStats(user *User) {

+ 12 - 0
util/time.go

@@ -14,6 +14,18 @@ var (
 	durationStrRegex  = regexp.MustCompile(`(?i)^(\d+)\s*(d|days?|h|hours?|m|mins?|minutes?|s|secs?|seconds?)$`)
 )
 
+// NextOccurrenceUTC takes a time of day (e.g. 9:00am), and returns the next occurrence
+// of that time from the current time (in UTC).
+func NextOccurrenceUTC(timeOfDay, base time.Time) time.Time {
+	hour, minute, seconds := timeOfDay.Clock()
+	now := base.UTC()
+	next := time.Date(now.Year(), now.Month(), now.Day(), hour, minute, seconds, 0, time.UTC)
+	if next.Before(now) {
+		next = next.AddDate(0, 0, 1)
+	}
+	return next
+}
+
 // ParseFutureTime parses a date/time string to a time.Time. It supports unix timestamps, durations
 // and natural language dates
 func ParseFutureTime(s string, now time.Time) (time.Time, error) {

+ 20 - 0
util/time_test.go

@@ -11,6 +11,26 @@ var (
 	base = time.Date(2021, 12, 10, 10, 17, 23, 0, time.UTC)
 )
 
+func TestNextOccurrenceUTC_NextDate(t *testing.T) {
+	loc, err := time.LoadLocation("America/New_York")
+	require.Nil(t, err)
+
+	timeOfDay := time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC) // Run at midnight UTC
+	nowInFairfieldCT := time.Date(2023, time.January, 10, 22, 19, 12, 0, loc)
+	nextRunTme := NextOccurrenceUTC(timeOfDay, nowInFairfieldCT)
+	require.Equal(t, time.Date(2023, time.January, 12, 0, 0, 0, 0, time.UTC), nextRunTme)
+}
+
+func TestNextOccurrenceUTC_SameDay(t *testing.T) {
+	loc, err := time.LoadLocation("America/New_York")
+	require.Nil(t, err)
+
+	timeOfDay := time.Date(0, 0, 0, 4, 0, 0, 0, time.UTC) // Run at 4am UTC
+	nowInFairfieldCT := time.Date(2023, time.January, 10, 22, 19, 12, 0, loc)
+	nextRunTme := NextOccurrenceUTC(timeOfDay, nowInFairfieldCT)
+	require.Equal(t, time.Date(2023, time.January, 11, 4, 0, 0, 0, time.UTC), nextRunTme)
+}
+
 func TestParseFutureTime_11am_FutureTime(t *testing.T) {
 	d, err := ParseFutureTime("11am", base)
 	require.Nil(t, err)

+ 0 - 1
web/public/config.js

@@ -10,7 +10,6 @@ var config = {
     app_root: "/app",
     enable_login: true,
     enable_signup: true,
-    enable_password_reset: false,
     enable_payments: true,
     enable_reservations: true,
     disallowed_topics: ["docs", "static", "file", "app", "account", "settings", "pricing", "signup", "login", "reset-password"]

+ 1 - 1
web/src/components/Login.js

@@ -112,7 +112,7 @@ const Login = () => {
                     </Box>
                 }
                 <Box sx={{width: "100%"}}>
-                    {config.enable_password_reset && <div style={{float: "left"}}><NavLink to={routes.resetPassword} variant="body1">{t("Reset password")}</NavLink></div>}
+                    {/* This is where the password reset link would go */}
                     {config.enable_signup && <div style={{float: "right"}}><NavLink to={routes.signup} variant="body1">{t("login_link_signup")}</NavLink></div>}
                 </Box>
             </Box>