cleaned up quite some log messages on the server side

This commit is contained in:
hyung-hwan 2024-12-03 20:28:04 +09:00
parent d167308879
commit d4f00d63f9
6 changed files with 176 additions and 159 deletions

View File

@ -1,6 +1,5 @@
package hodu
import "fmt"
import "net"
import "sync"
@ -25,32 +24,31 @@ func (cpc *ClientPeerConn) RunTask(wg *sync.WaitGroup) error {
defer wg.Done()
fmt.Printf("CONNECTION ESTABLISHED TO PEER... ABOUT TO READ DATA...\n")
for {
n, err = cpc.conn.Read(buf[:])
if err != nil {
// TODO: add proper log header
cpc.route.cts.cli.log.Write("", LOG_ERROR, "Unable to read from the client-side peer %s - %s", cpc.conn.RemoteAddr().String(), err.Error())
cpc.route.cts.cli.log.Write(cpc.route.cts.sid, LOG_ERROR,
"Failed to read from the client-side peer(%d,%d,%s,%s) - %s",
cpc.route.id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String(), err.Error())
break
}
err = cpc.route.cts.psc.Send(MakePeerDataPacket(cpc.route.id, cpc.conn_id, buf[0:n]))
if err != nil {
// TODO: add proper log header
cpc.route.cts.cli.log.Write("", LOG_ERROR, "Unable to write to server - %s", err.Error())
cpc.route.cts.cli.log.Write(cpc.route.cts.sid, LOG_ERROR,
"Failed to write peer(%d,%d,%s,%s) data to server - %s",
cpc.route.id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String(), err.Error())
break
}
}
cpc.route.cts.psc.Send(MakePeerStoppedPacket(cpc.route.id, cpc.conn_id)) // nothing much to do upon failure. no error check here
cpc.route.cts.psc.Send(MakePeerStoppedPacket(cpc.route.id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String())) // nothing much to do upon failure. no error check here
cpc.ReqStop()
cpc.route.RemoveClientPeerConn(cpc)
cpc.route.RemoveClientPeerConn(cpc)
return nil
}
func (cpc *ClientPeerConn) ReqStop() {
// TODO: because of connect delay in Start, cpc.p may not be yet ready. handle this case...
if cpc.stop_req.CompareAndSwap(false, true) {
if cpc.conn != nil {
cpc.conn.Close()

View File

@ -231,10 +231,10 @@ func (r *ClientRoute) RunTask(wg *sync.WaitGroup) {
// most useful works are triggered by ReportEvent() and done by ConnectToPeer()
defer wg.Done()
r.cts.cli.log.Write("", LOG_DEBUG, "Sending route-start for id=%d peer=%s to %s", r.id, r.peer_addr, r.cts.cfg.ServerAddr)
r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG, "Sending route_start for route(%d,%s) to %s", r.id, r.peer_addr, r.cts.remote_addr)
err = r.cts.psc.Send(MakeRouteStartPacket(r.id, r.proto, r.peer_addr))
if err != nil {
r.cts.cli.log.Write("", LOG_DEBUG, "Failed to Send route-start for id=%d peer=%s to %s", r.id, r.peer_addr, r.cts.cfg.ServerAddr)
r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG, "Failed to send route_start for route(%d,%s) to %s", r.id, r.peer_addr, r.cts.remote_addr)
goto done
}
@ -250,10 +250,9 @@ done:
r.ReqStop()
r.ptc_wg.Wait() // wait for all peer tasks are finished
r.cts.cli.log.Write("", LOG_DEBUG, "Sending route-stop for id=%d peer=%s to %s", r.id, r.peer_addr, r.cts.cfg.ServerAddr)
r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG, "Sending route_stop for route(%d,%s) to %s", r.id, r.peer_addr, r.cts.remote_addr)
r.cts.psc.Send(MakeRouteStopPacket(r.id, r.proto, r.peer_addr))
r.cts.RemoveClientRoute(r)
fmt.Printf("*** End fo Client Route Task\n")
}
func (r *ClientRoute) ReqStop() {
@ -264,7 +263,6 @@ func (r *ClientRoute) ReqStop() {
}
r.stop_chan <- true
}
fmt.Printf("*** Sent stop request to Route..\n")
}
func (r *ClientRoute) ConnectToPeer(pts_id uint32, pts_raddr string, pts_laddr string, wg *sync.WaitGroup) {
@ -312,7 +310,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id uint32, pts_raddr string, pts_laddr s
fmt.Printf("YYYYYYYY - %s\n", err.Error())
goto peer_aborted
}
fmt.Printf("STARTED NEW SERVER PEER STAK\n")
err = r.cts.psc.Send(MakePeerStartedPacket(r.id, ptc.conn_id, real_conn.RemoteAddr().String(), real_conn.LocalAddr().String()))
if err != nil {
fmt.Printf("CLOSING NEW SERVER PEER STAK - %s\n", err.Error())

View File

@ -31,9 +31,10 @@ func MakePeerStartedPacket(route_id uint32, peer_id uint32, remote_addr string,
}
}
func MakePeerStoppedPacket(route_id uint32, peer_id uint32) *Packet {
func MakePeerStoppedPacket(route_id uint32, peer_id uint32, remote_addr string, local_addr string) *Packet {
return &Packet{Kind: PACKET_KIND_PEER_STOPPED,
U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: peer_id}}}
U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: peer_id, RemoteAddrStr: remote_addr, LocalAddrStr: local_addr}},
}
}
func MakePeerAbortedPacket(route_id uint32, peer_id uint32) *Packet {

View File

@ -56,13 +56,13 @@ func (ctl *server_ctl_server_conns) ServeHTTP(w http.ResponseWriter, req *http.R
jsp = append(jsp, json_out_server_route{
Id: r.id,
ClientPeerAddr: r.ptc_addr,
ServerPeerListenAddr: r.svcaddr.String(),
ServerPeerListenAddr: r.svc_addr.String(),
})
}
js = append(js, json_out_server_conn{
Id: cts.id,
ClientAddr: cts.raddr.String(),
ServerAddr: cts.laddr.String(),
ClientAddr: cts.remote_addr.String(),
ServerAddr: cts.local_addr.String(),
Routes: jsp,
})
cts.route_mtx.Unlock()
@ -131,13 +131,13 @@ func (ctl *server_ctl_server_conns_id) ServeHTTP(w http.ResponseWriter, req *htt
jsp = append(jsp, json_out_server_route{
Id: r.id,
ClientPeerAddr: r.ptc_addr,
ServerPeerListenAddr: r.svcaddr.String(),
ServerPeerListenAddr: r.svc_addr.String(),
})
}
js = &json_out_server_conn{
Id: cts.id,
ClientAddr: cts.raddr.String(),
ServerAddr: cts.laddr.String(),
ClientAddr: cts.remote_addr.String(),
ServerAddr: cts.local_addr.String(),
Routes: jsp,
}
cts.route_mtx.Unlock()

View File

@ -1,7 +1,6 @@
package hodu
import "errors"
import "fmt"
import "io"
import "net"
import "sync"
@ -37,7 +36,6 @@ func NewServerPeerConn(r *ServerRoute, c *net.TCPConn, id uint32) *ServerPeerCon
spc.client_peer_started.Store(false)
spc.client_peer_stopped.Store(false)
spc.client_peer_eof.Store(false)
fmt.Printf("~~~~~~~~~~~~~~~ NEW SERVER PEER CONNECTION ADDED %p\n", &spc)
return &spc
}
@ -54,8 +52,9 @@ func (spc *ServerPeerConn) RunTask(wg *sync.WaitGroup) {
pss = spc.route.cts.pss
err = pss.Send(MakePeerStartedPacket(spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String()))
if err != nil {
// TODO: include route id and conn id in the error message
spc.route.cts.svr.log.Write("", LOG_ERROR, "Unable to notify peer started - %s", err.Error())
spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR,
"Failed to send peer_started event(%d,%d,%s,%s) to client - %s",
spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String(), err.Error())
goto done_without_stop
}
@ -88,27 +87,31 @@ wait_for_started:
if err != nil {
if errors.Is(err, io.EOF) {
if pss.Send(MakePeerEofPacket(spc.route.id, spc.conn_id)) != nil {
spc.route.cts.svr.log.Write("", LOG_ERROR, "Unable to report eof - %s", err.Error())
spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR,
"Failed to send peer_eof event(%d,%d,%s,%s) to client - %s",
spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String(), err.Error())
goto done
}
goto wait_for_stopped
} else {
spc.route.cts.svr.log.Write("", LOG_ERROR, "Unable to read data - %s", err.Error())
spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR,
"Failed to read data from peer(%d,%d,%s,%s) - %s",
spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String(), err.Error())
goto done
}
}
err = pss.Send(MakePeerDataPacket(spc.route.id, spc.conn_id, buf[:n]))
if err != nil {
// TODO: include route id and conn id in the error message
spc.route.cts.svr.log.Write("", LOG_ERROR, "Unable to send data - %s", err.Error())
spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR,
"Failed to send data from peer(%d,%d,%s,%s) to client - %s",
spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String(), err.Error())
goto done
}
}
wait_for_stopped:
for {
fmt.Printf ("******************* Waiting for peer Stop\n")
select {
case status = <-spc.client_peer_status_chan: // something not right... may use a different channel for closing...
goto done
@ -116,20 +119,16 @@ fmt.Printf ("******************* Waiting for peer Stop\n")
goto done
}
}
fmt.Printf ("******************* Sending peer stopped\n")
if pss.Send(MakePeerStoppedPacket(spc.route.id, spc.conn_id)) != nil {
fmt.Printf("unable to report data - %s\n", err.Error())
goto done
}
done:
if pss.Send(MakePeerStoppedPacket(spc.route.id, spc.conn_id)) != nil {
fmt.Printf("unable to report data - %s\n", err.Error())
if pss.Send(MakePeerStoppedPacket(spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String())) != nil {
spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR,
"Failed to send peer_stopped(%d,%d,%s,%s) to client - %s",
spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String(), err.Error())
// nothing much to do about the failure of sending this
}
done_without_stop:
fmt.Printf("SPC really ending..................\n")
spc.ReqStop()
spc.route.RemoveServerPeerConn(spc)
}
@ -149,42 +148,54 @@ func (spc *ServerPeerConn) ReqStop() {
}
}
func (spc *ServerPeerConn) ReportEvent(event_type PACKET_KIND, event_data []byte) error {
func (spc *ServerPeerConn) ReportEvent(event_type PACKET_KIND, event_data interface{}) error {
switch event_type {
case PACKET_KIND_PEER_STARTED:
fmt.Printf("******************* AAAAAAAAAAAAAAAAAAAaaa\n")
if spc.client_peer_started.CompareAndSwap(false, true) {
spc.client_peer_status_chan <- true
}
case PACKET_KIND_PEER_STOPPED:
fmt.Printf("******************* BBBBBBBBBBBBBBBBBBBBBBBB\n")
// this event needs to close on the server-side peer connection.
// sending false to the client_peer_status_chan isn't good enough to break
// the Recv loop in RunTask().
spc.ReqStop()
case PACKET_KIND_PEER_EOF:
fmt.Printf("******************* BBBBBBBBBBBBBBBBBBBBBBBB CLIENT PEER EOF\n")
// the client-side peer is not supposed to send data any more
if spc.client_peer_eof.CompareAndSwap(false, true) {
spc.conn.CloseWrite()
}
case PACKET_KIND_PEER_DATA:
fmt.Printf("******************* CCCCCCCCCCCCCCCCCCCCCCCccc\n")
if spc.client_peer_eof.Load() == false {
var err error
var ok bool
var data []byte
_, err = spc.conn.Write(event_data)
if err != nil {
// TODO: logging
fmt.Printf ("WARNING - failed to write data from %s to %s\n", spc.route.cts.raddr, spc.conn.RemoteAddr().String())
data, ok = event_data.([]byte)
if ok {
var err error
_, err = spc.conn.Write(data)
if err != nil {
spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR,
"Failed to write data from %s to peer(%d,%d,%s) - %s",
spc.route.cts.remote_addr, spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), err.Error())
spc.ReqStop()
}
} else {
// this must not happen.
spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR,
"internal server error - invalid data in peer_data event from %s to peer(%d,%d,%s)",
spc.route.cts.remote_addr, spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String())
spc.ReqStop()
}
} else {
// protocol error. the client must not relay more data from the client-side peer after EOF.
fmt.Printf("WARNING - broken client - redundant data from %s to %s\n", spc.route.cts.raddr, spc.conn.RemoteAddr().String())
spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR,
"internal client error - redundant data from %s to (%d,%d,%s)",
spc.route.cts.remote_addr, spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String())
spc.ReqStop()
}
default:

