diff --git a/Makefile b/Makefile index 1b27d3e..2fbb7df 100644 --- a/Makefile +++ b/Makefile @@ -1,13 +1,14 @@ SRCS=\ - c-peer.go \ client.go \ + client-ctl.go \ + client-peer.go \ frame.go \ hodu.go \ hodu.pb.go \ hodu_grpc.pb.go \ packet.go \ - s-peer.go \ server.go \ + server-peer.go \ cmd/main.go all: hodu diff --git a/client-ctl.go b/client-ctl.go new file mode 100644 index 0000000..2e38d58 --- /dev/null +++ b/client-ctl.go @@ -0,0 +1,88 @@ +package hodu + +import "encoding/json" +import "fmt" +import "net/http" + + +/* + * POST GET PUT DELETE + * /servers - create new server list all servers bulk update delete all servers + * /servers/1 - X get server 1 details update server 1 delete server 1 + * /servers/1/xxx - + */ + + + +type client_ctl_servers struct { + c *Client +} + +type client_ctl_servers_id struct { + c *Client +} + +type client_ctl_clients struct { + c *Client +} + +type client_ctl_clients_id struct { + c *Client +} + + +func (ctl *client_ctl_servers) ServeHTTP(w http.ResponseWriter, req *http.Request) { + var c *Client + var err error + var ptn string + + c = ctl.c + + _, ptn = c.mux.Handler(req); + fmt.Printf("%s %s %s [%s]\n", ptn, req.Method, req.URL.String(), req.PathValue("id")) + + switch req.Method { + case http.MethodGet: + goto bad_request // TODO: + + case http.MethodPost: + var s ClientCtlParamServer + var cc ClientConfig + err = json.NewDecoder(req.Body).Decode(&s) + if err != nil { + fmt.Printf ("failed to decode body - %s\n", err.Error()) + goto bad_request + } + cc.ServerAddr = s.ServerAddr + cc.PeerAddrs = s.PeerAddrs + c.StartService(&cc) // TODO: this can be blocking. do we have to resolve addresses before calling this? also not good because resolution succeed or fail at each attempt. however ok as ServeHTTP itself is in a goroutine? + w.WriteHeader(http.StatusCreated) + + case http.MethodPut: + goto bad_request // TODO: + + case http.MethodDelete: + var cts *ClientConn + c.cts_mtx.Lock() + for _, cts = range c.cts_map { cts.ReqStop() } + c.cts_mtx.Unlock() + } + + return + +bad_request: + w.WriteHeader(http.StatusBadRequest) + return + +} + + +func (ctl *client_ctl_servers_id) ServeHTTP(w http.ResponseWriter, req *http.Request) { +} + + +func (ctl *client_ctl_clients) ServeHTTP(w http.ResponseWriter, req *http.Request) { +} + +func (ctl *client_ctl_clients_id) ServeHTTP(w http.ResponseWriter, req *http.Request) { +} diff --git a/c-peer.go b/client-peer.go similarity index 95% rename from c-peer.go rename to client-peer.go index 2c95720..69204ec 100644 --- a/c-peer.go +++ b/client-peer.go @@ -4,7 +4,7 @@ import "fmt" import "net" import "sync" -func NewClientPeerConn(r *ClientRoute, c *net.TCPConn, id uint32) (*ClientPeerConn) { +func NewClientPeerConn(r *ClientRoute, c *net.TCPConn, id uint32) *ClientPeerConn { var cpc ClientPeerConn cpc.route = r @@ -56,7 +56,7 @@ func (cpc *ClientPeerConn) ReqStop() { } } -func (cpc* ClientPeerConn) CloseWrite() { +func (cpc *ClientPeerConn) CloseWrite() { if cpc.server_peer_eof.CompareAndSwap(false, true) { if cpc.conn != nil { cpc.conn.CloseWrite() diff --git a/client.go b/client.go index 26e3140..0a709d2 100644 --- a/client.go +++ b/client.go @@ -1,29 +1,25 @@ package hodu - -//import "bufio" import "context" import "crypto/tls" -import "encoding/json" import "errors" import "fmt" +import "math/rand" import "net" import "net/http" import "sync" import "sync/atomic" import "time" -//import "github.com/google/uuid" import "google.golang.org/grpc" import "google.golang.org/grpc/codes" import "google.golang.org/grpc/credentials/insecure" import "google.golang.org/grpc/status" -const PTC_LIMIT = 8192 - type PacketStreamClient grpc.BidiStreamingClient[Packet, Packet] -type ServerConnMap = map[net.Addr]*ServerConn +type ClientConnMap = map[net.Addr]*ClientConn +type ClientConnMapById = map[uint32]*ClientConn type ClientPeerConnMap = map[uint32]*ClientPeerConn type ClientRouteMap = map[uint32]*ClientRoute type ClientPeerCancelFuncMap = map[uint32]context.CancelFunc @@ -43,13 +39,15 @@ type Client struct { ctl *http.Server // control server cts_mtx sync.Mutex - cts_map ServerConnMap + cts_map ClientConnMap + cts_map_by_id ClientConnMapById wg sync.WaitGroup stop_req atomic.Bool stop_chan chan bool log Logger + mux *http.ServeMux } type ClientPeerConn struct { @@ -66,10 +64,12 @@ type ClientPeerConn struct { } // client connection to server -type ServerConn struct { +type ClientConn struct { cli *Client cfg *ClientConfig saddr *net.TCPAddr // server address that is connected to + id uint32 + lid string conn *grpc.ClientConn // grpc connection to the server hdc HoduClient @@ -87,7 +87,7 @@ type ServerConn struct { } type ClientRoute struct { - cts *ServerConn + cts *ClientConn id uint32 peer_addr *net.TCPAddr proto ROUTE_PROTO @@ -130,7 +130,7 @@ func (g *GuardedPacketStreamClient) Context() context.Context { }*/ // -------------------------------------------------------------------- -func NewClientRoute(cts *ServerConn, id uint32, addr *net.TCPAddr, proto ROUTE_PROTO) *ClientRoute { +func NewClientRoute(cts *ClientConn, id uint32, addr *net.TCPAddr, proto ROUTE_PROTO) *ClientRoute { var r ClientRoute r.cts = cts @@ -152,16 +152,17 @@ func (r *ClientRoute) RunTask(wg *sync.WaitGroup) { // most useful works are triggered by ReportEvent() and done by ConnectToPeer() defer wg.Done() + r.cts.cli.log.Write("", LOG_DEBUG, "Sending route-start for id=%d peer=%s to %s", r.id, r.peer_addr.String(), r.cts.saddr.String()) err = r.cts.psc.Send(MakeRouteStartPacket(r.id, r.proto, r.peer_addr.String())) if err != nil { - //return fmt.Errorf("unable to send route-start packet - %s", err.Error()) - goto done; + r.cts.cli.log.Write("", LOG_DEBUG, "Failed to Send route-start for id=%d peer=%s to %s", r.id, r.peer_addr.String(), r.cts.saddr.String()) + goto done } main_loop: for { select { - case <- r.stop_chan: + case <-r.stop_chan: break main_loop } } @@ -170,9 +171,10 @@ done: r.ReqStop() r.ptc_wg.Wait() // wait for all peer tasks are finished + r.cts.cli.log.Write("", LOG_DEBUG, "Sending route-stop for id=%d peer=%s to %s", r.id, r.peer_addr.String(), r.cts.saddr.String()) r.cts.psc.Send(MakeRouteStopPacket(r.id, r.proto, r.peer_addr.String())) r.cts.RemoveClientRoute(r) -fmt.Printf ("*** End fo Client Roue Task\n") +fmt.Printf("*** End fo Client Roue Task\n") } func (r *ClientRoute) ReqStop() { @@ -183,10 +185,10 @@ func (r *ClientRoute) ReqStop() { } r.stop_chan <- true } -fmt.Printf ("*** Sent stop request to Route..\n") +fmt.Printf("*** Sent stop request to Route..\n") } -func (r* ClientRoute) ConnectToPeer(pts_id uint32, wg *sync.WaitGroup) { +func (r *ClientRoute) ConnectToPeer(pts_id uint32, wg *sync.WaitGroup) { var err error var conn net.Conn var real_conn *net.TCPConn @@ -214,7 +216,7 @@ func (r* ClientRoute) ConnectToPeer(pts_id uint32, wg *sync.WaitGroup) { if err != nil { // TODO: make send peer started failure mesage? - fmt.Printf ("failed to connect to %s - %s\n", r.peer_addr.String(), err.Error()) + fmt.Printf("failed to connect to %s - %s\n", r.peer_addr.String(), err.Error()) goto peer_aborted } @@ -252,7 +254,7 @@ peer_aborted: } } -func (r* ClientRoute) DisconnectFromPeer(pts_id uint32) error { +func (r *ClientRoute) DisconnectFromPeer(pts_id uint32) error { var ptc *ClientPeerConn var cancel context.CancelFunc var ok bool @@ -260,7 +262,7 @@ func (r* ClientRoute) DisconnectFromPeer(pts_id uint32) error { r.ptc_mtx.Lock() cancel, ok = r.ptc_cancel_map[pts_id] if ok { -fmt.Printf ("~~~~~~~~~~~~~~~~ cancelling.....\n") +fmt.Printf("~~~~~~~~~~~~~~~~ cancelling.....\n") cancel() } @@ -275,7 +277,7 @@ fmt.Printf ("~~~~~~~~~~~~~~~~ cancelling.....\n") return nil } -func (r* ClientRoute) CloseWriteToPeer(pts_id uint32) error { +func (r *ClientRoute) CloseWriteToPeer(pts_id uint32) error { var ptc *ClientPeerConn var ok bool @@ -291,27 +293,26 @@ func (r* ClientRoute) CloseWriteToPeer(pts_id uint32) error { return nil } - -func (r* ClientRoute) ReportEvent (pts_id uint32, event_type PACKET_KIND, event_data []byte) error { +func (r *ClientRoute) ReportEvent(pts_id uint32, event_type PACKET_KIND, event_data []byte) error { var err error switch event_type { case PACKET_KIND_PEER_STARTED: -fmt.Printf ("GOT PEER STARTD . CONENCT TO CLIENT_SIDE PEER\n") +fmt.Printf("GOT PEER STARTD . CONENCT TO CLIENT_SIDE PEER\n") r.ptc_wg.Add(1) go r.ConnectToPeer(pts_id, &r.ptc_wg) case PACKET_KIND_PEER_ABORTED: fallthrough case PACKET_KIND_PEER_STOPPED: -fmt.Printf ("GOT PEER STOPPED . DISCONNECTION FROM CLIENT_SIDE PEER\n") +fmt.Printf("GOT PEER STOPPED . DISCONNECTION FROM CLIENT_SIDE PEER\n") err = r.DisconnectFromPeer(pts_id) if err != nil { // TODO: } case PACKET_KIND_PEER_EOF: -fmt.Printf ("GOT PEER EOF. REMEMBER EOF\n") +fmt.Printf("GOT PEER EOF. REMEMBER EOF\n") err = r.CloseWriteToPeer(pts_id) if err != nil { // TODO: @@ -336,8 +337,8 @@ fmt.Printf ("GOT PEER EOF. REMEMBER EOF\n") } // -------------------------------------------------------------------- -func NewServerConn(c *Client, addr *net.TCPAddr, cfg *ClientConfig) *ServerConn { - var cts ServerConn +func NewClientConn(c *Client, addr *net.TCPAddr, cfg *ClientConfig) *ClientConn { + var cts ClientConn cts.cli = c cts.route_map = make(ClientRouteMap) @@ -352,37 +353,37 @@ func NewServerConn(c *Client, addr *net.TCPAddr, cfg *ClientConfig) *ServerConn return &cts } -func (cts *ServerConn) AddNewClientRoute(route_id uint32, addr *net.TCPAddr, proto ROUTE_PROTO) (*ClientRoute, error) { +func (cts *ClientConn) AddNewClientRoute(route_id uint32, addr *net.TCPAddr, proto ROUTE_PROTO) (*ClientRoute, error) { var r *ClientRoute cts.route_mtx.Lock() if cts.route_map[route_id] != nil { cts.route_mtx.Unlock() - return nil, fmt.Errorf ("existent route id - %d", route_id) + return nil, fmt.Errorf("existent route id - %d", route_id) } r = NewClientRoute(cts, route_id, addr, proto) cts.route_map[route_id] = r cts.route_mtx.Unlock() -fmt.Printf ("added client route.... %d -> %d\n", route_id, len(cts.route_map)) +fmt.Printf("added client route.... %d -> %d\n", route_id, len(cts.route_map)) cts.route_wg.Add(1) go r.RunTask(&cts.route_wg) return r, nil } -func (cts *ServerConn) RemoveClientRoute(route *ClientRoute) error { +func (cts *ClientConn) RemoveClientRoute(route *ClientRoute) error { var r *ClientRoute var ok bool cts.route_mtx.Lock() r, ok = cts.route_map[route.id] - if (!ok) { + if !ok { cts.route_mtx.Unlock() - return fmt.Errorf ("non-existent route id - %d", route.id) + return fmt.Errorf("non-existent route id - %d", route.id) } if r != route { cts.route_mtx.Unlock() - return fmt.Errorf ("non-existent route id - %d", route.id) + return fmt.Errorf("non-existent route id - %d", route.id) } delete(cts.route_map, route.id) cts.route_mtx.Unlock() @@ -391,15 +392,15 @@ func (cts *ServerConn) RemoveClientRoute(route *ClientRoute) error { return nil } -func (cts *ServerConn) RemoveClientRouteById(route_id uint32) error { +func (cts *ClientConn) RemoveClientRouteById(route_id uint32) error { var r *ClientRoute var ok bool cts.route_mtx.Lock() r, ok = cts.route_map[route_id] - if (!ok) { + if !ok { cts.route_mtx.Unlock() - return fmt.Errorf ("non-existent route id - %d", route_id) + return fmt.Errorf("non-existent route id - %d", route_id) } delete(cts.route_map, route_id) cts.route_mtx.Unlock() @@ -408,7 +409,7 @@ func (cts *ServerConn) RemoveClientRouteById(route_id uint32) error { return nil } -func (cts *ServerConn) AddClientRoutes (peer_addrs []string) error { +func (cts *ClientConn) AddClientRoutes(peer_addrs []string) error { var i int var v string var addr *net.TCPAddr @@ -436,9 +437,9 @@ func (cts *ServerConn) AddClientRoutes (peer_addrs []string) error { return nil } -func (cts *ServerConn) disconnect_from_server() { +func (cts *ClientConn) disconnect_from_server() { if cts.conn != nil { - var r* ClientRoute + var r *ClientRoute cts.route_mtx.Lock() for _, r = range cts.route_map { @@ -456,14 +457,14 @@ func (cts *ServerConn) disconnect_from_server() { } } -func (cts *ServerConn) ReqStop() { +func (cts *ClientConn) ReqStop() { if cts.stop_req.CompareAndSwap(false, true) { cts.disconnect_from_server() cts.stop_chan <- true } } -func (cts *ServerConn) RunTask(wg *sync.WaitGroup) { +func (cts *ClientConn) RunTask(wg *sync.WaitGroup) { var psc PacketStreamClient var slpctx context.Context var c_seed Seed @@ -473,12 +474,10 @@ func (cts *ServerConn) RunTask(wg *sync.WaitGroup) { defer wg.Done() // arrange to call at the end of this function start_over: - cts.cli.log.Write ("", LOG_DEBUG, "Total number of server connections = %d", len(cts.cli.cts_map)) - - cts.cli.log.Write("", LOG_INFO, "Connecting to server %s", cts.saddr.String()) + cts.cli.log.Write(cts.lid, LOG_INFO, "Connecting to server %s", cts.saddr.String()) cts.conn, err = grpc.NewClient(cts.saddr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - cts.cli.log.Write("", LOG_ERROR, "Failed to make client to server %s - %s", cts.saddr.String(), err.Error()) + cts.cli.log.Write(cts.lid, LOG_ERROR, "Failed to make client to server %s - %s", cts.saddr.String(), err.Error()) goto reconnect_to_server } cts.hdc = NewHoduClient(cts.conn) @@ -492,21 +491,21 @@ start_over: c_seed.Flags = 0 s_seed, err = cts.hdc.GetSeed(cts.cli.ctx, &c_seed) if err != nil { - cts.cli.log.Write("", LOG_ERROR, "Failed to get seed from server %s - %s", cts.saddr.String(), err.Error()) + cts.cli.log.Write(cts.lid, LOG_ERROR, "Failed to get seed from server %s - %s", cts.saddr.String(), err.Error()) goto reconnect_to_server } cts.s_seed = *s_seed cts.c_seed = c_seed - cts.cli.log.Write("", LOG_INFO, "Got seed from server %s - ver=%#x", cts.saddr.String(), cts.s_seed.Version) + cts.cli.log.Write(cts.lid, LOG_INFO, "Got seed from server %s - ver=%#x", cts.saddr.String(), cts.s_seed.Version) psc, err = cts.hdc.PacketStream(cts.cli.ctx) if err != nil { - cts.cli.log.Write("", LOG_ERROR, "Failed to get packet stream from server %s - %s", cts.saddr.String(), err.Error()) + cts.cli.log.Write(cts.lid, LOG_ERROR, "Failed to get packet stream from server %s - %s", cts.saddr.String(), err.Error()) goto reconnect_to_server } - cts.cli.log.Write("", LOG_INFO, "Got packet stream from server %s", cts.saddr.String()) + cts.cli.log.Write(cts.lid, LOG_INFO, "Got packet stream from server %s", cts.saddr.String()) cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc} @@ -514,7 +513,7 @@ start_over: // let's add routes to the client-side peers. err = cts.AddClientRoutes(cts.cfg.PeerAddrs) if err != nil { - cts.cli.log.Write("", LOG_INFO, "Failed to add routes to server %s for %v - %s", cts.saddr.String(), cts.cfg.PeerAddrs, err.Error()) + cts.cli.log.Write(cts.lid, LOG_INFO, "Failed to add routes to server %s for %v - %s", cts.saddr.String(), cts.cfg.PeerAddrs, err.Error()) goto done } @@ -541,7 +540,7 @@ fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error()) if status.Code(err) == codes.Canceled || errors.Is(err, net.ErrClosed) { goto reconnect_to_server } else { - cts.cli.log.Write("", LOG_INFO, "Failed to receive packet form server %s - %s", cts.saddr.String(), err.Error()) + cts.cli.log.Write(cts.lid, LOG_INFO, "Failed to receive packet form server %s - %s", cts.saddr.String(), err.Error()) goto reconnect_to_server } } @@ -635,7 +634,7 @@ fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error()) if ok { err = cts.ReportEvent(x.Data.RouteId, x.Data.PeerId, PACKET_KIND_PEER_DATA, x.Data.Data) if err != nil { - fmt.Printf ("failed to report event - %s\n", err.Error()) + fmt.Printf("failed to report event - %s\n", err.Error()) // TODO: } else { // TODO: @@ -652,7 +651,7 @@ done: cts.ReqStop() wait_for_termination: cts.route_wg.Wait() // wait until all route tasks are finished - cts.cli.RemoveServerConn(cts) + cts.cli.RemoveClientConn(cts) return reconnect_to_server: @@ -668,19 +667,19 @@ reconnect_to_server: // this signal indicates that ReqStop() has been called // so jumt to the waiting label goto wait_for_termination - case <- slpctx.Done(): + case <-slpctx.Done(): // do nothing } goto start_over // and reconnect } -func (cts *ServerConn) ReportEvent (route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error { +func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error { var r *ClientRoute var ok bool cts.route_mtx.Lock() r, ok = cts.route_map[route_id] - if (!ok) { + if !ok { cts.route_mtx.Unlock() return fmt.Errorf ("non-existent route id - %d", route_id) } @@ -690,7 +689,7 @@ func (cts *ServerConn) ReportEvent (route_id uint32, pts_id uint32, event_type P } // -------------------------------------------------------------------- -func (r *ClientRoute) AddNewClientPeerConn (c *net.TCPConn, pts_id uint32) (*ClientPeerConn, error) { +func (r *ClientRoute) AddNewClientPeerConn(c *net.TCPConn, pts_id uint32) (*ClientPeerConn, error) { var ptc *ClientPeerConn r.ptc_mtx.Lock() @@ -704,31 +703,39 @@ func (r *ClientRoute) AddNewClientPeerConn (c *net.TCPConn, pts_id uint32) (*Cli // -------------------------------------------------------------------- - func NewClient(ctx context.Context, listen_on string, logger Logger, tlscfg *tls.Config) *Client { var c Client c.ctx, c.ctx_cancel = context.WithCancel(ctx) c.tlscfg = tlscfg c.ext_svcs = make([]Service, 0, 1) - c.cts_map = make(ServerConnMap) // TODO: make it configurable... + c.cts_map = make(ClientConnMap) + c.cts_map_by_id = make(ClientConnMapById) c.stop_req.Store(false) c.stop_chan = make(chan bool, 8) c.log = logger + c.mux = http.NewServeMux() + c.mux.Handle("/servers", &client_ctl_servers{c: &c}) + c.mux.Handle("/servers/{id}", &client_ctl_servers_id{c: &c}) + c.mux.Handle("/clients", &client_ctl_clients{c: &c}) + c.mux.Handle("/clients/{id}", &client_ctl_clients_id{c: &c}) + c.ctl = &http.Server{ Addr: listen_on, - Handler: &c, + Handler: c.mux, + // TODO: more settings } return &c } -func (c *Client) AddNewServerConn(addr *net.TCPAddr, cfg *ClientConfig) (*ServerConn, error) { - var cts *ServerConn +func (c *Client) AddNewClientConn(addr *net.TCPAddr, cfg *ClientConfig) (*ClientConn, error) { + var cts *ClientConn var ok bool + var id uint32 - cts = NewServerConn(c, addr, cfg) + cts = NewClientConn(c, addr, cfg) c.cts_mtx.Lock() defer c.cts_mtx.Unlock() @@ -738,23 +745,34 @@ func (c *Client) AddNewServerConn(addr *net.TCPAddr, cfg *ClientConfig) (*Server return nil, fmt.Errorf("existing server - %s", addr.String()) } + id = rand.Uint32() + for { + _, ok = c.cts_map_by_id[id] + if !ok { break } + id++ + } + cts.id = id + cts.lid = fmt.Sprintf("%d", id) + c.cts_map[addr] = cts -fmt.Printf ("ADD total servers %d\n", len(c.cts_map)) + c.cts_map_by_id[id] = cts +fmt.Printf("ADD total servers %d\n", len(c.cts_map)) return cts, nil } -func (c *Client) RemoveServerConn(cts *ServerConn) { +func (c *Client) RemoveClientConn(cts *ClientConn) { c.cts_mtx.Lock() delete(c.cts_map, cts.saddr) -fmt.Printf ("REMOVEDDDDDD CONNECTION FROM %s total servers %d\n", cts.saddr, len(c.cts_map)) + delete(c.cts_map_by_id, cts.id) +fmt.Printf("REMOVEDDDDDD CONNECTION FROM %s total servers %d\n", cts.saddr, len(c.cts_map)) c.cts_mtx.Unlock() } func (c *Client) ReqStop() { if c.stop_req.CompareAndSwap(false, true) { - var cts *ServerConn + var cts *ClientConn - if (c.ctl != nil) { + if c.ctl != nil { c.ctl.Shutdown(c.ctx) // to break c.ctl.ListenAndServe() } @@ -765,67 +783,18 @@ func (c *Client) ReqStop() { c.stop_chan <- true c.ctx_cancel() } -fmt.Printf ("*** Sent stop request to client..\n") } -func (c *Client) ServeHTTP(w http.ResponseWriter, req *http.Request) { - var err error - - // command handler for the control channel - if req.URL.String() == "/servers" { - switch req.Method { - case http.MethodGet: - goto bad_request // TODO: - - case http.MethodPost: - var s ClientCtlParamServer - var cc ClientConfig - err = json.NewDecoder(req.Body).Decode(&s) - if err != nil { - fmt.Printf ("failed to decode body - %s\n", err.Error()) - goto bad_request - } - cc.ServerAddr = s.ServerAddr - cc.PeerAddrs = s.PeerAddrs - c.StartService(&cc) // TODO: this can be blocking. do we have to resolve addresses before calling this? also not good because resolution succeed or fail at each attempt. however ok as ServeHTTP itself is in a goroutine? - w.WriteHeader(http.StatusCreated) - - case http.MethodPut: - goto bad_request // TODO: - - case http.MethodDelete: - var cts *ServerConn - for _, cts = range c.cts_map { - cts.ReqStop() - } - } - } else { - goto bad_request - } - fmt.Printf ("[%s][%s][%s]\n", req.RequestURI, req.URL.String(), req.Method) - return - -bad_request: - w.WriteHeader(http.StatusBadRequest) - return -} - -/* - * POST GET PUT DELETE - * /servers - create new server list all servers bulk update delete all servers - * /servers/1 - X get server 1 details update server 1 delete server 1 - * /servers/1/xxx - - */ func (c *Client) RunCtlTask(wg *sync.WaitGroup) { var err error defer wg.Done() err = c.ctl.ListenAndServe() - if !errors.Is(err, http.ErrServerClosed) { - fmt.Printf ("------------http server error - %s\n", err.Error()) + if errors.Is(err, http.ErrServerClosed) { + c.log.Write("", LOG_DEBUG, "Control channel closed") } else { - fmt.Printf ("********* http server ended\n") + c.log.Write("", LOG_ERROR, "Control channel error - %s", err.Error()) } } @@ -845,7 +814,7 @@ func (c *Client) RunTask(wg *sync.WaitGroup) { // RunTask - supposed to be detached as a go routine func (c *Client) StartService(data interface{}) { var saddr *net.TCPAddr - var cts *ServerConn + var cts *ClientConn var err error var cfg *ClientConfig var ok bool @@ -867,7 +836,7 @@ func (c *Client) StartService(data interface{}) { return } - cts, err = c.AddNewServerConn(saddr, cfg) + cts, err = c.AddNewClientConn(saddr, cfg) if err != nil { fmt.Printf("unable to add server connection structure to %s - %s", cfg.ServerAddr, err.Error()) return @@ -895,6 +864,6 @@ func (c *Client) WaitForTermination() { c.wg.Wait() } -func (c *Client) WriteLog (id string, level LogLevel, fmtstr string, args ...interface{}) { +func (c *Client) WriteLog(id string, level LogLevel, fmtstr string, args ...interface{}) { c.log.Write(id, level, fmtstr, args...) } diff --git a/go.mod b/go.mod index d31ecbf..09caa43 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module hodu -go 1.21.0 +go 1.22.0 require ( github.com/google/uuid v1.6.0 diff --git a/packet.go b/packet.go index f38868d..3e66a92 100644 --- a/packet.go +++ b/packet.go @@ -1,6 +1,5 @@ package hodu - func MakeRouteStartPacket(route_id uint32, proto ROUTE_PROTO, addr string) *Packet { return &Packet{ Kind: PACKET_KIND_ROUTE_START, @@ -25,7 +24,6 @@ func MakeRouteStoppedPacket(route_id uint32, proto ROUTE_PROTO) *Packet { U: &Packet_Route{Route: &RouteDesc{RouteId: route_id, Proto: proto}}} } - func MakePeerStartedPacket(route_id uint32, peer_id uint32) *Packet { // the connection from a peer to the server has been established return &Packet{Kind: PACKET_KIND_PEER_STARTED, @@ -35,24 +33,20 @@ func MakePeerStartedPacket(route_id uint32, peer_id uint32) *Packet { func MakePeerStoppedPacket(route_id uint32, peer_id uint32) *Packet { return &Packet{Kind: PACKET_KIND_PEER_STOPPED, - U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: peer_id}, - }} + U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: peer_id}}} } func MakePeerAbortedPacket(route_id uint32, peer_id uint32) *Packet { return &Packet{Kind: PACKET_KIND_PEER_ABORTED, - U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: peer_id}, - }} + U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: peer_id}}} } func MakePeerEofPacket(route_id uint32, peer_id uint32) *Packet { return &Packet{Kind: PACKET_KIND_PEER_EOF, - U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: peer_id}, - }} + U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: peer_id}}} } func MakePeerDataPacket(route_id uint32, peer_id uint32, data []byte) *Packet { return &Packet{Kind: PACKET_KIND_PEER_DATA, - U: &Packet_Data{Data: &PeerData{RouteId: route_id, PeerId: peer_id, Data: data}, - }} + U: &Packet_Data{Data: &PeerData{RouteId: route_id, PeerId: peer_id, Data: data}}} } diff --git a/s-peer.go b/server-peer.go similarity index 90% rename from s-peer.go rename to server-peer.go index 8c3cf5c..b43afc1 100644 --- a/s-peer.go +++ b/server-peer.go @@ -23,7 +23,7 @@ type ServerPeerConn struct { client_peer_eof atomic.Bool } -func NewServerPeerConn(r *ServerRoute, c *net.TCPConn, id uint32) (*ServerPeerConn) { +func NewServerPeerConn(r *ServerRoute, c *net.TCPConn, id uint32) *ServerPeerConn { var spc ServerPeerConn spc.route = r @@ -37,7 +37,7 @@ func NewServerPeerConn(r *ServerRoute, c *net.TCPConn, id uint32) (*ServerPeerCo spc.client_peer_started.Store(false) spc.client_peer_stopped.Store(false) spc.client_peer_eof.Store(false) -fmt.Printf ("~~~~~~~~~~~~~~~ NEW SERVER PEER CONNECTION ADDED %p\n", &spc) +fmt.Printf("~~~~~~~~~~~~~~~ NEW SERVER PEER CONNECTION ADDED %p\n", &spc) return &spc } @@ -76,7 +76,7 @@ wait_for_started: tmr.Stop() goto done - case <- spc.stop_chan: + case <-spc.stop_chan: tmr.Stop() goto done } @@ -110,9 +110,9 @@ wait_for_stopped: for { fmt.Printf ("******************* Waiting for peer Stop\n") select { - case status = <- spc.client_peer_status_chan: // something not right... may use a different channel for closing... + case status = <-spc.client_peer_status_chan: // something not right... may use a different channel for closing... goto done - case <- spc.stop_chan: + case <-spc.stop_chan: goto done } } @@ -149,7 +149,7 @@ func (spc *ServerPeerConn) ReqStop() { } } -func (spc *ServerPeerConn) ReportEvent (event_type PACKET_KIND, event_data []byte) error { +func (spc *ServerPeerConn) ReportEvent(event_type PACKET_KIND, event_data []byte) error { switch event_type { case PACKET_KIND_PEER_STARTED: @@ -184,7 +184,7 @@ fmt.Printf("******************* CCCCCCCCCCCCCCCCCCCCCCCccc\n") } } else { // protocol error. the client must not relay more data from the client-side peer after EOF. - fmt.Printf ("WARNING - broken client - redundant data from %s to %s\n", spc.route.cts.caddr, spc.conn.RemoteAddr().String()) + fmt.Printf("WARNING - broken client - redundant data from %s to %s\n", spc.route.cts.caddr, spc.conn.RemoteAddr().String()) } default: diff --git a/server.go b/server.go index 326aefd..49be7bd 100644 --- a/server.go +++ b/server.go @@ -19,7 +19,7 @@ import "google.golang.org/grpc/stats" const PTS_LIMIT = 8192 -type ClientConnMap = map[net.Addr]*ClientConn +type ServerConnMap = map[net.Addr]*ServerConn type ServerPeerConnMap = map[uint32]*ServerPeerConn type ServerRouteMap = map[uint32]*ServerRoute @@ -39,7 +39,7 @@ type Server struct { l_wg sync.WaitGroup cts_mtx sync.Mutex - cts_map ClientConnMap + cts_map ServerConnMap cts_wg sync.WaitGroup gs *grpc.Server @@ -48,9 +48,9 @@ type Server struct { UnimplementedHoduServer } -// client connection to server. +// connection from client. // client connect to the server, the server accept it, and makes a tunnel request -type ClientConn struct { +type ServerConn struct { svr *Server caddr net.Addr // client address that created this structure pss *GuardedPacketStreamServer @@ -65,7 +65,7 @@ type ClientConn struct { } type ServerRoute struct { - cts *ClientConn + cts *ServerConn l *net.TCPListener laddr *net.TCPAddr id uint32 @@ -107,7 +107,7 @@ func (g *GuardedPacketStreamServer) Context() context.Context { // ------------------------------------ -func NewServerRoute(cts *ClientConn, id uint32, proto ROUTE_PROTO) (*ServerRoute, error) { +func NewServerRoute(cts *ServerConn, id uint32, proto ROUTE_PROTO) (*ServerRoute, error) { var r ServerRoute var l *net.TCPListener var laddr *net.TCPAddr @@ -209,7 +209,7 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) { } func (r *ServerRoute) ReqStop() { - fmt.Printf ("requesting to stop route taak..\n") + fmt.Printf("requesting to stop route taak..\n") if r.stop_req.CompareAndSwap(false, true) { var pts *ServerPeerConn @@ -220,10 +220,10 @@ func (r *ServerRoute) ReqStop() { r.l.Close() } - fmt.Printf ("requiested to stopp route taak..\n") + fmt.Printf("requiested to stopp route taak..\n") } -func (r *ServerRoute) ReportEvent (pts_id uint32, event_type PACKET_KIND, event_data []byte) error { +func (r *ServerRoute) ReportEvent(pts_id uint32, event_type PACKET_KIND, event_data []byte) error { var spc *ServerPeerConn var ok bool @@ -239,7 +239,7 @@ func (r *ServerRoute) ReportEvent (pts_id uint32, event_type PACKET_KIND, event_ } // ------------------------------------ -func (cts *ClientConn) make_route_listener(proto ROUTE_PROTO) (*net.TCPListener, *net.TCPAddr, error) { +func (cts *ServerConn) make_route_listener(proto ROUTE_PROTO) (*net.TCPListener, *net.TCPAddr, error) { var l *net.TCPListener var err error var laddr *net.TCPAddr @@ -279,14 +279,14 @@ func (cts *ClientConn) make_route_listener(proto ROUTE_PROTO) (*net.TCPListener, return nil, nil, err } -func (cts *ClientConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO) (*ServerRoute, error) { +func (cts *ServerConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO) (*ServerRoute, error) { var r *ServerRoute var err error cts.route_mtx.Lock() if cts.route_map[route_id] != nil { cts.route_mtx.Unlock() - return nil, fmt.Errorf ("existent route id - %d", route_id) + return nil, fmt.Errorf("existent route id - %d", route_id) } r, err = NewServerRoute(cts, route_id, proto) if err != nil { @@ -301,19 +301,19 @@ func (cts *ClientConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO) (*S return r, nil } -func (cts *ClientConn) RemoveServerRoute (route* ServerRoute) error { +func (cts *ServerConn) RemoveServerRoute(route *ServerRoute) error { var r *ServerRoute var ok bool cts.route_mtx.Lock() r, ok = cts.route_map[route.id] - if (!ok) { + if !ok { cts.route_mtx.Unlock() - return fmt.Errorf ("non-existent route id - %d", route.id) + return fmt.Errorf("non-existent route id - %d", route.id) } - if (r != route) { + if r != route { cts.route_mtx.Unlock() - return fmt.Errorf ("non-existent route - %d", route.id) + return fmt.Errorf("non-existent route - %d", route.id) } delete(cts.route_map, route.id) cts.route_mtx.Unlock() @@ -322,15 +322,15 @@ func (cts *ClientConn) RemoveServerRoute (route* ServerRoute) error { return nil } -func (cts *ClientConn) RemoveServerRouteById (route_id uint32) (*ServerRoute, error) { +func (cts *ServerConn) RemoveServerRouteById(route_id uint32) (*ServerRoute, error) { var r *ServerRoute var ok bool cts.route_mtx.Lock() r, ok = cts.route_map[route_id] - if (!ok) { + if !ok { cts.route_mtx.Unlock() - return nil, fmt.Errorf ("non-existent route id - %d", route_id) + return nil, fmt.Errorf("non-existent route id - %d", route_id) } delete(cts.route_map, route_id) cts.route_mtx.Unlock() @@ -339,7 +339,7 @@ func (cts *ClientConn) RemoveServerRouteById (route_id uint32) (*ServerRoute, er return r, nil } -func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error { +func (cts *ServerConn) ReportEvent(route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error { var r *ServerRoute var ok bool @@ -347,14 +347,14 @@ func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type P r, ok = cts.route_map[route_id] if (!ok) { cts.route_mtx.Unlock() - return fmt.Errorf ("non-existent route id - %d", route_id) + return fmt.Errorf("non-existent route id - %d", route_id) } cts.route_mtx.Unlock() return r.ReportEvent(pts_id, event_type, event_data) } -func (cts *ClientConn) receive_from_stream(wg *sync.WaitGroup) { +func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { var pkt *Packet var err error @@ -377,7 +377,7 @@ func (cts *ClientConn) receive_from_stream(wg *sync.WaitGroup) { var ok bool x, ok = pkt.U.(*Packet_Route) if ok { - var r* ServerRoute + var r *ServerRoute r, err = cts.AddNewServerRoute(x.Route.RouteId, x.Route.Proto) if err != nil { @@ -401,7 +401,7 @@ func (cts *ClientConn) receive_from_stream(wg *sync.WaitGroup) { var ok bool x, ok = pkt.U.(*Packet_Route) if ok { - var r* ServerRoute + var r *ServerRoute r, err = cts.RemoveServerRouteById(x.Route.RouteId) if err != nil { @@ -429,7 +429,7 @@ func (cts *ClientConn) receive_from_stream(wg *sync.WaitGroup) { err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, nil) if err != nil { // TODO: - fmt.Printf ("Failed to report PEER_STARTED Event") + fmt.Printf("Failed to report PEER_STARTED Event") } else { // TODO: } @@ -448,7 +448,7 @@ func (cts *ClientConn) receive_from_stream(wg *sync.WaitGroup) { err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, nil) if err != nil { // TODO: - fmt.Printf ("Failed to report PEER_STOPPED Event") + fmt.Printf("Failed to report PEER_STOPPED Event") } else { // TODO: } @@ -465,7 +465,7 @@ func (cts *ClientConn) receive_from_stream(wg *sync.WaitGroup) { err = cts.ReportEvent(x.Data.RouteId, x.Data.PeerId, PACKET_KIND_PEER_DATA, x.Data.Data) if err != nil { // TODO: - fmt.Printf ("Failed to report PEER_DATA Event") + fmt.Printf("Failed to report PEER_DATA Event") } else { // TODO: } @@ -476,10 +476,10 @@ func (cts *ClientConn) receive_from_stream(wg *sync.WaitGroup) { } done: - fmt.Printf ("************ stream receiver finished....\n") + fmt.Printf("************ stream receiver finished....\n") } -func (cts *ClientConn) RunTask(wg *sync.WaitGroup) { +func (cts *ServerConn) RunTask(wg *sync.WaitGroup) { var strm *GuardedPacketStreamServer var ctx context.Context @@ -520,13 +520,13 @@ fmt.Printf("grpc server done - %s\n", ctx.Err().Error()) } done: -fmt.Printf ("^^^^^^^^^^^^^^^^^ waiting for reoute_wg...\n") +fmt.Printf("^^^^^^^^^^^^^^^^^ waiting for reoute_wg...\n") cts.ReqStop() // just in case cts.route_wg.Wait() -fmt.Printf ("^^^^^^^^^^^^^^^^^ waited for reoute_wg...\n") +fmt.Printf("^^^^^^^^^^^^^^^^^ waited for reoute_wg...\n") } -func (cts *ClientConn) ReqStop() { +func (cts *ServerConn) ReqStop() { if cts.stop_req.CompareAndSwap(false, true) { var r *ServerRoute @@ -538,14 +538,14 @@ func (cts *ClientConn) ReqStop() { // the grpc server. while the global grpc server is closed in // ReqStop() for Server, the individuation connection is closed // by returing from the grpc handler goroutine. See the comment - // RunTask() for ClientConn. + // RunTask() for ServerConn. cts.stop_chan <- true } } // -------------------------------------------------------------------- -func (s *Server) GetSeed (ctx context.Context, c_seed *Seed) (*Seed, error) { +func (s *Server) GetSeed(ctx context.Context, c_seed *Seed) (*Seed, error) { var s_seed Seed // seed exchange is for furture expansion of the protocol @@ -554,7 +554,7 @@ func (s *Server) GetSeed (ctx context.Context, c_seed *Seed) (*Seed, error) { s_seed.Version = HODU_VERSION s_seed.Flags = 0 - // we create no ClientConn structure associated with the connection + // we create no ServerConn structure associated with the connection // at this phase for the server. it doesn't track the client version and // features. we delegate protocol selection solely to the client. @@ -566,15 +566,15 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error { var p *peer.Peer var ok bool var err error - var cts *ClientConn + var cts *ServerConn ctx = strm.Context() p, ok = peer.FromContext(ctx) - if (!ok) { + if !ok { return fmt.Errorf("failed to get peer from packet stream context") } - cts, err = s.AddNewClientConn(p.Addr, strm) + cts, err = s.AddNewServerConn(p.Addr, strm) if err != nil { return fmt.Errorf("unable to add client %s - %s", p.Addr.String(), err.Error()) } @@ -593,7 +593,7 @@ type ConnCatcher struct { } func (cc *ConnCatcher) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { - return ctx + return ctx } func (cc *ConnCatcher) HandleRPC(ctx context.Context, s stats.RPCStats) { @@ -601,7 +601,7 @@ func (cc *ConnCatcher) HandleRPC(ctx context.Context, s stats.RPCStats) { func (cc *ConnCatcher) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { return ctx - //return context.TODO() + //return context.TODO() } func (cc *ConnCatcher) HandleConn(ctx context.Context, cs stats.ConnStats) { @@ -611,7 +611,7 @@ func (cc *ConnCatcher) HandleConn(ctx context.Context, cs stats.ConnStats) { var addr string p, ok = peer.FromContext(ctx) - if (!ok) { + if !ok { addr = "" } else { addr = p.Addr.String() @@ -626,8 +626,8 @@ if ok { fmt.Printf("**** client connected - [%s]\n", addr) case *stats.ConnEnd: fmt.Printf("**** client disconnected - [%s]\n", addr) - cc.server.RemoveClientConnByAddr(p.Addr) - } + cc.server.RemoveServerConnByAddr(p.Addr) + } } // wrappedStream wraps around the embedded grpc.ServerStream, and intercepts the RecvMsg and @@ -720,7 +720,7 @@ func NewServer(ctx context.Context, laddrs []string, logger Logger, tlscfg *tls. s.tlscfg = tlscfg s.ext_svcs = make([]Service, 0, 1) - s.cts_map = make(ClientConnMap) // TODO: make it configurable... + s.cts_map = make(ServerConnMap) // TODO: make it configurable... s.stop_chan = make(chan bool, 8) s.stop_req.Store(false) /* @@ -733,9 +733,9 @@ func NewServer(ctx context.Context, laddrs []string, logger Logger, tlscfg *tls. s.gs = grpc.NewServer( grpc.UnaryInterceptor(unaryInterceptor), grpc.StreamInterceptor(streamInterceptor), - grpc.StatsHandler(&ConnCatcher{ server: &s }), + grpc.StatsHandler(&ConnCatcher{server: &s}), ) // TODO: have this outside the server struct? - RegisterHoduServer (s.gs, &s) + RegisterHoduServer(s.gs, &s) return &s, nil @@ -760,13 +760,13 @@ func (s *Server) run_grpc_server(idx int, wg *sync.WaitGroup) error { l = s.l[idx] // it seems to be safe to call a single grpc server on differnt listening sockets multiple times - s.log.Write ("", LOG_ERROR, "Starting GRPC server listening on %s", l.Addr().String()) + s.log.Write("", LOG_ERROR, "Starting GRPC server listening on %s", l.Addr().String()) err = s.gs.Serve(l) if err != nil { if errors.Is(err, net.ErrClosed) { - s.log.Write ("", LOG_ERROR, "GRPC server listening on %s closed", l.Addr().String()) + s.log.Write("", LOG_ERROR, "GRPC server listening on %s closed", l.Addr().String()) } else { - s.log.Write ("", LOG_ERROR, "Error from GRPC server listening on %s - %s", l.Addr().String(), err.Error()) + s.log.Write("", LOG_ERROR, "Error from GRPC server listening on %s - %s", l.Addr().String(), err.Error()) } return err } @@ -814,18 +814,18 @@ func (s *Server) RunCtlTask(wg *sync.WaitGroup) { err = s.ctl.ListenAndServe() if errors.Is(err, http.ErrServerClosed) { - fmt.Printf ("------------http server error - %s\n", err.Error()) + fmt.Printf("------------http server error - %s\n", err.Error()) } else { - fmt.Printf ("********* http server ended\n") + fmt.Printf("********* http server ended\n") } } func (s *Server) ReqStop() { if s.stop_req.CompareAndSwap(false, true) { var l *net.TCPListener - var cts *ClientConn + var cts *ServerConn - if (s.ctl != nil) { + if s.ctl != nil { // shutdown the control server if ever started. s.ctl.Shutdown(s.ctx) } @@ -847,8 +847,8 @@ func (s *Server) ReqStop() { } } -func (s *Server) AddNewClientConn(addr net.Addr, pss Hodu_PacketStreamServer) (*ClientConn, error) { - var cts ClientConn +func (s *Server) AddNewServerConn(addr net.Addr, pss Hodu_PacketStreamServer) (*ServerConn, error) { + var cts ServerConn var ok bool cts.svr = s @@ -871,15 +871,15 @@ func (s *Server) AddNewClientConn(addr net.Addr, pss Hodu_PacketStreamServer) (* return &cts, nil } -func (s *Server) RemoveClientConn(cts *ClientConn) { +func (s *Server) RemoveServerConn(cts *ServerConn) { s.cts_mtx.Lock() delete(s.cts_map, cts.caddr) s.log.Write("", LOG_DEBUG, "Removed client connection from %s", cts.caddr.String()) s.cts_mtx.Unlock() } -func (s *Server) RemoveClientConnByAddr(addr net.Addr) { - var cts *ClientConn +func (s *Server) RemoveServerConnByAddr(addr net.Addr) { + var cts *ServerConn var ok bool s.cts_mtx.Lock() @@ -892,8 +892,8 @@ func (s *Server) RemoveClientConnByAddr(addr net.Addr) { } } -func (s *Server) FindClientConnByAddr (addr net.Addr) *ClientConn { - var cts *ClientConn +func (s *Server) FindServerConnByAddr(addr net.Addr) *ServerConn { + var cts *ServerConn var ok bool s.cts_mtx.Lock() @@ -930,6 +930,6 @@ func (s *Server) WaitForTermination() { s.wg.Wait() } -func (s *Server) WriteLog (id string, level LogLevel, fmtstr string, args ...interface{}) { +func (s *Server) WriteLog(id string, level LogLevel, fmtstr string, args ...interface{}) { s.log.Write(id, level, fmtstr, args...) }