diff --git a/client.go b/client.go index 0f89384..0bbaab3 100644 --- a/client.go +++ b/client.go @@ -824,13 +824,13 @@ bad_request: * /servers/1 - X get server 1 details update server 1 delete server 1 * /servers/1/xxx - */ -func (c *Client) RunCtlTask() { +func (c *Client) RunCtlTask(wg *sync.WaitGroup) { var err error - defer c.wg.Done() + defer wg.Done() err = c.ctl.ListenAndServe() - if err != http.ErrServerClosed { + if !errors.Is(err, http.ErrServerClosed) { fmt.Printf ("------------http server error - %s\n", err.Error()) } else { fmt.Printf ("********* http server ended\n") @@ -952,7 +952,7 @@ func client_main(listen_on string, server_addr string, peer_addrs []string) erro go c.handle_os_signals() c.wg.Add(1) - go c.RunCtlTask() // control channel task + go c.RunCtlTask(&c.wg) // control channel task cc.server_addr = server_addr cc.peer_addrs = peer_addrs diff --git a/main.go b/main.go index 7c19ce0..dc70d8c 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,6 @@ import "io" import "os" import "strings" -const HODU_VERSION uint32 = 0x010000 func main() { var err error diff --git a/server.go b/server.go index f7bca85..cd1667d 100644 --- a/server.go +++ b/server.go @@ -5,8 +5,10 @@ import "crypto/tls" import "errors" import "fmt" import "io" +import "log" import "math/rand" import "net" +import "net/http" import "os" import "os/signal" import "sync" @@ -31,7 +33,9 @@ type Server struct { wg sync.WaitGroup stop_req atomic.Bool - l []*net.TCPListener // central listener + ctl *http.Server // control server + + l []*net.TCPListener // main listener for grpc l_wg sync.WaitGroup cts_mtx sync.Mutex @@ -39,6 +43,7 @@ type Server struct { cts_wg sync.WaitGroup gs *grpc.Server + log Logger UnimplementedHoduServer } @@ -60,7 +65,7 @@ type ClientConn struct { } type ServerRoute struct { - cts *ClientConn + cts *ClientConn l *net.TCPListener laddr *net.TCPAddr id uint32 @@ -130,10 +135,8 @@ func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, err var ok bool var start_id uint32 -fmt.Printf ("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\n") r.pts_mtx.Lock() defer r.pts_mtx.Unlock() -fmt.Printf ("XXXXXXXXXXXXXXXXXXXXXXXXBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\n") if len(r.pts_map) >= r.pts_limit { return nil, fmt.Errorf("peer-to-server connection table full") @@ -152,7 +155,6 @@ fmt.Printf ("XXXXXXXXXXXXXXXXXXXXXXXXBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\n") } } -fmt.Printf ("Creaing new Server Peer Conn\n") pts = NewServerPeerConn(r, c, r.pts_last_id) r.pts_map[pts.conn_id] = pts r.pts_last_id++ @@ -164,6 +166,7 @@ func (r *ServerRoute) RemoveServerPeerConn(pts *ServerPeerConn) { r.pts_mtx.Lock() delete(r.pts_map, pts.conn_id) r.pts_mtx.Unlock() + r.cts.svr.log.Write(LOG_DEBUG, "Removed server-side peer connection %s", pts.conn.RemoteAddr().String()) } func (r *ServerRoute) RunTask(wg *sync.WaitGroup) { @@ -174,35 +177,30 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) { defer wg.Done() for { -fmt.Printf ("**** Ready to Acept server side peer connection\n") conn, err = r.l.AcceptTCP() if err != nil { if errors.Is(err, net.ErrClosed) { - fmt.Printf("[%s,%d] END OF TASK...[%#v] [%#v]\n", r.cts.caddr.String(), r.id, err, net.ErrClosed) + r.cts.svr.log.Write(LOG_INFO, "[%s,%d] Service-side peer listener closed\n", r.cts.caddr.String(), r.id) } else { - fmt.Printf("[%s,%d] accept failure - %s\n", r.cts.caddr.String(), r.id, err.Error()) + fmt.Printf("[%s,%d] Server-side peer listener error - %s\n", r.cts.caddr.String(), r.id, err.Error()) } break } -fmt.Printf ("**** Adding server peer connection server side peer connection\n") pts, err = r.AddNewServerPeerConn(conn) if err != nil { - // TODO: logging - fmt.Printf("YYYYYYYY - %s\n", err.Error()) + r.cts.svr.log.Write(LOG_ERROR, "[%s,%d] Failed to add new server-side peer %s - %s", r.cts.caddr.String(), r.id, conn.RemoteAddr().String(), err.Error()) conn.Close() } else { -fmt.Printf("STARTINGNEW SERVER PEER TASK\n") + r.cts.svr.log.Write(LOG_DEBUG, "[%s,%d] Added new server-side peer %s", r.cts.caddr.String(), r.id, conn.RemoteAddr().String()) r.pts_wg.Add(1) go pts.RunTask(&r.pts_wg) } } - r.l.Close() // don't care about double close. it could have been closed in ReqStop -fmt.Printf ("*** wariting for all pts to finish..\n") r.pts_wg.Wait() -fmt.Printf ("*** waited for all pts to finish..\n") + r.cts.svr.log.Write(LOG_DEBUG, "[%s,%d] All service-side peer handlers completed", r.cts.caddr.String(), r.id) } func (r *ServerRoute) ReqStop() { @@ -678,11 +676,11 @@ func unaryInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo, han if err != nil { fmt.Printf("RPC failed with error: %v\n", err) } -fmt.Printf ("RPC OK\n"); + return m, err } -func NewServer(laddrs []string, tlscfg *tls.Config) (*Server, error) { +func NewServer(laddrs []string, logger Logger, tlscfg *tls.Config) (*Server, error) { var s Server var l *net.TCPListener var laddr *net.TCPAddr @@ -694,6 +692,7 @@ func NewServer(laddrs []string, tlscfg *tls.Config) (*Server, error) { return nil, fmt.Errorf("no or too many addresses provided") } + s.log = logger /* create the specified number of listeners */ s.l = make([]*net.TCPListener, 0) for _, addr = range laddrs { @@ -749,12 +748,16 @@ func (s *Server) run_grpc_server(idx int, wg *sync.WaitGroup) error { defer wg.Done(); l = s.l[idx] - fmt.Printf ("serving grpc on %d listener\n", idx) // it seems to be safe to call a single grpc server on differnt listening sockets multiple times - // TODO: check if this assumption is ok + s.log.Write (LOG_ERROR, "Starting GRPC server listening on %s", l.Addr().String()) err = s.gs.Serve(l); if err != nil { - fmt.Printf ("XXXXXXXXXXXXXXXXXXXXXXXXXxx %s\n", err.Error()); + if errors.Is(err, net.ErrClosed) { + s.log.Write (LOG_ERROR, "GRPC server listening on %s closed", l.Addr().String()) + } else { + s.log.Write (LOG_ERROR, "Error from GRPC server listening on %s - %s", l.Addr().String(), err.Error()) + } + return err } return nil @@ -771,9 +774,10 @@ func (s *Server) RunTask(wg *sync.WaitGroup) { } s.l_wg.Wait() -fmt.Printf ("waiting for all client to server conn to complete\n") + s.log.Write(LOG_DEBUG, "All GRPC listeners completed") s.cts_wg.Wait() -fmt.Printf ("waited for all client to server conn to complete\n") + s.log.Write(LOG_DEBUG, "All CTS handlers completed") + s.ReqStop() // stop the main grpc server after all the other tasks are finished. @@ -782,6 +786,19 @@ fmt.Printf ("waited for all client to server conn to complete\n") syscall.Kill(syscall.Getpid(), syscall.SIGTERM) } +func (s *Server) RunCtlTask(wg *sync.WaitGroup) { + var err error + + defer wg.Done() + + err = s.ctl.ListenAndServe() + if errors.Is(err, http.ErrServerClosed) { + fmt.Printf ("------------http server error - %s\n", err.Error()) + } else { + fmt.Printf ("********* http server ended\n") + } +} + func (s *Server) ReqStop() { if s.stop_req.CompareAndSwap(false, true) { var l *net.TCPListener @@ -808,7 +825,6 @@ func (s *Server) AddNewClientConn(addr net.Addr, pss Hodu_PacketStreamServer) (* cts.svr = s cts.route_map = make(ServerRouteMap) cts.caddr = addr - //cts.pss = &GuardedPacketStreamServer{pss: pss} cts.pss = &GuardedPacketStreamServer{Hodu_PacketStreamServer: pss} cts.stop_req.Store(false) @@ -821,16 +837,15 @@ func (s *Server) AddNewClientConn(addr net.Addr, pss Hodu_PacketStreamServer) (* if ok { return nil, fmt.Errorf("existing client - %s", addr.String()) } - s.cts_map[addr] = &cts; -fmt.Printf ("ADD total clients %d\n", len(s.cts_map)); + s.log.Write(LOG_DEBUG, "Added client connection from %s", addr.String()) return &cts, nil } func (s *Server) RemoveClientConn(cts *ClientConn) { s.cts_mtx.Lock() delete(s.cts_map, cts.caddr) -fmt.Printf ("REMOVE total clients %d\n", len(s.cts_map)); + s.log.Write(LOG_DEBUG, "Removed client connection from %s", cts.caddr.String()) s.cts_mtx.Unlock() } @@ -890,10 +905,20 @@ BAMCA0gAMEUCIEKzVMF3JqjQjuM2rX7Rx8hancI5KJhwfeKu1xbyR7XaAiEA2UT7 -----END CERTIFICATE----- ` +type serverLogger struct { + log *log.Logger +} + + +func (log* serverLogger) Write(level LogLevel, fmt string, args ...interface{}) { + log.log.Printf(fmt, args...) +} + func server_main(laddrs []string) error { var s *Server var err error + var sl serverLogger var cert tls.Certificate cert, err = tls.X509KeyPair([]byte(serverCert), []byte(serverKey)) @@ -901,7 +926,8 @@ func server_main(laddrs []string) error { return fmt.Errorf("ERROR: failed to load key pair - %s\n", err) } - s, err = NewServer(laddrs, &tls.Config{Certificates: []tls.Certificate{cert}}) + sl.log = log.Default() + s, err = NewServer(laddrs, &sl, &tls.Config{Certificates: []tls.Certificate{cert}}) if err != nil { return fmt.Errorf("ERROR: failed to create new server - %s", err.Error()) }