From f02536bf24a1b97c355164c7280a405ed15b1eef Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Tue, 12 Nov 2024 22:59:37 +0900 Subject: [PATCH] added some experimental code using grpc --- Makefile | 5 + README.md | 5 + c-peer.go | 58 ++++ client.go | 535 ++++++++++++++++++++++++++++++++++++ frame.go | 70 +++++ go.mod | 16 ++ go.sum | 16 ++ hodu.proto | 54 ++++ main.go | 42 +++ main.proto | 44 +++ packet.go | 53 ++++ s-peer.go | 147 ++++++++++ server.go | 790 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 13 files changed, 1835 insertions(+) create mode 100644 Makefile create mode 100644 README.md create mode 100644 c-peer.go create mode 100644 client.go create mode 100644 frame.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 hodu.proto create mode 100644 main.go create mode 100644 main.proto create mode 100644 packet.go create mode 100644 s-peer.go create mode 100644 server.go diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..6ef2945 --- /dev/null +++ b/Makefile @@ -0,0 +1,5 @@ +all: + protoc --go_out=. --go_opt=paths=source_relative \ + --go-grpc_out=. --go-grpc_opt=paths=source_relative \ + hodu.proto + go build -x -o hodu diff --git a/README.md b/README.md new file mode 100644 index 0000000..55aeb8a --- /dev/null +++ b/README.md @@ -0,0 +1,5 @@ + +hodu client [ ...] + + client requests server that it grants access to the list of peers + reserver diff --git a/c-peer.go b/c-peer.go new file mode 100644 index 0000000..be62e23 --- /dev/null +++ b/c-peer.go @@ -0,0 +1,58 @@ +package main + +import "fmt" +import "net" + +func NewClientPeerConn(r *ClientRoute, c *net.TCPConn, id uint32) (*ClientPeerConn) { + var cpc ClientPeerConn + + cpc.route = r + cpc.conn = c + cpc.conn_id = id + cpc.stop_req.Store(false) + //cpc.server_peer_status_chan = make(chan bool, 16) + //cpc.server_peer_opened_received.Store(false) + //cpc.server_peer_closed_received.Store(false) + + return &cpc +} + +func (cpc *ClientPeerConn) RunTask() error { + //var conn *net.TCPConn + //var addr *net.TCPAddr + var err error + var buf [4096]byte + var n int + + + fmt.Printf("CONNECTION ESTABLISHED TO PEER... ABOUT TO READ DATA...\n") + for { + n, err = cpc.conn.Read(buf[:]) + if err != nil { + fmt.Printf("unable to read from the client-side peer %s - %s\n", cpc.addr, err.Error()) + break + } + +// TODO: guarded call.. + err = cpc.route.cts.psc.Send(MakePeerDataPacket(cpc.route.id, cpc.conn_id, buf[0:n])) + if err != nil { + fmt.Printf("unable to write data to server - %s\n", err.Error()) + break + } + } + +//done: + cpc.ReqStop() + //cpc.c.RemoveClientPeerConn(cpc) + //cpc.c.wg.Done() + return nil +} + +func (cpc *ClientPeerConn) ReqStop() { + // TODO: because of connect delay in Start, cpc.p may not be yet ready. handle this case... + if cpc.stop_req.CompareAndSwap(false, true) { + if cpc.conn != nil { + cpc.conn.Close() + } + } +} diff --git a/client.go b/client.go new file mode 100644 index 0000000..44fada9 --- /dev/null +++ b/client.go @@ -0,0 +1,535 @@ +package main + + +//import "bufio" +import "context" +import "crypto/tls" +import "crypto/x509" +//import "encoding/binary" +import "fmt" +import "io" +import "log" +import "net" +//import "os" +import "sync" +import "sync/atomic" +//import "syscall" +//import "time" + +//import "github.com/google/uuid" +import "google.golang.org/grpc" +import "google.golang.org/grpc/credentials/insecure" + +const PTC_LIMIT = 8192 + +type PacketStreamClient grpc.BidiStreamingClient[Packet, Packet] + +type ServerConnMap = map[*net.TCPAddr]*ServerConn +type ClientPeerConnMap = map[uint32]*ClientPeerConn +type ClientRouteMap = map[uint32]*ClientRoute + +// -------------------------------------------------------------------- +type ClientConfig struct { + server_addr string + peer_addrs []string +} + +type Client struct { + cfg *ClientConfig + tlscfg *tls.Config + saddr *net.TCPAddr + + sc *grpc.ClientConn // main control connection to the server + sg HoduClient + psc PacketStreamClient + psc_mtx sync.Mutex + + cts_mtx sync.Mutex + cts_map ServerConnMap + wg sync.WaitGroup + stop_req atomic.Bool +} + + +type ClientPeerConn struct { + route *ClientRoute + conn_id uint32 + conn *net.TCPConn + remot_conn_id uint32 + + addr string // peer address + stop_req atomic.Bool +} + +// client connection to server +type ServerConn struct { + cli *Client + saddr *net.TCPAddr // server address that is connected to + psc Hodu_PacketStreamClient + + route_mtx sync.Mutex + routes ClientRouteMap + //route_wg sync.WaitGroup + + //cw_mtx sync.Mutex + + wg sync.WaitGroup + stop_req atomic.Bool + greeted bool +} + +type ClientRoute struct { + cts *ServerConn + id uint32 + peer_addr *net.TCPAddr + proto ROUTE_PROTO + + ptc_mtx sync.Mutex + ptc_map ClientPeerConnMap + ptc_limit int + ptc_last_id uint32 + ptc_wg sync.WaitGroup +} + + +// -------------------------------------------------------------------- +func NewClientRoute(cts *ServerConn, id uint32, addr *net.TCPAddr, proto ROUTE_PROTO) *ClientRoute { + var r ClientRoute + + r.cts = cts + r.id = id + r.ptc_limit = PTC_LIMIT + r.ptc_map = make(ClientPeerConnMap) + r.ptc_last_id = 0 + r.proto = proto + r.peer_addr = addr + + return &r; +} + +func (r *ClientRoute) RunTask() { + // this task on the route object isn't actually necessary. +} + +func (r *ClientRoute) StopTask() { + // TODO: + fmt.Printf ("ClientRoute StopTask not implemented yet\n") + // TOOD: stop all peer connection jobs +} + +func (r* ClientRoute) ConnectToPeer(pts_id uint32) { + var err error + var conn *net.TCPConn + var ptc *ClientPeerConn + +// MAKE thesse into a separte go rountine... so it doesn't block + conn, err = net.DialTCP("tcp", nil, r.peer_addr); + if err != nil { + fmt.Printf ("failed to connect to %s - %s\n", r.peer_addr.String(), err.Error()) + return + } + + ptc, err = r.AddNewClientPeerConn(conn) + if err != nil { + // TODO: logging + fmt.Printf("YYYYYYYY - %s\n", err.Error()) + conn.Close() + return + } + fmt.Printf("STARTED NEW SERVER PEER STAK\n") + + r.ptc_wg.Add(1) + go ptc.RunTask() + r.ptc_wg.Wait() + conn.Close() // don't care about double close. it could have been closed in StopTask +} + +func (r* ClientRoute) ReportEvent (pts_id uint32, event_type PACKET_KIND, event_data []byte) error { + switch event_type { + case PACKET_KIND_PEER_STARTED: + go r.ConnectToPeer(pts_id) + + // TODO: other types + } + + return nil +} + +// -------------------------------------------------------------------- + +func (cts *ServerConn) AddNewClientRoute(route_id uint32, addr *net.TCPAddr, proto ROUTE_PROTO) (*ClientRoute, error) { + var r *ClientRoute + + cts.route_mtx.Lock() + if cts.routes[route_id] != nil { + cts.route_mtx.Unlock() + return nil, fmt.Errorf ("existent route id - %d", route_id) + } + r = NewClientRoute(cts, route_id, addr, proto) + cts.routes[route_id] = r + cts.route_mtx.Unlock() + +fmt.Printf ("added client route.... %d -> %d\n", route_id, len(cts.routes)) + go r.RunTask() + return r, nil +} + +func (cts *ServerConn) RemoveClientRoute (route_id uint32) error { + var r *ClientRoute + var ok bool + + cts.route_mtx.Lock() + r, ok = cts.routes[route_id] + if (!ok) { + cts.route_mtx.Unlock() + return fmt.Errorf ("non-existent route id - %d", route_id) + } + delete(cts.routes, route_id) + cts.route_mtx.Unlock() + + r.StopTask() // TODO: make this unblocking or blocking? + return nil; +} + +func (cts *ServerConn) AddClientRoutes (peer_addrs []string) error { + var i int + var v string + var addr *net.TCPAddr + var proto ROUTE_PROTO + var r *ClientRoute + var err error + + for i, v = range peer_addrs { + addr, err = net.ResolveTCPAddr(NET_TYPE_TCP, v) + if err != nil { + return fmt.Errorf("unable to resovle %s - %s", v, err.Error()) + } + + if addr.IP.To4() != nil { + proto = ROUTE_PROTO_TCP4 + } else { + proto = ROUTE_PROTO_TCP6 + } + + _, err = cts.AddNewClientRoute(uint32(i), addr, proto) + if err != nil { + return fmt.Errorf("unable to add client route for %s", addr) + } + } + + for _, r = range cts.routes { + err = cts.cli.psc.Send(MakeRouteStartPacket(r.id, r.proto, addr.String())) + if err != nil { + return fmt.Errorf("unable to send route-start packet - %s", err.Error()) + } + } + + return nil; +} + +func (cts *ServerConn) ReportEvent (route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error { + var r *ClientRoute + var ok bool + + cts.route_mtx.Lock() + r, ok = cts.routes[route_id] + if (!ok) { + cts.route_mtx.Unlock() + return fmt.Errorf ("non-existent route id - %d", route_id) + } + cts.route_mtx.Unlock() + + return r.ReportEvent(pts_id, event_type, event_data) +} +// -------------------------------------------------------------------- + +func (r *ClientRoute) AddNewClientPeerConn (c* net.TCPConn) (*ClientPeerConn, error) { + var ptc *ClientPeerConn + var ok bool + var start_id uint32 + + r.ptc_mtx.Lock() + defer r.ptc_mtx.Unlock() + + if len(r.ptc_map) >= r.ptc_limit { + return nil, fmt.Errorf("peer-to-client connection table full") + } + + start_id = r.ptc_last_id + for { + _, ok = r.ptc_map[r.ptc_last_id] + if !ok { + break + } + r.ptc_last_id++ + if r.ptc_last_id == start_id { + // unlikely to happen but it cycled through the whole range. + return nil, fmt.Errorf("failed to assign peer-to-table connection id") + } + } + + ptc = NewClientPeerConn(r, c, r.ptc_last_id) + r.ptc_map[ptc.conn_id] = ptc + r.ptc_last_id++ + + return ptc, nil +} +// -------------------------------------------------------------------- + +func (c *Client) AddNewServerConn(addr *net.TCPAddr, psc Hodu_PacketStreamClient) (*ServerConn, error) { + var cts ServerConn + var ok bool + + cts.cli = c + cts.routes = make(ClientRouteMap) + cts.saddr = addr + cts.psc = psc + + cts.stop_req.Store(false) + cts.greeted = false + + c.cts_mtx.Lock() + defer c.cts_mtx.Unlock() + + _, ok = c.cts_map[addr] + if ok { + return nil, fmt.Errorf("existing server - %s", addr.String()) + } + + c.cts_map[addr] = &cts; +fmt.Printf ("ADD total servers %d\n", len(c.cts_map)); + return &cts, nil +} + +func (c *Client) RemoveServerConn(cts *ServerConn) { + c.cts_mtx.Lock() + delete(c.cts_map, cts.saddr) +fmt.Printf ("REMOVE total servers %d\n", len(c.cts_map)); + c.cts_mtx.Unlock() +} + +// -------------------------------------------------------------------- +func NewClient(cfg *ClientConfig, tlscfg *tls.Config) (*Client, error) { + var c Client + var saddr *net.TCPAddr + var err error + + if len(cfg.peer_addrs) < 0 || len(cfg.peer_addrs) > int(^uint16(0)) { // TODO: change this check... not really right... + return nil, fmt.Errorf("no peer addresses or too many peer addresses") + } + + saddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, cfg.server_addr) + if err != nil { + return nil, fmt.Errorf("unable to resolve %s - %s", cfg.server_addr, err.Error()) + } + + c.cfg = cfg + c.tlscfg = tlscfg + c.saddr = saddr + c.cts_map = make(ServerConnMap) // TODO: make it configurable... + c.stop_req.Store(false) + + return &c, nil +} + +func (c *Client) RunTask(ctx context.Context) { + var conn *grpc.ClientConn + var cts *ServerConn + var err error + +// TODO: HANDLE connection timeout.. + // ctx, _/*cancel*/ := context.WithTimeout(context.Background(), time.Second) + conn, err = grpc.NewClient(c.saddr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + // TODO: logging + fmt.Printf("ERROR - unable to connect to %s - %s", c.cfg.server_addr, err.Error()) + return + } + + c.sc = conn + c.sg = NewHoduClient(conn) + + c.psc, err = c.sg.PacketStream(ctx) // TODO: accept external context and use it.L + if err != nil { + conn.Close() + fmt.Printf ("failed to get the packet stream - %s", err.Error()) + return + } + + cts, err = c.AddNewServerConn(c.saddr, c.psc) + if err != nil { + conn.Close() + fmt.Printf ("failed to register connection to server - %s", err.Error()) + return + } + + err = cts.AddClientRoutes(c.cfg.peer_addrs) + if err != nil { + conn.Close() + fmt.Printf("unable to make client routes - %s", err.Error()) + return + } + + for { + var pkt *Packet + + select { + case <-ctx.Done(): + fmt.Printf("context doine... error - %s\n", ctx.Err().Error()) + default: + // no other case is ready. + // without the default case, the select construct would block + } + + pkt, err = c.psc.Recv() + if err == io.EOF { + // return will close stream from server side + fmt.Printf("server disconnected\n") + break + } + if err != nil { + fmt.Printf("server receive error - %s\n", err.Error()) + break + } + + switch pkt.Kind { + case PACKET_KIND_ROUTE_STARTED: + var x *Packet_Route + var ok bool + x, ok = pkt.U.(*Packet_Route) + if ok { + fmt.Printf ("SERVER LISTENING ON %s\n", x.Route.AddrStr); + err = cts.ReportEvent(x.Route.RouteId, 0, pkt.Kind, nil) + if err != nil { + // TODO: + } else { + // TODO: + } + } else { + // TODO: send invalid request... or simply keep quiet? + } + + case PACKET_KIND_ROUTE_STOPPED: + var x *Packet_Route + var ok bool + x, ok = pkt.U.(*Packet_Route) + if ok { + err = cts.ReportEvent(x.Route.RouteId, 0, pkt.Kind, nil) + if err != nil { + // TODO: + } else { + // TODO: + } + } else { + // TODO: send invalid request... or simply keep quiet? + } + + case PACKET_KIND_PEER_STARTED: + // the connection from the client to a peer has been established + var x *Packet_Peer + var ok bool + x, ok = pkt.U.(*Packet_Peer) + if ok { + err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, nil) + if err != nil { + // TODO: + } else { + // TODO: + } + } else { + // TODO + } + + case PACKET_KIND_PEER_STOPPED: + // the connection from the client to a peer has been established + var x *Packet_Peer + var ok bool + x, ok = pkt.U.(*Packet_Peer) + if ok { + err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, nil) + if err != nil { + // TODO: + } else { + // TODO: + } + } else { + // TODO + } + + case PACKET_KIND_PEER_DATA: + // the connection from the client to a peer has been established + var x *Packet_Data + var ok bool + x, ok = pkt.U.(*Packet_Data) + if ok { + err = cts.ReportEvent(x.Data.RouteId, x.Data.PeerId, PACKET_KIND_PEER_DATA, x.Data.Data) + if err != nil { + // TODO: + } else { + // TODO: + } + } else { + // TODO + } + } + } + +//done: + c.ReqStop() // just in case... + c.wg.Wait() + c.sc.Close() +} + +func (c *Client) ReqStop() { + if c.stop_req.CompareAndSwap(false, true) { + // TODO: notify the server.. send term command??? + c.sc.Close() + } +} + +// -------------------------------------------------------------------- + +const rootCert = `-----BEGIN CERTIFICATE----- +MIIB+TCCAZ+gAwIBAgIJAL05LKXo6PrrMAoGCCqGSM49BAMCMFkxCzAJBgNVBAYT +AkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRn +aXRzIFB0eSBMdGQxEjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0xNTEyMDgxNDAxMTNa +Fw0yNTEyMDUxNDAxMTNaMFkxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0 +YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxEjAQBgNVBAMM +CWxvY2FsaG9zdDBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABHGaaHVod0hLOR4d +66xIrtS2TmEmjSFjt+DIEcb6sM9RTKS8TZcdBnEqq8YT7m2sKbV+TEq9Nn7d9pHz +pWG2heWjUDBOMB0GA1UdDgQWBBR0fqrecDJ44D/fiYJiOeBzfoqEijAfBgNVHSME +GDAWgBR0fqrecDJ44D/fiYJiOeBzfoqEijAMBgNVHRMEBTADAQH/MAoGCCqGSM49 +BAMCA0gAMEUCIEKzVMF3JqjQjuM2rX7Rx8hancI5KJhwfeKu1xbyR7XaAiEA2UT7 +1xOP035EcraRmWPe7tO0LpXgMxlh2VItpc2uc2w= +-----END CERTIFICATE----- +` + +func client_main(server_addr string, peer_addrs []string) error { + var c *Client + var err error + var cert_pool *x509.CertPool + var tlscfg *tls.Config + var cc ClientConfig + var wg sync.WaitGroup + + cert_pool = x509.NewCertPool() + ok := cert_pool.AppendCertsFromPEM([]byte(rootCert)) + if !ok { + log.Fatal("failed to parse root certificate") + } + tlscfg = &tls.Config{RootCAs: cert_pool, ServerName: "localhost", InsecureSkipVerify: true} + + cc.server_addr = server_addr + cc.peer_addrs = peer_addrs + c, err = NewClient(&cc, tlscfg) + if err != nil { + fmt.Printf("failed create client - %s\n", err.Error()) + return err + } + + wg.Add(1) + go c.RunTask(context.Background()); + + wg.Wait(); + return nil +} diff --git a/frame.go b/frame.go new file mode 100644 index 0000000..ea04d67 --- /dev/null +++ b/frame.go @@ -0,0 +1,70 @@ +package main + +import "time" + +const NET_TYPE_TCP string = "tcp" + +const FRAME_CODE_CLIENT_HELLO uint8 = 1 +const FRAME_CODE_SERVER_HELLO uint8 = 2 +const FRAME_CODE_SERVER_PEER_OPEN uint8 = 3 +const FRAME_CODE_SERVER_PEER_DATA uint8 = 4 +const FRAME_CODE_SERVER_PEER_CLOSE uint8 = 5 +const FRAME_CODE_SERVER_PEER_ERROR uint8 = 6 +const FRAME_CODE_CLIENT_PEER_OPENED uint8 = 7 +const FRAME_CODE_CLIENT_PEER_DATA uint8 = 8 +const FRAME_CODE_CLIENT_PEER_CLOSED uint8 = 9 +const FRAME_CODE_CLIENT_PEER_ERROR uint8 = 10 + +const FRAME_OPTION_PORT uint8 = 1 +const FRAME_OPTION_REQNET4 uint8 = 2 +const FRAME_OPTION_REQNET6 uint8 = 3 + +type FrameHeader struct { + Len uint16 // length of whole packet including the header + Code uint8 + ChanId uint8 + ConnId uint16 +} + +type FrameOptionHeader struct { + Len uint8 + Code uint8 +} + +type FreameOptionRetnet4Data struct { + addr [4]byte + pfxlen uint8 +} + +type FreameOptionRetnet6Data struct { + addr [16]byte + pfxlen uint8 +} + +type FrameClientHelloData struct { + AuthKey [64]byte // TODO: How to send this securely??? + ReqChan uint8 + /* this can be followed by variable options */ +} + +type FrameServerHelloData struct { + AuthCode uint8 +} + +func read_chan_with_tmout(chan_bool chan bool, tmout time.Duration) (bool, bool) { + var tmr *time.Timer + var b bool + var timed bool + + tmr = time.NewTimer(tmout) + + select { + case <-tmr.C: + timed = true + b = false + case b = <-chan_bool: + tmr.Stop() + timed = false + } + return timed, b +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..09caa43 --- /dev/null +++ b/go.mod @@ -0,0 +1,16 @@ +module hodu + +go 1.22.0 + +require ( + github.com/google/uuid v1.6.0 + google.golang.org/grpc v1.67.1 + google.golang.org/protobuf v1.34.2 +) + +require ( + golang.org/x/net v0.28.0 // indirect + golang.org/x/sys v0.24.0 // indirect + golang.org/x/text v0.17.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..9246f02 --- /dev/null +++ b/go.sum @@ -0,0 +1,16 @@ +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 h1:e7S5W7MGGLaSu8j3YjdezkZ+m1/Nm0uRVRMEMGk26Xs= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= +google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= +google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= diff --git a/hodu.proto b/hodu.proto new file mode 100644 index 0000000..effb602 --- /dev/null +++ b/hodu.proto @@ -0,0 +1,54 @@ +syntax = "proto3"; + +option go_package = "./main"; + +//package hodu; // no idea if it's still important... + +service Hodu { + rpc PacketStream (stream Packet) returns (stream Packet) {} +} + +enum ROUTE_PROTO { + TCP = 0; + TCP4 = 1; + TCP6 = 2; +}; + +message RouteDesc { + uint32 RouteId = 1; + ROUTE_PROTO Proto = 2; + string AddrStr = 3; +}; + +message PeerDesc { + uint32 RouteId = 1; + uint32 PeerId = 2; +}; + +message PeerData { + uint32 RouteId = 1; + uint32 PeerId = 2; + bytes Data = 3; +}; + +enum PACKET_KIND { + ERROR = 0; // generic error response + OK = 1; // generic success response + ROUTE_START = 2; + ROUTE_STOP = 3; + ROUTE_STARTED = 4; + ROUTE_STOPPED = 5; + PEER_STARTED = 6; + PEER_STOPPED = 7; + PEER_DATA = 8; +}; + +message Packet { + PACKET_KIND Kind = 1; + + oneof U { + RouteDesc Route = 2; + PeerDesc Peer = 3; + PeerData Data = 4; + }; +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..f796f82 --- /dev/null +++ b/main.go @@ -0,0 +1,42 @@ +package main + +import "fmt" +import "os" +import "strings" + +func main() { + var err error + + if len(os.Args) < 2 { + goto wrong_usage + } + + if strings.EqualFold(os.Args[1], "server") { + if len(os.Args) < 3 { + goto wrong_usage + } + err = server_main(os.Args[2:]) + if err != nil { + fmt.Fprintf(os.Stderr, "ERROR: serer error - %s\n", err.Error()) + os.Exit(1) + } + } else if strings.EqualFold(os.Args[1], "client") { + if len(os.Args) < 4 { + goto wrong_usage + } + err = client_main(os.Args[2], os.Args[3:]) + if err != nil { + fmt.Fprintf(os.Stderr, "ERROR: client error - %s\n", err.Error()) + os.Exit(1) + } + } else { + goto wrong_usage + } + + os.Exit(0) + +wrong_usage: + fmt.Fprintf(os.Stderr, "USAGE: %s server listen-addr:listen-port\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " %s client target-addr:target-port peer-addr:peer-port\n", os.Args[0]) + os.Exit(1) +} diff --git a/main.proto b/main.proto new file mode 100644 index 0000000..adcc6ae --- /dev/null +++ b/main.proto @@ -0,0 +1,44 @@ +syntax = "proto3"; +package main; + +option go_package = "."; + +service Control { + rpc Open (OpenReq) returns (OpenRes); + rpc Close (CloseReq) returns (CloseRes); + rpc Channel (stream Frame) returns (stream Frame); +} + +message OpenReq { + string abc = 1; +} + +message OpenRes { + string def = 1; +} + +message CloseReq { + string abc = 1; +} + +message CloseRes { + string def = 1; +} + +message Data { + uint32 chan_id = 1; + uint32 conn_id = 2; + bytes data = 3; +} + +message Error { + uint32 code = 1; +} + +message Frame { + oneof frame { + Data data = 1; + Error error = 2; + } +} + diff --git a/packet.go b/packet.go new file mode 100644 index 0000000..6fe27c6 --- /dev/null +++ b/packet.go @@ -0,0 +1,53 @@ +package main + + +func MakeRouteStartPacket(route_id uint32, proto ROUTE_PROTO, addr string) *Packet { + return &Packet{ + Kind: PACKET_KIND_ROUTE_START, + U: &Packet_Route{Route: &RouteDesc{RouteId: route_id, Proto: proto, AddrStr: addr}}} +} + +func MakeRouteStartedPacket(route_id uint32, proto ROUTE_PROTO, addr string) *Packet { + // the connection from a peer to the server has been established + return &Packet{Kind: PACKET_KIND_ROUTE_STARTED, + U: &Packet_Route{Route: &RouteDesc{RouteId: route_id, Proto: proto, AddrStr: addr}}} +} + +func MakeRouteStoppedPacket(route_id uint32, proto ROUTE_PROTO) *Packet { + // the connection from a peer to the server has been established + return &Packet{Kind: PACKET_KIND_ROUTE_STOPPED, + U: &Packet_Route{Route: &RouteDesc{RouteId: route_id, Proto: proto}}} +} + + +func MakePeerStartedPacket(route_id uint32, pts_id uint32) *Packet { + // the connection from a peer to the server has been established + return &Packet{Kind: PACKET_KIND_PEER_STARTED, + U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: pts_id}}, + } +} + +func MakePeerStoppedPacket(route_id uint32, pts_id uint32) *Packet { + return &Packet{Kind: PACKET_KIND_PEER_STOPPED, + U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: pts_id}, + }} +} + +func MakePtcStartedPacket(route_id uint32, pts_id uint32) *Packet { + // the connection from the client to a peer has been established + return &Packet{Kind: PACKET_KIND_PEER_STARTED, + U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: pts_id}}, + } +} + +func MakePtcStoppedPacket(route_id uint32, pts_id uint32) *Packet { + return &Packet{Kind: PACKET_KIND_PEER_STOPPED, + U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: pts_id}, + }} +} + +func MakePeerDataPacket(route_id uint32, pts_id uint32, data []byte) *Packet { + return &Packet{Kind: PACKET_KIND_PEER_DATA, + U: &Packet_Data{Data: &PeerData{RouteId: route_id, PeerId: pts_id, Data: data}, + }} +} diff --git a/s-peer.go b/s-peer.go new file mode 100644 index 0000000..5da0778 --- /dev/null +++ b/s-peer.go @@ -0,0 +1,147 @@ +package main + +import "fmt" +import "net" +import "sync/atomic" +import "time" + +type ServerPeerConn struct { + route *ServerRoute + conn_id uint32 + cts *ClientConn + conn *net.TCPConn + stop_req atomic.Bool + client_peer_status_chan chan bool + client_peer_opened_received atomic.Bool + client_peer_closed_received atomic.Bool +} + +func NewServerPeerConn(r *ServerRoute, c *net.TCPConn, id uint32) (*ServerPeerConn) { + var spc ServerPeerConn + + spc.route = r + spc.conn = c + spc.conn_id = id + spc.stop_req.Store(false) + spc.client_peer_status_chan = make(chan bool, 16) + spc.client_peer_opened_received.Store(false) + spc.client_peer_closed_received.Store(false) + + return &spc +} + +func (spc *ServerPeerConn) RunTask() error { + var pss Hodu_PacketStreamServer + var n int + var buf [4096]byte + var tmr *time.Timer + var status bool + var err error = nil + + pss = spc.route.cts.pss +//TODO: this needs to be guarded + err = pss.Send(MakePeerStartedPacket(spc.route.id, spc.conn_id)) + if err != nil { + // TODO: include route id and conn id in the error message + err = fmt.Errorf("unable to send start-pts - %s\n", err.Error()) + goto done + } + + tmr = time.NewTimer(2 * time.Second) // TODO: make this configurable... +wait_for_started: + for { + select { + case status = <- spc.client_peer_status_chan: + if status { + break wait_for_started + } else { + // the socket must have been closed too. + goto done + } + + case <- tmr.C: + // connection failure, not in time + tmr.Stop() + goto done + + /*case <- spc->ctx->Done(): + tmr.Stop() + goto done*/ + } + } + tmr.Stop() + + for { + n, err = spc.conn.Read(buf[:]) + if err != nil { + fmt.Printf("read error - %s\n", err.Error()) + break + } + +// TODO: this needs to be guarded + err = pss.Send(MakePeerDataPacket(spc.route.id, spc.conn_id, buf[:n])) + if err != nil { + // TODO: include route id and conn id in the error message + err = fmt.Errorf("unable to send data - %s\n", err.Error()) + goto done; + } + } + +done: + fmt.Printf("spc really ending..................\n") + spc.ReqStop() + spc.route.RemoveServerPeerConn(spc) + //spc.cts.wg.Done() + return err +} + +func (spc *ServerPeerConn) ReqStop() { + if spc.stop_req.CompareAndSwap(false, true) { + var pss Hodu_PacketStreamServer + var err error + + pss = spc.route.cts.pss + + if spc.client_peer_opened_received.CompareAndSwap(false, true) { + spc.client_peer_status_chan <- false + } + spc.conn.Close() + err = pss.Send(MakePeerStoppedPacket(spc.route.id, spc.conn_id)) + if err != nil { + // TODO: print warning + fmt.Printf ("WARNING - failed to report event to %s - %s\n", spc.route.cts.caddr, err.Error()) + } + } +} + +func (spc *ServerPeerConn) ReportEvent (event_type PACKET_KIND, event_data []byte) error { + + switch event_type { + case PACKET_KIND_PEER_STARTED: + if spc.client_peer_opened_received.CompareAndSwap(false, true) { + spc.client_peer_status_chan <- true + } + + case PACKET_KIND_PEER_STOPPED: + if spc.client_peer_closed_received.CompareAndSwap(false, true) { + spc.client_peer_status_chan <- false + } + + case PACKET_KIND_PEER_DATA: + var err error + + _, err = spc.conn.Write(event_data) + if err != nil { + // TODO: logging + fmt.Printf ("WARNING - failed to write data from %s to %s\n", spc.route.cts.caddr, spc.conn.RemoteAddr().String()) + } + + default: + // ignore all other events + // TODO: produce warning in debug mode + } + return nil +} + + + diff --git a/server.go b/server.go new file mode 100644 index 0000000..e315a2b --- /dev/null +++ b/server.go @@ -0,0 +1,790 @@ +package main + +//import "bufio" +//import "bytes" +import "context" +import "crypto/tls" +import "fmt" +import "io" +import "math/rand" +import "net" +import "os" +import "os/signal" +import "sync" +import "sync/atomic" +import "syscall" +import "time" + +import "google.golang.org/grpc" +import "google.golang.org/grpc/metadata" +import "google.golang.org/grpc/peer" +import "google.golang.org/grpc/stats" + +const PTS_LIMIT = 8192 +//const CTS_LIMIT = 2048 + +type ClientConnMap = map[net.Addr]*ClientConn +type ServerPeerConnMap = map[uint32]*ServerPeerConn +type ServerRouteMap = map[uint32]*ServerRoute + +type Server struct { + tlscfg *tls.Config + l []*net.TCPListener // central listener + l_wg sync.WaitGroup + + cts_mtx sync.Mutex + cts_map ClientConnMap + wg sync.WaitGroup + stop_req atomic.Bool + + // grpc stuffs + gs *grpc.Server + UnimplementedHoduServer +} + +// client connection to server. +// client connect to the server, the server accept it, and makes a tunnel request +type ClientConn struct { + svr *Server + caddr net.Addr // client address that created this structure + pss Hodu_PacketStreamServer + + cw_mtx sync.Mutex + route_mtx sync.Mutex + routes ServerRouteMap + //route_wg sync.WaitGroup + + wg sync.WaitGroup + stop_req atomic.Bool + greeted bool +} + +type ServerRoute struct { + cts *ClientConn + l *net.TCPListener + laddr *net.TCPAddr + id uint32 + + pts_mtx sync.Mutex + pts_map ServerPeerConnMap + pts_limit int + pts_last_id uint32 + pts_wg sync.WaitGroup +} + +// ------------------------------------ + +func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, error) { + var pts *ServerPeerConn + var ok bool + var start_id uint32 + + r.pts_mtx.Lock() + defer r.pts_mtx.Unlock() + + if len(r.pts_map) >= r.pts_limit { + return nil, fmt.Errorf("peer-to-server connection table full") + } + + start_id = r.pts_last_id + for { + _, ok = r.pts_map[r.pts_last_id] + if !ok { + break + } + r.pts_last_id++ + if r.pts_last_id == start_id { + // unlikely to happen but it cycled through the whole range. + return nil, fmt.Errorf("failed to assign peer-to-server connection id") + } + } + + pts = NewServerPeerConn(r, c, r.pts_last_id) + r.pts_map[pts.conn_id] = pts + r.pts_last_id++ + + return pts, nil +} + +func (r *ServerRoute) RemoveServerPeerConn(pts *ServerPeerConn) { + r.pts_mtx.Lock() + delete(r.pts_map, pts.conn_id) + r.pts_mtx.Unlock() +} + +// ------------------------------------ +func (r *ServerRoute) RunTask() { + var err error + var conn *net.TCPConn + var pts *ServerPeerConn + + for { + conn, err = r.l.AcceptTCP() + if err != nil { + // TODO: logging + fmt.Printf("[%s,%d] accept failure - %s\n", r.cts.caddr.String(), r.id, err.Error()) + break + } + + pts, err = r.AddNewServerPeerConn(conn) + if err != nil { + // TODO: logging + fmt.Printf("YYYYYYYY - %s\n", err.Error()) + conn.Close() + } else { + fmt.Printf("STARTED NEW SERVER PEER STAK\n") + r.pts_wg.Add(1) + go pts.RunTask() + } + } + + r.l.Close() // don't care about double close. it could have been closed in StopTask + r.pts_wg.Wait() + +// cts.l_wg.Done() +// TODO:inform that the job is done? +} + +func (r *ServerRoute) StopTask() { +fmt.Printf ("stoppping stak..\n") + // TODO: all pts stop... + r.l.Close(); +// TODO: wait?? +} + +func (r *ServerRoute) ReportEvent (pts_id uint32, event_type PACKET_KIND, event_data []byte) error { + var spc *ServerPeerConn + var ok bool + + r.pts_mtx.Lock() + spc, ok = r.pts_map[pts_id] + if !ok { + return fmt.Errorf("non-existent peer id - %u", pts_id) + } + r.pts_mtx.Unlock(); + + return spc.ReportEvent(event_type, event_data) +} +// ------------------------------------ + +func (cts *ClientConn) make_route_listener(proto ROUTE_PROTO) (*net.TCPListener, *net.TCPAddr, error) { + var l *net.TCPListener + var err error + var laddr *net.TCPAddr + var port int + var tries int = 0 + var nw string + + switch proto { + case ROUTE_PROTO_TCP: + nw = "tcp" + case ROUTE_PROTO_TCP4: + nw = "tcp4" + case ROUTE_PROTO_TCP6: + nw = "tcp6" + } + + for { + port = rand.Intn(65535-32000+1) + 32000 + + laddr, err = net.ResolveTCPAddr(nw, fmt.Sprintf(":%d", port)) + if err == nil { + l, err = net.ListenTCP(nw, laddr) // make the binding address configurable. support multiple binding addresses??? + if err == nil { + fmt.Printf("listening .... on ... %d\n", port) + return l, laddr, nil + } + } + + // TODO: implement max retries.. + tries++ + if tries >= 1000 { + err = fmt.Errorf("unable to allocate port") + break + } + } + + return nil, nil, err +} + +func NewServerRoute(cts *ClientConn, id uint32, proto ROUTE_PROTO) (*ServerRoute, error) { + var r ServerRoute + var l *net.TCPListener + var laddr *net.TCPAddr + var err error + + l, laddr, err = cts.make_route_listener(proto); + if err != nil { + return nil, err + } + + r.cts = cts + r.id = id + r.l = l + r.laddr = laddr + r.pts_limit = PTS_LIMIT + r.pts_map = make(ServerPeerConnMap) + r.pts_last_id = 0 + + return &r, nil; +} + +func (cts *ClientConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO) (*ServerRoute, error) { + var r *ServerRoute + var err error + + cts.route_mtx.Lock() + if cts.routes[route_id] != nil { + cts.route_mtx.Unlock() + return nil, fmt.Errorf ("existent route id - %d", route_id) + } + r, err = NewServerRoute(cts, route_id, proto) + if err != nil { + cts.route_mtx.Unlock() + return nil, err + } + cts.routes[route_id] = r; + cts.route_mtx.Unlock() + + go r.RunTask() + return r, nil +} + +func (cts *ClientConn) RemoveServerRoute (route_id uint32) error { + var r *ServerRoute + var ok bool + + cts.route_mtx.Lock() + r, ok = cts.routes[route_id] + if (!ok) { + cts.route_mtx.Unlock() + return fmt.Errorf ("non-existent route id - %d", route_id) + } + delete(cts.routes, route_id) + cts.route_mtx.Unlock() + + r.StopTask() // TODO: make this unblocking or blocking? + return nil; +} + +func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error { + var r *ServerRoute + var ok bool + + cts.route_mtx.Lock() + r, ok = cts.routes[route_id] + if (!ok) { + cts.route_mtx.Unlock() + return fmt.Errorf ("non-existent route id - %d", route_id) + } + cts.route_mtx.Unlock() + + return r.ReportEvent(pts_id, event_type, event_data) +} + +func (cts *ClientConn) ReqStop() { + if cts.stop_req.CompareAndSwap(false, true) { + var r *ServerRoute + + for _, r = range cts.routes { + r.StopTask() + } + + //cts.c.Close() // close the accepted connection from the client + } +} + +// ------------------------------------ + +func handle_os_signals(s *Server, exit_chan chan<- bool) { + var ( + sighup_chan chan os.Signal + sigterm_chan chan os.Signal + sig os.Signal + ) + + sighup_chan = make(chan os.Signal, 1) + sigterm_chan = make(chan os.Signal, 1) + + signal.Notify(sighup_chan, syscall.SIGHUP) + signal.Notify(sigterm_chan, syscall.SIGTERM, os.Interrupt) + +chan_loop: + for { + select { + case <-sighup_chan: + // TODO: + //s.RefreshConfig() + case sig = <-sigterm_chan: + // TODO: get timeout value from config + //s.Shutdown(fmt.Sprintf("termination by signal %s", sig), 3*time.Second) + s.ReqStop() + //log.Debugf("termination by signal %s", sig) + fmt.Printf("termination by signal %s\n", sig) + exit_chan <- true + break chan_loop + } + } +} + +// -------------------------------------------------------------------- + +func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error { + var ctx context.Context + var p *peer.Peer + var ok bool + var pkt *Packet + var err error + var cts *ClientConn + + ctx = strm.Context() + p, ok = peer.FromContext(ctx) + if (!ok) { + return fmt.Errorf("failed to get peer from packet stream context") + } + + cts, err = s.AddNewClientConn(p.Addr, strm) + if err != nil { + return fmt.Errorf("unable to add client %s - %s", p.Addr.String(), err.Error()) + } + + + for { + // exit if context is done + // or continue + select { + case <-ctx.Done(): + return ctx.Err() + default: + // no other case is ready. + // without the default case, the select construct would block + } + + pkt, err = strm.Recv() + if err == io.EOF { + // return will close stream from server side + return nil + } + if err != nil { + //log.Printf("receive error %v", err) + continue + } + + switch pkt.Kind { + case PACKET_KIND_ROUTE_START: + var x *Packet_Route + //var t *ServerRoute + var ok bool + x, ok = pkt.U.(*Packet_Route) + if ok { + var r* ServerRoute + fmt.Printf ("ADDED SERVER ROUTE FOR CLEINT PEER %s\n", x.Route.AddrStr) + r, err = cts.AddNewServerRoute(x.Route.RouteId, x.Route.Proto) + if err != nil { + // TODO: Send Error Response... + } else { + err = strm.Send(MakeRouteStartedPacket(r.id, x.Route.Proto, r.laddr.String())) + if err != nil { + // TODO: + } + } + } else { + // TODO: send invalid request... or simply keep quiet? + } + + case PACKET_KIND_ROUTE_STOP: + var x *Packet_Route + var ok bool + x, ok = pkt.U.(*Packet_Route) + if ok { + err = cts.RemoveServerRoute(x.Route.RouteId); // TODO: this must be unblocking. otherwide, other routes will get blocked... + if err != nil { + // TODO: Send Error Response... + } else { + err = strm.Send(MakeRouteStoppedPacket(x.Route.RouteId, x.Route.Proto)) + if err != nil { + // TODO: + } + } + } else { + // TODO: send invalid request... or simply keep quiet? + } + + case PACKET_KIND_PEER_STARTED: + // the connection from the client to a peer has been established + var x *Packet_Peer + var ok bool + x, ok = pkt.U.(*Packet_Peer) + if ok { + err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, nil) + if err != nil { + // TODO: + } else { + // TODO: + } + } else { + // TODO + } + + case PACKET_KIND_PEER_STOPPED: + // the connection from the client to a peer has been established + var x *Packet_Peer + var ok bool + x, ok = pkt.U.(*Packet_Peer) + if ok { + err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, nil) + if err != nil { + // TODO: + } else { + // TODO: + } + } else { + // TODO + } + + case PACKET_KIND_PEER_DATA: + // the connection from the client to a peer has been established + var x *Packet_Data + var ok bool + x, ok = pkt.U.(*Packet_Data) + if ok { + err = cts.ReportEvent(x.Data.RouteId, x.Data.PeerId, PACKET_KIND_PEER_DATA, x.Data.Data) + if err != nil { + // TODO: + } else { + // TODO: + } + } else { + // TODO + } + } + } +} + +// ------------------------------------ + +type ConnCatcher struct { + server *Server +} + +func (cc *ConnCatcher) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + return ctx +} + +func (cc *ConnCatcher) HandleRPC(ctx context.Context, s stats.RPCStats) { +} + +func (cc *ConnCatcher) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { + return ctx; + //return context.TODO() +} + +func (cc *ConnCatcher) HandleConn(ctx context.Context, cs stats.ConnStats) { +// fmt.Println(ctx.Value("user_id")) // Returns nil, can't access the value + var p *peer.Peer + var ok bool + var addr string + + p, ok = peer.FromContext(ctx) + if (!ok) { + addr = "" + } else { + addr = p.Addr.String() + } + +md,ok:=metadata.FromIncomingContext(ctx) +fmt.Printf("%+v%+v\n",md,ok) +if ok { +} + switch cs.(type) { + case *stats.ConnBegin: + fmt.Printf("**** client connected - [%s]\n", addr) + case *stats.ConnEnd: + fmt.Printf("**** client disconnected - [%s]\n", addr) + cc.server.RemoveClientConnByAddr(p.Addr); + } +} + +// wrappedStream wraps around the embedded grpc.ServerStream, and intercepts the RecvMsg and +// SendMsg method call. +type wrappedStream struct { + grpc.ServerStream +} + +func (w *wrappedStream) RecvMsg(m any) error { + fmt.Printf("Receive a message (Type: %T) at %s\n", m, time.Now().Format(time.RFC3339)) + return w.ServerStream.RecvMsg(m) +} + +func (w *wrappedStream) SendMsg(m any) error { + fmt.Printf("Send a message (Type: %T) at %v\n", m, time.Now().Format(time.RFC3339)) + return w.ServerStream.SendMsg(m) +} + +func newWrappedStream(s grpc.ServerStream) grpc.ServerStream { + return &wrappedStream{s} +} + +func streamInterceptor(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + // authentication (token verification) +/* + md, ok := metadata.FromIncomingContext(ss.Context()) + if !ok { + return errMissingMetadata + } + if !valid(md["authorization"]) { + return errInvalidToken + } +*/ + + err := handler(srv, newWrappedStream(ss)) + if err != nil { + fmt.Printf("RPC failed with error: %v\n", err) + } + return err +} + +func unaryInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + // authentication (token verification) +/* + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, errMissingMetadata + } + if !valid(md["authorization"]) { +// return nil, errInvalidToken + } +*/ + m, err := handler(ctx, req) + if err != nil { + fmt.Printf("RPC failed with error: %v\n", err) + } +fmt.Printf ("RPC OK\n"); + return m, err +} + +func NewServer(laddrs []string, tlscfg *tls.Config) (*Server, error) { + var s Server + var l *net.TCPListener + var laddr *net.TCPAddr + var err error + var addr string + var gl *net.TCPListener + + if len(laddrs) <= 0 { + return nil, fmt.Errorf("no or too many addresses provided") + } + + /* create the specified number of listeners */ + s.l = make([]*net.TCPListener, 0) + for _, addr = range laddrs { + laddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, addr) + if err != nil { + goto oops + } + + l, err = net.ListenTCP(NET_TYPE_TCP, laddr) + if err != nil { + goto oops + } + + s.l = append(s.l, l) + } + + s.tlscfg = tlscfg + s.cts_map = make(ClientConnMap) // TODO: make it configurable... + s.stop_req.Store(false) +/* + creds, err := credentials.NewServerTLSFromFile(data.Path("x509/server_cert.pem"), data.Path("x509/server_key.pem")) + if err != nil { + log.Fatalf("failed to create credentials: %v", err) + } + gs = grpc.NewServer(grpc.Creds(creds)) +*/ + s.gs = grpc.NewServer( + grpc.UnaryInterceptor(unaryInterceptor), + grpc.StreamInterceptor(streamInterceptor), + grpc.StatsHandler(&ConnCatcher{ server: &s }), + ) // TODO: have this outside the server struct? + RegisterHoduServer (s.gs, &s) + + return &s, nil + +oops: +/* TODO: check if gs needs to be closed... */ + if gl != nil { + gl.Close() + } + + for _, l = range s.l { + l.Close() + } + s.l = make([]*net.TCPListener, 0) + return nil, err +} + +func (s *Server) run_grpc_server(idx int) error { + var l *net.TCPListener + var err error + + l = s.l[idx] + fmt.Printf ("serving grpc on %d listener\n", idx) + // it seems to be safe to call a single grpc server on differnt listening sockets multiple times + // TODO: check if this assumption is ok + err = s.gs.Serve(l); + if err != nil { + fmt.Printf ("XXXXXXXXXXXXXXXXXXXXXXXXXxx %s\n", err.Error()); + } + + s.l_wg.Done(); + return nil +} + +func (s *Server) MainLoop() error { + var idx int + + for idx, _ = range s.l { + s.l_wg.Add(1) + go s.run_grpc_server(idx) + } + + s.l_wg.Wait(); + s.ReqStop() + s.wg.Wait() + + return nil +} + +func (s *Server) ReqStop() { + if s.stop_req.CompareAndSwap(false, true) { + var l *net.TCPListener + var cts *ClientConn + + //s.gs.GracefulStop() + s.gs.Stop() + for _, l = range s.l { + l.Close() + } + + s.cts_mtx.Lock() // TODO: this mya create dead-lock. check possibility of dead lock??? + for _, cts = range s.cts_map { + cts.ReqStop() // request to stop connections from/to peer held in the cts structure + } + s.cts_mtx.Unlock() + } +} + +func (s *Server) AddNewClientConn(addr net.Addr, pss Hodu_PacketStreamServer) (*ClientConn, error) { + var cts ClientConn + var ok bool + + cts.svr = s + cts.routes = make(ServerRouteMap) + cts.caddr = addr + cts.pss = pss + + cts.stop_req.Store(false) + cts.greeted = false + + s.cts_mtx.Lock() + defer s.cts_mtx.Unlock() + + _, ok = s.cts_map[addr] + if ok { + return nil, fmt.Errorf("existing client - %s", addr.String()) + } + + s.cts_map[addr] = &cts; +fmt.Printf ("ADD total clients %d\n", len(s.cts_map)); + return &cts, nil +} + +func (s *Server) RemoveClientConn(cts *ClientConn) { + s.cts_mtx.Lock() + delete(s.cts_map, cts.caddr) +fmt.Printf ("REMOVE total clients %d\n", len(s.cts_map)); + s.cts_mtx.Unlock() +} + +func (s *Server) RemoveClientConnByAddr(addr net.Addr) { + var cts *ClientConn + var ok bool + + s.cts_mtx.Lock() + defer s.cts_mtx.Unlock() + + cts, ok = s.cts_map[addr] + if ok { + cts.ReqStop() + delete(s.cts_map, cts.caddr) + } +} + +func (s *Server) FindClientConnByAddr (addr net.Addr) *ClientConn { + var cts *ClientConn + var ok bool + + s.cts_mtx.Lock() + defer s.cts_mtx.Unlock() + + cts, ok = s.cts_map[addr] + if !ok { + return nil + } + + return cts +} + +// -------------------------------------------------------------------- + +const serverKey = `-----BEGIN EC PARAMETERS----- +BggqhkjOPQMBBw== +-----END EC PARAMETERS----- +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIHg+g2unjA5BkDtXSN9ShN7kbPlbCcqcYdDu+QeV8XWuoAoGCCqGSM49 +AwEHoUQDQgAEcZpodWh3SEs5Hh3rrEiu1LZOYSaNIWO34MgRxvqwz1FMpLxNlx0G +cSqrxhPubawptX5MSr02ft32kfOlYbaF5Q== +-----END EC PRIVATE KEY----- +` + +const serverCert = `-----BEGIN CERTIFICATE----- +MIIB+TCCAZ+gAwIBAgIJAL05LKXo6PrrMAoGCCqGSM49BAMCMFkxCzAJBgNVBAYT +AkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRn +aXRzIFB0eSBMdGQxEjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0xNTEyMDgxNDAxMTNa +Fw0yNTEyMDUxNDAxMTNaMFkxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0 +YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxEjAQBgNVBAMM +CWxvY2FsaG9zdDBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABHGaaHVod0hLOR4d +66xIrtS2TmEmjSFjt+DIEcb6sM9RTKS8TZcdBnEqq8YT7m2sKbV+TEq9Nn7d9pHz +pWG2heWjUDBOMB0GA1UdDgQWBBR0fqrecDJ44D/fiYJiOeBzfoqEijAfBgNVHSME +GDAWgBR0fqrecDJ44D/fiYJiOeBzfoqEijAMBgNVHRMEBTADAQH/MAoGCCqGSM49 +BAMCA0gAMEUCIEKzVMF3JqjQjuM2rX7Rx8hancI5KJhwfeKu1xbyR7XaAiEA2UT7 +1xOP035EcraRmWPe7tO0LpXgMxlh2VItpc2uc2w= +-----END CERTIFICATE----- +` + +func server_main(laddrs []string) error { + var s *Server + var err error + var exit_chan chan bool + var cert tls.Certificate + + cert, err = tls.X509KeyPair([]byte(serverCert), []byte(serverKey)) + if err != nil { + return fmt.Errorf("ERROR: failed to load key pair - %s\n", err) + } + + s, err = NewServer(laddrs, &tls.Config{Certificates: []tls.Certificate{cert}}) + if err != nil { + return fmt.Errorf("ERROR: failed to create new server - %s", err.Error()) + } + + exit_chan = make(chan bool, 1) + go handle_os_signals(s, exit_chan) + err = s.MainLoop() // this is blocking. ReqStop() will be called from a signal handler + if err != nil { + return err + } + + <-exit_chan // wait until the term signal handler almost reaches the end + return nil +}