hodu/bulletin.go
hyung-hwan 09593fd678 fixed some potential concurrency issues in the client side.
enhanded route_started log messages
renamed service to svc for some items
added two more fields to denote requested service address/network to ServerRoute
2025-03-11 21:12:05 +09:00

173 lines
3.2 KiB
Go

package hodu
import "container/list"
import "container/ring"
import "errors"
import "sync"
type BulletinSubscription[T interface{}] struct {
C chan T
b *Bulletin[T]
topic string
node *list.Element
}
type BulletinSubscriptionList = *list.List
type BulletinSubscriptionMap map[string]BulletinSubscriptionList
type Bulletin[T interface{}] struct {
sbsc_map BulletinSubscriptionMap
sbsc_mtx sync.RWMutex
closed bool
r_mtx sync.RWMutex
r *ring.Ring
r_capa int
r_full bool
}
func NewBulletin[T interface{}](capa int) *Bulletin[T] {
return &Bulletin[T]{
sbsc_map: make(BulletinSubscriptionMap, 0),
r: ring.New(capa),
r_capa: capa,
r_full: false,
}
}
func (b *Bulletin[T]) unsubscribe_all_nolock() {
var topic string
var sl BulletinSubscriptionList
for topic, sl = range b.sbsc_map {
var sbsc *BulletinSubscription[T]
var e *list.Element
for e = sl.Front(); e != nil; e = e.Next() {
sbsc = e.Value.(*BulletinSubscription[T])
close(sbsc.C)
sbsc.b = nil
sbsc.node = nil
}
delete(b.sbsc_map, topic)
}
b.closed = true
}
func (b *Bulletin[T]) UnsubscribeAll() {
b.sbsc_mtx.Lock()
b.unsubscribe_all_nolock()
b.sbsc_mtx.Unlock()
}
func (b *Bulletin[T]) Close() {
b.sbsc_mtx.Lock()
if !b.closed {
b.unsubscribe_all_nolock()
b.closed = true
}
b.sbsc_mtx.Unlock()
}
func (b *Bulletin[T]) Subscribe(topic string) (*BulletinSubscription[T], error) {
var sbsc BulletinSubscription[T]
var sbsc_list BulletinSubscriptionList
var ok bool
if b.closed { return nil, errors.New("closed bulletin") }
sbsc.C = make(chan T, 128) // TODO: size?
sbsc.b = b
sbsc.topic = topic
b.sbsc_mtx.Lock()
sbsc_list, ok = b.sbsc_map[topic]
if !ok {
sbsc_list = list.New()
b.sbsc_map[topic] = sbsc_list
}
sbsc.node = sbsc_list.PushBack(&sbsc)
b.sbsc_mtx.Unlock()
return &sbsc, nil
}
func (b *Bulletin[T]) Unsubscribe(sbsc *BulletinSubscription[T]) {
if sbsc.b == b && sbsc.node != nil {
var sl BulletinSubscriptionList
var ok bool
b.sbsc_mtx.Lock()
sl, ok = b.sbsc_map[sbsc.topic]
if ok {
sl.Remove(sbsc.node)
close(sbsc.C)
sbsc.node = nil
sbsc.b = nil
}
b.sbsc_mtx.Unlock()
}
}
func (b *Bulletin[T]) Publish(topic string, data T) {
var sl BulletinSubscriptionList
var ok bool
var topics [2]string
var t string
if b.closed { return }
if topic == "" { return }
topics[0] = topic
topics[1] = ""
b.sbsc_mtx.Lock()
for _, t = range topics {
sl, ok = b.sbsc_map[t]
if ok {
var sbsc *BulletinSubscription[T]
var e *list.Element
for e = sl.Front(); e != nil; e = e.Next() {
sbsc = e.Value.(*BulletinSubscription[T])
select {
case sbsc.C <- data:
// ok. could be written.
default:
// channel full. discard it
}
}
}
}
b.sbsc_mtx.Unlock()
}
func (b *Bulletin[T]) Enqueue(topic string, data T) {
b.r_mtx.Lock()
b.r.Value = data // update the value at the current position
b.r = b.r.Next() // move the current position
b.r_mtx.Unlock()
}
func (b *Bulletin[T]) Dequeue() {
b.r_mtx.Lock()
b.r_mtx.Unlock()
}
/*
func (b *Bulletin[T]) RunTask(wg *sync.WaitGroup) {
var done bool
var msg T
var ok bool
defer wg.Done()
for !done {
select {
case msg, ok = <- b.C:
if !ok { done = true }
}
}
}*/