223
server.go
View File

@ -43,7 +43,7 @@ type Server struct {
rpc []*net.TCPListener // main listener for grpc
rpc_wg sync.WaitGroup
gs *grpc.Server
rpc_svr *grpc.Server
cts_mtx sync.Mutex
cts_map ServerConnMap
@ -58,27 +58,27 @@ type Server struct {
// connection from client.
// client connect to the server, the server accept it, and makes a tunnel request
type ServerConn struct {
svr *Server
id uint32
sid string // for logging
svr *Server
id uint32
sid string // for logging
raddr net.Addr // client address that created this structure
laddr net.Addr // local address that the client is connected to
pss *GuardedPacketStreamServer
remote_addr net.Addr // client address that created this structure
local_addr net.Addr // local address that the client is connected to
pss *GuardedPacketStreamServer
route_mtx sync.Mutex
route_map ServerRouteMap
route_wg sync.WaitGroup
route_mtx sync.Mutex
route_map ServerRouteMap
route_wg sync.WaitGroup
wg sync.WaitGroup
stop_req atomic.Bool
stop_chan chan bool
wg sync.WaitGroup
stop_req atomic.Bool
stop_chan chan bool
}
type ServerRoute struct {
cts *ServerConn
l *net.TCPListener
svcaddr *net.TCPAddr
svc_addr *net.TCPAddr // listening address
ptc_addr string
id uint32
@ -125,7 +125,7 @@ func NewServerRoute(cts *ServerConn, id uint32, proto ROUTE_PROTO, ptc_addr stri
var svcaddr *net.TCPAddr
var err error
l, svcaddr, err = cts.make_route_listener(proto)
l, svcaddr, err = cts.make_route_listener(id, proto)
if err != nil {
return nil, err
}
@ -133,7 +133,7 @@ func NewServerRoute(cts *ServerConn, id uint32, proto ROUTE_PROTO, ptc_addr stri
r.cts = cts
r.id = id
r.l = l
r.svcaddr = svcaddr
r.svc_addr = svcaddr
r.ptc_addr = ptc_addr
r.pts_limit = PTS_LIMIT
r.pts_map = make(ServerPeerConnMap)
@ -179,35 +179,33 @@ func (r *ServerRoute) RemoveServerPeerConn(pts *ServerPeerConn) {
r.pts_mtx.Lock()
delete(r.pts_map, pts.conn_id)
r.pts_mtx.Unlock()
r.cts.svr.log.Write("", LOG_DEBUG, "Removed server-side peer connection %s", pts.conn.RemoteAddr().String())
r.cts.svr.log.Write(r.cts.sid, LOG_DEBUG, "Removed server-side peer connection %s from route(%d)", pts.conn.RemoteAddr().String(), r.id)
}
func (r *ServerRoute) RunTask(wg *sync.WaitGroup) {
var err error
var conn *net.TCPConn
var pts *ServerPeerConn
var log_id string
defer wg.Done()
log_id = fmt.Sprintf("%s,%d", r.cts.raddr.String(), r.id)
for {
conn, err = r.l.AcceptTCP()
if err != nil {
if errors.Is(err, net.ErrClosed) {
r.cts.svr.log.Write(log_id, LOG_INFO, "Server-side peer listener closed")
r.cts.svr.log.Write(r.cts.sid, LOG_INFO, "Server-side peer listener closed on route(%d)", r.id)
} else {
r.cts.svr.log.Write(log_id, LOG_INFO, "Server-side peer listener error - %s", err.Error())
r.cts.svr.log.Write(r.cts.sid, LOG_INFO, "Server-side peer listener error on route(%d) - %s", r.id, err.Error())
}
break
}
pts, err = r.AddNewServerPeerConn(conn)
if err != nil {
r.cts.svr.log.Write(log_id, LOG_ERROR, "Failed to add new server-side peer %s - %s", conn.RemoteAddr().String(), err.Error())
r.cts.svr.log.Write(r.cts.sid, LOG_ERROR, "Failed to add new server-side peer %s to route(%d) - %s", r.id, conn.RemoteAddr().String(), r.id, err.Error())
conn.Close()
} else {
r.cts.svr.log.Write(log_id, LOG_DEBUG, "Added new server-side peer %s", conn.RemoteAddr().String())
r.cts.svr.log.Write(r.cts.sid, LOG_DEBUG, "Added new server-side peer %s to route(%d)", conn.RemoteAddr().String(), r.id)
r.pts_wg.Add(1)
go pts.RunTask(&r.pts_wg)
}
@ -216,14 +214,12 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) {
r.ReqStop()
r.pts_wg.Wait()
r.cts.svr.log.Write(log_id, LOG_DEBUG, "All service-side peer handlers completed")
r.cts.svr.log.Write(r.cts.sid, LOG_DEBUG, "All service-side peer handlers completed on route(%d)", r.id)
r.cts.RemoveServerRoute(r) // final phase...
}
func (r *ServerRoute) ReqStop() {
fmt.Printf("requesting to stop route taak..\n")
if r.stop_req.CompareAndSwap(false, true) {
var pts *ServerPeerConn
@ -233,10 +229,9 @@ func (r *ServerRoute) ReqStop() {
r.l.Close()
}
fmt.Printf("requiested to stopp route taak..\n")
}
func (r *ServerRoute) ReportEvent(pts_id uint32, event_type PACKET_KIND, event_data []byte) error {
func (r *ServerRoute) ReportEvent(pts_id uint32, event_type PACKET_KIND, event_data interface{}) error {
var spc *ServerPeerConn
var ok bool
@ -252,7 +247,7 @@ func (r *ServerRoute) ReportEvent(pts_id uint32, event_type PACKET_KIND, event_d
}
// ------------------------------------
func (cts *ServerConn) make_route_listener(proto ROUTE_PROTO) (*net.TCPListener, *net.TCPAddr, error) {
func (cts *ServerConn) make_route_listener(id uint32, proto ROUTE_PROTO) (*net.TCPListener, *net.TCPAddr, error) {
var l *net.TCPListener
var err error
var svcaddr *net.TCPAddr
@ -276,12 +271,11 @@ func (cts *ServerConn) make_route_listener(proto ROUTE_PROTO) (*net.TCPListener,
if err == nil {
l, err = net.ListenTCP(nw, svcaddr) // make the binding address configurable. support multiple binding addresses???
if err == nil {
fmt.Printf("listening .... on ... %d\n", port)
cts.svr.log.Write(cts.sid, LOG_DEBUG, "Route(%d) listening on %d", id, port)
return l, svcaddr, nil
}
}
// TODO: implement max retries..
tries++
if tries >= 1000 {
err = fmt.Errorf("unable to allocate port")
@ -352,7 +346,7 @@ func (cts *ServerConn) RemoveServerRouteById(route_id uint32) (*ServerRoute, err
return r, nil
}
func (cts *ServerConn) ReportEvent(route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error {
func (cts *ServerConn) ReportEvent(route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data interface{}) error {
var r *ServerRoute
var ok bool
@ -376,11 +370,11 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
for {
pkt, err = cts.pss.Recv()
if errors.Is(err, io.EOF) {
cts.svr.log.Write("", LOG_INFO, "GRPC stream closed for client %s", cts.raddr)
cts.svr.log.Write(cts.sid, LOG_INFO, "RPC stream closed for client %s", cts.remote_addr)
goto done
}
if err != nil {
cts.svr.log.Write("", LOG_ERROR, "GRPC stream error for client %s - %s", cts.raddr, err.Error())
cts.svr.log.Write(cts.sid, LOG_ERROR, "RPC stream error for client %s - %s", cts.remote_addr, err.Error())
goto done
}
@ -394,18 +388,18 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
r, err = cts.AddNewServerRoute(x.Route.RouteId, x.Route.Proto, x.Route.AddrStr)
if err != nil {
cts.svr.log.Write("", LOG_ERROR, "Failed to add server route for client %s peer %s", cts.raddr, x.Route.AddrStr)
cts.svr.log.Write(cts.sid, LOG_ERROR, "Failed to add route for client %s peer %s", cts.remote_addr, x.Route.AddrStr)
} else {
cts.svr.log.Write("", LOG_INFO, "Added server route(id=%d) for client %s peer %s to cts(id=%d)", r.id, cts.raddr, x.Route.AddrStr, cts.id)
err = cts.pss.Send(MakeRouteStartedPacket(r.id, x.Route.Proto, r.svcaddr.String()))
cts.svr.log.Write(cts.sid, LOG_INFO, "Added route(%d) for client %s peer %s to cts(%d)", r.id, cts.remote_addr, x.Route.AddrStr, cts.id)
err = cts.pss.Send(MakeRouteStartedPacket(r.id, x.Route.Proto, r.svc_addr.String()))
if err != nil {
r.ReqStop()
cts.svr.log.Write("", LOG_ERROR, "Failed to inform client %s of server route started for peer %s", cts.raddr, x.Route.AddrStr)
cts.svr.log.Write(cts.sid, LOG_ERROR, "Failed to inform client %s of route started for peer %s", cts.remote_addr, x.Route.AddrStr)
goto done
}
}
} else {
cts.svr.log.Write("", LOG_INFO, "Received invalid packet from %s", cts.raddr)
cts.svr.log.Write(cts.sid, LOG_INFO, "Received invalid packet from %s", cts.remote_addr)
// TODO: need to abort this client?
}
@ -418,19 +412,18 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
r, err = cts.RemoveServerRouteById(x.Route.RouteId)
if err != nil {
cts.svr.log.Write("", LOG_ERROR, "Failed to delete server route(id=%d) for client %s peer %s", x.Route.RouteId, cts.raddr, x.Route.AddrStr)
cts.svr.log.Write(cts.sid, LOG_ERROR, "Failed to delete route(%d) for client %s peer %s", x.Route.RouteId, cts.remote_addr, x.Route.AddrStr)
} else {
cts.svr.log.Write("", LOG_ERROR, "Deleted server route(id=%d) for client %s peer %s", x.Route.RouteId, cts.raddr, x.Route.AddrStr)
cts.svr.log.Write(cts.sid, LOG_ERROR, "Deleted route(%d) for client %s peer %s", x.Route.RouteId, cts.remote_addr, x.Route.AddrStr)
err = cts.pss.Send(MakeRouteStoppedPacket(x.Route.RouteId, x.Route.Proto))
if err != nil {
r.ReqStop()
cts.svr.log.Write("", LOG_ERROR, "Failed to inform client %s of server route(id=%d) stopped for peer %s", cts.raddr, x.Route.RouteId, x.Route.AddrStr)
cts.svr.log.Write(cts.sid, LOG_ERROR, "Failed to inform client %s of route(%d) stopped for peer %s", cts.remote_addr, x.Route.RouteId, x.Route.AddrStr)
goto done
}
}
} else {
cts.svr.log.Write("", LOG_INFO, "Received invalid packet from %s", cts.raddr)
// TODO: need to abort this client?
cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid route_stop event from %s", cts.remote_addr)
}
case PACKET_KIND_PEER_STARTED:
@ -439,15 +432,19 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
var ok bool
x, ok = pkt.U.(*Packet_Peer)
if ok {
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, nil)
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, x.Peer)
if err != nil {
// TODO:
fmt.Printf("Failed to report PEER_STARTED Event")
cts.svr.log.Write(cts.sid, LOG_ERROR,
"Failed to handle peer_started event from %s for peer(%d,%d,%s,%s) - %s",
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
} else {
// TODO:
cts.svr.log.Write(cts.sid, LOG_DEBUG,
"Handled peer_started event from %s for peer(%d,%d,%s,%s)",
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
}
} else {
// TODO
// invalid event data
cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_started event from %s", cts.remote_addr)
}
case PACKET_KIND_PEER_ABORTED:
@ -460,13 +457,17 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
if ok {
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, nil)
if err != nil {
// TODO:
fmt.Printf("Failed to report PEER_STOPPED Event")
cts.svr.log.Write(cts.sid, LOG_ERROR,
"Failed to handle peer_started event from %s for peer(%d,%d,%s,%s) - %s",
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
} else {
// TODO:
cts.svr.log.Write(cts.sid, LOG_DEBUG,
"Handled peer_stopped event from %s for peer(%d,%d,%s,%s)",
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
}
} else {
// TODO
// invalid event data
cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_stopped event from %s", cts.remote_addr)
}
case PACKET_KIND_PEER_DATA:
@ -477,19 +478,23 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
if ok {
err = cts.ReportEvent(x.Data.RouteId, x.Data.PeerId, PACKET_KIND_PEER_DATA, x.Data.Data)
if err != nil {
// TODO:
fmt.Printf("Failed to report PEER_DATA Event")
cts.svr.log.Write(cts.sid, LOG_ERROR,
"Failed to handle peer_data event from %s for peer(%d,%d) - %s",
cts.remote_addr, x.Data.RouteId, x.Data.PeerId, err.Error())
} else {
// TODO:
cts.svr.log.Write(cts.sid, LOG_DEBUG,
"Handled peer_data event from %s for peer(%d,%d)",
cts.remote_addr, x.Data.RouteId, x.Data.PeerId)
}
} else {
// TODO
// invalid event data
cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_data event from %s", cts.remote_addr)
}
}
}
done:
fmt.Printf("************ stream receiver finished....\n")
cts.svr.log.Write(cts.sid, LOG_INFO, "RPC stream receiver ended")
}
func (cts *ServerConn) RunTask(wg *sync.WaitGroup) {
@ -517,7 +522,7 @@ func (cts *ServerConn) RunTask(wg *sync.WaitGroup) {
// or continue
select {
case <-ctx.Done(): // the stream context is done
fmt.Printf("grpc server done - %s\n", ctx.Err().Error())
cts.svr.log.Write(cts.sid, LOG_INFO, "RPC stream done - %s", ctx.Err().Error())
goto done
case <- cts.stop_chan:
@ -533,10 +538,8 @@ fmt.Printf("grpc server done - %s\n", ctx.Err().Error())
}
done:
fmt.Printf("^^^^^^^^^^^^^^^^^ waiting for reoute_wg...\n")
cts.ReqStop() // just in case
cts.route_wg.Wait()
fmt.Printf("^^^^^^^^^^^^^^^^^ waited for reoute_wg...\n")
}
func (cts *ServerConn) ReqStop() {
@ -618,7 +621,6 @@ func (cc *ConnCatcher) TagConn(ctx context.Context, info *stats.ConnTagInfo) con
}
func (cc *ConnCatcher) HandleConn(ctx context.Context, cs stats.ConnStats) {
// fmt.Println(ctx.Value("user_id")) // Returns nil, can't access the value
var p *peer.Peer
var ok bool
var addr string
@ -629,44 +631,46 @@ func (cc *ConnCatcher) HandleConn(ctx context.Context, cs stats.ConnStats) {
} else {
addr = p.Addr.String()
}
/*
md,ok:=metadata.FromIncomingContext(ctx)
fmt.Printf("%+v%+v\n",md,ok)
if ok {
}*/
/*
md,ok:=metadata.FromIncomingContext(ctx)
if ok {
}*/
switch cs.(type) {
case *stats.ConnBegin:
fmt.Printf("**** client connected - [%s]\n", addr)
cc.server.log.Write("", LOG_INFO, "Client connected - %s", addr)
case *stats.ConnEnd:
fmt.Printf("**** client disconnected - [%s]\n", addr)
cc.server.log.Write("", LOG_INFO, "Client disconnected - %s", addr)
cc.server.RemoveServerConnByAddr(p.Addr)
}
}
// wrappedStream wraps around the embedded grpc.ServerStream, and intercepts the RecvMsg and
// SendMsg method call.
// ------------------------------------
type wrappedStream struct {
grpc.ServerStream
}
func (w *wrappedStream) RecvMsg(m any) error {
//fmt.Printf("Receive a message (Type: %T) at %s\n", m, time.Now().Format(time.RFC3339))
return w.ServerStream.RecvMsg(m)
func (w *wrappedStream) RecvMsg(msg interface{}) error {
return w.ServerStream.RecvMsg(msg)
}
func (w *wrappedStream) SendMsg(m any) error {
//fmt.Printf("Send a message (Type: %T) at %v\n", m, time.Now().Format(time.RFC3339))
return w.ServerStream.SendMsg(m)
func (w *wrappedStream) SendMsg(msg interface{}) error {
return w.ServerStream.SendMsg(msg)
}
func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {
return &wrappedStream{s}
}
func streamInterceptor(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
var err error
// authentication (token verification)
/*
md, ok := metadata.FromIncomingContext(ss.Context())
md, ok = metadata.FromIncomingContext(ss.Context())
if !ok {
return errMissingMetadata
}
@ -675,17 +679,20 @@ func streamInterceptor(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo,
}
*/
err := handler(srv, newWrappedStream(ss))
err = handler(srv, newWrappedStream(ss))
if err != nil {
fmt.Printf("RPC failed with error: %v\n", err)
// TODO: LOGGING
}
return err
}
func unaryInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
var v interface{}
var err error
// authentication (token verification)
/*
md, ok := metadata.FromIncomingContext(ctx)
md, ok = metadata.FromIncomingContext(ctx)
if !ok {
return nil, errMissingMetadata
}
@ -693,12 +700,14 @@ func unaryInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo, han
// return nil, errInvalidToken
}
*/
m, err := handler(ctx, req)
v, err = handler(ctx, req)
if err != nil {
fmt.Printf("RPC failed with error: %v\n", err)
//fmt.Printf("RPC failed with error: %v\n", err)
// TODO: Logging?
}
return m, err
return v, err
}
func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logger Logger, tlscfg *tls.Config) (*Server, error) {
@ -746,12 +755,12 @@ func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logg
}
gs = grpc.NewServer(grpc.Creds(creds))
*/
s.gs = grpc.NewServer(
grpc.UnaryInterceptor(unaryInterceptor),
grpc.StreamInterceptor(streamInterceptor),
s.rpc_svr = grpc.NewServer(
//grpc.UnaryInterceptor(unaryInterceptor),
//grpc.StreamInterceptor(streamInterceptor),
grpc.StatsHandler(&ConnCatcher{server: &s}),
) // TODO: have this outside the server struct?
RegisterHoduServer(s.gs, &s)
)
RegisterHoduServer(s.rpc_svr, &s)
s.ctl_prefix = "" // TODO:
@ -776,7 +785,7 @@ func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logg
return &s, nil
oops:
/* TODO: check if gs needs to be closed... */
// TODO: check if rpc_svr needs to be closed. probably not. closing the listen may be good enough
if gl != nil {
gl.Close()
}
@ -796,13 +805,13 @@ func (s *Server) run_grpc_server(idx int, wg *sync.WaitGroup) error {
l = s.rpc[idx]
// it seems to be safe to call a single grpc server on differnt listening sockets multiple times
s.log.Write("", LOG_ERROR, "Starting GRPC server on %s", l.Addr().String())
err = s.gs.Serve(l)
s.log.Write("", LOG_ERROR, "Starting RPC server on %s", l.Addr().String())
err = s.rpc_svr.Serve(l)
if err != nil {
if errors.Is(err, net.ErrClosed) {
s.log.Write("", LOG_ERROR, "GRPC server on %s closed", l.Addr().String())
s.log.Write("", LOG_ERROR, "RPC server on %s closed", l.Addr().String())
} else {
s.log.Write("", LOG_ERROR, "Error from GRPC server on %s - %s", l.Addr().String(), err.Error())
s.log.Write("", LOG_ERROR, "Error from RPC server on %s - %s", l.Addr().String(), err.Error())
}
return err
}
@ -834,13 +843,13 @@ task_loop:
s.ReqStop()
s.rpc_wg.Wait()
s.log.Write("", LOG_DEBUG, "All GRPC listeners completed")
s.log.Write("", LOG_DEBUG, "All RPC listeners completed")
s.cts_wg.Wait()
s.log.Write("", LOG_DEBUG, "All CTS handlers completed")
// stop the main grpc server after all the other tasks are finished.
s.gs.Stop()
s.rpc_svr.Stop()
}
func (s *Server) RunCtlTask(wg *sync.WaitGroup) {
@ -883,7 +892,7 @@ func (s *Server) ReqStop() {
l.Close()
}
s.cts_mtx.Lock() // TODO: this mya create dead-lock. check possibility of dead lock???
s.cts_mtx.Lock()
for _, cts = range s.cts_map {
cts.ReqStop() // request to stop connections from/to peer held in the cts structure
}
@ -901,8 +910,8 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p
cts.svr = s
cts.route_map = make(ServerRouteMap)
cts.raddr = *remote_addr
cts.laddr = *local_addr
cts.remote_addr = *remote_addr
cts.local_addr = *local_addr
cts.pss = &GuardedPacketStreamServer{Hodu_PacketStreamServer: pss}
cts.stop_req.Store(false)
@ -920,13 +929,13 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p
cts.id = id
cts.sid = fmt.Sprintf("%d", id) // id in string used for logging
_, ok = s.cts_map_by_addr[cts.raddr]
_, ok = s.cts_map_by_addr[cts.remote_addr]
if ok {
return nil, fmt.Errorf("existing client - %s", cts.raddr.String())
return nil, fmt.Errorf("existing client - %s", cts.remote_addr.String())
}
s.cts_map_by_addr[cts.raddr] = &cts
s.cts_map_by_addr[cts.remote_addr] = &cts
s.cts_map[id] = &cts;
s.log.Write("", LOG_DEBUG, "Added client connection from %s", cts.raddr.String())
s.log.Write("", LOG_DEBUG, "Added client connection from %s", cts.remote_addr.String())
return &cts, nil
}
@ -958,7 +967,7 @@ func (s *Server) RemoveServerConn(cts *ServerConn) error {
}
delete(s.cts_map, cts.id)
delete(s.cts_map_by_addr, cts.raddr)
delete(s.cts_map_by_addr, cts.remote_addr)
s.cts_mtx.Unlock()
cts.ReqStop()
@ -977,7 +986,7 @@ func (s *Server) RemoveServerConnByAddr(addr net.Addr) error {
return fmt.Errorf("non-existent connection address - %s", addr.String())
}
delete(s.cts_map, cts.id)
delete(s.cts_map_by_addr, cts.raddr)
delete(s.cts_map_by_addr, cts.remote_addr)
s.cts_mtx.Unlock()
cts.ReqStop()