Parcourir la source

Merge pull request #502 from binwiederhier/async-message-cache

Batch message INSERTs
Philipp C. Heckel il y a 3 ans
Parent
commit
f4daa4508f

+ 6 - 0
cmd/serve.go

@@ -44,6 +44,8 @@ var flagsServe = append(
 	altsrc.NewStringFlag(&cli.StringFlag{Name: "firebase-key-file", Aliases: []string{"firebase_key_file", "F"}, EnvVars: []string{"NTFY_FIREBASE_KEY_FILE"}, Usage: "Firebase credentials file; if set additionally publish to FCM topic"}),
 	altsrc.NewStringFlag(&cli.StringFlag{Name: "cache-file", Aliases: []string{"cache_file", "C"}, EnvVars: []string{"NTFY_CACHE_FILE"}, Usage: "cache file used for message caching"}),
 	altsrc.NewDurationFlag(&cli.DurationFlag{Name: "cache-duration", Aliases: []string{"cache_duration", "b"}, EnvVars: []string{"NTFY_CACHE_DURATION"}, Value: server.DefaultCacheDuration, Usage: "buffer messages for this time to allow `since` requests"}),
+	altsrc.NewIntFlag(&cli.IntFlag{Name: "cache-batch-size", Aliases: []string{"cache_batch_size"}, EnvVars: []string{"NTFY_BATCH_SIZE"}, Usage: "max size of messages to batch together when writing to message cache (if zero, writes are synchronous)"}),
+	altsrc.NewDurationFlag(&cli.DurationFlag{Name: "cache-batch-timeout", Aliases: []string{"cache_batch_timeout"}, EnvVars: []string{"NTFY_CACHE_BATCH_TIMEOUT"}, Usage: "timeout for batched async writes to the message cache (if zero, writes are synchronous)"}),
 	altsrc.NewStringFlag(&cli.StringFlag{Name: "cache-startup-queries", Aliases: []string{"cache_startup_queries"}, EnvVars: []string{"NTFY_CACHE_STARTUP_QUERIES"}, Usage: "queries run when the cache database is initialized"}),
 	altsrc.NewStringFlag(&cli.StringFlag{Name: "auth-file", Aliases: []string{"auth_file", "H"}, EnvVars: []string{"NTFY_AUTH_FILE"}, Usage: "auth database file used for access control"}),
 	altsrc.NewStringFlag(&cli.StringFlag{Name: "auth-default-access", Aliases: []string{"auth_default_access", "p"}, EnvVars: []string{"NTFY_AUTH_DEFAULT_ACCESS"}, Value: "read-write", Usage: "default permissions if no matching entries in the auth database are found"}),
@@ -110,6 +112,8 @@ func execServe(c *cli.Context) error {
 	cacheFile := c.String("cache-file")
 	cacheDuration := c.Duration("cache-duration")
 	cacheStartupQueries := c.String("cache-startup-queries")
+	cacheBatchSize := c.Int("cache-batch-size")
+	cacheBatchTimeout := c.Duration("cache-batch-timeout")
 	authFile := c.String("auth-file")
 	authDefaultAccess := c.String("auth-default-access")
 	attachmentCacheDir := c.String("attachment-cache-dir")
@@ -233,6 +237,8 @@ func execServe(c *cli.Context) error {
 	conf.CacheFile = cacheFile
 	conf.CacheDuration = cacheDuration
 	conf.CacheStartupQueries = cacheStartupQueries
+	conf.CacheBatchSize = cacheBatchSize
+	conf.CacheBatchTimeout = cacheBatchTimeout
 	conf.AuthFile = authFile
 	conf.AuthDefaultRead = authDefaultRead
 	conf.AuthDefaultWrite = authDefaultWrite

+ 13 - 1
docs/config.md

@@ -825,19 +825,27 @@ out [this discussion on Reddit](https://www.reddit.com/r/golang/comments/r9u4ee/
 
 Depending on *how you run it*, here are a few limits that are relevant:
 
-### WAL for message cache
+### Message cache
 By default, the [message cache](#message-cache) (defined by `cache-file`) uses the SQLite default settings, which means it
 syncs to disk on every write. For personal servers, this is perfectly adequate. For larger installations, such as ntfy.sh,
 the [write-ahead log (WAL)](https://sqlite.org/wal.html) should be enabled, and the sync mode should be adjusted. 
 See [this article](https://phiresky.github.io/blog/2020/sqlite-performance-tuning/) for details.
 
+In addition to that, for very high load servers (such as ntfy.sh), it may be beneficial to write messages to the cache
+in batches, and asynchronously. This can be enabled with the `cache-batch-size` and `cache-batch-timeout`. If you start
+seeing `database locked` messages in the logs, you should probably enable that.
+
 Here's how ntfy.sh has been tuned in the `server.yml` file:
 
 ``` yaml
+cache-batch-size: 25
+cache-batch-timeout: "1s"
 cache-startup-queries: |
     pragma journal_mode = WAL;
     pragma synchronous = normal;
     pragma temp_store = memory;
+    pragma busy_timeout = 15000;
+    vacuum;
 ```
 
 ### For systemd services
@@ -990,6 +998,8 @@ variable before running the `ntfy` command (e.g. `export NTFY_LISTEN_HTTP=:80`).
 | `cache-file`                               | `NTFY_CACHE_FILE`                               | *filename*                                          | -                 | If set, messages are cached in a local SQLite database instead of only in-memory. This allows for service restarts without losing messages in support of the since= parameter. See [message cache](#message-cache).             |
 | `cache-duration`                           | `NTFY_CACHE_DURATION`                           | *duration*                                          | 12h               | Duration for which messages will be buffered before they are deleted. This is required to support the `since=...` and `poll=1` parameter. Set this to `0` to disable the cache entirely.                                        |
 | `cache-startup-queries`                    | `NTFY_CACHE_STARTUP_QUERIES`                    | *string (SQL queries)*                              | -                 | SQL queries to run during database startup; this is useful for tuning and [enabling WAL mode](#wal-for-message-cache)                                                                                                           |
+| `cache-batch-size`                         | `NTFY_CACHE_BATCH_SIZE`                         | *int*                                               | 0                 | Max size of messages to batch together when writing to message cache (if zero, writes are synchronous)                                                                                                                          |
+| `cache-batch-timeout`                      | `NTFY_CACHE_BATCH_TIMEOUT`                      | *duration*                                          | 0s                | Timeout for batched async writes to the message cache (if zero, writes are synchronous)                                                                                                                                         |
 | `auth-file`                                | `NTFY_AUTH_FILE`                                | *filename*                                          | -                 | Auth database file used for access control. If set, enables authentication and access control. See [access control](#access-control).                                                                                           |
 | `auth-default-access`                      | `NTFY_AUTH_DEFAULT_ACCESS`                      | `read-write`, `read-only`, `write-only`, `deny-all` | `read-write`      | Default permissions if no matching entries in the auth database are found. Default is `read-write`.                                                                                                                             |
 | `behind-proxy`                             | `NTFY_BEHIND_PROXY`                             | *bool*                                              | false             | If set, the X-Forwarded-For header is used to determine the visitor IP address instead of the remote address of the connection.                                                                                                 |
@@ -1054,6 +1064,8 @@ OPTIONS:
    --behind-proxy, --behind_proxy, -P                                                                  if set, use X-Forwarded-For header to determine visitor IP address (for rate limiting) (default: false) [$NTFY_BEHIND_PROXY]
    --cache-duration since, --cache_duration since, -b since                                            buffer messages for this time to allow since requests (default: 12h0m0s) [$NTFY_CACHE_DURATION]
    --cache-file value, --cache_file value, -C value                                                    cache file used for message caching [$NTFY_CACHE_FILE]
+   --cache-batch-size value, --cache_batch_size value                                                  max size of messages to batch together when writing to message cache (if zero, writes are synchronous) (default: 0) [$NTFY_BATCH_SIZE]
+   --cache-batch-timeout value, --cache_batch_timeout value                                            timeout for batched async writes to the message cache (if zero, writes are synchronous) (default: 0s) [$NTFY_CACHE_BATCH_TIMEOUT]   
    --cache-startup-queries value, --cache_startup_queries value                                        queries run when the cache database is initialized [$NTFY_CACHE_STARTUP_QUERIES]
    --cert-file value, --cert_file value, -E value                                                      certificate file, if listen-https is set [$NTFY_CERT_FILE]
    --config value, -c value                                                                            config file (default: /etc/ntfy/server.yml) [$NTFY_CONFIG_FILE]

+ 5 - 0
docs/releases.md

@@ -14,6 +14,10 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release
 
 ## ntfy server v1.30.0 (UNRELREASED)
 
+**Features:**
+
+* High-load servers: Allow asynchronous batch-writing of messages to cache via `cache-batch-*` options ([#498](https://github.com/binwiederhier/ntfy/issues/498)/[#502](https://github.com/binwiederhier/ntfy/pull/502))   
+
 **Documentation:**
 
 * GitHub Actions example ([#492](https://github.com/binwiederhier/ntfy/pull/492), thanks to [@ksurl](https://github.com/ksurl))
@@ -22,6 +26,7 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release
 **Other things:**
 
 * Put ntfy.sh docs on GitHub pages to reduce AWS outbound traffic cost ([#491](https://github.com/binwiederhier/ntfy/issues/491))
+* The ntfy.sh server hardware was upgraded to a bigger box. If you'd like to help out carrying the server cost, **[sponsorships and donations](https://github.com/sponsors/binwiederhier)** 💸 would be very much appreciated
 
 ## ntfy server v1.29.0
 Released November 12, 2022

+ 4 - 0
server/config.go

@@ -61,6 +61,8 @@ type Config struct {
 	CacheFile                            string
 	CacheDuration                        time.Duration
 	CacheStartupQueries                  string
+	CacheBatchSize                       int
+	CacheBatchTimeout                    time.Duration
 	AuthFile                             string
 	AuthDefaultRead                      bool
 	AuthDefaultWrite                     bool
@@ -114,6 +116,8 @@ func NewConfig() *Config {
 		FirebaseKeyFile:                      "",
 		CacheFile:                            "",
 		CacheDuration:                        DefaultCacheDuration,
+		CacheBatchSize:                       0,
+		CacheBatchTimeout:                    0,
 		AuthFile:                             "",
 		AuthDefaultRead:                      true,
 		AuthDefaultWrite:                     true,

+ 69 - 13
server/message_cache.go

@@ -44,6 +44,7 @@ const (
 			published INT NOT NULL
 		);
 		CREATE INDEX IF NOT EXISTS idx_mid ON messages (mid);
+		CREATE INDEX IF NOT EXISTS idx_time ON messages (time);
 		CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
 		COMMIT;
 	`
@@ -92,7 +93,7 @@ const (
 
 // Schema management queries
 const (
-	currentSchemaVersion          = 8
+	currentSchemaVersion          = 9
 	createSchemaVersionTableQuery = `
 		CREATE TABLE IF NOT EXISTS schemaVersion (
 			id INT PRIMARY KEY,
@@ -185,15 +186,21 @@ const (
 	migrate7To8AlterMessagesTableQuery = `
 		ALTER TABLE messages ADD COLUMN icon TEXT NOT NULL DEFAULT('');
 	`
+
+	// 8 -> 9
+	migrate8To9AlterMessagesTableQuery = `
+		CREATE INDEX IF NOT EXISTS idx_time ON messages (time);	
+	`
 )
 
 type messageCache struct {
-	db  *sql.DB
-	nop bool
+	db    *sql.DB
+	queue *util.BatchingQueue[*message]
+	nop   bool
 }
 
 // newSqliteCache creates a SQLite file-backed cache
-func newSqliteCache(filename, startupQueries string, nop bool) (*messageCache, error) {
+func newSqliteCache(filename, startupQueries string, batchSize int, batchTimeout time.Duration, nop bool) (*messageCache, error) {
 	db, err := sql.Open("sqlite3", filename)
 	if err != nil {
 		return nil, err
@@ -201,21 +208,28 @@ func newSqliteCache(filename, startupQueries string, nop bool) (*messageCache, e
 	if err := setupCacheDB(db, startupQueries); err != nil {
 		return nil, err
 	}
-	return &messageCache{
-		db:  db,
-		nop: nop,
-	}, nil
+	var queue *util.BatchingQueue[*message]
+	if batchSize > 0 || batchTimeout > 0 {
+		queue = util.NewBatchingQueue[*message](batchSize, batchTimeout)
+	}
+	cache := &messageCache{
+		db:    db,
+		queue: queue,
+		nop:   nop,
+	}
+	go cache.processMessageBatches()
+	return cache, nil
 }
 
 // newMemCache creates an in-memory cache
 func newMemCache() (*messageCache, error) {
-	return newSqliteCache(createMemoryFilename(), "", false)
+	return newSqliteCache(createMemoryFilename(), "", 0, 0, false)
 }
 
 // newNopCache creates an in-memory cache that discards all messages;
 // it is always empty and can be used if caching is entirely disabled
 func newNopCache() (*messageCache, error) {
-	return newSqliteCache(createMemoryFilename(), "", true)
+	return newSqliteCache(createMemoryFilename(), "", 0, 0, true)
 }
 
 // createMemoryFilename creates a unique memory filename to use for the SQLite backend.
@@ -228,14 +242,23 @@ func createMemoryFilename() string {
 	return fmt.Sprintf("file:%s?mode=memory&cache=shared", util.RandomString(10))
 }
 
+// AddMessage stores a message to the message cache synchronously, or queues it to be stored at a later date asyncronously.
+// The message is queued only if "batchSize" or "batchTimeout" are passed to the constructor.
 func (c *messageCache) AddMessage(m *message) error {
+	if c.queue != nil {
+		c.queue.Enqueue(m)
+		return nil
+	}
 	return c.addMessages([]*message{m})
 }
 
+// addMessages synchronously stores a match of messages. If the database is locked, the transaction waits until
+// SQLite's busy_timeout is exceeded before erroring out.
 func (c *messageCache) addMessages(ms []*message) error {
 	if c.nop {
 		return nil
 	}
+	start := time.Now()
 	tx, err := c.db.Begin()
 	if err != nil {
 		return err
@@ -289,7 +312,12 @@ func (c *messageCache) addMessages(ms []*message) error {
 			return err
 		}
 	}
-	return tx.Commit()
+	if err := tx.Commit(); err != nil {
+		log.Error("Cache: Writing %d message(s) failed (took %v)", len(ms), time.Since(start))
+		return err
+	}
+	log.Debug("Cache: Wrote %d message(s) in %v", len(ms), time.Since(start))
+	return nil
 }
 
 func (c *messageCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
@@ -395,8 +423,12 @@ func (c *messageCache) Topics() (map[string]*topic, error) {
 }
 
 func (c *messageCache) Prune(olderThan time.Time) error {
-	_, err := c.db.Exec(pruneMessagesQuery, olderThan.Unix())
-	return err
+	start := time.Now()
+	if _, err := c.db.Exec(pruneMessagesQuery, olderThan.Unix()); err != nil {
+		log.Warn("Cache: Pruning failed (after %v): %s", time.Since(start), err.Error())
+	}
+	log.Debug("Cache: Pruning successful (took %v)", time.Since(start))
+	return nil
 }
 
 func (c *messageCache) AttachmentBytesUsed(sender string) (int64, error) {
@@ -417,6 +449,17 @@ func (c *messageCache) AttachmentBytesUsed(sender string) (int64, error) {
 	return size, nil
 }
 
+func (c *messageCache) processMessageBatches() {
+	if c.queue == nil {
+		return
+	}
+	for messages := range c.queue.Dequeue() {
+		if err := c.addMessages(messages); err != nil {
+			log.Error("Cache: %s", err.Error())
+		}
+	}
+}
+
 func readMessages(rows *sql.Rows) ([]*message, error) {
 	defer rows.Close()
 	messages := make([]*message, 0)
@@ -542,6 +585,8 @@ func setupCacheDB(db *sql.DB, startupQueries string) error {
 		return migrateFrom6(db)
 	} else if schemaVersion == 7 {
 		return migrateFrom7(db)
+	} else if schemaVersion == 8 {
+		return migrateFrom8(db)
 	}
 	return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
 }
@@ -647,5 +692,16 @@ func migrateFrom7(db *sql.DB) error {
 	if _, err := db.Exec(updateSchemaVersion, 8); err != nil {
 		return err
 	}
+	return migrateFrom8(db)
+}
+
+func migrateFrom8(db *sql.DB) error {
+	log.Info("Migrating cache database schema: from 8 to 9")
+	if _, err := db.Exec(migrate8To9AlterMessagesTableQuery); err != nil {
+		return err
+	}
+	if _, err := db.Exec(updateSchemaVersion, 9); err != nil {
+		return err
+	}
 	return nil // Update this when a new version is added
 }

+ 5 - 5
server/message_cache_test.go

@@ -450,7 +450,7 @@ func TestSqliteCache_StartupQueries_WAL(t *testing.T) {
 	startupQueries := `pragma journal_mode = WAL; 
 pragma synchronous = normal; 
 pragma temp_store = memory;`
-	db, err := newSqliteCache(filename, startupQueries, false)
+	db, err := newSqliteCache(filename, startupQueries, 0, 0, false)
 	require.Nil(t, err)
 	require.Nil(t, db.AddMessage(newDefaultMessage("mytopic", "some message")))
 	require.FileExists(t, filename)
@@ -461,7 +461,7 @@ pragma temp_store = memory;`
 func TestSqliteCache_StartupQueries_None(t *testing.T) {
 	filename := newSqliteTestCacheFile(t)
 	startupQueries := ""
-	db, err := newSqliteCache(filename, startupQueries, false)
+	db, err := newSqliteCache(filename, startupQueries, 0, 0, false)
 	require.Nil(t, err)
 	require.Nil(t, db.AddMessage(newDefaultMessage("mytopic", "some message")))
 	require.FileExists(t, filename)
@@ -472,7 +472,7 @@ func TestSqliteCache_StartupQueries_None(t *testing.T) {
 func TestSqliteCache_StartupQueries_Fail(t *testing.T) {
 	filename := newSqliteTestCacheFile(t)
 	startupQueries := `xx error`
-	_, err := newSqliteCache(filename, startupQueries, false)
+	_, err := newSqliteCache(filename, startupQueries, 0, 0, false)
 	require.Error(t, err)
 }
 
@@ -501,7 +501,7 @@ func TestMemCache_NopCache(t *testing.T) {
 }
 
 func newSqliteTestCache(t *testing.T) *messageCache {
-	c, err := newSqliteCache(newSqliteTestCacheFile(t), "", false)
+	c, err := newSqliteCache(newSqliteTestCacheFile(t), "", 0, 0, false)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -513,7 +513,7 @@ func newSqliteTestCacheFile(t *testing.T) string {
 }
 
 func newSqliteTestCacheFromFile(t *testing.T, filename, startupQueries string) *messageCache {
-	c, err := newSqliteCache(filename, startupQueries, false)
+	c, err := newSqliteCache(filename, startupQueries, 0, 0, false)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 2 - 1
server/server.go

@@ -159,7 +159,7 @@ func createMessageCache(conf *Config) (*messageCache, error) {
 	if conf.CacheDuration == 0 {
 		return newNopCache()
 	} else if conf.CacheFile != "" {
-		return newSqliteCache(conf.CacheFile, conf.CacheStartupQueries, false)
+		return newSqliteCache(conf.CacheFile, conf.CacheStartupQueries, conf.CacheBatchSize, conf.CacheBatchTimeout, false)
 	}
 	return newMemCache()
 }
@@ -491,6 +491,7 @@ func (s *Server) handlePublishWithoutResponse(r *http.Request, v *visitor) (*mes
 		log.Debug("%s Message delayed, will process later", logMessagePrefix(v, m))
 	}
 	if cache {
+		log.Debug("%s Adding message to cache", logMessagePrefix(v, m))
 		if err := s.messageCache.AddMessage(m); err != nil {
 			return nil, err
 		}

+ 8 - 0
server/server.yml

@@ -53,6 +53,12 @@
 #       pragma journal_mode = WAL;
 #       pragma synchronous = normal;
 #       pragma temp_store = memory;
+#       pragma busy_timeout = 15000;
+#       vacuum;
+#
+# The "cache-batch-size" and "cache-batch-timeout" parameter allow enabling async batch writing
+# of messages. If set, messages will be queued and written to the database in batches of the given
+# size, or after the given timeout. This is only required for high volume servers.
 #
 # Debian/RPM package users:
 #   Use /var/cache/ntfy/cache.db as cache file to avoid permission issues. The package
@@ -65,6 +71,8 @@
 # cache-file: <filename>
 # cache-duration: "12h"
 # cache-startup-queries:
+# cache-batch-size: 0
+# cache-batch-timeout: "0ms"
 
 # If set, access to the ntfy server and API can be controlled on a granular level using
 # the 'ntfy user' and 'ntfy access' commands. See the --help pages for details, or check the docs.

+ 86 - 0
util/batching_queue.go

@@ -0,0 +1,86 @@
+package util
+
+import (
+	"sync"
+	"time"
+)
+
+// BatchingQueue is a queue that creates batches of the enqueued elements based on a
+// max batch size and a batch timeout.
+//
+// Example:
+//
+//	q := NewBatchingQueue[int](2, 500 * time.Millisecond)
+//	go func() {
+//	  for batch := range q.Dequeue() {
+//	    fmt.Println(batch)
+//	  }
+//	}()
+//	q.Enqueue(1)
+//	q.Enqueue(2)
+//	q.Enqueue(3)
+//	time.Sleep(time.Second)
+//
+// This example will emit batch [1, 2] immediately (because the batch size is 2), and
+// a batch [3] after 500ms.
+type BatchingQueue[T any] struct {
+	batchSize int
+	timeout   time.Duration
+	in        []T
+	out       chan []T
+	mu        sync.Mutex
+}
+
+// NewBatchingQueue creates a new BatchingQueue
+func NewBatchingQueue[T any](batchSize int, timeout time.Duration) *BatchingQueue[T] {
+	q := &BatchingQueue[T]{
+		batchSize: batchSize,
+		timeout:   timeout,
+		in:        make([]T, 0),
+		out:       make(chan []T),
+	}
+	go q.timeoutTicker()
+	return q
+}
+
+// Enqueue enqueues an element to the queue. If the configured batch size is reached,
+// the batch will be emitted immediately.
+func (q *BatchingQueue[T]) Enqueue(element T) {
+	q.mu.Lock()
+	q.in = append(q.in, element)
+	var elements []T
+	if len(q.in) == q.batchSize {
+		elements = q.dequeueAll()
+	}
+	q.mu.Unlock()
+	if len(elements) > 0 {
+		q.out <- elements
+	}
+}
+
+// Dequeue returns a channel emitting batches of elements
+func (q *BatchingQueue[T]) Dequeue() <-chan []T {
+	return q.out
+}
+
+func (q *BatchingQueue[T]) dequeueAll() []T {
+	elements := make([]T, len(q.in))
+	copy(elements, q.in)
+	q.in = q.in[:0]
+	return elements
+}
+
+func (q *BatchingQueue[T]) timeoutTicker() {
+	if q.timeout == 0 {
+		return
+	}
+	ticker := time.NewTicker(q.timeout)
+	for range ticker.C {
+		q.mu.Lock()
+		elements := q.dequeueAll()
+		q.mu.Unlock()
+		if len(elements) > 0 {
+			q.out <- elements
+		}
+	}
+}

+ 58 - 0
util/batching_queue_test.go

@@ -0,0 +1,58 @@
+package util_test
+
+import (
+	"github.com/stretchr/testify/require"
+	"heckel.io/ntfy/util"
+	"math/rand"
+	"sync"
+	"testing"
+	"time"
+)
+
+func TestBatchingQueue_InfTimeout(t *testing.T) {
+	q := util.NewBatchingQueue[int](25, 1*time.Hour)
+	batches, total := make([][]int, 0), 0
+	var mu sync.Mutex
+	go func() {
+		for batch := range q.Dequeue() {
+			mu.Lock()
+			batches = append(batches, batch)
+			total += len(batch)
+			mu.Unlock()
+		}
+	}()
+	for i := 0; i < 101; i++ {
+		go q.Enqueue(i)
+	}
+	time.Sleep(time.Second)
+	mu.Lock()
+	require.Equal(t, 100, total) // One is missing, stuck in the last batch!
+	require.Equal(t, 4, len(batches))
+	mu.Unlock()
+}
+
+func TestBatchingQueue_WithTimeout(t *testing.T) {
+	q := util.NewBatchingQueue[int](25, 100*time.Millisecond)
+	batches, total := make([][]int, 0), 0
+	var mu sync.Mutex
+	go func() {
+		for batch := range q.Dequeue() {
+			mu.Lock()
+			batches = append(batches, batch)
+			total += len(batch)
+			mu.Unlock()
+		}
+	}()
+	for i := 0; i < 101; i++ {
+		go func(i int) {
+			time.Sleep(time.Duration(rand.Intn(700)) * time.Millisecond)
+			q.Enqueue(i)
+		}(i)
+	}
+	time.Sleep(time.Second)
+	mu.Lock()
+	require.Equal(t, 101, total)
+	require.True(t, len(batches) > 4) // 101/25
+	require.True(t, len(batches) < 21)
+	mu.Unlock()
+}