diff --git a/README.md b/README.md index bb50c11..23266ba 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,7 @@ -hodu client [ ...] - - client requests server that it grants access to the list of peers - reserver - +## normal operation +- ./hodu server --listen-on=0.0.0.0:9999 --listen-on=0.0.0.0:8888 +- ./hodu client --listen-on=127.0.0.1:7777 --server=127.0.0.1:9999 192.168.1.130:8000 ## server.json ``` @@ -16,7 +14,8 @@ hodu client [ ...] } ``` -## create a server +## client control channel + ``` curl -X POST --data-binary @server.json http://127.0.0.1:7777/servers ``` diff --git a/client.go b/client.go index 12dff9b..9e212a1 100644 --- a/client.go +++ b/client.go @@ -47,6 +47,8 @@ type Client struct { wg sync.WaitGroup stop_req atomic.Bool stop_chan chan bool + + log Logger } type ClientPeerConn struct { @@ -143,7 +145,7 @@ func NewClientRoute(cts *ServerConn, id uint32, addr *net.TCPAddr, proto ROUTE_P r.stop_req.Store(false) r.stop_chan = make(chan bool, 1) - return &r; + return &r } func (r *ClientRoute) RunTask(wg *sync.WaitGroup) { @@ -172,7 +174,7 @@ func (r *ClientRoute) ReqStop() { r.stop_chan <- true } -fmt.Printf ("*** Sent stop request to Route..\n"); +fmt.Printf ("*** Sent stop request to Route..\n") } func (r* ClientRoute) ConnectToPeer(pts_id uint32, wg *sync.WaitGroup) { @@ -195,7 +197,7 @@ func (r* ClientRoute) ConnectToPeer(pts_id uint32, wg *sync.WaitGroup) { r.ptc_mtx.Unlock() d.LocalAddr = nil // TOOD: use this if local address is specified - conn, err = d.DialContext(ctx, "tcp", r.peer_addr.String()); + conn, err = d.DialContext(ctx, "tcp", r.peer_addr.String()) r.ptc_mtx.Lock() delete(r.ptc_cancel_map, pts_id) @@ -373,7 +375,7 @@ func (cts *ServerConn) RemoveClientRoute (route_id uint32) error { cts.route_mtx.Unlock() r.ReqStop() // TODO: make this unblocking or blocking? - return nil; + return nil } func (cts *ServerConn) RemoveClientRoutes () { @@ -402,7 +404,7 @@ func (cts *ServerConn) AddClientRoutes (peer_addrs []string) error { var err error for i, v = range peer_addrs { - addr, err = net.ResolveTCPAddr(NET_TYPE_TCP, v) + addr, err = net.ResolveTCPAddr(NET_TYPE_TCP, v) // Make this interruptable if err != nil { return fmt.Errorf("unable to resovle %s - %s", v, err.Error()) } @@ -426,7 +428,7 @@ func (cts *ServerConn) AddClientRoutes (peer_addrs []string) error { } } - return nil; + return nil } func (cts *ServerConn) ReqStop() { @@ -442,7 +444,7 @@ func (cts *ServerConn) ReqStop() { // TODO: notify the server.. send term command??? cts.stop_chan <- true } -fmt.Printf ("*** Sent stop request to ServerConn..\n"); +fmt.Printf ("*** Sent stop request to ServerConn..\n") } func (cts *ServerConn) RunTask(wg *sync.WaitGroup) { @@ -534,7 +536,7 @@ fmt.Printf("[%v]\n", cts.route_map) var ok bool x, ok = pkt.U.(*Packet_Route) if ok { - fmt.Printf ("SERVER LISTENING ON %s\n", x.Route.AddrStr); + fmt.Printf ("SERVER LISTENING ON %s\n", x.Route.AddrStr) err = cts.ReportEvent(x.Route.RouteId, 0, pkt.Kind, nil) if err != nil { // TODO: @@ -712,7 +714,7 @@ func (r *ClientRoute) AddNewClientPeerConn (c *net.TCPConn, pts_id uint32) (*Cli // -------------------------------------------------------------------- -func NewClient(ctx context.Context, listen_on string, tlscfg *tls.Config) *Client { +func NewClient(ctx context.Context, listen_on string, logger Logger, tlscfg *tls.Config) *Client { var c Client c.ctx, c.ctx_cancel = context.WithCancel(ctx) @@ -721,6 +723,7 @@ func NewClient(ctx context.Context, listen_on string, tlscfg *tls.Config) *Clien c.cts_map = make(ServerConnMap) // TODO: make it configurable... c.stop_req.Store(false) c.stop_chan = make(chan bool, 1) + c.log = logger c.ctl = &http.Server{ Addr: listen_on, @@ -744,15 +747,15 @@ func (c *Client) AddNewServerConn(addr *net.TCPAddr, cfg *ClientConfig) (*Server return nil, fmt.Errorf("existing server - %s", addr.String()) } - c.cts_map[addr] = cts; -fmt.Printf ("ADD total servers %d\n", len(c.cts_map)); + c.cts_map[addr] = cts +fmt.Printf ("ADD total servers %d\n", len(c.cts_map)) return cts, nil } func (c *Client) RemoveServerConn(cts *ServerConn) { c.cts_mtx.Lock() delete(c.cts_map, cts.saddr) -fmt.Printf ("REMOVE total servers %d\n", len(c.cts_map)); +fmt.Printf ("REMOVE total servers %d\n", len(c.cts_map)) c.cts_mtx.Unlock() } @@ -770,7 +773,7 @@ func (c *Client) ReqStop() { c.stop_chan <- true c.ctx_cancel() } -fmt.Printf ("*** Sent stop request to client..\n"); +fmt.Printf ("*** Sent stop request to client..\n") } func (c *Client) ServeHTTP(w http.ResponseWriter, req *http.Request) { @@ -866,7 +869,7 @@ func (c *Client) StartService(data interface{}) { return } - saddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, cfg.ServerAddr) + saddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, cfg.ServerAddr) // TODO: make this interruptable... if err != nil { fmt.Printf("unable to resolve %s - %s", cfg.ServerAddr, err.Error()) return @@ -899,3 +902,7 @@ func (c *Client) StopServices() { func (c *Client) WaitForTermination() { c.wg.Wait() } + +func (c *Client) WriteLog (id string, level LogLevel, fmt string, args ...interface{}) { + c.log.Write(id, level, fmt, args...) +} diff --git a/cmd/main.go b/cmd/main.go index 1a18ad1..f374e7a 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -47,12 +47,10 @@ type serverLogger struct { log *log.Logger } - -func (log* serverLogger) Write(level hodu.LogLevel, fmt string, args ...interface{}) { +func (log* serverLogger) Write(id string, level hodu.LogLevel, fmt string, args ...interface{}) { log.log.Printf(fmt, args...) } - // -------------------------------------------------------------------- type signal_handler struct { svc hodu.Service @@ -63,7 +61,9 @@ func (sh *signal_handler) RunTask(wg *sync.WaitGroup) { var sigterm_chan chan os.Signal var sig os.Signal - defer wg.Done() + if wg != nil { + defer wg.Done() + } sighup_chan = make(chan os.Signal, 1) sigterm_chan = make(chan os.Signal, 1) @@ -76,22 +76,18 @@ chan_loop: select { case <-sighup_chan: // TODO: - //svc.ReqReload() + //sh.svc.ReqReload() case sig = <-sigterm_chan: - // TODO: get timeout value from config - //c.Shutdown(fmt.Sprintf("termination by signal %s", sig), 3*time.Second) sh.svc.StopServices() - //log.Debugf("termination by signal %s", sig) fmt.Printf("termination by signal %s\n", sig) break chan_loop } } - + //signal.Reset(syscall.SIGHUP) //signal.Reset(syscall.SIGTERM) signal.Stop(sighup_chan) signal.Stop(sigterm_chan) -fmt.Printf("end of signal handler\n") } func (sh *signal_handler) StartService(data interface{}) { @@ -111,11 +107,13 @@ func (sh *signal_handler) WaitForTermination() { // sh.wg.Wait() } +func (sh *signal_handler) WriteLog(id string, level hodu.LogLevel, fmt string, args ...interface{}) { + sh.svc.WriteLog(id, level, fmt, args...) +} + func server_main(laddrs []string) error { var s *hodu.Server var err error - - var sl serverLogger var cert tls.Certificate cert, err = tls.X509KeyPair([]byte(rootCert), []byte(rootKey)) @@ -123,8 +121,7 @@ func server_main(laddrs []string) error { return fmt.Errorf("ERROR: failed to load key pair - %s\n", err) } - sl.log = log.Default() - s, err = hodu.NewServer(laddrs, &sl, &tls.Config{Certificates: []tls.Certificate{cert}}) + s, err = hodu.NewServer(laddrs, &serverLogger{log: log.Default}, &tls.Config{Certificates: []tls.Certificate{cert}}) if err != nil { return fmt.Errorf("ERROR: failed to create new server - %s", err.Error()) } @@ -155,14 +152,14 @@ func client_main(listen_on string, server_addr string, peer_addrs []string) erro InsecureSkipVerify: true, } - c = hodu.NewClient(context.Background(), listen_on, tlscfg) + c = hodu.NewClient(context.Background(), listen_on, &serverLogger{log: log.Default}, tlscfg) cc.ServerAddr = server_addr cc.PeerAddrs = peer_addrs c.StartService(&cc) - c.StartCtlService() - c.StartExtService(&signal_handler{svc:c}, nil) + c.StartCtlService() // control channel + c.StartExtService(&signal_handler{svc:c}, nil) // signal handler task c.WaitForTermination() return nil diff --git a/hodu.go b/hodu.go index 14b47c3..72318bb 100644 --- a/hodu.go +++ b/hodu.go @@ -14,14 +14,13 @@ const ( ) type Logger interface { - Write (level LogLevel, fmt string, args ...interface{}) + Write (id string, level LogLevel, fmt string, args ...interface{}) } type Service interface { - RunTask (wg *sync.WaitGroup) // blocking. run the actual task loop + RunTask (wg *sync.WaitGroup) // blocking. run the actual task loop. it must call wg.Done() upon exit from itself. StartService(data interface{}) // non-blocking. spin up a service. it may be invokded multiple times for multiple instances StopServices() // non-blocking. send stop request to all services spun up WaitForTermination() // blocking. must wait until all services are stopped + WriteLog(id string, level LogLevel, fmt string, args ...interface{}) } - -type ExtTask func(svc Service, wg *sync.WaitGroup) diff --git a/s-peer.go b/s-peer.go index e6d8363..8c3cf5c 100644 --- a/s-peer.go +++ b/s-peer.go @@ -102,7 +102,7 @@ wait_for_started: if err != nil { // TODO: include route id and conn id in the error message fmt.Printf("unable to send data - %s\n", err.Error()) - goto done; + goto done } } diff --git a/server.go b/server.go index 8e6212d..6e73229 100644 --- a/server.go +++ b/server.go @@ -110,7 +110,7 @@ func NewServerRoute(cts *ClientConn, id uint32, proto ROUTE_PROTO) (*ServerRoute var laddr *net.TCPAddr var err error - l, laddr, err = cts.make_route_listener(proto); + l, laddr, err = cts.make_route_listener(proto) if err != nil { return nil, err } @@ -124,7 +124,7 @@ func NewServerRoute(cts *ClientConn, id uint32, proto ROUTE_PROTO) (*ServerRoute r.pts_last_id = 0 r.stop_req.Store(false) - return &r, nil; + return &r, nil } func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, error) { @@ -163,7 +163,7 @@ func (r *ServerRoute) RemoveServerPeerConn(pts *ServerPeerConn) { r.pts_mtx.Lock() delete(r.pts_map, pts.conn_id) r.pts_mtx.Unlock() - r.cts.svr.log.Write(LOG_DEBUG, "Removed server-side peer connection %s", pts.conn.RemoteAddr().String()) + r.cts.svr.log.Write("", LOG_DEBUG, "Removed server-side peer connection %s", pts.conn.RemoteAddr().String()) } func (r *ServerRoute) RunTask(wg *sync.WaitGroup) { @@ -177,7 +177,7 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) { conn, err = r.l.AcceptTCP() if err != nil { if errors.Is(err, net.ErrClosed) { - r.cts.svr.log.Write(LOG_INFO, "[%s,%d] Service-side peer listener closed\n", r.cts.caddr.String(), r.id) + r.cts.svr.log.Write("", LOG_INFO, "[%s,%d] Service-side peer listener closed\n", r.cts.caddr.String(), r.id) } else { fmt.Printf("[%s,%d] Server-side peer listener error - %s\n", r.cts.caddr.String(), r.id, err.Error()) } @@ -186,10 +186,10 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) { pts, err = r.AddNewServerPeerConn(conn) if err != nil { - r.cts.svr.log.Write(LOG_ERROR, "[%s,%d] Failed to add new server-side peer %s - %s", r.cts.caddr.String(), r.id, conn.RemoteAddr().String(), err.Error()) + r.cts.svr.log.Write("", LOG_ERROR, "[%s,%d] Failed to add new server-side peer %s - %s", r.cts.caddr.String(), r.id, conn.RemoteAddr().String(), err.Error()) conn.Close() } else { - r.cts.svr.log.Write(LOG_DEBUG, "[%s,%d] Added new server-side peer %s", r.cts.caddr.String(), r.id, conn.RemoteAddr().String()) + r.cts.svr.log.Write("", LOG_DEBUG, "[%s,%d] Added new server-side peer %s", r.cts.caddr.String(), r.id, conn.RemoteAddr().String()) r.pts_wg.Add(1) go pts.RunTask(&r.pts_wg) } @@ -197,7 +197,7 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) { r.l.Close() // don't care about double close. it could have been closed in ReqStop r.pts_wg.Wait() - r.cts.svr.log.Write(LOG_DEBUG, "[%s,%d] All service-side peer handlers completed", r.cts.caddr.String(), r.id) + r.cts.svr.log.Write("", LOG_DEBUG, "[%s,%d] All service-side peer handlers completed", r.cts.caddr.String(), r.id) } func (r *ServerRoute) ReqStop() { @@ -210,7 +210,7 @@ func (r *ServerRoute) ReqStop() { pts.ReqStop() } - r.l.Close(); + r.l.Close() } fmt.Printf ("requiested to stopp route taak..\n") } @@ -222,10 +222,10 @@ func (r *ServerRoute) ReportEvent (pts_id uint32, event_type PACKET_KIND, event_ r.pts_mtx.Lock() spc, ok = r.pts_map[pts_id] if !ok { - r.pts_mtx.Unlock(); + r.pts_mtx.Unlock() return fmt.Errorf("non-existent peer id - %u", pts_id) } - r.pts_mtx.Unlock(); + r.pts_mtx.Unlock() return spc.ReportEvent(event_type, event_data) } @@ -285,7 +285,7 @@ func (cts *ClientConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO) (*S cts.route_mtx.Unlock() return nil, err } - cts.route_map[route_id] = r; + cts.route_map[route_id] = r cts.route_mtx.Unlock() cts.route_wg.Add(1) @@ -307,7 +307,7 @@ func (cts *ClientConn) RemoveServerRoute (route_id uint32) error { cts.route_mtx.Unlock() r.ReqStop() // TODO: make this unblocking or blocking? - return nil; + return nil } func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error { @@ -372,7 +372,7 @@ fmt.Printf ("grpd stream ended\n") var ok bool x, ok = pkt.U.(*Packet_Route) if ok { - err = cts.RemoveServerRoute(x.Route.RouteId); // TODO: this must be unblocking. otherwide, other route_map will get blocked... + err = cts.RemoveServerRoute(x.Route.RouteId) // TODO: this must be unblocking. otherwide, other route_map will get blocked... if err != nil { // TODO: Send Error Response... } else { @@ -557,7 +557,7 @@ func (cc *ConnCatcher) HandleRPC(ctx context.Context, s stats.RPCStats) { } func (cc *ConnCatcher) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { - return ctx; + return ctx //return context.TODO() } @@ -583,7 +583,7 @@ if ok { fmt.Printf("**** client connected - [%s]\n", addr) case *stats.ConnEnd: fmt.Printf("**** client disconnected - [%s]\n", addr) - cc.server.RemoveClientConnByAddr(p.Addr); + cc.server.RemoveClientConnByAddr(p.Addr) } } @@ -661,7 +661,7 @@ func NewServer(laddrs []string, logger Logger, tlscfg *tls.Config) (*Server, err /* create the specified number of listeners */ s.l = make([]*net.TCPListener, 0) for _, addr = range laddrs { - laddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, addr) + laddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, addr) // Make this interruptable??? if err != nil { goto oops } @@ -711,17 +711,17 @@ func (s *Server) run_grpc_server(idx int, wg *sync.WaitGroup) error { var l *net.TCPListener var err error - defer wg.Done(); + defer wg.Done() l = s.l[idx] // it seems to be safe to call a single grpc server on differnt listening sockets multiple times - s.log.Write (LOG_ERROR, "Starting GRPC server listening on %s", l.Addr().String()) - err = s.gs.Serve(l); + s.log.Write ("", LOG_ERROR, "Starting GRPC server listening on %s", l.Addr().String()) + err = s.gs.Serve(l) if err != nil { if errors.Is(err, net.ErrClosed) { - s.log.Write (LOG_ERROR, "GRPC server listening on %s closed", l.Addr().String()) + s.log.Write ("", LOG_ERROR, "GRPC server listening on %s closed", l.Addr().String()) } else { - s.log.Write (LOG_ERROR, "Error from GRPC server listening on %s - %s", l.Addr().String(), err.Error()) + s.log.Write ("", LOG_ERROR, "Error from GRPC server listening on %s - %s", l.Addr().String(), err.Error()) } return err } @@ -740,9 +740,9 @@ func (s *Server) RunTask(wg *sync.WaitGroup) { } s.l_wg.Wait() - s.log.Write(LOG_DEBUG, "All GRPC listeners completed") + s.log.Write("", LOG_DEBUG, "", "All GRPC listeners completed") s.cts_wg.Wait() - s.log.Write(LOG_DEBUG, "All CTS handlers completed") + s.log.Write("", LOG_DEBUG, "", "All CTS handlers completed") s.ReqStop() @@ -801,15 +801,15 @@ func (s *Server) AddNewClientConn(addr net.Addr, pss Hodu_PacketStreamServer) (* if ok { return nil, fmt.Errorf("existing client - %s", addr.String()) } - s.cts_map[addr] = &cts; - s.log.Write(LOG_DEBUG, "Added client connection from %s", addr.String()) + s.cts_map[addr] = &cts + s.log.Write("", LOG_DEBUG, "Added client connection from %s", addr.String()) return &cts, nil } func (s *Server) RemoveClientConn(cts *ClientConn) { s.cts_mtx.Lock() delete(s.cts_map, cts.caddr) - s.log.Write(LOG_DEBUG, "Removed client connection from %s", cts.caddr.String()) + s.log.Write("", LOG_DEBUG, "Removed client connection from %s", cts.caddr.String()) s.cts_mtx.Unlock() } @@ -864,3 +864,7 @@ func (s *Server) StopServices() { func (s *Server) WaitForTermination() { s.wg.Wait() } + +func (s *Server) WriteLog (id string, level LogLevel, fmt string, args ...interface{}) { + s.log.Write(id, level, fmt, args...) +}