batching_queue.go 989 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. package util
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. type BatchingQueue[T any] struct {
  7. batchSize int
  8. timeout time.Duration
  9. in []T
  10. out chan []T
  11. mu sync.Mutex
  12. }
  13. func NewBatchingQueue[T any](batchSize int, timeout time.Duration) *BatchingQueue[T] {
  14. q := &BatchingQueue[T]{
  15. batchSize: batchSize,
  16. timeout: timeout,
  17. in: make([]T, 0),
  18. out: make(chan []T),
  19. }
  20. ticker := time.NewTicker(timeout)
  21. go func() {
  22. for range ticker.C {
  23. elements := q.popAll()
  24. if len(elements) > 0 {
  25. q.out <- elements
  26. }
  27. }
  28. }()
  29. return q
  30. }
  31. func (c *BatchingQueue[T]) Push(element T) {
  32. c.mu.Lock()
  33. c.in = append(c.in, element)
  34. limitReached := len(c.in) == c.batchSize
  35. c.mu.Unlock()
  36. if limitReached {
  37. c.out <- c.popAll()
  38. }
  39. }
  40. func (c *BatchingQueue[T]) Pop() <-chan []T {
  41. return c.out
  42. }
  43. func (c *BatchingQueue[T]) popAll() []T {
  44. c.mu.Lock()
  45. defer c.mu.Unlock()
  46. elements := make([]T, len(c.in))
  47. copy(elements, c.in)
  48. c.in = c.in[:0]
  49. return elements
  50. }