fixed wrong queue implementation in bulletin.go
This commit is contained in:
242
bulletin.go
242
bulletin.go
@ -4,6 +4,7 @@ import "container/list"
|
||||
import "container/ring"
|
||||
import "errors"
|
||||
import "sync"
|
||||
import "time"
|
||||
|
||||
type BulletinSubscription[T interface{}] struct {
|
||||
C chan T
|
||||
@ -17,22 +18,50 @@ type BulletinSubscriptionList = *list.List
|
||||
type BulletinSubscriptionMap map[string]BulletinSubscriptionList
|
||||
|
||||
type Bulletin[T interface{}] struct {
|
||||
svc Service
|
||||
|
||||
sbsc_map BulletinSubscriptionMap
|
||||
sbsc_list *list.List
|
||||
sbsc_mtx sync.RWMutex
|
||||
closed bool
|
||||
blocked bool
|
||||
|
||||
r_mtx sync.RWMutex
|
||||
r *ring.Ring
|
||||
r_capa int
|
||||
r_full bool
|
||||
r_head *ring.Ring
|
||||
r_tail *ring.Ring
|
||||
r_len int
|
||||
r_cap int
|
||||
r_chan chan struct{}
|
||||
stop_chan chan struct{}
|
||||
}
|
||||
|
||||
func NewBulletin[T interface{}](capa int) *Bulletin[T] {
|
||||
func NewBulletin[T interface{}](svc Service, capa int) *Bulletin[T] {
|
||||
var r *ring.Ring
|
||||
|
||||
r = ring.New(capa)
|
||||
return &Bulletin[T]{
|
||||
sbsc_map: make(BulletinSubscriptionMap, 0),
|
||||
r: ring.New(capa),
|
||||
r_capa: capa,
|
||||
r_full: false,
|
||||
sbsc_list: list.New(),
|
||||
r: r,
|
||||
r_head: r,
|
||||
r_tail: r,
|
||||
r_cap: capa,
|
||||
r_len: 0,
|
||||
r_chan: make(chan struct{}, 1),
|
||||
stop_chan: make(chan struct{}, 1),
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Bulletin[T]) unsubscribe_list_nolock(sl BulletinSubscriptionList) {
|
||||
var sbsc *BulletinSubscription[T]
|
||||
var e *list.Element
|
||||
|
||||
for e = sl.Front(); e != nil; e = e.Next() {
|
||||
sbsc = e.Value.(*BulletinSubscription[T])
|
||||
sl.Remove(sbsc.node)
|
||||
close(sbsc.C)
|
||||
sbsc.b = nil
|
||||
sbsc.node = nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -41,20 +70,12 @@ func (b *Bulletin[T]) unsubscribe_all_nolock() {
|
||||
var sl BulletinSubscriptionList
|
||||
|
||||
for topic, sl = range b.sbsc_map {
|
||||
var sbsc *BulletinSubscription[T]
|
||||
var e *list.Element
|
||||
|
||||
for e = sl.Front(); e != nil; e = e.Next() {
|
||||
sbsc = e.Value.(*BulletinSubscription[T])
|
||||
close(sbsc.C)
|
||||
sbsc.b = nil
|
||||
sbsc.node = nil
|
||||
}
|
||||
|
||||
b.unsubscribe_list_nolock(sl)
|
||||
delete(b.sbsc_map, topic)
|
||||
}
|
||||
|
||||
b.closed = true
|
||||
b.unsubscribe_list_nolock(b.sbsc_list)
|
||||
b.blocked = true
|
||||
}
|
||||
|
||||
func (b *Bulletin[T]) UnsubscribeAll() {
|
||||
@ -63,33 +84,44 @@ func (b *Bulletin[T]) UnsubscribeAll() {
|
||||
b.sbsc_mtx.Unlock()
|
||||
}
|
||||
|
||||
func (b *Bulletin[T]) Close() {
|
||||
func (b *Bulletin[T]) Block() {
|
||||
b.sbsc_mtx.Lock()
|
||||
if !b.closed {
|
||||
b.unsubscribe_all_nolock()
|
||||
b.closed = true
|
||||
}
|
||||
b.blocked = true
|
||||
b.sbsc_mtx.Unlock()
|
||||
}
|
||||
|
||||
func (b *Bulletin[T]) Unblock() {
|
||||
b.sbsc_mtx.Lock()
|
||||
b.blocked = false
|
||||
b.sbsc_mtx.Unlock()
|
||||
}
|
||||
|
||||
func (b *Bulletin[T]) Subscribe(topic string) (*BulletinSubscription[T], error) {
|
||||
var sbsc BulletinSubscription[T]
|
||||
var sbsc_list BulletinSubscriptionList
|
||||
var ok bool
|
||||
|
||||
if b.closed { return nil, errors.New("closed bulletin") }
|
||||
b.sbsc_mtx.Lock()
|
||||
if b.blocked {
|
||||
b.sbsc_mtx.Unlock()
|
||||
return nil, errors.New("blocked")
|
||||
}
|
||||
|
||||
sbsc.C = make(chan T, 128) // TODO: size?
|
||||
sbsc.b = b
|
||||
sbsc.topic = topic
|
||||
|
||||
b.sbsc_mtx.Lock()
|
||||
sbsc_list, ok = b.sbsc_map[topic]
|
||||
if !ok {
|
||||
sbsc_list = list.New()
|
||||
b.sbsc_map[topic] = sbsc_list
|
||||
if topic == "" {
|
||||
sbsc.node = b.sbsc_list.PushBack(&sbsc)
|
||||
} else {
|
||||
var sbsc_list BulletinSubscriptionList
|
||||
var ok bool
|
||||
|
||||
sbsc_list, ok = b.sbsc_map[topic]
|
||||
if !ok {
|
||||
sbsc_list = list.New()
|
||||
b.sbsc_map[topic] = sbsc_list
|
||||
}
|
||||
sbsc.node = sbsc_list.PushBack(&sbsc)
|
||||
}
|
||||
sbsc.node = sbsc_list.PushBack(&sbsc)
|
||||
b.sbsc_mtx.Unlock()
|
||||
return &sbsc, nil
|
||||
}
|
||||
@ -100,12 +132,19 @@ func (b *Bulletin[T]) Unsubscribe(sbsc *BulletinSubscription[T]) {
|
||||
var ok bool
|
||||
|
||||
b.sbsc_mtx.Lock()
|
||||
sl, ok = b.sbsc_map[sbsc.topic]
|
||||
if ok {
|
||||
sl.Remove(sbsc.node)
|
||||
if sbsc.topic == "" {
|
||||
b.sbsc_list.Remove(sbsc.node)
|
||||
close(sbsc.C)
|
||||
sbsc.node = nil
|
||||
sbsc.b = nil
|
||||
} else {
|
||||
sl, ok = b.sbsc_map[sbsc.topic]
|
||||
if ok {
|
||||
sl.Remove(sbsc.node)
|
||||
close(sbsc.C)
|
||||
sbsc.node = nil
|
||||
sbsc.b = nil
|
||||
}
|
||||
}
|
||||
b.sbsc_mtx.Unlock()
|
||||
}
|
||||
@ -114,59 +153,130 @@ func (b *Bulletin[T]) Unsubscribe(sbsc *BulletinSubscription[T]) {
|
||||
func (b *Bulletin[T]) Publish(topic string, data T) {
|
||||
var sl BulletinSubscriptionList
|
||||
var ok bool
|
||||
var topics [2]string
|
||||
var t string
|
||||
|
||||
if b.closed { return }
|
||||
if topic == "" { return }
|
||||
|
||||
topics[0] = topic
|
||||
topics[1] = ""
|
||||
|
||||
b.sbsc_mtx.Lock()
|
||||
for _, t = range topics {
|
||||
sl, ok = b.sbsc_map[t]
|
||||
if ok {
|
||||
var sbsc *BulletinSubscription[T]
|
||||
var e *list.Element
|
||||
for e = sl.Front(); e != nil; e = e.Next() {
|
||||
sbsc = e.Value.(*BulletinSubscription[T])
|
||||
select {
|
||||
case sbsc.C <- data:
|
||||
// ok. could be written.
|
||||
default:
|
||||
// channel full. discard it
|
||||
}
|
||||
if b.blocked {
|
||||
b.sbsc_mtx.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
sl, ok = b.sbsc_map[topic]
|
||||
if ok {
|
||||
var sbsc *BulletinSubscription[T]
|
||||
var e *list.Element
|
||||
for e = sl.Front(); e != nil; e = e.Next() {
|
||||
sbsc = e.Value.(*BulletinSubscription[T])
|
||||
select {
|
||||
case sbsc.C <- data:
|
||||
// ok. could be written.
|
||||
default:
|
||||
// channel full. discard it
|
||||
}
|
||||
}
|
||||
}
|
||||
b.sbsc_mtx.Unlock()
|
||||
}
|
||||
|
||||
func (b *Bulletin[T]) Enqueue(topic string, data T) {
|
||||
func (b *Bulletin[T]) Enqueue(data T) {
|
||||
// hopefuly, it's fater to use a single mutex, a ring buffer, and a notification channel than
|
||||
// to use a channel to pass messages. TODO: performance verification
|
||||
b.r_mtx.Lock()
|
||||
b.r.Value = data // update the value at the current position
|
||||
b.r = b.r.Next() // move the current position
|
||||
if b.blocked {
|
||||
b.r_mtx.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
if b.r_len < b.r_cap {
|
||||
b.r_len++
|
||||
} else {
|
||||
b.r_head = b.r_head.Next()
|
||||
}
|
||||
b.r_tail.Value = data // update the value at the current position
|
||||
b.r_tail = b.r_tail.Next() // move the current position
|
||||
select {
|
||||
case b.r_chan <- struct{}{}:
|
||||
// write success
|
||||
default:
|
||||
// don't care if not writable
|
||||
}
|
||||
b.r_mtx.Unlock()
|
||||
}
|
||||
|
||||
func (b *Bulletin[T]) Dequeue() {
|
||||
func (b *Bulletin[T]) Dequeue() (T, bool) {
|
||||
var v T
|
||||
var ok bool
|
||||
|
||||
b.r_mtx.Lock()
|
||||
|
||||
if b.r_len > 0 {
|
||||
v = b.r_head.Value.(T) // store the value for returning
|
||||
b.r_head.Value = nil // nullify the value
|
||||
b.r_head = b.r_head.Next() // advance the head position
|
||||
b.r_len--
|
||||
ok = true
|
||||
}
|
||||
|
||||
b.r_mtx.Unlock()
|
||||
return v, ok
|
||||
}
|
||||
|
||||
/*
|
||||
func (b *Bulletin[T]) RunTask(wg *sync.WaitGroup) {
|
||||
var done bool
|
||||
var msg T
|
||||
var ok bool
|
||||
var tmr *time.Timer
|
||||
|
||||
defer wg.Done()
|
||||
|
||||
tmr = time.NewTimer(3 * time.Second)
|
||||
for !done {
|
||||
select {
|
||||
case msg, ok = <- b.C:
|
||||
if !ok { done = true }
|
||||
var msg T
|
||||
var ok bool
|
||||
|
||||
msg, ok = b.Dequeue()
|
||||
if !ok {
|
||||
select {
|
||||
case <-b.stop_chan:
|
||||
// this may break the loop prematurely while there
|
||||
// are messages to read as it uses two different channels:
|
||||
// one for stop, another for notification
|
||||
done = true
|
||||
case <-b.r_chan:
|
||||
// noti received.
|
||||
tmr.Stop()
|
||||
tmr.Reset(3 * time.Second)
|
||||
case <-tmr.C:
|
||||
// try to dequeue again
|
||||
tmr.Reset(3 * time.Second)
|
||||
}
|
||||
} else {
|
||||
// forward msg to all subscribers...
|
||||
var e *list.Element
|
||||
var sbsc *BulletinSubscription[T]
|
||||
|
||||
tmr.Stop()
|
||||
|
||||
b.sbsc_mtx.Lock()
|
||||
for e = b.sbsc_list.Front(); e != nil; e = e.Next() {
|
||||
sbsc = e.Value.(*BulletinSubscription[T])
|
||||
select {
|
||||
case sbsc.C <- msg:
|
||||
// ok. could be written.
|
||||
default:
|
||||
// channel full. discard it
|
||||
}
|
||||
}
|
||||
b.sbsc_mtx.Unlock()
|
||||
}
|
||||
}
|
||||
}*/
|
||||
|
||||
tmr.Stop()
|
||||
}
|
||||
|
||||
func (b *Bulletin[T]) ReqStop() {
|
||||
select {
|
||||
case b.stop_chan <- struct{}{}:
|
||||
// write success
|
||||
default:
|
||||
// ignore failure
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user