implemented an api call at /client-conns/id/routes/id/peers/id

This commit is contained in:
2024-12-01 17:20:16 +09:00
parent 74fb40d44f
commit 96442bb93a
8 changed files with 200 additions and 76 deletions

112
client.go
View File

@ -100,9 +100,12 @@ type ClientPeerConn struct {
conn_id uint32
conn *net.TCPConn
pts_laddr string // server-local addreess of the server-side peer
pts_raddr string // address of the server-side peer
pts_eof atomic.Bool
stop_chan chan bool
stop_req atomic.Bool
server_peer_eof atomic.Bool
}
type GuardedPacketStreamClient struct {
@ -144,6 +147,54 @@ func NewClientRoute(cts *ClientConn, id uint32, addr string, proto ROUTE_PROTO)
return &r
}
func (r *ClientRoute) AddNewClientPeerConn(c *net.TCPConn, pts_id uint32, pts_raddr string, pts_laddr string) (*ClientPeerConn, error) {
var ptc *ClientPeerConn
r.ptc_mtx.Lock()
defer r.ptc_mtx.Unlock()
ptc = NewClientPeerConn(r, c, pts_id, pts_raddr, pts_laddr)
r.ptc_map[ptc.conn_id] = ptc
return ptc, nil
}
func (r *ClientRoute) RemoveClientPeerConn(ptc *ClientPeerConn) error {
var c *ClientPeerConn
var ok bool
r.ptc_mtx.Lock()
c, ok = r.ptc_map[ptc.conn_id]
if !ok {
r.ptc_mtx.Unlock()
return fmt.Errorf("non-existent peer id - %d", ptc.conn_id)
}
if c != ptc {
r.ptc_mtx.Unlock()
return fmt.Errorf("non-existent peer id - %d", ptc.conn_id)
}
delete(r.ptc_map, ptc.conn_id)
r.ptc_mtx.Unlock()
ptc.ReqStop()
return nil
}
func (r *ClientRoute) FindClientPeerConnById(conn_id uint32) *ClientPeerConn {
var c *ClientPeerConn
var ok bool
r.ptc_mtx.Lock()
defer r.ptc_mtx.Unlock()
c, ok = r.ptc_map[conn_id]
if !ok {
return nil
}
return c
}
func (r *ClientRoute) RunTask(wg *sync.WaitGroup) {
var err error
@ -187,7 +238,7 @@ func (r *ClientRoute) ReqStop() {
fmt.Printf("*** Sent stop request to Route..\n")
}
func (r *ClientRoute) ConnectToPeer(pts_id uint32, wg *sync.WaitGroup) {
func (r *ClientRoute) ConnectToPeer(pts_id uint32, pts_raddr string, pts_laddr string, wg *sync.WaitGroup) {
var err error
var conn net.Conn
var real_conn *net.TCPConn
@ -225,7 +276,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id uint32, wg *sync.WaitGroup) {
goto peer_aborted
}
ptc, err = r.AddNewClientPeerConn(real_conn, pts_id)
ptc, err = r.AddNewClientPeerConn(real_conn, pts_id, pts_raddr, pts_laddr)
if err != nil {
// TODO: logging
// TODO: make send peer started failure mesage?
@ -233,7 +284,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id uint32, wg *sync.WaitGroup) {
goto peer_aborted
}
fmt.Printf("STARTED NEW SERVER PEER STAK\n")
err = r.cts.psc.Send(MakePeerStartedPacket(r.id, ptc.conn_id))
err = r.cts.psc.Send(MakePeerStartedPacket(r.id, ptc.conn_id, real_conn.RemoteAddr().String(), real_conn.LocalAddr().String()))
if err != nil {
fmt.Printf("CLOSING NEW SERVER PEER STAK - %s\n", err.Error())
goto peer_aborted
@ -302,6 +353,7 @@ func (r *ClientRoute) ReportEvent(pts_id uint32, event_type PACKET_KIND, event_d
str, ok = event_data.(string)
if !ok {
// TODO: internal error
fmt.Printf("INTERNAL ERROR. MUST NOT HAPPEN\n")
} else {
var addr *net.TCPAddr
addr, err = net.ResolveTCPAddr("tcp", str)
@ -317,8 +369,17 @@ func (r *ClientRoute) ReportEvent(pts_id uint32, event_type PACKET_KIND, event_d
case PACKET_KIND_PEER_STARTED:
fmt.Printf("GOT PEER STARTD . CONENCT TO CLIENT_SIDE PEER\n")
r.ptc_wg.Add(1)
go r.ConnectToPeer(pts_id, &r.ptc_wg)
var ok bool
var pd *PeerDesc
pd, ok = event_data.(*PeerDesc)
if !ok {
// TODO: internal error
fmt.Printf("INTERNAL ERROR. MUST NOT HAPPEN\n")
} else {
r.ptc_wg.Add(1)
go r.ConnectToPeer(pts_id, pd.RemoteAddrStr, pd.LocalAddrStr, &r.ptc_wg)
}
case PACKET_KIND_PEER_ABORTED:
fallthrough
@ -650,7 +711,7 @@ fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error())
var ok bool
x, ok = pkt.U.(*Packet_Peer)
if ok {
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, nil)
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, x.Peer)
if err != nil {
// TODO:
} else {
@ -753,19 +814,6 @@ func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type P
return r.ReportEvent(pts_id, event_type, event_data)
}
// --------------------------------------------------------------------
func (r *ClientRoute) AddNewClientPeerConn(c *net.TCPConn, pts_id uint32) (*ClientPeerConn, error) {
var ptc *ClientPeerConn
r.ptc_mtx.Lock()
defer r.ptc_mtx.Unlock()
ptc = NewClientPeerConn(r, c, pts_id)
r.ptc_map[ptc.conn_id] = ptc
return ptc, nil
}
// --------------------------------------------------------------------
@ -934,6 +982,30 @@ func (c *Client) FindClientRouteById(conn_id uint32, route_id uint32) *ClientRou
return cts.FindClientRouteById(route_id)
}
func (c *Client) FindClientPeerConnById(conn_id uint32, route_id uint32, peer_id uint32) *ClientPeerConn {
var cts *ClientConn
var r* ClientRoute
var ok bool
c.cts_mtx.Lock()
defer c.cts_mtx.Unlock()
cts, ok = c.cts_map[conn_id]
if !ok {
return nil
}
cts.route_mtx.Lock()
defer cts.route_mtx.Unlock()
r, ok = cts.route_map[route_id]
if !ok {
return nil
}
return r.FindClientPeerConnById(peer_id)
}
func (c *Client) ReqStop() {
if c.stop_req.CompareAndSwap(false, true) {
var cts *ClientConn