started add log messages

This commit is contained in:
hyung-hwan 2024-11-21 01:11:01 +09:00
parent e018bc9cab
commit 9d7a843b4c
3 changed files with 57 additions and 32 deletions

View File

@ -824,13 +824,13 @@ bad_request:
* /servers/1 - X get server 1 details update server 1 delete server 1
* /servers/1/xxx -
*/
func (c *Client) RunCtlTask() {
func (c *Client) RunCtlTask(wg *sync.WaitGroup) {
var err error
defer c.wg.Done()
defer wg.Done()
err = c.ctl.ListenAndServe()
if err != http.ErrServerClosed {
if !errors.Is(err, http.ErrServerClosed) {
fmt.Printf ("------------http server error - %s\n", err.Error())
} else {
fmt.Printf ("********* http server ended\n")
@ -952,7 +952,7 @@ func client_main(listen_on string, server_addr string, peer_addrs []string) erro
go c.handle_os_signals()
c.wg.Add(1)
go c.RunCtlTask() // control channel task
go c.RunCtlTask(&c.wg) // control channel task
cc.server_addr = server_addr
cc.peer_addrs = peer_addrs

View File

@ -6,7 +6,6 @@ import "io"
import "os"
import "strings"
const HODU_VERSION uint32 = 0x010000
func main() {
var err error

View File

@ -5,8 +5,10 @@ import "crypto/tls"
import "errors"
import "fmt"
import "io"
import "log"
import "math/rand"
import "net"
import "net/http"
import "os"
import "os/signal"
import "sync"
@ -31,7 +33,9 @@ type Server struct {
wg sync.WaitGroup
stop_req atomic.Bool
l []*net.TCPListener // central listener
ctl *http.Server // control server
l []*net.TCPListener // main listener for grpc
l_wg sync.WaitGroup
cts_mtx sync.Mutex
@ -39,6 +43,7 @@ type Server struct {
cts_wg sync.WaitGroup
gs *grpc.Server
log Logger
UnimplementedHoduServer
}
@ -130,10 +135,8 @@ func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, err
var ok bool
var start_id uint32
fmt.Printf ("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\n")
r.pts_mtx.Lock()
defer r.pts_mtx.Unlock()
fmt.Printf ("XXXXXXXXXXXXXXXXXXXXXXXXBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\n")
if len(r.pts_map) >= r.pts_limit {
return nil, fmt.Errorf("peer-to-server connection table full")
@ -152,7 +155,6 @@ fmt.Printf ("XXXXXXXXXXXXXXXXXXXXXXXXBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\n")
}
}
fmt.Printf ("Creaing new Server Peer Conn\n")
pts = NewServerPeerConn(r, c, r.pts_last_id)
r.pts_map[pts.conn_id] = pts
r.pts_last_id++
@ -164,6 +166,7 @@ func (r *ServerRoute) RemoveServerPeerConn(pts *ServerPeerConn) {
r.pts_mtx.Lock()
delete(r.pts_map, pts.conn_id)
r.pts_mtx.Unlock()
r.cts.svr.log.Write(LOG_DEBUG, "Removed server-side peer connection %s", pts.conn.RemoteAddr().String())
}
func (r *ServerRoute) RunTask(wg *sync.WaitGroup) {
@ -174,35 +177,30 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) {
defer wg.Done()
for {
fmt.Printf ("**** Ready to Acept server side peer connection\n")
conn, err = r.l.AcceptTCP()
if err != nil {
if errors.Is(err, net.ErrClosed) {
fmt.Printf("[%s,%d] END OF TASK...[%#v] [%#v]\n", r.cts.caddr.String(), r.id, err, net.ErrClosed)
r.cts.svr.log.Write(LOG_INFO, "[%s,%d] Service-side peer listener closed\n", r.cts.caddr.String(), r.id)
} else {
fmt.Printf("[%s,%d] accept failure - %s\n", r.cts.caddr.String(), r.id, err.Error())
fmt.Printf("[%s,%d] Server-side peer listener error - %s\n", r.cts.caddr.String(), r.id, err.Error())
}
break
}
fmt.Printf ("**** Adding server peer connection server side peer connection\n")
pts, err = r.AddNewServerPeerConn(conn)
if err != nil {
// TODO: logging
fmt.Printf("YYYYYYYY - %s\n", err.Error())
r.cts.svr.log.Write(LOG_ERROR, "[%s,%d] Failed to add new server-side peer %s - %s", r.cts.caddr.String(), r.id, conn.RemoteAddr().String(), err.Error())
conn.Close()
} else {
fmt.Printf("STARTINGNEW SERVER PEER TASK\n")
r.cts.svr.log.Write(LOG_DEBUG, "[%s,%d] Added new server-side peer %s", r.cts.caddr.String(), r.id, conn.RemoteAddr().String())
r.pts_wg.Add(1)
go pts.RunTask(&r.pts_wg)
}
}
r.l.Close() // don't care about double close. it could have been closed in ReqStop
fmt.Printf ("*** wariting for all pts to finish..\n")
r.pts_wg.Wait()
fmt.Printf ("*** waited for all pts to finish..\n")
r.cts.svr.log.Write(LOG_DEBUG, "[%s,%d] All service-side peer handlers completed", r.cts.caddr.String(), r.id)
}
func (r *ServerRoute) ReqStop() {
@ -678,11 +676,11 @@ func unaryInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo, han
if err != nil {
fmt.Printf("RPC failed with error: %v\n", err)
}
fmt.Printf ("RPC OK\n");
return m, err
}
func NewServer(laddrs []string, tlscfg *tls.Config) (*Server, error) {
func NewServer(laddrs []string, logger Logger, tlscfg *tls.Config) (*Server, error) {
var s Server
var l *net.TCPListener
var laddr *net.TCPAddr
@ -694,6 +692,7 @@ func NewServer(laddrs []string, tlscfg *tls.Config) (*Server, error) {
return nil, fmt.Errorf("no or too many addresses provided")
}
s.log = logger
/* create the specified number of listeners */
s.l = make([]*net.TCPListener, 0)
for _, addr = range laddrs {
@ -749,12 +748,16 @@ func (s *Server) run_grpc_server(idx int, wg *sync.WaitGroup) error {
defer wg.Done();
l = s.l[idx]
fmt.Printf ("serving grpc on %d listener\n", idx)
// it seems to be safe to call a single grpc server on differnt listening sockets multiple times
// TODO: check if this assumption is ok
s.log.Write (LOG_ERROR, "Starting GRPC server listening on %s", l.Addr().String())
err = s.gs.Serve(l);
if err != nil {
fmt.Printf ("XXXXXXXXXXXXXXXXXXXXXXXXXxx %s\n", err.Error());
if errors.Is(err, net.ErrClosed) {
s.log.Write (LOG_ERROR, "GRPC server listening on %s closed", l.Addr().String())
} else {
s.log.Write (LOG_ERROR, "Error from GRPC server listening on %s - %s", l.Addr().String(), err.Error())
}
return err
}
return nil
@ -771,9 +774,10 @@ func (s *Server) RunTask(wg *sync.WaitGroup) {
}
s.l_wg.Wait()
fmt.Printf ("waiting for all client to server conn to complete\n")
s.log.Write(LOG_DEBUG, "All GRPC listeners completed")
s.cts_wg.Wait()
fmt.Printf ("waited for all client to server conn to complete\n")
s.log.Write(LOG_DEBUG, "All CTS handlers completed")
s.ReqStop()
// stop the main grpc server after all the other tasks are finished.
@ -782,6 +786,19 @@ fmt.Printf ("waited for all client to server conn to complete\n")
syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
}
func (s *Server) RunCtlTask(wg *sync.WaitGroup) {
var err error
defer wg.Done()
err = s.ctl.ListenAndServe()
if errors.Is(err, http.ErrServerClosed) {
fmt.Printf ("------------http server error - %s\n", err.Error())
} else {
fmt.Printf ("********* http server ended\n")
}
}
func (s *Server) ReqStop() {
if s.stop_req.CompareAndSwap(false, true) {
var l *net.TCPListener
@ -808,7 +825,6 @@ func (s *Server) AddNewClientConn(addr net.Addr, pss Hodu_PacketStreamServer) (*
cts.svr = s
cts.route_map = make(ServerRouteMap)
cts.caddr = addr
//cts.pss = &GuardedPacketStreamServer{pss: pss}
cts.pss = &GuardedPacketStreamServer{Hodu_PacketStreamServer: pss}
cts.stop_req.Store(false)
@ -821,16 +837,15 @@ func (s *Server) AddNewClientConn(addr net.Addr, pss Hodu_PacketStreamServer) (*
if ok {
return nil, fmt.Errorf("existing client - %s", addr.String())
}
s.cts_map[addr] = &cts;
fmt.Printf ("ADD total clients %d\n", len(s.cts_map));
s.log.Write(LOG_DEBUG, "Added client connection from %s", addr.String())
return &cts, nil
}
func (s *Server) RemoveClientConn(cts *ClientConn) {
s.cts_mtx.Lock()
delete(s.cts_map, cts.caddr)
fmt.Printf ("REMOVE total clients %d\n", len(s.cts_map));
s.log.Write(LOG_DEBUG, "Removed client connection from %s", cts.caddr.String())
s.cts_mtx.Unlock()
}
@ -890,10 +905,20 @@ BAMCA0gAMEUCIEKzVMF3JqjQjuM2rX7Rx8hancI5KJhwfeKu1xbyR7XaAiEA2UT7
-----END CERTIFICATE-----
`
type serverLogger struct {
log *log.Logger
}
func (log* serverLogger) Write(level LogLevel, fmt string, args ...interface{}) {
log.log.Printf(fmt, args...)
}
func server_main(laddrs []string) error {
var s *Server
var err error
var sl serverLogger
var cert tls.Certificate
cert, err = tls.X509KeyPair([]byte(serverCert), []byte(serverKey))
@ -901,7 +926,8 @@ func server_main(laddrs []string) error {
return fmt.Errorf("ERROR: failed to load key pair - %s\n", err)
}
s, err = NewServer(laddrs, &tls.Config{Certificates: []tls.Certificate{cert}})
sl.log = log.Default()
s, err = NewServer(laddrs, &sl, &tls.Config{Certificates: []tls.Certificate{cert}})
if err != nil {
return fmt.Errorf("ERROR: failed to create new server - %s", err.Error())
}