From 030d62af12a7f94ab0aa015e5b1b5cf71f47843f Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Mon, 10 Mar 2025 09:33:19 +0900 Subject: [PATCH] writing bulletin subscription/publish code --- bulletin.go | 73 ++++++++++++++++++++++++++++++++++++++++++++---- bulletin_test.go | 27 ++++++++++++++++++ 2 files changed, 95 insertions(+), 5 deletions(-) create mode 100644 bulletin_test.go diff --git a/bulletin.go b/bulletin.go index 4138453..25c39b2 100644 --- a/bulletin.go +++ b/bulletin.go @@ -1,21 +1,84 @@ package hodu -type Message struct { +import "container/list" +import "sync" + +type BulletinChan = chan interface{} + +type BulletinSubscription struct { + c chan interface{} + b *Bulletin + topic string + node *list.Element } -type Subscriber struct { -} +type BulletinSubscriptionList = *list.List +type BulletinSubscriptionMap map[string]BulletinSubscriptionList type Bulletin struct { + sbsc_map BulletinSubscriptionMap + sbsc_mtx sync.RWMutex } func NewBulletin() *Bulletin { - return &Bulletin{} + return &Bulletin{ + sbsc_map: make(BulletinSubscriptionMap, 0), + } } -func (b *Bulletin) Subscribe(topic string) { +func (b *Bulletin) Subscribe(topic string) *BulletinSubscription { + var sbsc BulletinSubscription + var sbsc_list BulletinSubscriptionList + var ok bool + sbsc.b = b + sbsc.c = make(chan interface{}) + sbsc.topic = topic + b.sbsc_mtx.Lock() + + sbsc_list, ok = b.sbsc_map[topic] + if !ok { + sbsc_list = list.New() + b.sbsc_map[topic] = sbsc_list + } + + + sbsc.node = sbsc_list.PushBack(&sbsc) + b.sbsc_mtx.Unlock() + return &sbsc +} + +func (b *Bulletin) Unsbsccribe(sbsc *BulletinSubscription) { + if sbsc.b == b { + var sl BulletinSubscriptionList + var ok bool + + b.sbsc_mtx.Lock() + sl, ok = b.sbsc_map[sbsc.topic] + if ok { sl.Remove(sbsc.node) } + b.sbsc_mtx.Unlock() + } } func (b *Bulletin) Publish(topic string, data interface{}) { + var sl BulletinSubscriptionList + var ok bool + + b.sbsc_mtx.Lock() + sl, ok = b.sbsc_map[topic] + if ok { + var sbsc *BulletinSubscription + var e *list.Element + for e = sl.Front(); e != nil; e = e.Next() { + sbsc = e.Value.(*BulletinSubscription) + sbsc.c <- data + } + } + b.sbsc_mtx.Unlock() +} + +func (s *BulletinSubscription) Receive() interface{} { + var x interface{} + x = <- s.c + return x } diff --git a/bulletin_test.go b/bulletin_test.go new file mode 100644 index 0000000..1dfa7fa --- /dev/null +++ b/bulletin_test.go @@ -0,0 +1,27 @@ +package hodu_test + +import "fmt" +import "hodu" +import "testing" + +func TestBulletin(t *testing.T) { + var b *hodu.Bulletin + var s1 *hodu.BulletinSubscription + var s2 *hodu.BulletinSubscription + + b = hodu.NewBulletin() + + s1 = b.Subscribe("t1") + s2 = b.Subscribe("t2") + + go func() { + fmt.Printf ("s1: %+v\n", s1.Receive()) + }() + + go func() { + fmt.Printf ("s2: %+v\n", s2.Receive()) + }() + + b.Publish("t1", "donkey") +} +