added the GetSeed() call and handled more events

This commit is contained in:
hyung-hwan 2024-11-20 00:31:14 +09:00
parent fa336bfb07
commit 188900c1ae
7 changed files with 260 additions and 93 deletions

View File

@ -3,3 +3,20 @@ hodu client <servee> <peer1> [<peer2> ...]
client requests server that it grants access to the list of peers
reserver
## server.json
```
{
"server-addr": "127.0.0.1:9999",
"peer-addrs": [
"127.0.0.1:22",
"127.0.0.1:80"
]
}
```
## create a server
```
curl -X POST --data-binary @server.json http://127.0.0.1:7777/servers
```

View File

@ -11,9 +11,7 @@ func NewClientPeerConn(r *ClientRoute, c net.Conn, id uint32) (*ClientPeerConn)
cpc.conn = c
cpc.conn_id = id
cpc.stop_req.Store(false)
//cpc.server_peer_status_chan = make(chan bool, 16)
//cpc.server_peer_opened_received.Store(false)
//cpc.server_peer_closed_received.Store(false)
cpc.server_peer_eof.Store(false)
return &cpc
}
@ -57,3 +55,16 @@ func (cpc *ClientPeerConn) ReqStop() {
}
}
}
func (cpc* ClientPeerConn) CloseWrite() {
if cpc.server_peer_eof.CompareAndSwap(false, true) {
if cpc.conn != nil {
var conn *net.TCPConn
var ok bool
conn, ok = cpc.conn.(*net.TCPConn)
if ok {
conn.CloseWrite()
}
}
}
}

121
client.go
View File

