|
|
@@ -11,11 +11,11 @@ import (
|
|
|
// topic represents a channel to which subscribers can subscribe, and publishers
|
|
|
// can publish a message
|
|
|
type topic struct {
|
|
|
- ID string
|
|
|
- subscribers map[int]*topicSubscriber
|
|
|
- lastVisitor *visitor
|
|
|
- lastVisitorExpires time.Time
|
|
|
- mu sync.Mutex
|
|
|
+ ID string
|
|
|
+ subscribers map[int]*topicSubscriber
|
|
|
+ vRate *visitor
|
|
|
+ vRateExpires time.Time
|
|
|
+ mu sync.Mutex
|
|
|
}
|
|
|
|
|
|
type topicSubscriber struct {
|
|
|
@@ -49,9 +49,9 @@ func (t *topic) Subscribe(s subscriber, visitor *visitor, cancel func(), subscri
|
|
|
}
|
|
|
|
|
|
// if no subscriber is already handling the rate limit
|
|
|
- if t.lastVisitor == nil && subscriberRateLimit {
|
|
|
- t.lastVisitor = visitor
|
|
|
- t.lastVisitorExpires = time.Time{}
|
|
|
+ if t.vRate == nil && subscriberRateLimit {
|
|
|
+ t.vRate = visitor
|
|
|
+ t.vRateExpires = time.Time{}
|
|
|
}
|
|
|
|
|
|
return subscriberID
|
|
|
@@ -61,16 +61,16 @@ func (t *topic) Stale() bool {
|
|
|
t.mu.Lock()
|
|
|
defer t.mu.Unlock()
|
|
|
// if Time is initialized (not the zero value) and the expiry time has passed
|
|
|
- if !t.lastVisitorExpires.IsZero() && t.lastVisitorExpires.Before(time.Now()) {
|
|
|
- t.lastVisitor = nil
|
|
|
+ if !t.vRateExpires.IsZero() && t.vRateExpires.Before(time.Now()) {
|
|
|
+ t.vRate = nil
|
|
|
}
|
|
|
- return len(t.subscribers) == 0 && t.lastVisitor == nil
|
|
|
+ return len(t.subscribers) == 0 && t.vRate == nil
|
|
|
}
|
|
|
|
|
|
func (t *topic) Billee() *visitor {
|
|
|
t.mu.Lock()
|
|
|
defer t.mu.Unlock()
|
|
|
- return t.lastVisitor
|
|
|
+ return t.vRate
|
|
|
}
|
|
|
|
|
|
// Unsubscribe removes the subscription from the list of subscribers
|
|
|
@@ -84,16 +84,16 @@ func (t *topic) Unsubscribe(id int) {
|
|
|
// look for an active subscriber (in random order) that wants to handle the rate limit
|
|
|
for _, v := range t.subscribers {
|
|
|
if v.subscriberRateLimit {
|
|
|
- t.lastVisitor = v.visitor
|
|
|
- t.lastVisitorExpires = time.Time{}
|
|
|
+ t.vRate = v.visitor
|
|
|
+ t.vRateExpires = time.Time{}
|
|
|
return
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// if no active subscriber is found, count it towards the leaving subscriber
|
|
|
if deletingSub.subscriberRateLimit {
|
|
|
- t.lastVisitor = deletingSub.visitor
|
|
|
- t.lastVisitorExpires = time.Now().Add(subscriberBilledValidity)
|
|
|
+ t.vRate = deletingSub.visitor
|
|
|
+ t.vRateExpires = time.Now().Add(subscriberBilledValidity)
|
|
|
}
|
|
|
}
|
|
|
|