commit f02536bf24a1b97c355164c7280a405ed15b1eef Author: hyung-hwan Date: Tue Nov 12 22:59:37 2024 +0900 added some experimental code using grpc diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..6ef2945 --- /dev/null +++ b/Makefile @@ -0,0 +1,5 @@ +all: + protoc --go_out=. --go_opt=paths=source_relative \ + --go-grpc_out=. --go-grpc_opt=paths=source_relative \ + hodu.proto + go build -x -o hodu diff --git a/README.md b/README.md new file mode 100644 index 0000000..55aeb8a --- /dev/null +++ b/README.md @@ -0,0 +1,5 @@ + +hodu client [ ...] + + client requests server that it grants access to the list of peers + reserver diff --git a/c-peer.go b/c-peer.go new file mode 100644 index 0000000..be62e23 --- /dev/null +++ b/c-peer.go @@ -0,0 +1,58 @@ +package main + +import "fmt" +import "net" + +func NewClientPeerConn(r *ClientRoute, c *net.TCPConn, id uint32) (*ClientPeerConn) { + var cpc ClientPeerConn + + cpc.route = r + cpc.conn = c + cpc.conn_id = id + cpc.stop_req.Store(false) + //cpc.server_peer_status_chan = make(chan bool, 16) + //cpc.server_peer_opened_received.Store(false) + //cpc.server_peer_closed_received.Store(false) + + return &cpc +} + +func (cpc *ClientPeerConn) RunTask() error { + //var conn *net.TCPConn + //var addr *net.TCPAddr + var err error + var buf [4096]byte + var n int + + + fmt.Printf("CONNECTION ESTABLISHED TO PEER... ABOUT TO READ DATA...\n") + for { + n, err = cpc.conn.Read(buf[:]) + if err != nil { + fmt.Printf("unable to read from the client-side peer %s - %s\n", cpc.addr, err.Error()) + break + } + +// TODO: guarded call.. + err = cpc.route.cts.psc.Send(MakePeerDataPacket(cpc.route.id, cpc.conn_id, buf[0:n])) + if err != nil { + fmt.Printf("unable to write data to server - %s\n", err.Error()) + break + } + } + +//done: + cpc.ReqStop() + //cpc.c.RemoveClientPeerConn(cpc) + //cpc.c.wg.Done() + return nil +} + +func (cpc *ClientPeerConn) ReqStop() { + // TODO: because of connect delay in Start, cpc.p may not be yet ready. handle this case... + if cpc.stop_req.CompareAndSwap(false, true) { + if cpc.conn != nil { + cpc.conn.Close() + } + } +} diff --git a/client.go b/client.go new file mode 100644 index 0000000..44fada9 --- /dev/null +++ b/client.go @@ -0,0 +1,535 @@ +package main + + +//import "bufio" +import "context" +import "crypto/tls" +import "crypto/x509" +//import "encoding/binary" +import "fmt" +import "io" +import "log" +import "net" +//import "os" +import "sync" +import "sync/atomic" +//import "syscall" +//import "time" + +//import "github.com/google/uuid" +import "google.golang.org/grpc" +import "google.golang.org/grpc/credentials/insecure" + +const PTC_LIMIT = 8192 + +type PacketStreamClient grpc.BidiStreamingClient[Packet, Packet] + +type ServerConnMap = map[*net.TCPAddr]*ServerConn +type ClientPeerConnMap = map[uint32]*ClientPeerConn +type ClientRouteMap = map[uint32]*ClientRoute + +// -------------------------------------------------------------------- +type ClientConfig struct { + server_addr string + peer_addrs []string +} + +type Client struct { + cfg *ClientConfig + tlscfg *tls.Config + saddr *net.TCPAddr + + sc *grpc.ClientConn // main control connection to the server + sg HoduClient + psc PacketStreamClient + psc_mtx sync.Mutex + + cts_mtx sync.Mutex + cts_map ServerConnMap + wg sync.WaitGroup + stop_req atomic.Bool +} + + +type ClientPeerConn struct { + route *ClientRoute + conn_id uint32 + conn *net.TCPConn + remot_conn_id uint32 + + addr string // peer address + stop_req atomic.Bool +} + +// client connection to server +type ServerConn struct { + cli *Client + saddr *net.TCPAddr // server address that is connected to + psc Hodu_PacketStreamClient + + route_mtx sync.Mutex + routes ClientRouteMap + //route_wg sync.WaitGroup + + //cw_mtx sync.Mutex + + wg sync.WaitGroup + stop_req atomic.Bool + greeted bool +} + +type ClientRoute struct { + cts *ServerConn + id uint32 + peer_addr *net.TCPAddr + proto ROUTE_PROTO + + ptc_mtx sync.Mutex + ptc_map ClientPeerConnMap + ptc_limit int + ptc_last_id uint32 + ptc_wg sync.WaitGroup +} + + +// -------------------------------------------------------------------- +func NewClientRoute(cts *ServerConn, id uint32, addr *net.TCPAddr, proto ROUTE_PROTO) *ClientRoute { + var r ClientRoute + + r.cts = cts + r.id = id + r.ptc_limit = PTC_LIMIT + r.ptc_map = make(ClientPeerConnMap) + r.ptc_last_id = 0 + r.proto = proto + r.peer_addr = addr + + return &r; +} + +func (r *ClientRoute) RunTask() { + // this task on the route object isn't actually necessary. +} + +func (r *ClientRoute) StopTask() { + // TODO: + fmt.Printf ("ClientRoute StopTask not implemented yet\n") + // TOOD: stop all peer connection jobs +} + +func (r* ClientRoute) ConnectToPeer(pts_id uint32) { + var err error + var conn *net.TCPConn + var ptc *ClientPeerConn + +// MAKE thesse into a separte go rountine... so it doesn't block + conn, err = net.DialTCP("tcp", nil, r.peer_addr); + if err != nil { + fmt.Printf ("failed to connect to %s - %s\n", r.peer_addr.String(), err.Error()) + return + } + + ptc, err = r.AddNewClientPeerConn(conn) + if err != nil { + // TODO: logging + fmt.Printf("YYYYYYYY - %s\n", err.Error()) + conn.Close() + return + } + fmt.Printf("STARTED NEW SERVER PEER STAK\n") + + r.ptc_wg.Add(1) + go ptc.RunTask() + r.ptc_wg.Wait() + conn.Close() // don't care about double close. it could have been closed in StopTask +} + +func (r* ClientRoute) ReportEvent (pts_id uint32, event_type PACKET_KIND, event_data []byte) error { + switch event_type { + case PACKET_KIND_PEER_STARTED: + go r.ConnectToPeer(pts_id) + + // TODO: other types + } + + return nil +} + +// -------------------------------------------------------------------- + +func (cts *ServerConn) AddNewClientRoute(route_id uint32, addr *net.TCPAddr, proto ROUTE_PROTO) (*ClientRoute, error) { + var r *ClientRoute + + cts.route_mtx.Lock() + if cts.routes[route_id] != nil { + cts.route_mtx.Unlock() + return nil, fmt.Errorf ("existent route id - %d", route_id) + } + r = NewClientRoute(cts, route_id, addr, proto) + cts.routes[route_id] = r + cts.route_mtx.Unlock() + +fmt.Printf ("added client route.... %d -> %d\n", route_id, len(cts.routes)) + go r.RunTask() + return r, nil +} + +func (cts *ServerConn) RemoveClientRoute (route_id uint32) error { + var r *ClientRoute + var ok bool + + cts.route_mtx.Lock() + r, ok = cts.routes[route_id] + if (!ok) { + cts.route_mtx.Unlock() + return fmt.Errorf ("non-existent route id - %d", route_id) + } + delete(cts.routes, route_id) + cts.route_mtx.Unlock() + + r.StopTask() // TODO: make this unblocking or blocking? + return nil; +} + +func (cts *ServerConn) AddClientRoutes (peer_addrs []string) error { + var i int + var v string + var addr *net.TCPAddr + var proto ROUTE_PROTO + var r *ClientRoute + var err error + + for i, v = range peer_addrs { + addr, err = net.ResolveTCPAddr(NET_TYPE_TCP, v) + if err != nil { + return fmt.Errorf("unable to resovle %s - %s", v, err.Error()) + } + + if addr.IP.To4() != nil { + proto = ROUTE_PROTO_TCP4 + } else { + proto = ROUTE_PROTO_TCP6 + } + + _, err = cts.AddNewClientRoute(uint32(i), addr, proto) + if err != nil { + return fmt.Errorf("unable to add client route for %s", addr) + } + } + + for _, r = range cts.routes { + err = cts.cli.psc.Send(MakeRouteStartPacket(r.id, r.proto, addr.String())) + if err != nil { + return fmt.Errorf("unable to send route-start packet - %s", err.Error()) + } + } + + return nil; +} + +func (cts *ServerConn) ReportEvent (route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error { + var r *ClientRoute + var ok bool + + cts.route_mtx.Lock() + r, ok = cts.routes[route_id] + if (!ok) { + cts.route_mtx.Unlock() + return fmt.Errorf ("non-existent route id - %d", route_id) + } + cts.route_mtx.Unlock() + + return r.ReportEvent(pts_id, event_type, event_data) +} +// -------------------------------------------------------------------- + +func (r *ClientRoute) AddNewClientPeerConn (c* net.TCPConn) (*ClientPeerConn, error) { + var ptc *ClientPeerConn + var ok bool + var start_id uint32 + + r.ptc_mtx.Lock() + defer r.ptc_mtx.Unlock() + + if len(r.ptc_map) >= r.ptc_limit { + return nil, fmt.Errorf("peer-to-client connection table full") + } + + start_id = r.ptc_last_id + for { + _, ok = r.ptc_map[r.ptc_last_id] + if !ok { + break + } + r.ptc_last_id++ + if r.ptc_last_id == start_id { + // unlikely to happen but it cycled through the whole range. + return nil, fmt.Errorf("failed to assign peer-to-table connection id") + } + } + + ptc = NewClientPeerConn(r, c, r.ptc_last_id) + r.ptc_map[ptc.conn_id] = ptc + r.ptc_last_id++ + + return ptc, nil +} +// -------------------------------------------------------------------- + +func (c *Client) AddNewServerConn(addr *net.TCPAddr, psc Hodu_PacketStreamClient) (*ServerConn, error) { + var cts ServerConn + var ok bool + + cts.cli = c + cts.routes = make(ClientRouteMap) + cts.saddr = addr + cts.psc = psc + + cts.stop_req.Store(false) + cts.greeted = false + + c.cts_mtx.Lock() + defer c.cts_mtx.Unlock() + + _, ok = c.cts_map[addr] + if ok { + return nil, fmt.Errorf("existing server - %s", addr.String()) + } + + c.cts_map[addr] = &cts; +fmt.Printf ("ADD total servers %d\n", len(c.cts_map)); + return &cts, nil +} + +func (c *Client) RemoveServerConn(cts *ServerConn) { + c.cts_mtx.Lock() + delete(c.cts_map, cts.saddr) +fmt.Printf ("REMOVE total servers %d\n", len(c.cts_map)); + c.cts_mtx.Unlock() +} + +// -------------------------------------------------------------------- +func NewClient(cfg *ClientConfig, tlscfg *tls.Config) (*Client, error) { + var c Client + var saddr *net.TCPAddr + var err error + + if len(cfg.peer_addrs) < 0 || len(cfg.peer_addrs) > int(^uint16(0)) { // TODO: change this check... not really right... + return nil, fmt.Errorf("no peer addresses or too many peer addresses") + } + + saddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, cfg.server_addr) + if err != nil { + return nil, fmt.Errorf("unable to resolve %s - %s", cfg.server_addr, err.Error()) + } + + c.cfg = cfg + c.tlscfg = tlscfg + c.saddr = saddr + c.cts_map = make(ServerConnMap) // TODO: make it configurable... + c.stop_req.Store(false) + + return &c, nil +} + +func (c *Client) RunTask(ctx context.Context) { + var conn *grpc.ClientConn + var cts *ServerConn + var err error + +// TODO: HANDLE connection timeout.. + // ctx, _/*cancel*/ := context.WithTimeout(context.Background(), time.Second) + conn, err = grpc.NewClient(c.saddr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + // TODO: logging + fmt.Printf("ERROR - unable to connect to %s - %s", c.cfg.server_addr, err.Error()) + return + } + + c.sc = conn + c.sg = NewHoduClient(conn) + + c.psc, err = c.sg.PacketStream(ctx) // TODO: accept external context and use it.L + if err != nil { + conn.Close() + fmt.Printf ("failed to get the packet stream - %s", err.Error()) + return + } + + cts, err = c.AddNewServerConn(c.saddr, c.psc) + if err != nil { + conn.Close() + fmt.Printf ("failed to register connection to server - %s", err.Error()) + return + } + + err = cts.AddClientRoutes(c.cfg.peer_addrs) + if err != nil { + conn.Close() + fmt.Printf("unable to make client routes - %s", err.Error()) + return + } + + for { + var pkt *Packet + + select { + case <-ctx.Done(): + fmt.Printf("context doine... error - %s\n", ctx.Err().Error()) + default: + // no other case is ready. + // without the default case, the select construct would block + } + + pkt, err = c.psc.Recv() + if err == io.EOF { + // return will close stream from server side + fmt.Printf("server disconnected\n") + break + } + if err != nil { + fmt.Printf("server receive error - %s\n", err.Error()) + break + } + + switch pkt.Kind { + case PACKET_KIND_ROUTE_STARTED: + var x *Packet_Route + var ok bool + x, ok = pkt.U.(*Packet_Route) + if ok { + fmt.Printf ("SERVER LISTENING ON %s\n", x.Route.AddrStr); + err = cts.ReportEvent(x.Route.RouteId, 0, pkt.Kind, nil) + if err != nil { + // TODO: + } else { + // TODO: + } + } else { + // TODO: send invalid request... or simply keep quiet? + } + + case PACKET_KIND_ROUTE_STOPPED: + var x *Packet_Route + var ok bool + x, ok = pkt.U.(*Packet_Route) + if ok { + err = cts.ReportEvent(x.Route.RouteId, 0, pkt.Kind, nil) + if err != nil { + // TODO: + } else { + // TODO: + } + } else { + // TODO: send invalid request... or simply keep quiet? + } + + case PACKET_KIND_PEER_STARTED: + // the connection from the client to a peer has been established + var x *Packet_Peer + var ok bool + x, ok = pkt.U.(*Packet_Peer) + if ok { + err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, nil) + if err != nil { + // TODO: + } else { + // TODO: + } + } else { + // TODO + } + + case PACKET_KIND_PEER_STOPPED: + // the connection from the client to a peer has been established + var x *Packet_Peer + var ok bool + x, ok = pkt.U.(*Packet_Peer) + if ok { + err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, nil) + if err != nil { + // TODO: + } else { + // TODO: + } + } else { + // TODO + } + + case PACKET_KIND_PEER_DATA: + // the connection from the client to a peer has been established + var x *Packet_Data + var ok bool + x, ok = pkt.U.(*Packet_Data) + if ok { + err = cts.ReportEvent(x.Data.RouteId, x.Data.PeerId, PACKET_KIND_PEER_DATA, x.Data.Data) + if err != nil { + // TODO: + } else { + // TODO: + } + } else { + // TODO + } + } + } + +//done: + c.ReqStop() // just in case... + c.wg.Wait() + c.sc.Close() +} + +func (c *Client) ReqStop() { + if c.stop_req.CompareAndSwap(false, true) { + // TODO: notify the server.. send term command??? + c.sc.Close() + } +} + +// -------------------------------------------------------------------- + +const rootCert = `-----BEGIN CERTIFICATE----- +MIIB+TCCAZ+gAwIBAgIJAL05LKXo6PrrMAoGCCqGSM49BAMCMFkxCzAJBgNVBAYT +AkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRn +aXRzIFB0eSBMdGQxEjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0xNTEyMDgxNDAxMTNa +Fw0yNTEyMDUxNDAxMTNaMFkxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0 +YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxEjAQBgNVBAMM +CWxvY2FsaG9zdDBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABHGaaHVod0hLOR4d +66xIrtS2TmEmjSFjt+DIEcb6sM9RTKS8TZcdBnEqq8YT7m2sKbV+TEq9Nn7d9pHz +pWG2heWjUDBOMB0GA1UdDgQWBBR0fqrecDJ44D/fiYJiOeBzfoqEijAfBgNVHSME +GDAWgBR0fqrecDJ44D/fiYJiOeBzfoqEijAMBgNVHRMEBTADAQH/MAoGCCqGSM49 +BAMCA0gAMEUCIEKzVMF3JqjQjuM2rX7Rx8hancI5KJhwfeKu1xbyR7XaAiEA2UT7 +1xOP035EcraRmWPe7tO0LpXgMxlh2VItpc2uc2w= +-----END CERTIFICATE----- +` + +func client_main(server_addr string, peer_addrs []string) error { + var c *Client + var err error + var cert_pool *x509.CertPool + var tlscfg *tls.Config + var cc ClientConfig + var wg sync.WaitGroup + + cert_pool = x509.NewCertPool() + ok := cert_pool.AppendCertsFromPEM([]byte(rootCert)) + if !ok { + log.Fatal("failed to parse root certificate") + } + tlscfg = &tls.Config{RootCAs: cert_pool, ServerName: "localhost", InsecureSkipVerify: true} + + cc.server_addr = server_addr + cc.peer_addrs = peer_addrs + c, err = NewClient(&cc, tlscfg) + if err != nil { + fmt.Printf("failed create client - %s\n", err.Error()) + return err + } + + wg.Add(1) + go c.RunTask(context.Background()); + + wg.Wait(); + return nil +} diff --git a/frame.go b/frame.go new file mode 100644 index 0000000..ea04d67 --- /dev/null +++ b/frame.go @@ -0,0 +1,70 @@ +package main + +import "time" + +const NET_TYPE_TCP string = "tcp" + +const FRAME_CODE_CLIENT_HELLO uint8 = 1 +const FRAME_CODE_SERVER_HELLO uint8 = 2 +const FRAME_CODE_SERVER_PEER_OPEN uint8 = 3 +const FRAME_CODE_SERVER_PEER_DATA uint8 = 4 +const FRAME_CODE_SERVER_PEER_CLOSE uint8 = 5 +const FRAME_CODE_SERVER_PEER_ERROR uint8 = 6 +const FRAME_CODE_CLIENT_PEER_OPENED uint8 = 7 +const FRAME_CODE_CLIENT_PEER_DATA uint8 = 8 +const FRAME_CODE_CLIENT_PEER_CLOSED uint8 = 9 +const FRAME_CODE_CLIENT_PEER_ERROR uint8 = 10 + +const FRAME_OPTION_PORT uint8 = 1 +const FRAME_OPTION_REQNET4 uint8 = 2 +const FRAME_OPTION_REQNET6 uint8 = 3 + +type FrameHeader struct { + Len uint16 // length of whole packet including the header + Code uint8 + ChanId uint8 + ConnId uint16 +} + +type FrameOptionHeader struct { + Len uint8 + Code uint8 +} + +type FreameOptionRetnet4Data struct { + addr [4]byte + pfxlen uint8 +} + +type FreameOptionRetnet6Data struct { + addr [16]byte + pfxlen uint8 +} + +type FrameClientHelloData struct { + AuthKey [64]byte // TODO: How to send this securely??? + ReqChan uint8 + /* this can be followed by variable options */ +} + +type FrameServerHelloData struct { + AuthCode uint8 +} + +func read_chan_with_tmout(chan_bool chan bool, tmout time.Duration) (bool, bool) { + var tmr *time.Timer + var b bool + var timed bool + + tmr = time.NewTimer(tmout) + + select { + case <-tmr.C: + timed = true + b = false + case b = <-chan_bool: + tmr.Stop() + timed = false + } + return timed, b +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..09caa43 --- /dev/null +++ b/go.mod @@ -0,0 +1,16 @@ +module hodu + +go 1.22.0 + +require ( + github.com/google/uuid v1.6.0 + google.golang.org/grpc v1.67.1 + google.golang.org/protobuf v1.34.2 +) + +require ( + golang.org/x/net v0.28.0 // indirect + golang.org/x/sys v0.24.0 // indirect + golang.org/x/text v0.17.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..9246f02 --- /dev/null +++ b/go.sum @@ -0,0 +1,16 @@ +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 h1:e7S5W7MGGLaSu8j3YjdezkZ+m1/Nm0uRVRMEMGk26Xs= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= +google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= +google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= diff --git a/hodu.proto b/hodu.proto new file mode 100644 index 0000000..effb602 --- /dev/null +++ b/hodu.proto @@ -0,0 +1,54 @@ +syntax = "proto3"; + +option go_package = "./main"; + +//package hodu; // no idea if it's still important... + +service Hodu { + rpc PacketStream (stream Packet) returns (stream Packet) {} +} + +enum ROUTE_PROTO { + TCP = 0; + TCP4 = 1; + TCP6 = 2; +}; + +message RouteDesc { + uint32 RouteId = 1; + ROUTE_PROTO Proto = 2; + string AddrStr = 3; +}; + +message PeerDesc { + uint32 RouteId = 1; + uint32 PeerId = 2; +}; + +message PeerData { + uint32 RouteId = 1; + uint32 PeerId = 2; + bytes Data = 3; +}; + +enum PACKET_KIND { + ERROR = 0; // generic error response + OK = 1; // generic success response + ROUTE_START = 2; + ROUTE_STOP = 3; + ROUTE_STARTED = 4; + ROUTE_STOPPED = 5; + PEER_STARTED = 6; + PEER_STOPPED = 7; + PEER_DATA = 8; +}; + +message Packet { + PACKET_KIND Kind = 1; + + oneof U { + RouteDesc Route = 2; + PeerDesc Peer = 3; + PeerData Data = 4; + }; +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..f796f82 --- /dev/null +++ b/main.go @@ -0,0 +1,42 @@ +package main + +import "fmt" +import "os" +import "strings" + +func main() { + var err error + + if len(os.Args) < 2 { + goto wrong_usage + } + + if strings.EqualFold(os.Args[1], "server") { + if len(os.Args) < 3 { + goto wrong_usage + } + err = server_main(os.Args[2:]) + if err != nil { + fmt.Fprintf(os.Stderr, "ERROR: serer error - %s\n", err.Error()) + os.Exit(1) + } + } else if strings.EqualFold(os.Args[1], "client") { + if len(os.Args) < 4 { + goto wrong_usage + } + err = client_main(os.Args[2], os.Args[3:]) + if err != nil { + fmt.Fprintf(os.Stderr, "ERROR: client error - %s\n", err.Error()) + os.Exit(1) + } + } else { + goto wrong_usage + } + + os.Exit(0) + +wrong_usage: + fmt.Fprintf(os.Stderr, "USAGE: %s server listen-addr:listen-port\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " %s client target-addr:target-port peer-addr:peer-port\n", os.Args[0]) + os.Exit(1) +} diff --git a/main.proto b/main.proto new file mode 100644 index 0000000..adcc6ae --- /dev/null +++ b/main.proto @@ -0,0 +1,44 @@ +syntax = "proto3"; +package main; + +option go_package = "."; + +service Control { + rpc Open (OpenReq) returns (OpenRes); + rpc Close (CloseReq) returns (CloseRes); + rpc Channel (stream Frame) returns (stream Frame); +} + +message OpenReq { + string abc = 1; +} + +message OpenRes { + string def = 1; +} + +message CloseReq { + string abc = 1; +} + +message CloseRes { + string def = 1; +} + +message Data { + uint32 chan_id = 1; + uint32 conn_id = 2; + bytes data = 3; +} + +message Error { + uint32 code = 1; +} + +message Frame { + oneof frame { + Data data = 1; + Error error = 2; + } +} + diff --git a/packet.go b/packet.go new file mode 100644 index 0000000..6fe27c6 --- /dev/null +++ b/packet.go @@ -0,0 +1,53 @@ +package main + + +func MakeRouteStartPacket(route_id uint32, proto ROUTE_PROTO, addr string) *Packet { + return &Packet{ + Kind: PACKET_KIND_ROUTE_START, + U: &Packet_Route{Route: &RouteDesc{RouteId: route_id, Proto: proto, AddrStr: addr}}} +} + +func MakeRouteStartedPacket(route_id uint32, proto ROUTE_PROTO, addr string) *Packet { + // the connection from a peer to the server has been established + return &Packet{Kind: PACKET_KIND_ROUTE_STARTED, + U: &Packet_Route{Route: &RouteDesc{RouteId: route_id, Proto: proto, AddrStr: addr}}} +} + +func MakeRouteStoppedPacket(route_id uint32, proto ROUTE_PROTO) *Packet { + // the connection from a peer to the server has been established + return &Packet{Kind: PACKET_KIND_ROUTE_STOPPED, + U: &Packet_Route{Route: &RouteDesc{RouteId: route_id, Proto: proto}}} +} + + +func MakePeerStartedPacket(route_id uint32, pts_id uint32) *Packet { + // the connection from a peer to the server has been established + return &Packet{Kind: PACKET_KIND_PEER_STARTED, + U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: pts_id}}, + } +} + +func MakePeerStoppedPacket(route_id uint32, pts_id uint32) *Packet { + return &Packet{Kind: PACKET_KIND_PEER_STOPPED, + U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: pts_id}, + }} +} + +func MakePtcStartedPacket(route_id uint32, pts_id uint32) *Packet { + // the connection from the client to a peer has been established + return &Packet{Kind: PACKET_KIND_PEER_STARTED, + U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: pts_id}}, + } +} + +func MakePtcStoppedPacket(route_id uint32, pts_id uint32) *Packet { + return &Packet{Kind: PACKET_KIND_PEER_STOPPED, + U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: pts_id}, + }} +} + +func MakePeerDataPacket(route_id uint32, pts_id uint32, data []byte) *Packet { + return &Packet{Kind: PACKET_KIND_PEER_DATA, + U: &Packet_Data{Data: &PeerData{RouteId: route_id, PeerId: pts_id, Data: data}, + }} +} diff --git a/s-peer.go b/s-peer.go new file mode 100644 index 0000000..5da0778 --- /dev/null +++ b/s-peer.go @@ -0,0 +1,147 @@ +package main + +import "fmt" +import "net" +import "sync/atomic" +import "time" + +type ServerPeerConn struct { + route *ServerRoute + conn_id uint32 + cts *ClientConn + conn *net.TCPConn + stop_req atomic.Bool + client_peer_status_chan chan bool + client_peer_opened_received atomic.Bool + client_peer_closed_received atomic.Bool +} + +func NewServerPeerConn(r *ServerRoute, c *net.TCPConn, id uint32) (*ServerPeerConn) { + var spc ServerPeerConn + + spc.route = r + spc.conn = c + spc.conn_id = id + spc.stop_req.Store(false) + spc.client_peer_status_chan = make(chan bool, 16) + spc.client_peer_opened_received.Store(false) + spc.client_peer_closed_received.Store(false) + + return &spc +} + +func (spc *ServerPeerConn) RunTask() error { + var pss Hodu_PacketStreamServer + var n int + var buf [4096]byte + var tmr *time.Timer + var status bool + var err error = nil + + pss = spc.route.cts.pss +//TODO: this needs to be guarded + err = pss.Send(MakePeerStartedPacket(spc.route.id, spc.conn_id)) + if err != nil { + // TODO: include route id and conn id in the error message + err = fmt.Errorf("unable to send start-pts - %s\n", err.Error()) + goto done + } + + tmr = time.NewTimer(2 * time.Second) // TODO: make this configurable... +wait_for_started: + for { + select { + case status = <- spc.client_peer_status_chan: + if status { + break wait_for_started + } else { + // the socket must have been closed too. + goto done + } + + case <- tmr.C: + // connection failure, not in time + tmr.Stop() + goto done + + /*case <- spc->ctx->Done(): + tmr.Stop() + goto done*/ + } + } + tmr.Stop() + + for { + n, err = spc.conn.Read(buf[:]) + if err != nil { + fmt.Printf("read error - %s\n", err.Error()) + break + } + +// TODO: this needs to be guarded + err = pss.Send(MakePeerDataPacket(spc.route.id, spc.conn_id, buf[:n])) + if err != nil { + // TODO: include route id and conn id in the error message + err = fmt.Errorf("unable to send data - %s\n", err.Error()) + goto done; + } + } + +done: + fmt.Printf("spc really ending..................\n") + spc.ReqStop() + spc.route.RemoveServerPeerConn(spc) + //spc.cts.wg.Done() + return err +} + +func (spc *ServerPeerConn) ReqStop() { + if spc.stop_req.CompareAndSwap(false, true) { + var pss Hodu_PacketStreamServer + var err error + + pss = spc.route.cts.pss + + if spc.client_peer_opened_received.CompareAndSwap(false, true) { + spc.client_peer_status_chan <- false + } + spc.conn.Close() + err = pss.Send(MakePeerStoppedPacket(spc.route.id, spc.conn_id)) + if err != nil { + // TODO: print warning + fmt.Printf ("WARNING - failed to report event to %s - %s\n", spc.route.cts.caddr, err.Error()) + } + } +} + +func (spc *ServerPeerConn) ReportEvent (event_type PACKET_KIND, event_data []byte) error { + + switch event_type { + case PACKET_KIND_PEER_STARTED: + if spc.client_peer_opened_received.CompareAndSwap(false, true) { + spc.client_peer_status_chan <- true + } + + case PACKET_KIND_PEER_STOPPED: + if spc.client_peer_closed_received.CompareAndSwap(false, true) { + spc.client_peer_status_chan <- false + } + + case PACKET_KIND_PEER_DATA: + var err error + + _, err = spc.conn.Write(event_data) + if err != nil { + // TODO: logging + fmt.Printf ("WARNING - failed to write data from %s to %s\n", spc.route.cts.caddr, spc.conn.RemoteAddr().String()) + } + + default: + // ignore all other events + // TODO: produce warning in debug mode + } + return nil +} + + + diff --git a/server.go b/server.go new file mode 100644 index 0000000..e315a2b --- /dev/null +++ b/server.go @@ -0,0 +1,790 @@ +package main + +//import "bufio" +//import "bytes" +import "context" +import "crypto/tls" +import "fmt" +import "io" +import "math/rand" +import "net" +import "os" +import "os/signal" +import "sync" +import "sync/atomic" +import "syscall" +import "time" + +import "google.golang.org/grpc" +import "google.golang.org/grpc/metadata" +import "google.golang.org/grpc/peer" +import "google.golang.org/grpc/stats" + +const PTS_LIMIT = 8192 +//const CTS_LIMIT = 2048 + +type ClientConnMap = map[net.Addr]*ClientConn +type ServerPeerConnMap = map[uint32]*ServerPeerConn +type ServerRouteMap = map[uint32]*ServerRoute + +type Server struct { + tlscfg *tls.Config + l []*net.TCPListener // central listener + l_wg sync.WaitGroup + + cts_mtx sync.Mutex + cts_map ClientConnMap + wg sync.WaitGroup + stop_req atomic.Bool + + // grpc stuffs + gs *grpc.Server + UnimplementedHoduServer +} + +// client connection to server. +// client connect to the server, the server accept it, and makes a tunnel request +type ClientConn struct { + svr *Server + caddr net.Addr // client address that created this structure + pss Hodu_PacketStreamServer + + cw_mtx sync.Mutex + route_mtx sync.Mutex + routes ServerRouteMap + //route_wg sync.WaitGroup + + wg sync.WaitGroup + stop_req atomic.Bool + greeted bool +} + +type ServerRoute struct { + cts *ClientConn + l *net.TCPListener + laddr *net.TCPAddr + id uint32 + + pts_mtx sync.Mutex + pts_map ServerPeerConnMap + pts_limit int + pts_last_id uint32 + pts_wg sync.WaitGroup +} + +// ------------------------------------ + +func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, error) { + var pts *ServerPeerConn + var ok bool + var start_id uint32 + + r.pts_mtx.Lock() + defer r.pts_mtx.Unlock() + + if len(r.pts_map) >= r.pts_limit { + return nil, fmt.Errorf("peer-to-server connection table full") + } + + start_id = r.pts_last_id + for { + _, ok = r.pts_map[r.pts_last_id] + if !ok { + break + } + r.pts_last_id++ + if r.pts_last_id == start_id { + // unlikely to happen but it cycled through the whole range. + return nil, fmt.Errorf("failed to assign peer-to-server connection id") + } + } + + pts = NewServerPeerConn(r, c, r.pts_last_id) + r.pts_map[pts.conn_id] = pts + r.pts_last_id++ + + return pts, nil +} + +func (r *ServerRoute) RemoveServerPeerConn(pts *ServerPeerConn) { + r.pts_mtx.Lock() + delete(r.pts_map, pts.conn_id) + r.pts_mtx.Unlock() +} + +// ------------------------------------ +func (r *ServerRoute) RunTask() { + var err error + var conn *net.TCPConn + var pts *ServerPeerConn + + for { + conn, err = r.l.AcceptTCP() + if err != nil { + // TODO: logging + fmt.Printf("[%s,%d] accept failure - %s\n", r.cts.caddr.String(), r.id, err.Error()) + break + } + + pts, err = r.AddNewServerPeerConn(conn) + if err != nil { + // TODO: logging + fmt.Printf("YYYYYYYY - %s\n", err.Error()) + conn.Close() + } else { + fmt.Printf("STARTED NEW SERVER PEER STAK\n") + r.pts_wg.Add(1) + go pts.RunTask() + } + } + + r.l.Close() // don't care about double close. it could have been closed in StopTask + r.pts_wg.Wait() + +// cts.l_wg.Done() +// TODO:inform that the job is done? +} + +func (r *ServerRoute) StopTask() { +fmt.Printf ("stoppping stak..\n") + // TODO: all pts stop... + r.l.Close(); +// TODO: wait?? +} + +func (r *ServerRoute) ReportEvent (pts_id uint32, event_type PACKET_KIND, event_data []byte) error { + var spc *ServerPeerConn + var ok bool + + r.pts_mtx.Lock() + spc, ok = r.pts_map[pts_id] + if !ok { + return fmt.Errorf("non-existent peer id - %u", pts_id) + } + r.pts_mtx.Unlock(); + + return spc.ReportEvent(event_type, event_data) +} +// ------------------------------------ + +func (cts *ClientConn) make_route_listener(proto ROUTE_PROTO) (*net.TCPListener, *net.TCPAddr, error) { + var l *net.TCPListener + var err error + var laddr *net.TCPAddr + var port int + var tries int = 0 + var nw string + + switch proto { + case ROUTE_PROTO_TCP: + nw = "tcp" + case ROUTE_PROTO_TCP4: + nw = "tcp4" + case ROUTE_PROTO_TCP6: + nw = "tcp6" + } + + for { + port = rand.Intn(65535-32000+1) + 32000 + + laddr, err = net.ResolveTCPAddr(nw, fmt.Sprintf(":%d", port)) + if err == nil { + l, err = net.ListenTCP(nw, laddr) // make the binding address configurable. support multiple binding addresses??? + if err == nil { + fmt.Printf("listening .... on ... %d\n", port) + return l, laddr, nil + } + } + + // TODO: implement max retries.. + tries++ + if tries >= 1000 { + err = fmt.Errorf("unable to allocate port") + break + } + } + + return nil, nil, err +} + +func NewServerRoute(cts *ClientConn, id uint32, proto ROUTE_PROTO) (*ServerRoute, error) { + var r ServerRoute + var l *net.TCPListener + var laddr *net.TCPAddr + var err error + + l, laddr, err = cts.make_route_listener(proto); + if err != nil { + return nil, err + } + + r.cts = cts + r.id = id + r.l = l + r.laddr = laddr + r.pts_limit = PTS_LIMIT + r.pts_map = make(ServerPeerConnMap) + r.pts_last_id = 0 + + return &r, nil; +} + +func (cts *ClientConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO) (*ServerRoute, error) { + var r *ServerRoute + var err error + + cts.route_mtx.Lock() + if cts.routes[route_id] != nil { + cts.route_mtx.Unlock() + return nil, fmt.Errorf ("existent route id - %d", route_id) + } + r, err = NewServerRoute(cts, route_id, proto) + if err != nil { + cts.route_mtx.Unlock() + return nil, err + } + cts.routes[route_id] = r; + cts.route_mtx.Unlock() + + go r.RunTask() + return r, nil +} + +func (cts *ClientConn) RemoveServerRoute (route_id uint32) error { + var r *ServerRoute + var ok bool + + cts.route_mtx.Lock() + r, ok = cts.routes[route_id] + if (!ok) { + cts.route_mtx.Unlock() + return fmt.Errorf ("non-existent route id - %d", route_id) + } + delete(cts.routes, route_id) + cts.route_mtx.Unlock() + + r.StopTask() // TODO: make this unblocking or blocking? + return nil; +} + +func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error { + var r *ServerRoute + var ok bool + + cts.route_mtx.Lock() + r, ok = cts.routes[route_id] + if (!ok) { + cts.route_mtx.Unlock() + return fmt.Errorf ("non-existent route id - %d", route_id) + } + cts.route_mtx.Unlock() + + return r.ReportEvent(pts_id, event_type, event_data) +} + +func (cts *ClientConn) ReqStop() { + if cts.stop_req.CompareAndSwap(false, true) { + var r *ServerRoute + + for _, r = range cts.routes { + r.StopTask() + } + + //cts.c.Close() // close the accepted connection from the client + } +} + +// ------------------------------------ + +func handle_os_signals(s *Server, exit_chan chan<- bool) { + var ( + sighup_chan chan os.Signal + sigterm_chan chan os.Signal + sig os.Signal + ) + + sighup_chan = make(chan os.Signal, 1) + sigterm_chan = make(chan os.Signal, 1) + + signal.Notify(sighup_chan, syscall.SIGHUP) + signal.Notify(sigterm_chan, syscall.SIGTERM, os.Interrupt) + +chan_loop: + for { + select { + case <-sighup_chan: + // TODO: + //s.RefreshConfig() + case sig = <-sigterm_chan: + // TODO: get timeout value from config + //s.Shutdown(fmt.Sprintf("termination by signal %s", sig), 3*time.Second) + s.ReqStop() + //log.Debugf("termination by signal %s", sig) + fmt.Printf("termination by signal %s\n", sig) + exit_chan <- true + break chan_loop + } + } +} + +// -------------------------------------------------------------------- + +func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error { + var ctx context.Context + var p *peer.Peer + var ok bool + var pkt *Packet + var err error + var cts *ClientConn + + ctx = strm.Context() + p, ok = peer.FromContext(ctx) + if (!ok) { + return fmt.Errorf("failed to get peer from packet stream context") + } + + cts, err = s.AddNewClientConn(p.Addr, strm) + if err != nil { + return fmt.Errorf("unable to add client %s - %s", p.Addr.String(), err.Error()) + } + + + for { + // exit if context is done + // or continue + select { + case <-ctx.Done(): + return ctx.Err() + default: + // no other case is ready. + // without the default case, the select construct would block + } + + pkt, err = strm.Recv() + if err == io.EOF { + // return will close stream from server side + return nil + } + if err != nil { + //log.Printf("receive error %v", err) + continue + } + + switch pkt.Kind { + case PACKET_KIND_ROUTE_START: + var x *Packet_Route + //var t *ServerRoute + var ok bool + x, ok = pkt.U.(*Packet_Route) + if ok { + var r* ServerRoute + fmt.Printf ("ADDED SERVER ROUTE FOR CLEINT PEER %s\n", x.Route.AddrStr) + r, err = cts.AddNewServerRoute(x.Route.RouteId, x.Route.Proto) + if err != nil { + // TODO: Send Error Response... + } else { + err = strm.Send(MakeRouteStartedPacket(r.id, x.Route.Proto, r.laddr.String())) + if err != nil { + // TODO: + } + } + } else { + // TODO: send invalid request... or simply keep quiet? + } + + case PACKET_KIND_ROUTE_STOP: + var x *Packet_Route + var ok bool + x, ok = pkt.U.(*Packet_Route) + if ok { + err = cts.RemoveServerRoute(x.Route.RouteId); // TODO: this must be unblocking. otherwide, other routes will get blocked... + if err != nil { + // TODO: Send Error Response... + } else { + err = strm.Send(MakeRouteStoppedPacket(x.Route.RouteId, x.Route.Proto)) + if err != nil { + // TODO: + } + } + } else { + // TODO: send invalid request... or simply keep quiet? + } + + case PACKET_KIND_PEER_STARTED: + // the connection from the client to a peer has been established + var x *Packet_Peer + var ok bool + x, ok = pkt.U.(*Packet_Peer) + if ok { + err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, nil) + if err != nil { + // TODO: + } else { + // TODO: + } + } else { + // TODO + } + + case PACKET_KIND_PEER_STOPPED: + // the connection from the client to a peer has been established + var x *Packet_Peer + var ok bool + x, ok = pkt.U.(*Packet_Peer) + if ok { + err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, nil) + if err != nil { + // TODO: + } else { + // TODO: + } + } else { + // TODO + } + + case PACKET_KIND_PEER_DATA: + // the connection from the client to a peer has been established + var x *Packet_Data + var ok bool + x, ok = pkt.U.(*Packet_Data) + if ok { + err = cts.ReportEvent(x.Data.RouteId, x.Data.PeerId, PACKET_KIND_PEER_DATA, x.Data.Data) + if err != nil { + // TODO: + } else { + // TODO: + } + } else { + // TODO + } + } + } +} + +// ------------------------------------ + +type ConnCatcher struct { + server *Server +} + +func (cc *ConnCatcher) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + return ctx +} + +func (cc *ConnCatcher) HandleRPC(ctx context.Context, s stats.RPCStats) { +} + +func (cc *ConnCatcher) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { + return ctx; + //return context.TODO() +} + +func (cc *ConnCatcher) HandleConn(ctx context.Context, cs stats.ConnStats) { +// fmt.Println(ctx.Value("user_id")) // Returns nil, can't access the value + var p *peer.Peer + var ok bool + var addr string + + p, ok = peer.FromContext(ctx) + if (!ok) { + addr = "" + } else { + addr = p.Addr.String() + } + +md,ok:=metadata.FromIncomingContext(ctx) +fmt.Printf("%+v%+v\n",md,ok) +if ok { +} + switch cs.(type) { + case *stats.ConnBegin: + fmt.Printf("**** client connected - [%s]\n", addr) + case *stats.ConnEnd: + fmt.Printf("**** client disconnected - [%s]\n", addr) + cc.server.RemoveClientConnByAddr(p.Addr); + } +} + +// wrappedStream wraps around the embedded grpc.ServerStream, and intercepts the RecvMsg and +// SendMsg method call. +type wrappedStream struct { + grpc.ServerStream +} + +func (w *wrappedStream) RecvMsg(m any) error { + fmt.Printf("Receive a message (Type: %T) at %s\n", m, time.Now().Format(time.RFC3339)) + return w.ServerStream.RecvMsg(m) +} + +func (w *wrappedStream) SendMsg(m any) error { + fmt.Printf("Send a message (Type: %T) at %v\n", m, time.Now().Format(time.RFC3339)) + return w.ServerStream.SendMsg(m) +} + +func newWrappedStream(s grpc.ServerStream) grpc.ServerStream { + return &wrappedStream{s} +} + +func streamInterceptor(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + // authentication (token verification) +/* + md, ok := metadata.FromIncomingContext(ss.Context()) + if !ok { + return errMissingMetadata + } + if !valid(md["authorization"]) { + return errInvalidToken + } +*/ + + err := handler(srv, newWrappedStream(ss)) + if err != nil { + fmt.Printf("RPC failed with error: %v\n", err) + } + return err +} + +func unaryInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + // authentication (token verification) +/* + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, errMissingMetadata + } + if !valid(md["authorization"]) { +// return nil, errInvalidToken + } +*/ + m, err := handler(ctx, req) + if err != nil { + fmt.Printf("RPC failed with error: %v\n", err) + } +fmt.Printf ("RPC OK\n"); + return m, err +} + +func NewServer(laddrs []string, tlscfg *tls.Config) (*Server, error) { + var s Server + var l *net.TCPListener + var laddr *net.TCPAddr + var err error + var addr string + var gl *net.TCPListener + + if len(laddrs) <= 0 { + return nil, fmt.Errorf("no or too many addresses provided") + } + + /* create the specified number of listeners */ + s.l = make([]*net.TCPListener, 0) + for _, addr = range laddrs { + laddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, addr) + if err != nil { + goto oops + } + + l, err = net.ListenTCP(NET_TYPE_TCP, laddr) + if err != nil { + goto oops + } + + s.l = append(s.l, l) + } + + s.tlscfg = tlscfg + s.cts_map = make(ClientConnMap) // TODO: make it configurable... + s.stop_req.Store(false) +/* + creds, err := credentials.NewServerTLSFromFile(data.Path("x509/server_cert.pem"), data.Path("x509/server_key.pem")) + if err != nil { + log.Fatalf("failed to create credentials: %v", err) + } + gs = grpc.NewServer(grpc.Creds(creds)) +*/ + s.gs = grpc.NewServer( + grpc.UnaryInterceptor(unaryInterceptor), + grpc.StreamInterceptor(streamInterceptor), + grpc.StatsHandler(&ConnCatcher{ server: &s }), + ) // TODO: have this outside the server struct? + RegisterHoduServer (s.gs, &s) + + return &s, nil + +oops: +/* TODO: check if gs needs to be closed... */ + if gl != nil { + gl.Close() + } + + for _, l = range s.l { + l.Close() + } + s.l = make([]*net.TCPListener, 0) + return nil, err +} + +func (s *Server) run_grpc_server(idx int) error { + var l *net.TCPListener + var err error + + l = s.l[idx] + fmt.Printf ("serving grpc on %d listener\n", idx) + // it seems to be safe to call a single grpc server on differnt listening sockets multiple times + // TODO: check if this assumption is ok + err = s.gs.Serve(l); + if err != nil { + fmt.Printf ("XXXXXXXXXXXXXXXXXXXXXXXXXxx %s\n", err.Error()); + } + + s.l_wg.Done(); + return nil +} + +func (s *Server) MainLoop() error { + var idx int + + for idx, _ = range s.l { + s.l_wg.Add(1) + go s.run_grpc_server(idx) + } + + s.l_wg.Wait(); + s.ReqStop() + s.wg.Wait() + + return nil +} + +func (s *Server) ReqStop() { + if s.stop_req.CompareAndSwap(false, true) { + var l *net.TCPListener + var cts *ClientConn + + //s.gs.GracefulStop() + s.gs.Stop() + for _, l = range s.l { + l.Close() + } + + s.cts_mtx.Lock() // TODO: this mya create dead-lock. check possibility of dead lock??? + for _, cts = range s.cts_map { + cts.ReqStop() // request to stop connections from/to peer held in the cts structure + } + s.cts_mtx.Unlock() + } +} + +func (s *Server) AddNewClientConn(addr net.Addr, pss Hodu_PacketStreamServer) (*ClientConn, error) { + var cts ClientConn + var ok bool + + cts.svr = s + cts.routes = make(ServerRouteMap) + cts.caddr = addr + cts.pss = pss + + cts.stop_req.Store(false) + cts.greeted = false + + s.cts_mtx.Lock() + defer s.cts_mtx.Unlock() + + _, ok = s.cts_map[addr] + if ok { + return nil, fmt.Errorf("existing client - %s", addr.String()) + } + + s.cts_map[addr] = &cts; +fmt.Printf ("ADD total clients %d\n", len(s.cts_map)); + return &cts, nil +} + +func (s *Server) RemoveClientConn(cts *ClientConn) { + s.cts_mtx.Lock() + delete(s.cts_map, cts.caddr) +fmt.Printf ("REMOVE total clients %d\n", len(s.cts_map)); + s.cts_mtx.Unlock() +} + +func (s *Server) RemoveClientConnByAddr(addr net.Addr) { + var cts *ClientConn + var ok bool + + s.cts_mtx.Lock() + defer s.cts_mtx.Unlock() + + cts, ok = s.cts_map[addr] + if ok { + cts.ReqStop() + delete(s.cts_map, cts.caddr) + } +} + +func (s *Server) FindClientConnByAddr (addr net.Addr) *ClientConn { + var cts *ClientConn + var ok bool + + s.cts_mtx.Lock() + defer s.cts_mtx.Unlock() + + cts, ok = s.cts_map[addr] + if !ok { + return nil + } + + return cts +} + +// -------------------------------------------------------------------- + +const serverKey = `-----BEGIN EC PARAMETERS----- +BggqhkjOPQMBBw== +-----END EC PARAMETERS----- +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIHg+g2unjA5BkDtXSN9ShN7kbPlbCcqcYdDu+QeV8XWuoAoGCCqGSM49 +AwEHoUQDQgAEcZpodWh3SEs5Hh3rrEiu1LZOYSaNIWO34MgRxvqwz1FMpLxNlx0G +cSqrxhPubawptX5MSr02ft32kfOlYbaF5Q== +-----END EC PRIVATE KEY----- +` + +const serverCert = `-----BEGIN CERTIFICATE----- +MIIB+TCCAZ+gAwIBAgIJAL05LKXo6PrrMAoGCCqGSM49BAMCMFkxCzAJBgNVBAYT +AkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRn +aXRzIFB0eSBMdGQxEjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0xNTEyMDgxNDAxMTNa +Fw0yNTEyMDUxNDAxMTNaMFkxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0 +YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxEjAQBgNVBAMM +CWxvY2FsaG9zdDBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABHGaaHVod0hLOR4d +66xIrtS2TmEmjSFjt+DIEcb6sM9RTKS8TZcdBnEqq8YT7m2sKbV+TEq9Nn7d9pHz +pWG2heWjUDBOMB0GA1UdDgQWBBR0fqrecDJ44D/fiYJiOeBzfoqEijAfBgNVHSME +GDAWgBR0fqrecDJ44D/fiYJiOeBzfoqEijAMBgNVHRMEBTADAQH/MAoGCCqGSM49 +BAMCA0gAMEUCIEKzVMF3JqjQjuM2rX7Rx8hancI5KJhwfeKu1xbyR7XaAiEA2UT7 +1xOP035EcraRmWPe7tO0LpXgMxlh2VItpc2uc2w= +-----END CERTIFICATE----- +` + +func server_main(laddrs []string) error { + var s *Server + var err error + var exit_chan chan bool + var cert tls.Certificate + + cert, err = tls.X509KeyPair([]byte(serverCert), []byte(serverKey)) + if err != nil { + return fmt.Errorf("ERROR: failed to load key pair - %s\n", err) + } + + s, err = NewServer(laddrs, &tls.Config{Certificates: []tls.Certificate{cert}}) + if err != nil { + return fmt.Errorf("ERROR: failed to create new server - %s", err.Error()) + } + + exit_chan = make(chan bool, 1) + go handle_os_signals(s, exit_chan) + err = s.MainLoop() // this is blocking. ReqStop() will be called from a signal handler + if err != nil { + return err + } + + <-exit_chan // wait until the term signal handler almost reaches the end + return nil +}