From c901f88023810830d968d07ab8e927ce978bfa47 Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Mon, 2 Dec 2024 02:19:50 +0900 Subject: [PATCH] some incremetal changes --- client-ctl.go | 10 +++++-- client.go | 47 ++++++++++++++++--------------- cmd/main.go | 26 +++++++++--------- hodu.pb.go | 4 +-- hodu_grpc.pb.go | 2 +- server-ctl.go | 73 +++++++++++++++++++++++++++++++++++++++++++++++-- server-peer.go | 8 +++--- server-ws.go | 5 +++- server.go | 35 +++++++++++++----------- 9 files changed, 146 insertions(+), 64 deletions(-) diff --git a/client-ctl.go b/client-ctl.go index 0a4fb77..7b6da65 100644 --- a/client-ctl.go +++ b/client-ctl.go @@ -38,6 +38,7 @@ type json_out_client_conn_id struct { type json_out_client_conn struct { Id uint32 `json:"id"` ServerAddr string `json:"server-addr"` + ClientAddr string `json:"client-addr"` Routes []json_out_client_route `json:"routes"` } @@ -123,7 +124,12 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R ServerPeerListenAddr: r.server_peer_listen_addr.String(), }) } - js = append(js, json_out_client_conn{Id: cts.id, ServerAddr: cts.cfg.ServerAddr, Routes: jsp}) + js = append(js, json_out_client_conn{ + Id: cts.id, + ServerAddr: cts.remote_addr, + ClientAddr: cts.local_addr, + Routes: jsp, + }) cts.route_mtx.Unlock() } c.cts_mtx.Unlock() @@ -226,7 +232,7 @@ func (ctl *client_ctl_client_conns_id) ServeHTTP(w http.ResponseWriter, req *htt ServerPeerListenAddr: r.server_peer_listen_addr.String(), }) } - js = &json_out_client_conn{Id: cts.id, ServerAddr: cts.cfg.ServerAddr, Routes: jsp} + js = &json_out_client_conn{Id: cts.id, ServerAddr: cts.local_addr, ClientAddr: cts.remote_addr, Routes: jsp} cts.route_mtx.Unlock() status_code = http.StatusOK; w.WriteHeader(status_code) diff --git a/client.go b/client.go index 17b1069..9de3535 100644 --- a/client.go +++ b/client.go @@ -14,6 +14,7 @@ import "time" import "google.golang.org/grpc" import "google.golang.org/grpc/codes" import "google.golang.org/grpc/credentials/insecure" +import "google.golang.org/grpc/peer" import "google.golang.org/grpc/status" type PacketStreamClient grpc.BidiStreamingClient[Packet, Packet] @@ -59,24 +60,26 @@ type Client struct { // client connection to server type ClientConn struct { - cli *Client - cfg ClientConfigActive - id uint32 - lid string + cli *Client + cfg ClientConfigActive + id uint32 + lid string - conn *grpc.ClientConn // grpc connection to the server - hdc HoduClient - psc *GuardedPacketStreamClient // guarded grpc stream + local_addr string + remote_addr string + conn *grpc.ClientConn // grpc connection to the server + hdc HoduClient + psc *GuardedPacketStreamClient // guarded grpc stream - s_seed Seed - c_seed Seed + s_seed Seed + c_seed Seed - route_mtx sync.Mutex - route_map ClientRouteMap - route_wg sync.WaitGroup + route_mtx sync.Mutex + route_map ClientRouteMap + route_wg sync.WaitGroup - stop_req atomic.Bool - stop_chan chan bool + stop_req atomic.Bool + stop_chan chan bool } type ClientRoute struct { @@ -611,19 +614,13 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) { var slpctx context.Context var c_seed Seed var s_seed *Seed + var p *peer.Peer + var ok bool var err error defer wg.Done() // arrange to call at the end of this function start_over: -/* - cts.saddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, cts.cfg.ServerAddr) // TODO: make this interruptable... - if err != nil { - err = fmt.Errorf("unresolavable address %s - %s", cts.saddr, err.Error()) - goto reconnect_to_server - } -*/ - cts.cli.log.Write(cts.lid, LOG_INFO, "Connecting to server %s", cts.cfg.ServerAddr) cts.conn, err = grpc.NewClient(cts.cfg.ServerAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { @@ -655,6 +652,12 @@ start_over: goto reconnect_to_server } + p, ok = peer.FromContext(psc.Context()) + if ok { + cts.remote_addr = p.Addr.String() + cts.local_addr = p.LocalAddr.String() + } + cts.cli.log.Write(cts.lid, LOG_INFO, "Got packet stream from server %s", cts.cfg.ServerAddr) cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc} diff --git a/cmd/main.go b/cmd/main.go index 49682e5..126630f 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -216,14 +216,14 @@ func main() { la = make([]string, 0) flgs = flag.NewFlagSet("", flag.ContinueOnError) - flgs.Func("rpc-on", "specify a rpc listening address", func(v string) error { - la = append(la, v) - return nil - }) flgs.Func("ctl-on", "specify a listening address for control channel", func(v string) error { ctl_addr = v // TODO: support multiple addrs return nil }) + flgs.Func("rpc-on", "specify a rpc listening address", func(v string) error { + la = append(la, v) + return nil + }) flgs.SetOutput(io.Discard) // prevent usage output err = flgs.Parse(os.Args[2:]) if err != nil { @@ -241,19 +241,19 @@ func main() { goto oops } } else if strings.EqualFold(os.Args[1], "client") { - var la []string - var sa []string + var rpc_addr []string + var ctl_addr[] string - la = make([]string, 0) - sa = make([]string, 0) + ctl_addr = make([]string, 0) + rpc_addr = make([]string, 0) flgs = flag.NewFlagSet("", flag.ContinueOnError) - flgs.Func("rpc-on", "specify a control channel address", func(v string) error { - la = append(la, v) + flgs.Func("ctl-on", "specify a listening address for control channel", func(v string) error { + ctl_addr = append(ctl_addr, v) return nil }) flgs.Func("rpc-server", "specify a rpc server address", func(v string) error { - sa = append(sa, v) + rpc_addr = append(rpc_addr, v) return nil }) flgs.SetOutput(io.Discard) @@ -263,10 +263,10 @@ func main() { goto wrong_usage } - if len(la) != 1 || len(sa) != 1 || flgs.NArg() < 1 { + if len(ctl_addr) != 1 || len(rpc_addr) != 1 || flgs.NArg() < 1 { goto wrong_usage } - err = client_main(la[0], sa[0], flgs.Args()) + err = client_main(ctl_addr[0], rpc_addr[0], flgs.Args()) if err != nil { fmt.Fprintf(os.Stderr, "ERROR: client error - %s\n", err.Error()) goto oops diff --git a/hodu.pb.go b/hodu.pb.go index 4aaaa27..75583c2 100644 --- a/hodu.pb.go +++ b/hodu.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.35.2 -// protoc v5.28.2 +// protoc-gen-go v1.35.1 +// protoc v3.19.6 // source: hodu.proto package hodu diff --git a/hodu_grpc.pb.go b/hodu_grpc.pb.go index b06b516..c7dc232 100644 --- a/hodu_grpc.pb.go +++ b/hodu_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.5.1 -// - protoc v5.28.2 +// - protoc v3.19.6 // source: hodu.proto package hodu diff --git a/server-ctl.go b/server-ctl.go index 5d2b0fe..0264c70 100644 --- a/server-ctl.go +++ b/server-ctl.go @@ -1,13 +1,80 @@ package hodu +import "encoding/json" import "net/http" -type server_ctl_client_conns struct { + +type json_out_server_conn struct { + Id uint32 `json:"id"` + ServerAddr string `json:"server-addr"` + ClientAddr string `json:"client-addr"` + Routes []json_out_server_route `json:"routes"` +} + +type json_out_server_route struct { + Id uint32 `json:"id"` + ClientPeerAddr string `json:"client-peer-addr"` + ServerPeerListenAddr string `json:"server-peer-listen-addr"` +} + +// ------------------------------------ + +type server_ctl_server_conns struct { s *Server } // ------------------------------------ -func (ctl *server_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.Request) { - w.Write([]byte("hello")) +func (ctl *server_ctl_server_conns) ServeHTTP(w http.ResponseWriter, req *http.Request) { + var s *Server + var status_code int + var err error + var je *json.Encoder + + s = ctl.s + je = json.NewEncoder(w) + + switch req.Method { + case http.MethodGet: + var cts *ServerConn + var js []json_out_server_conn + + js = make([]json_out_server_conn, 0) + s.cts_mtx.Lock() + for _, cts = range s.cts_map { + var r *ServerRoute + var jsp []json_out_server_route + + jsp = make([]json_out_server_route, 0) + cts.route_mtx.Lock() + for _, r = range cts.route_map { + jsp = append(jsp, json_out_server_route{ + Id: r.id, + ClientPeerAddr: r.ptc_addr, + ServerPeerListenAddr: r.laddr.String(), + }) + } + js = append(js, json_out_server_conn{Id: cts.id, ClientAddr: cts.caddr.String(), ServerAddr: cts.local_addr.String(), Routes: jsp}) + cts.route_mtx.Unlock() + } + s.cts_mtx.Unlock() + + status_code = http.StatusOK; w.WriteHeader(status_code) + if err = je.Encode(js); err != nil { goto oops } + + case http.MethodDelete: +// TODO s.ReqStopAllServerConns() + status_code = http.StatusNoContent; w.WriteHeader(status_code) + + default: + status_code = http.StatusBadRequest; w.WriteHeader(status_code) + } + +//done: + s.log.Write("", LOG_DEBUG, "[%s] %s %s %d", req.RemoteAddr, req.Method, req.URL.String(), status_code) // TODO: time taken + return + +oops: + s.log.Write("", LOG_ERROR, "[%s] %s %s - %s", req.RemoteAddr, req.Method, req.URL.String(), err.Error()) + return } diff --git a/server-peer.go b/server-peer.go index 5860853..c319ba5 100644 --- a/server-peer.go +++ b/server-peer.go @@ -55,7 +55,7 @@ func (spc *ServerPeerConn) RunTask(wg *sync.WaitGroup) { err = pss.Send(MakePeerStartedPacket(spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String())) if err != nil { // TODO: include route id and conn id in the error message - fmt.Printf("unable to send start-pts - %s\n", err.Error()) + spc.route.cts.svr.log.Write("", LOG_ERROR, "Unable to notify peer started - %s", err.Error()) goto done_without_stop } @@ -88,12 +88,12 @@ wait_for_started: if err != nil { if errors.Is(err, io.EOF) { if pss.Send(MakePeerEofPacket(spc.route.id, spc.conn_id)) != nil { - fmt.Printf("unable to report data - %s\n", err.Error()) + spc.route.cts.svr.log.Write("", LOG_ERROR, "Unable to report eof - %s", err.Error()) goto done } goto wait_for_stopped } else { - fmt.Printf("read error - %s\n", err.Error()) + spc.route.cts.svr.log.Write("", LOG_ERROR, "Unable to read data - %s", err.Error()) goto done } } @@ -101,7 +101,7 @@ wait_for_started: err = pss.Send(MakePeerDataPacket(spc.route.id, spc.conn_id, buf[:n])) if err != nil { // TODO: include route id and conn id in the error message - fmt.Printf("unable to send data - %s\n", err.Error()) + spc.route.cts.svr.log.Write("", LOG_ERROR, "Unable to send data - %s", err.Error()) goto done } } diff --git a/server-ws.go b/server-ws.go index 3a9c3fb..3960c0c 100644 --- a/server-ws.go +++ b/server-ws.go @@ -17,7 +17,6 @@ func server_ws_tty (ws* websocket.Conn) { ws.Write([]byte("it's so wrong. it's awesome\r\n")) ws.Write([]byte("it's so wrong. 동키가 지나간다.it's awesome\r\n")) - for { err = websocket.Message.Receive(ws, &msg) if err != nil { @@ -30,6 +29,10 @@ fmt.Printf ("RECEIVED MESSAGE [%v]\n", msg) } } +func new_server_ctl_ws_tty(s *Server) *server_ctl_ws_tty { + return &server_ctl_ws_tty{s: s, h: websocket.Handler(server_ws_tty)} +} + func (ctl *server_ctl_ws_tty) ServeHTTP(w http.ResponseWriter, req *http.Request) { ctl.h.ServeHTTP(w, req) } diff --git a/server.go b/server.go index e082060..802e18b 100644 --- a/server.go +++ b/server.go @@ -16,8 +16,6 @@ import "google.golang.org/grpc" //import "google.golang.org/grpc/metadata" import "google.golang.org/grpc/peer" import "google.golang.org/grpc/stats" -import "golang.org/x/net/websocket" - const PTS_LIMIT = 8192 @@ -57,7 +55,9 @@ type Server struct { // client connect to the server, the server accept it, and makes a tunnel request type ServerConn struct { svr *Server + id uint32 caddr net.Addr // client address that created this structure + local_addr net.Addr pss *GuardedPacketStreamServer route_mtx sync.Mutex @@ -73,6 +73,7 @@ type ServerRoute struct { cts *ServerConn l *net.TCPListener laddr *net.TCPAddr + ptc_addr string id uint32 pts_mtx sync.Mutex @@ -112,7 +113,7 @@ func (g *GuardedPacketStreamServer) Context() context.Context { // ------------------------------------ -func NewServerRoute(cts *ServerConn, id uint32, proto ROUTE_PROTO) (*ServerRoute, error) { +func NewServerRoute(cts *ServerConn, id uint32, proto ROUTE_PROTO, ptc_addr string) (*ServerRoute, error) { var r ServerRoute var l *net.TCPListener var laddr *net.TCPAddr @@ -127,6 +128,7 @@ func NewServerRoute(cts *ServerConn, id uint32, proto ROUTE_PROTO) (*ServerRoute r.id = id r.l = l r.laddr = laddr + r.ptc_addr = ptc_addr r.pts_limit = PTS_LIMIT r.pts_map = make(ServerPeerConnMap) r.pts_last_id = 0 @@ -284,7 +286,7 @@ func (cts *ServerConn) make_route_listener(proto ROUTE_PROTO) (*net.TCPListener, return nil, nil, err } -func (cts *ServerConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO) (*ServerRoute, error) { +func (cts *ServerConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO, ptc_addr string) (*ServerRoute, error) { var r *ServerRoute var err error @@ -293,7 +295,7 @@ func (cts *ServerConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO) (*S cts.route_mtx.Unlock() return nil, fmt.Errorf("existent route id - %d", route_id) } - r, err = NewServerRoute(cts, route_id, proto) + r, err = NewServerRoute(cts, route_id, proto, ptc_addr) if err != nil { cts.route_mtx.Unlock() return nil, err @@ -384,7 +386,7 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { if ok { var r *ServerRoute - r, err = cts.AddNewServerRoute(x.Route.RouteId, x.Route.Proto) + r, err = cts.AddNewServerRoute(x.Route.RouteId, x.Route.Proto, x.Route.AddrStr) if err != nil { cts.svr.log.Write("", LOG_ERROR, "Failed to add server route for client %s peer %s", cts.caddr, x.Route.AddrStr) } else { @@ -579,7 +581,7 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error { return fmt.Errorf("failed to get peer from packet stream context") } - cts, err = s.AddNewServerConn(p.Addr, strm) + cts, err = s.AddNewServerConn(&p.Addr, &p.LocalAddr, strm) if err != nil { return fmt.Errorf("unable to add client %s - %s", p.Addr.String(), err.Error()) } @@ -749,8 +751,8 @@ func NewServer(ctx context.Context, ctl_addr string, laddrs []string, logger Log cwd, _ = os.Getwd() s.ctl_mux.Handle(s.ctl_prefix + "/ui/", http.StripPrefix(s.ctl_prefix, http.FileServer(http.Dir(cwd)))) // TODO: proper directory. it must not use the current working directory... //s.ctl_mux.HandleFunc(s.ctl_prefix + "/ws/tty", websocket.Handler(server_ws_tty).ServeHTTP) - s.ctl_mux.Handle(s.ctl_prefix + "/ws/tty", &server_ctl_ws_tty{s: &s, h: websocket.Handler(server_ws_tty)}) - s.ctl_mux.Handle(s.ctl_prefix + "/server-conns", &server_ctl_client_conns{s: &s}) + s.ctl_mux.Handle(s.ctl_prefix + "/ws/tty", new_server_ctl_ws_tty(&s)) + s.ctl_mux.Handle(s.ctl_prefix + "/server-conns", &server_ctl_server_conns{s: &s}) s.ctl = &http.Server{ Addr: ctl_addr, @@ -868,13 +870,14 @@ func (s *Server) ReqStop() { } } -func (s *Server) AddNewServerConn(addr net.Addr, pss Hodu_PacketStreamServer) (*ServerConn, error) { +func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, pss Hodu_PacketStreamServer) (*ServerConn, error) { var cts ServerConn var ok bool cts.svr = s cts.route_map = make(ServerRouteMap) - cts.caddr = addr + cts.caddr = *remote_addr + cts.local_addr = *local_addr cts.pss = &GuardedPacketStreamServer{Hodu_PacketStreamServer: pss} cts.stop_req.Store(false) @@ -883,12 +886,12 @@ func (s *Server) AddNewServerConn(addr net.Addr, pss Hodu_PacketStreamServer) (* s.cts_mtx.Lock() defer s.cts_mtx.Unlock() - _, ok = s.cts_map[addr] + _, ok = s.cts_map[cts.caddr] if ok { - return nil, fmt.Errorf("existing client - %s", addr.String()) + return nil, fmt.Errorf("existing client - %s", cts.caddr.String()) } - s.cts_map[addr] = &cts - s.log.Write("", LOG_DEBUG, "Added client connection from %s", addr.String()) + s.cts_map[cts.caddr] = &cts + s.log.Write("", LOG_DEBUG, "Added client connection from %s", cts.caddr.String()) return &cts, nil } @@ -908,8 +911,8 @@ func (s *Server) RemoveServerConnByAddr(addr net.Addr) { cts, ok = s.cts_map[addr] if ok { - cts.ReqStop() delete(s.cts_map, cts.caddr) + cts.ReqStop() } }