@ -59,26 +59,30 @@ type ClientPeerConn struct {
remot_conn_id uint32
addr string // peer address
stop_req atomic.Bool
stop_chan chan bool
stop_req atomic.Bool
server_peer_eof atomic.Bool
}
// client connection to server
type ServerConn struct {
cli *Client
cfg *ClientConfig
saddr *net.TCPAddr // server address that is connected to
cli *Client
cfg *ClientConfig
saddr *net.TCPAddr // server address that is connected to
conn *grpc.ClientConn // grpc connection to the server
hdc HoduClient
psc *GuardedPacketStreamClient // guarded grpc stream
psc_mtx sync.Mutex
conn *grpc.ClientConn // grpc connection to the server
hdc HoduClient
psc *GuardedPacketStreamClient // guarded grpc stream
route_mtx sync.Mutex
s_seed Seed
c_seed Seed
route_mtx sync.Mutex
route_map ClientRouteMap
route_wg sync.WaitGroup
route_wg sync.WaitGroup
stop_req atomic.Bool
stop_req atomic.Bool
stop_chan chan bool
}
@ -194,7 +198,7 @@ func (r* ClientRoute) ConnectToPeer(pts_id uint32) {
return
}
ptc, err = r.AddNewClientPeerConn(conn)
ptc, err = r.AddNewClientPeerConn(conn, pts_id)
if err != nil {
// TODO: logging
// TODO: make send peer started failure mesage?
@ -214,14 +218,60 @@ func (r* ClientRoute) ConnectToPeer(pts_id uint32) {
go ptc.RunTask(&r.ptc_wg)
}
func (r* ClientRoute) DisconnectFromPeer(pts_id uint32) error {
var ptc *ClientPeerConn
var ok bool
r.ptc_mtx.Lock()
ptc, ok = r.ptc_map[pts_id]
if !ok {
r.ptc_mtx.Unlock()
return fmt.Errorf("non-existent connection id - %u", pts_id)
}
r.ptc_mtx.Unlock()
ptc.ReqStop()
return nil
}
func (r* ClientRoute) CloseWriteToPeer(pts_id uint32) error {
var ptc *ClientPeerConn
var ok bool
r.ptc_mtx.Lock()
ptc, ok = r.ptc_map[pts_id]
if !ok {
r.ptc_mtx.Unlock()
return fmt.Errorf("non-existent connection id - %u", pts_id)
}
r.ptc_mtx.Unlock()
ptc.CloseWrite()
return nil
}
func (r* ClientRoute) ReportEvent (pts_id uint32, event_type PACKET_KIND, event_data []byte) error {
var err error
switch event_type {
case PACKET_KIND_PEER_STARTED:
fmt.Printf ("GOT PEER STARTD . CONENCT TO CLIENT_SIDE PEER\n")
r.ConnectToPeer(pts_id)
// TODO:
// case PACKET_KIND_PEER_STOPPED:
// r.DisconnectFromPeer(pts_id)
case PACKET_KIND_PEER_STOPPED:
fmt.Printf ("GOT PEER STOPPED . DISCONNECTION FROM CLIENT_SIDE PEER\n")
err = r.DisconnectFromPeer(pts_id)
if err != nil {
// TODO:
}
case PACKET_KIND_PEER_EOF:
fmt.Printf ("GOT PEER EOF. REMEMBER EOF\n")
err = r.CloseWriteToPeer(pts_id)
if err != nil {
// TODO:
}
case PACKET_KIND_PEER_DATA:
var ptc *ClientPeerConn
@ -367,6 +417,8 @@ func (cts *ServerConn) RunTask(wg *sync.WaitGroup) {
var hdc HoduClient
var psc PacketStreamClient
var slpctx context.Context
var c_seed Seed
var s_seed *Seed
var err error
defer wg.Done() // arrange to call at the end of this function
@ -383,6 +435,19 @@ fmt.Printf ("Connecting GRPC to [%s]\n", cts.saddr.String())
}
hdc = NewHoduClient(conn)
// seed exchange is for furture expansion of the protocol
// there is nothing to do much about it for now.
c_seed.Version = HODU_VERSION
c_seed.Flags = 0
s_seed, err = hdc.GetSeed(cts.cli.ctx, &c_seed)
if err != nil {
fmt.Printf("ERROR: unable to get seed from %s - %s\n", cts.cfg.server_addr, err.Error())
goto reconnect_to_server
}
cts.s_seed = *s_seed
cts.c_seed = c_seed
psc, err = hdc.PacketStream(cts.cli.ctx)
if err != nil {
fmt.Printf ("ERROR: unable to get grpc packet stream - %s\n", err.Error())
@ -494,6 +559,21 @@ fmt.Printf("[%v]\n", cts.route_map)
// TODO
}
case PACKET_KIND_PEER_EOF:
var x *Packet_Peer
var ok bool
x, ok = pkt.U.(*Packet_Peer)
if ok {
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_EOF, nil)
if err != nil {
// TODO:
} else {
// TODO:
}
} else {
// TODO
}
case PACKET_KIND_PEER_DATA:
// the connection from the client to a peer has been established
fmt.Printf ("**** GOT PEER DATA\n")
@ -561,14 +641,15 @@ func (cts *ServerConn) ReportEvent (route_id uint32, pts_id uint32, event_type P
}
// --------------------------------------------------------------------
func (r *ClientRoute) AddNewClientPeerConn (c net.Conn) (*ClientPeerConn, error) {
func (r *ClientRoute) AddNewClientPeerConn (c net.Conn, pts_id uint32) (*ClientPeerConn, error) {
var ptc *ClientPeerConn
var ok bool
var start_id uint32
//var ok bool
//var start_id uint32
r.ptc_mtx.Lock()
defer r.ptc_mtx.Unlock()
/*
if len(r.ptc_map) >= r.ptc_limit {
return nil, fmt.Errorf("peer-to-client connection table full")
}
@ -587,8 +668,10 @@ func (r *ClientRoute) AddNewClientPeerConn (c net.Conn) (*ClientPeerConn, error)
}
ptc = NewClientPeerConn(r, c, r.ptc_last_id)
*/
ptc = NewClientPeerConn(r, c, pts_id)
r.ptc_map[ptc.conn_id] = ptc
r.ptc_last_id++
//r.ptc_last_id++
return ptc, nil
}

View File

@ -5,9 +5,15 @@ option go_package = "./main";
//package hodu; // no idea if it's still important...
service Hodu {
rpc GetSeed (Seed) returns (Seed) {}
rpc PacketStream (stream Packet) returns (stream Packet) {}
}
message Seed {
uint32 Version = 1;
uint64 Flags = 2;
}
enum ROUTE_PROTO {
TCP = 0;
TCP4 = 1;

View File

@ -6,6 +6,8 @@ import "io"
import "os"
import "strings"
const HODU_VERSION uint32 = 0x010000
func main() {
var err error
var flgs *flag.FlagSet

113
s-peer.go
View File

@ -13,11 +13,14 @@ type ServerPeerConn struct {
conn_id uint32
cts *ClientConn
conn *net.TCPConn
stop_req atomic.Bool
stop_chan chan bool
stop_req atomic.Bool
client_peer_status_chan chan bool
client_peer_opened_received atomic.Bool
client_peer_closed_received atomic.Bool
client_peer_started atomic.Bool
client_peer_stopped atomic.Bool
client_peer_eof atomic.Bool
}
func NewServerPeerConn(r *ServerRoute, c *net.TCPConn, id uint32) (*ServerPeerConn) {
@ -26,12 +29,15 @@ func NewServerPeerConn(r *ServerRoute, c *net.TCPConn, id uint32) (*ServerPeerCo
spc.route = r
spc.conn = c
spc.conn_id = id
spc.stop_req.Store(false)
spc.stop_chan = make(chan bool, 1)
spc.client_peer_status_chan = make(chan bool, 16)
spc.client_peer_opened_received.Store(false)
spc.client_peer_closed_received.Store(false)
spc.stop_chan = make(chan bool, 8)
spc.stop_req.Store(false)
spc.client_peer_status_chan = make(chan bool, 8)
spc.client_peer_started.Store(false)
spc.client_peer_stopped.Store(false)
spc.client_peer_eof.Store(false)
fmt.Printf ("~~~~~~~~~~~~~~~ NEW SERVER PEER CONNECTION ADDED %p\n", &spc)
return &spc
}
@ -50,7 +56,7 @@ func (spc *ServerPeerConn) RunTask(wg *sync.WaitGroup) {
if err != nil {
// TODO: include route id and conn id in the error message
fmt.Printf("unable to send start-pts - %s\n", err.Error())
goto done
goto done_without_stop
}
tmr = time.NewTimer(2 * time.Second) // TODO: make this configurable...
@ -80,15 +86,16 @@ wait_for_started:
for {
n, err = spc.conn.Read(buf[:])
if err != nil {
if !errors.Is(err, io.EOF) {
if errors.Is(err, io.EOF) {
if pss.Send(MakePeerEofPacket(spc.route.id, spc.conn_id)) != nil {
fmt.Printf("unable to report data - %s\n", err.Error())
goto done
}
goto wait_for_stopped
} else {
fmt.Printf("read error - %s\n", err.Error())
goto done
}
if pss.Send(MakePeerStoppedPacket(spc.route.id, spc.conn_id)) != nil {
fmt.Printf("unable to report data - %s\n", err.Error())
goto done
}
goto wait_for_stopped
}
err = pss.Send(MakePeerDataPacket(spc.route.id, spc.conn_id, buf[:n]))
@ -100,41 +107,45 @@ wait_for_started:
}
wait_for_stopped:
//if spc.client_peer_open {
for {
select {
case status = <- spc.client_peer_status_chan: // something not right... may use a different channel for closing...
goto done
case <- spc.stop_chan:
goto done
}
for {
fmt.Printf ("******************* Waiting for peer Stop\n")
select {
case status = <- spc.client_peer_status_chan: // something not right... may use a different channel for closing...
goto done
case <- spc.stop_chan:
goto done
}
//}
}
fmt.Printf ("******************* Sending peer stopped\n")
if pss.Send(MakePeerStoppedPacket(spc.route.id, spc.conn_id)) != nil {
fmt.Printf("unable to report data - %s\n", err.Error())
goto done
}
done:
// TODO: inform the client to close peer connection..
if pss.Send(MakePeerStoppedPacket(spc.route.id, spc.conn_id)) != nil {
fmt.Printf("unable to report data - %s\n", err.Error())
// nothing much to do about the failure of sending this
}
done_without_stop:
fmt.Printf("SPC really ending..................\n")
spc.ReqStop()
spc.route.RemoveServerPeerConn(spc)
//spc.cts.wg.Done()
}
func (spc *ServerPeerConn) ReqStop() {
if spc.stop_req.CompareAndSwap(false, true) {
var pss *GuardedPacketStreamServer
var err error
pss = spc.route.cts.pss
spc.stop_chan <- true
if spc.client_peer_opened_received.CompareAndSwap(false, true) {
if spc.client_peer_started.CompareAndSwap(false, true) {
spc.client_peer_status_chan <- false
}
spc.conn.Close()
err = pss.Send(MakePeerStoppedPacket(spc.route.id, spc.conn_id))
if err != nil {
// TODO: print warning
fmt.Printf ("WARNING - failed to report event to %s - %s\n", spc.route.cts.caddr, err.Error())
if spc.client_peer_stopped.CompareAndSwap(false, true) {
spc.client_peer_status_chan <- false
}
spc.conn.Close() // to abort the main Recv() loop
}
}
@ -143,28 +154,37 @@ func (spc *ServerPeerConn) ReportEvent (event_type PACKET_KIND, event_data []byt
switch event_type {
case PACKET_KIND_PEER_STARTED:
fmt.Printf("******************* AAAAAAAAAAAAAAAAAAAaaa\n")
if spc.client_peer_opened_received.CompareAndSwap(false, true) {
if spc.client_peer_started.CompareAndSwap(false, true) {
spc.client_peer_status_chan <- true
}
case PACKET_KIND_PEER_STOPPED:
fmt.Printf("******************* BBBBBBBBBBBBBBBBBBBBBBBB\n")
//if spc.client_peer_closed_received.CompareAndSwap(false, true) {
// spc.client_peer_status_chan <- false
//}
// this event needs to close on the server-side peer connection.
// sending false to the client_peer_status_chan isn't good enough to break
// the Recv loop in RunTask().
spc.ReqStop()
case PACKET_KIND_PEER_EOF:
fmt.Printf("******************* BBBBBBBBBBBBBBBBBBBBBBBB CLIENT PEER EOF\n")
// the client-side peer is not supposed to send data any more
if spc.client_peer_eof.CompareAndSwap(false, true) {
spc.conn.CloseWrite()
}
case PACKET_KIND_PEER_DATA:
fmt.Printf("******************* CCCCCCCCCCCCCCCCCCCCCCCccc\n")
var err error
if spc.client_peer_eof.Load() == false {
var err error
_, err = spc.conn.Write(event_data)
if err != nil {
// TODO: logging
fmt.Printf ("WARNING - failed to write data from %s to %s\n", spc.route.cts.caddr, spc.conn.RemoteAddr().String())
_, err = spc.conn.Write(event_data)
if err != nil {
// TODO: logging
fmt.Printf ("WARNING - failed to write data from %s to %s\n", spc.route.cts.caddr, spc.conn.RemoteAddr().String())
}
} else {
// protocol error. the client must not relay more data from the client-side peer after EOF.
fmt.Printf ("WARNING - broken client - redundant data from %s to %s\n", spc.route.cts.caddr, spc.conn.RemoteAddr().String())
}
default:
@ -173,6 +193,3 @@ fmt.Printf("******************* CCCCCCCCCCCCCCCCCCCCCCCccc\n")
}
return nil
}

View File

@ -12,7 +12,7 @@ import "os/signal"
import "sync"
import "sync/atomic"
import "syscall"
import "time"
//import "time"
import "google.golang.org/grpc"
//import "google.golang.org/grpc/metadata"
@ -39,38 +39,38 @@ type Server struct {
cts_wg sync.WaitGroup
gs *grpc.Server
UnimplementedHoduServer
}
// client connection to server.
// client connect to the server, the server accept it, and makes a tunnel request
type ClientConn struct {
svr *Server
caddr net.Addr // client address that created this structure
pss *GuardedPacketStreamServer
svr *Server
caddr net.Addr // client address that created this structure
pss *GuardedPacketStreamServer
cw_mtx sync.Mutex
route_mtx sync.Mutex
route_map ServerRouteMap
route_wg sync.WaitGroup
wg sync.WaitGroup
stop_req atomic.Bool
stop_chan chan bool
wg sync.WaitGroup
stop_req atomic.Bool
stop_chan chan bool
}
type ServerRoute struct {
cts *ClientConn
l *net.TCPListener
laddr *net.TCPAddr
id uint32
l *net.TCPListener
laddr *net.TCPAddr
id uint32
pts_mtx sync.Mutex
pts_map ServerPeerConnMap
pts_limit int
pts_last_id uint32
pts_wg sync.WaitGroup
stop_req atomic.Bool
pts_wg sync.WaitGroup
stop_req atomic.Bool
}
type GuardedPacketStreamServer struct {
@ -330,11 +330,13 @@ func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type P
return r.ReportEvent(pts_id, event_type, event_data)
}
func (cts *ClientConn) receive_from_stream () {
func (cts *ClientConn) receive_from_stream(wg *sync.WaitGroup) {
var pkt *Packet
var err error
//for {
defer wg.Done()
for {
pkt, err = cts.pss.Recv()
if errors.Is(err, io.EOF) {
// return will close stream from server side
@ -439,7 +441,7 @@ fmt.Printf ("grpd stream ended\n")
// TODO
}
}
//}
}
done:
fmt.Printf ("************ stream receiver finished....\n")
@ -451,11 +453,19 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
defer wg.Done()
strm = cts.pss
ctx = strm.Context()
//go cts.receive_from_stream()
// it looks like the only proper way to interrupt the blocking Recv
// call on the grpc streaming server is exit from the service handler
// which is this function invoked from PacketStream().
// there is no cancel function or whatever that can interrupt it.
// so start the Recv() loop in a separte goroutine and let this
// function be the channel waiter only.
// increment on the wait group is for the caller to wait for
// these detached goroutines to finish.
wg.Add(1)
go cts.receive_from_stream(wg)
for {
// exit if context is done
@ -468,11 +478,10 @@ fmt.Printf("grpd server done - %s\n", ctx.Err().Error())
case <- cts.stop_chan:
goto done
default:
//default:
// no other case is ready.
// without the default case, the select construct would block
}
cts.receive_from_stream()
}
done:
@ -528,6 +537,22 @@ chan_loop:
// --------------------------------------------------------------------
func (s *Server) GetSeed (ctx context.Context, c_seed *Seed) (*Seed, error) {
var s_seed Seed
// seed exchange is for furture expansion of the protocol
// there is nothing to do much about it for now.
s_seed.Version = HODU_VERSION
s_seed.Flags = 0
// we create no ClientConn structure associated with the connection
// at this phase for the server. it doesn't track the client version and
// features. we delegate protocol selection solely to the client.
return &s_seed, nil
}
func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error {
var ctx context.Context
var p *peer.Peer
@ -604,12 +629,12 @@ type wrappedStream struct {
}
func (w *wrappedStream) RecvMsg(m any) error {
fmt.Printf("Receive a message (Type: %T) at %s\n", m, time.Now().Format(time.RFC3339))
//fmt.Printf("Receive a message (Type: %T) at %s\n", m, time.Now().Format(time.RFC3339))
return w.ServerStream.RecvMsg(m)
}
func (w *wrappedStream) SendMsg(m any) error {
fmt.Printf("Send a message (Type: %T) at %v\n", m, time.Now().Format(time.RFC3339))
//fmt.Printf("Send a message (Type: %T) at %v\n", m, time.Now().Format(time.RFC3339))
return w.ServerStream.SendMsg(m)
}
@ -744,8 +769,14 @@ func (s *Server) RunTask(wg *sync.WaitGroup) {
}
s.l_wg.Wait()
fmt.Printf ("waiting for all client to server conn to complete\n")
s.cts_wg.Wait()
fmt.Printf ("waited for all client to server conn to complete\n")
s.ReqStop()
// stop the main grpc server after all the other tasks are finished.
s.gs.Stop()
syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
}
@ -755,7 +786,7 @@ func (s *Server) ReqStop() {
var cts *ClientConn
//s.gs.GracefulStop()
s.gs.Stop()
//s.gs.Stop()
for _, l = range s.l {
l.Close()
}