package hodu import "context" import "crypto/tls" import "errors" import "fmt" import "log" //import "math/rand" import "net" import "net/http" import "sync" import "sync/atomic" import "time" import "google.golang.org/grpc" import "google.golang.org/grpc/codes" import "google.golang.org/grpc/credentials" import "google.golang.org/grpc/credentials/insecure" import "google.golang.org/grpc/peer" import "google.golang.org/grpc/status" type PacketStreamClient grpc.BidiStreamingClient[Packet, Packet] type ClientConnMap = map[ConnId]*ClientConn type ClientRouteMap = map[RouteId]*ClientRoute type ClientPeerConnMap = map[PeerId]*ClientPeerConn type ClientPeerCancelFuncMap = map[PeerId]context.CancelFunc // -------------------------------------------------------------------- type ClientRouteConfig struct { PeerAddr string PeerName string Option RouteOption ServiceAddr string // server-peer-service-addr ServiceNet string // server-peer-service-net } type ClientConfig struct { ServerAddrs []string //PeerAddrs []string Routes []ClientRouteConfig ServerSeedTmout time.Duration ServerAuthority string // http2 :authority header } type ClientConfigActive struct { Id ConnId Index int ClientConfig } type Client struct { ctx context.Context ctx_cancel context.CancelFunc ctltlscfg *tls.Config rpctlscfg *tls.Config ext_mtx sync.Mutex ext_svcs []Service ctl_addr []string ctl_prefix string ctl_mux *http.ServeMux ctl []*http.Server // control server ptc_tmout time.Duration // timeout seconds to connect to peer ptc_limit int // global maximum number of peers cts_limit int cts_mtx sync.Mutex cts_next_id ConnId cts_map ClientConnMap wg sync.WaitGroup stop_req atomic.Bool stop_chan chan bool log Logger stats struct { conns atomic.Int64 routes atomic.Int64 peers atomic.Int64 } } // client connection to server type ClientConn struct { cli *Client cfg ClientConfigActive id ConnId sid string // id rendered in string local_addr string remote_addr string conn *grpc.ClientConn // grpc connection to the server hdc HoduClient psc *GuardedPacketStreamClient // guarded grpc stream s_seed Seed c_seed Seed route_mtx sync.Mutex route_next_id RouteId route_map ClientRouteMap route_wg sync.WaitGroup stop_req atomic.Bool stop_chan chan bool } type ClientRoute struct { cts *ClientConn id RouteId peer_addr string peer_name string peer_option RouteOption server_peer_listen_addr *net.TCPAddr // actual service-side service address server_peer_addr string // desired server-side service address server_peer_net string server_peer_option RouteOption ptc_mtx sync.Mutex ptc_map ClientPeerConnMap ptc_cancel_map ClientPeerCancelFuncMap ptc_wg sync.WaitGroup stop_req atomic.Bool stop_chan chan bool } type ClientPeerConn struct { route *ClientRoute conn_id PeerId conn *net.TCPConn pts_laddr string // server-local addreess of the server-side peer pts_raddr string // address of the server-side peer pts_eof atomic.Bool stop_chan chan bool stop_req atomic.Bool } type GuardedPacketStreamClient struct { mtx sync.Mutex //psc Hodu_PacketStreamClient Hodu_PacketStreamClient } // ------------------------------------ func (g *GuardedPacketStreamClient) Send(data *Packet) error { g.mtx.Lock() defer g.mtx.Unlock() //return g.psc.Send(data) return g.Hodu_PacketStreamClient.Send(data) } /*func (g *GuardedPacketStreamClient) Recv() (*Packet, error) { return g.psc.Recv() } func (g *GuardedPacketStreamClient) Context() context.Context { return g.psc.Context() }*/ // -------------------------------------------------------------------- func NewClientRoute(cts *ClientConn, id RouteId, client_peer_addr string, client_peer_name string, server_peer_svc_addr string, server_peer_svc_net string, server_peer_option RouteOption) *ClientRoute { var r ClientRoute r.cts = cts r.id = id r.ptc_map = make(ClientPeerConnMap) r.ptc_cancel_map = make(ClientPeerCancelFuncMap) r.peer_addr = client_peer_addr // client-side peer r.peer_name = client_peer_name // if the client_peer_addr is a domain name, it can't tell between tcp4 and tcp6 r.peer_option = string_to_route_option(tcp_addr_str_class(client_peer_addr)) r.server_peer_addr = server_peer_svc_addr r.server_peer_net = server_peer_svc_net // permitted network for server-side peer r.server_peer_option = server_peer_option r.stop_req.Store(false) r.stop_chan = make(chan bool, 8) return &r } func (r *ClientRoute) AddNewClientPeerConn(c *net.TCPConn, pts_id PeerId, pts_raddr string, pts_laddr string) (*ClientPeerConn, error) { var ptc *ClientPeerConn r.ptc_mtx.Lock() ptc = NewClientPeerConn(r, c, pts_id, pts_raddr, pts_laddr) r.ptc_map[ptc.conn_id] = ptc r.cts.cli.stats.peers.Add(1) r.ptc_mtx.Unlock() r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "Added client-side peer(%d,%d,%s,%s)", r.id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String()) return ptc, nil } func (r *ClientRoute) RemoveClientPeerConn(ptc *ClientPeerConn) error { var c *ClientPeerConn var ok bool r.ptc_mtx.Lock() c, ok = r.ptc_map[ptc.conn_id] if !ok { r.ptc_mtx.Unlock() return fmt.Errorf("non-existent peer id - %d", ptc.conn_id) } if c != ptc { r.ptc_mtx.Unlock() return fmt.Errorf("conflicting peer id - %d", ptc.conn_id) } delete(r.ptc_map, ptc.conn_id) r.cts.cli.stats.peers.Add(-1) r.ptc_mtx.Unlock() r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "Removed client-side peer(%d,%d,%s,%s)", r.id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String()) ptc.ReqStop() return nil } /*func (r *ClientRoute) RemoveAllClientPeerConns() { var c *ClientPeerConn r.ptc_mtx.Lock() defer r.ptc_mtx.Unlock() for _, c = range r.ptc_map { delete(r.ptc_map, c.conn_id) r.cts.cli.stats.peers.Add(-1) c.ReqStop() } }*/ func (r *ClientRoute) ReqStopAllClientPeerConns() { var c *ClientPeerConn r.ptc_mtx.Lock() defer r.ptc_mtx.Unlock() for _, c = range r.ptc_map { c.ReqStop() } } func (r *ClientRoute) FindClientPeerConnById(conn_id PeerId) *ClientPeerConn { var c *ClientPeerConn var ok bool r.ptc_mtx.Lock() defer r.ptc_mtx.Unlock() c, ok = r.ptc_map[conn_id] if !ok { return nil } return c } func (r *ClientRoute) RunTask(wg *sync.WaitGroup) { var err error // this task on the route object isn't actually necessary. // most useful works are triggered by ReportEvent() and done by ConnectToPeer() defer wg.Done() err = r.cts.psc.Send(MakeRouteStartPacket(r.id, r.server_peer_option, r.peer_addr, r.peer_name, r.server_peer_addr, r.server_peer_net)) if err != nil { r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG, "Failed to send route_start for route(%d,%s,%v,%v) to %s", r.id, r.peer_addr, r.server_peer_option, r.server_peer_net, r.cts.remote_addr) goto done } else { r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG, "Sent route_start for route(%d,%s,%v,%v) to %s", r.id, r.peer_addr, r.server_peer_option, r.server_peer_net, r.cts.remote_addr) } main_loop: for { select { case <-r.stop_chan: break main_loop } } done: r.ReqStop() r.ptc_wg.Wait() // wait for all peer tasks are finished err = r.cts.psc.Send(MakeRouteStopPacket(r.id, r.server_peer_option, r.peer_addr, r.peer_name, r.server_peer_addr, r.server_peer_net)) if err != nil { r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG, "Failed to route_stop for route(%d,%s,%v,%v) to %s - %s", r.id, r.peer_addr, r.server_peer_option, r.server_peer_net, r.cts.remote_addr, err.Error()) } else { r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG, "Sent route_stop for route(%d,%s,%v,%v) to %s", r.id, r.peer_addr, r.server_peer_option, r.server_peer_net, r.cts.remote_addr) } r.cts.RemoveClientRoute(r) } func (r *ClientRoute) ReqStop() { if r.stop_req.CompareAndSwap(false, true) { var ptc *ClientPeerConn for _, ptc = range r.ptc_map { ptc.ReqStop() } r.stop_chan <- true } } func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts_raddr string, pts_laddr string, wg *sync.WaitGroup) { var err error var conn net.Conn var real_conn *net.TCPConn var real_conn_raddr string var real_conn_laddr string var ptc *ClientPeerConn var d net.Dialer var ctx context.Context var cancel context.CancelFunc var tmout time.Duration var ok bool // TODO: handle TTY // if route_option & RouteOption(ROUTE_OPTION_TTY) it must create a pseudo-tty insteaad of connecting to tcp address // defer wg.Done() tmout = time.Duration(r.cts.cli.ptc_tmout) if tmout <= 0 { tmout = 10 * time.Second} ctx, cancel = context.WithTimeout(r.cts.cli.ctx, tmout) r.ptc_mtx.Lock() r.ptc_cancel_map[pts_id] = cancel r.ptc_mtx.Unlock() d.LocalAddr = nil // TOOD: use this if local address is specified conn, err = d.DialContext(ctx, "tcp", r.peer_addr) r.ptc_mtx.Lock() delete(r.ptc_cancel_map, pts_id) r.ptc_mtx.Unlock() if err != nil { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Failed to connect to %s for route(%d,%d,%s,%s) - %s", r.peer_addr, r.id, pts_id, pts_raddr, pts_laddr, err.Error()) goto peer_aborted } real_conn, ok = conn.(*net.TCPConn) if !ok { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Failed to get connection information to %s for route(%d,%d,%s,%s) - %s", r.peer_addr, r.id, pts_id, pts_raddr, pts_laddr, err.Error()) goto peer_aborted } real_conn_raddr = real_conn.RemoteAddr().String() real_conn_laddr = real_conn.LocalAddr().String() ptc, err = r.AddNewClientPeerConn(real_conn, pts_id, pts_raddr, pts_laddr) if err != nil { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Failed to add client peer %s for route(%d,%d,%s,%s) - %s", r.peer_addr, r.id, pts_id, pts_raddr, pts_laddr, err.Error()) goto peer_aborted } // ptc.conn is equal to pts_id as assigned in r.AddNewClientPeerConn() err = r.cts.psc.Send(MakePeerStartedPacket(r.id, ptc.conn_id, real_conn_raddr, real_conn_laddr)) if err != nil { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Failed to send peer_start(%d,%d,%s,%s) for route(%d,%d,%s,%s) - %s", r.id, ptc.conn_id, real_conn_raddr, real_conn_laddr, r.id, pts_id, pts_raddr, pts_laddr, err.Error()) goto peer_aborted } wg.Add(1) go ptc.RunTask(wg) return peer_aborted: // real_conn_radd and real_conn_laddr may be empty depending on when the jump to here is made. err = r.cts.psc.Send(MakePeerAbortedPacket(r.id, pts_id, real_conn_raddr, real_conn_laddr)) if err != nil { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Failed to send peer_aborted(%d,%d) for route(%d,%d,%s,%s) - %s", r.id, pts_id, r.id, pts_id, pts_raddr, pts_laddr, err.Error()) } if conn != nil { conn.Close() } } func (r *ClientRoute) DisconnectFromPeer(ptc *ClientPeerConn) error { var p *ClientPeerConn var cancel context.CancelFunc var ok bool r.ptc_mtx.Lock() p, ok = r.ptc_map[ptc.conn_id] if ok && p == ptc { cancel, ok = r.ptc_cancel_map[ptc.conn_id] if ok { cancel() } } r.ptc_mtx.Unlock() ptc.ReqStop() return nil } func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_data interface{}) error { var err error switch event_type { case PACKET_KIND_ROUTE_STARTED: var ok bool var rd *RouteDesc rd, ok = event_data.(*RouteDesc) if !ok { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.id) r.ReqStop() } else { var addr *net.TCPAddr addr, err = net.ResolveTCPAddr("tcp", rd.TargetAddrStr) if err != nil { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid service address(%s) for server peer in route_started event(%d)", rd.TargetAddrStr, r.id) r.ReqStop() } else { r.server_peer_listen_addr = addr r.server_peer_net = rd.ServiceNetStr } } case PACKET_KIND_ROUTE_STOPPED: // NOTE: // this event can be sent by the server in response to failed ROUTE_START or successful ROUTE_STOP. // in case of the failed ROUTE_START, r.ReqStop() may trigger another ROUTE_STOP sent to the server. // but the server must be able to handle this case as invalid route. var ok bool _, ok = event_data.(*RouteDesc) if !ok { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.id) r.ReqStop() } else { r.ReqStop() } case PACKET_KIND_PEER_STARTED: var ok bool var pd *PeerDesc pd, ok = event_data.(*PeerDesc) if !ok { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid data in peer_started event(%d,%d)", r.id, pts_id) r.ReqStop() } else { if r.cts.cli.ptc_limit > 0 && int(r.cts.cli.stats.peers.Load()) >= r.cts.cli.ptc_limit { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Rejecting to connect to peer(%s)for route(%d,%d) - allowed max %d", r.peer_addr, r.id, pts_id, r.cts.cli.ptc_limit) err = r.cts.psc.Send(MakePeerAbortedPacket(r.id, pts_id, "", "")) if err != nil { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Failed to send peer_aborted(%d,%d) for route(%d,%d,%s,%s) - %s", r.id, pts_id, r.id, pts_id, "", "", err.Error()) } } else { r.ptc_wg.Add(1) go r.ConnectToPeer(pts_id, r.peer_option, pd.RemoteAddrStr, pd.LocalAddrStr, &r.ptc_wg) } } case PACKET_KIND_PEER_ABORTED: var ptc *ClientPeerConn ptc = r.FindClientPeerConnById(pts_id) if ptc != nil { var ok bool var pd *PeerDesc pd, ok = event_data.(*PeerDesc) if !ok { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid data in peer_aborted event(%d,%d)", r.id, pts_id) r.ReqStop() } else { err = r.DisconnectFromPeer(ptc) if err != nil { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Failed to disconnect from peer(%d,%d,%s,%s) - %s", r.id, pts_id, pd.RemoteAddrStr, pd.LocalAddrStr, err.Error()) ptc.ReqStop() } } } case PACKET_KIND_PEER_STOPPED: var ptc *ClientPeerConn ptc = r.FindClientPeerConnById(pts_id) if ptc != nil { var ok bool var pd *PeerDesc pd, ok = event_data.(*PeerDesc) if !ok { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid data in peer_stopped event(%d,%d)", r.id, pts_id) ptc.ReqStop() } else { err = r.DisconnectFromPeer(ptc) if err != nil { r.cts.cli.log.Write(r.cts.sid, LOG_WARN, "Failed to disconnect from peer(%d,%d,%s,%s) - %s", r.id, pts_id, pd.RemoteAddrStr, pd.LocalAddrStr, err.Error()) ptc.ReqStop() } } } case PACKET_KIND_PEER_EOF: var ptc *ClientPeerConn ptc = r.FindClientPeerConnById(pts_id) if ptc != nil { var ok bool _, ok = event_data.(*PeerDesc) if !ok { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid data in peer_eof event(%d,%d)", r.id, pts_id) ptc.ReqStop() } else { ptc.CloseWrite() } } case PACKET_KIND_PEER_DATA: var ptc *ClientPeerConn ptc = r.FindClientPeerConnById(pts_id) if ptc != nil { var ok bool var data []byte data, ok = event_data.([]byte) if !ok { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid data in peer_data event(%d,%d)", r.id, pts_id) ptc.ReqStop() } else { _, err = ptc.conn.Write(data) if err != nil { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Failed to write to peer(%d,%d,%s,%s) - %s", r.id, pts_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String(), err.Error()) ptc.ReqStop() } } } default: // ignore all others } return nil } // -------------------------------------------------------------------- func NewClientConn(c *Client, cfg *ClientConfig) *ClientConn { var cts ClientConn cts.cli = c cts.route_map = make(ClientRouteMap) cts.route_next_id = 0 cts.cfg.ClientConfig = *cfg cts.stop_req.Store(false) cts.stop_chan = make(chan bool, 8) // the actual connection to the server is established in the main task function // The cts.conn, cts.hdc, cts.psc fields are left unassigned here. return &cts } func (cts *ClientConn) AddNewClientRoute(rc *ClientRouteConfig) (*ClientRoute, error) { var r *ClientRoute var start_id RouteId cts.route_mtx.Lock() //start_id = RouteId(rand.Uint64()) start_id = cts.route_next_id for { var ok bool _, ok = cts.route_map[cts.route_next_id] if !ok { break } cts.route_next_id++ if cts.route_next_id == start_id { cts.route_mtx.Unlock() return nil, fmt.Errorf("unable to assign id") } } r = NewClientRoute(cts, cts.route_next_id, rc.PeerAddr, rc.PeerName, rc.ServiceAddr, rc.ServiceNet, rc.Option) cts.route_map[r.id] = r cts.route_next_id++ cts.cli.stats.routes.Add(1) cts.route_mtx.Unlock() cts.cli.log.Write(cts.sid, LOG_INFO, "Added route(%d,%d) %s", cts.id, r.id, r.peer_addr) cts.route_wg.Add(1) go r.RunTask(&cts.route_wg) return r, nil } func (cts *ClientConn) ReqStopAllClientRoutes() { var r *ClientRoute cts.route_mtx.Lock() defer cts.route_mtx.Unlock() for _, r = range cts.route_map { r.ReqStop() } } /* func (cts *ClientConn) RemoveAllClientRoutes() { var r *ClientRoute cts.route_mtx.Lock() defer cts.route_mtx.Unlock() for _, r = range cts.route_map { delete(cts.route_map, r.id) cts.cli.stats.routes.Add(-1) r.ReqStop() } }*/ func (cts *ClientConn) RemoveClientRoute(route *ClientRoute) error { var r *ClientRoute var ok bool cts.route_mtx.Lock() r, ok = cts.route_map[route.id] if !ok { cts.route_mtx.Unlock() return fmt.Errorf("non-existent route id - %d", route.id) } if r != route { cts.route_mtx.Unlock() return fmt.Errorf("conflicting route id - %d", route.id) } delete(cts.route_map, route.id) cts.cli.stats.routes.Add(-1) cts.route_mtx.Unlock() cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", route.id, route.peer_addr) r.ReqStop() return nil } func (cts *ClientConn) RemoveClientRouteById(route_id RouteId) error { var r *ClientRoute var ok bool cts.route_mtx.Lock() r, ok = cts.route_map[route_id] if !ok { cts.route_mtx.Unlock() return fmt.Errorf("non-existent route id - %d", route_id) } delete(cts.route_map, route_id) cts.cli.stats.routes.Add(-1) cts.route_mtx.Unlock() cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", r.id, r.peer_addr) r.ReqStop() return nil } func (cts *ClientConn) FindClientRouteById(route_id RouteId) *ClientRoute { var r *ClientRoute var ok bool cts.route_mtx.Lock() r, ok = cts.route_map[route_id] if !ok { cts.route_mtx.Unlock() return nil } cts.route_mtx.Unlock() return r } func (cts *ClientConn) AddClientRouteConfig (route *ClientRouteConfig) { cts.route_mtx.Lock() cts.cfg.Routes = append(cts.cfg.Routes, *route) cts.route_mtx.Unlock() } func (cts *ClientConn) AddClientRoutes(routes []ClientRouteConfig) error { var v ClientRouteConfig var err error for _, v = range routes { _, err = cts.AddNewClientRoute(&v) if err != nil { return fmt.Errorf("unable to add client route for %s - %s", v, err.Error()) } } return nil } func (cts *ClientConn) disconnect_from_server() { if cts.conn != nil { var r *ClientRoute cts.route_mtx.Lock() for _, r = range cts.route_map { r.ReqStop() } cts.route_mtx.Unlock() cts.conn.Close() // don't reset cts.conn to nil here // if this function is called from RunTask() // for reconnection, it will be set to a new value // immediately after the start_over lable in it. // if it's called from ReqStop(), we don't really // need to care about it. cts.local_addr = "" cts.remote_addr = "" } } func (cts *ClientConn) ReqStop() { if cts.stop_req.CompareAndSwap(false, true) { cts.disconnect_from_server() cts.stop_chan <- true } } func timed_interceptor(tmout time.Duration) grpc.UnaryClientInterceptor { // The client calls GetSeed() as the first call to the server. // To simulate a kind of connect timeout to the server and find out an unresponsive server, // Place a unary intercepter that places a new context with a timeout on the GetSeed() call. return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { var cancel context.CancelFunc if tmout > 0 && method == Hodu_GetSeed_FullMethodName { ctx, cancel = context.WithTimeout(ctx, tmout) defer cancel() } return invoker(ctx, method, req, reply, cc, opts...) } } func (cts *ClientConn) RunTask(wg *sync.WaitGroup) { var psc PacketStreamClient var slpctx context.Context var c_seed Seed var s_seed *Seed var p *peer.Peer var ok bool var err error var opts []grpc.DialOption defer wg.Done() // arrange to call at the end of this function start_over: cts.cfg.Index = (cts.cfg.Index + 1) % len(cts.cfg.ServerAddrs) cts.cli.log.Write(cts.sid, LOG_INFO, "Connecting to server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index]) if cts.cli.rpctlscfg == nil { opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) if cts.cfg.ServerAuthority != "" { opts = append(opts, grpc.WithAuthority(cts.cfg.ServerAuthority)) } } else { opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(cts.cli.rpctlscfg))) // set the http2 :authority header with tls server name defined. if cts.cfg.ServerAuthority != "" { opts = append(opts, grpc.WithAuthority(cts.cfg.ServerAuthority)) } else if cts.cli.rpctlscfg.ServerName != "" { opts = append(opts, grpc.WithAuthority(cts.cli.rpctlscfg.ServerName)) } } if cts.cfg.ServerSeedTmout > 0 { opts = append(opts, grpc.WithUnaryInterceptor(timed_interceptor(cts.cfg.ServerSeedTmout))) } cts.conn, err = grpc.NewClient(cts.cfg.ServerAddrs[cts.cfg.Index], opts...) if err != nil { cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to make client to server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error()) goto reconnect_to_server } cts.hdc = NewHoduClient(cts.conn) // seed exchange is for furture expansion of the protocol // there is nothing to do much about it for now. c_seed.Version = HODU_RPC_VERSION c_seed.Flags = 0 s_seed, err = cts.hdc.GetSeed(cts.cli.ctx, &c_seed) if err != nil { cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to get seed from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error()) goto reconnect_to_server } cts.s_seed = *s_seed cts.c_seed = c_seed cts.route_next_id = 0 // reset this whenever a new connection is made. the number of routes must be zero. cts.cli.log.Write(cts.sid, LOG_INFO, "Got seed from server[%d] %s - ver=%#x", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.s_seed.Version) psc, err = cts.hdc.PacketStream(cts.cli.ctx) if err != nil { cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to get packet stream from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error()) goto reconnect_to_server } p, ok = peer.FromContext(psc.Context()) if ok { cts.remote_addr = p.Addr.String() cts.local_addr = p.LocalAddr.String() } cts.cli.log.Write(cts.sid, LOG_INFO, "Got packet stream from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index]) cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc} if len(cts.cfg.Routes) > 0 { // the connection structure to a server is ready. // let's add routes to the client-side peers if given err = cts.AddClientRoutes(cts.cfg.Routes) if err != nil { cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to add routes to server[%d] %s for %v - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.cfg.Routes, err.Error()) goto done } } for { var pkt *Packet select { case <-cts.cli.ctx.Done(): // need to log cts.cli.ctx.Err().Error()? goto done case <-cts.stop_chan: goto done default: // no other case is ready. run the code below select. // without the default case, the select construct would block } pkt, err = psc.Recv() if err != nil { if status.Code(err) == codes.Canceled || errors.Is(err, net.ErrClosed) { goto reconnect_to_server } else { cts.cli.log.Write(cts.sid, LOG_INFO, "Failed to receive packet from %s - %s", cts.remote_addr, err.Error()) goto reconnect_to_server } } switch pkt.Kind { case PACKET_KIND_ROUTE_STARTED: // the server side managed to set up the route the client requested var x *Packet_Route var ok bool x, ok = pkt.U.(*Packet_Route) if ok { err = cts.ReportEvent(RouteId(x.Route.RouteId), 0, pkt.Kind, x.Route) if err != nil { cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to handle route_started event(%d,%s) from %s - %s", x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr, err.Error()) } else { cts.cli.log.Write(cts.sid, LOG_DEBUG, "Handled route_started event(%d,%s) from %s", x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr) } } else { cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid route_started event from %s", cts.remote_addr) } case PACKET_KIND_ROUTE_STOPPED: var x *Packet_Route var ok bool x, ok = pkt.U.(*Packet_Route) if ok { err = cts.ReportEvent(RouteId(x.Route.RouteId), 0, pkt.Kind, x.Route) if err != nil { cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to handle route_stopped event(%d,%s) from %s - %s", x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr, err.Error()) } else { cts.cli.log.Write(cts.sid, LOG_DEBUG, "Handled route_stopped event(%d,%s) from %s", x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr) } } else { cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid route_stopped event from %s", cts.remote_addr) } case PACKET_KIND_PEER_STARTED: // the connection from the client to a peer has been established var x *Packet_Peer var ok bool x, ok = pkt.U.(*Packet_Peer) if ok { err = cts.ReportEvent(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_STARTED, x.Peer) if err != nil { cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to handle peer_started event from %s for peer(%d,%d,%s,%s) - %s", cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error()) } else { cts.cli.log.Write(cts.sid, LOG_DEBUG, "Handled peer_started event from %s for peer(%d,%d,%s,%s)", cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr) } } else { cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid peer_started event from %s", cts.remote_addr) } // PACKET_KIND_PEER_ABORTED is never sent by server to client. // the code here doesn't handle the event. case PACKET_KIND_PEER_STOPPED: // the connection from the client to a peer has been established var x *Packet_Peer var ok bool x, ok = pkt.U.(*Packet_Peer) if ok { err = cts.ReportEvent(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_STOPPED, x.Peer) if err != nil { cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to handle peer_stopped event from %s for peer(%d,%d,%s,%s) - %s", cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error()) } else { cts.cli.log.Write(cts.sid, LOG_DEBUG, "Handled peer_stopped event from %s for peer(%d,%d,%s,%s)", cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr) } } else { cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid peer_stopped event from %s", cts.remote_addr) } case PACKET_KIND_PEER_EOF: var x *Packet_Peer var ok bool x, ok = pkt.U.(*Packet_Peer) if ok { err = cts.ReportEvent(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_EOF, x.Peer) if err != nil { cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to handle peer_eof event from %s for peer(%d,%d,%s,%s) - %s", cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error()) } else { cts.cli.log.Write(cts.sid, LOG_DEBUG, "Handled peer_eof event from %s for peer(%d,%d,%s,%s)", cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr) } } else { cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid peer_eof event from %s", cts.remote_addr) } case PACKET_KIND_PEER_DATA: // the connection from the client to a peer has been established var x *Packet_Data var ok bool x, ok = pkt.U.(*Packet_Data) if ok { err = cts.ReportEvent(RouteId(x.Data.RouteId), PeerId(x.Data.PeerId), PACKET_KIND_PEER_DATA, x.Data.Data) if err != nil { cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to handle peer_data event from %s for peer(%d,%d) - %s", cts.remote_addr, x.Data.RouteId, x.Data.PeerId, err.Error()) } else { cts.cli.log.Write(cts.sid, LOG_DEBUG, "Handled peer_data event from %s for peer(%d,%d)", cts.remote_addr, x.Data.RouteId, x.Data.PeerId) } } else { cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid peer_data event from %s", cts.remote_addr) } default: // do nothing. ignore the rest } } done: cts.cli.log.Write(cts.sid, LOG_INFO, "Disconnected from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index]) //cts.RemoveClientRoutes() // this isn't needed as each task removes itself from cts upon its termination cts.ReqStop() wait_for_termination: cts.route_wg.Wait() // wait until all route tasks are finished cts.cli.RemoveClientConn(cts) return reconnect_to_server: cts.disconnect_from_server() // wait for 2 seconds slpctx, _ = context.WithTimeout(cts.cli.ctx, 2 * time.Second) select { case <-cts.cli.ctx.Done(): // need to log cts.cli.ctx.Err().Error()? goto done case <-cts.stop_chan: // this signal indicates that ReqStop() has been called // so jumt to the waiting label goto wait_for_termination case <-slpctx.Done(): // do nothing } goto start_over // and reconnect } func (cts *ClientConn) ReportEvent(route_id RouteId, pts_id PeerId, event_type PACKET_KIND, event_data interface{}) error { var r *ClientRoute var ok bool cts.route_mtx.Lock() r, ok = cts.route_map[route_id] if !ok { cts.route_mtx.Unlock() return fmt.Errorf("non-existent route id - %d", route_id) } cts.route_mtx.Unlock() return r.ReportEvent(pts_id, event_type, event_data) } // -------------------------------------------------------------------- type client_ctl_log_writer struct { cli *Client } func (hlw *client_ctl_log_writer) Write(p []byte) (n int, err error) { // the standard http.Server always requires *log.Logger // use this iowriter to create a logger to pass it to the http server. hlw.cli.log.Write("", LOG_INFO, string(p)) return len(p), nil } func NewClient(ctx context.Context, logger Logger, ctl_addrs []string, ctl_prefix string, ctltlscfg *tls.Config, rpctlscfg *tls.Config, rpc_max int, peer_max int, peer_conn_tmout time.Duration) *Client { var c Client var i int var hs_log *log.Logger c.ctx, c.ctx_cancel = context.WithCancel(ctx) c.ctltlscfg = ctltlscfg c.rpctlscfg = rpctlscfg c.ext_svcs = make([]Service, 0, 1) c.ptc_tmout = peer_conn_tmout c.ptc_limit = peer_max c.cts_limit = rpc_max c.cts_next_id = 0 c.cts_map = make(ClientConnMap) c.stop_req.Store(false) c.stop_chan = make(chan bool, 8) c.log = logger c.ctl_prefix = ctl_prefix c.ctl_mux = http.NewServeMux() c.ctl_mux.Handle(c.ctl_prefix + "/client-conns", &client_ctl_client_conns{c: &c}) c.ctl_mux.Handle(c.ctl_prefix + "/client-conns/{conn_id}", &client_ctl_client_conns_id{c: &c}) c.ctl_mux.Handle(c.ctl_prefix + "/client-conns/{conn_id}/routes", &client_ctl_client_conns_id_routes{c: &c}) c.ctl_mux.Handle(c.ctl_prefix + "/client-conns/{conn_id}/routes/{route_id}", &client_ctl_client_conns_id_routes_id{c: &c}) c.ctl_mux.Handle(c.ctl_prefix + "/client-conns/{conn_id}/routes/{route_id}/peers", &client_ctl_client_conns_id_routes_id_peers{c: &c}) c.ctl_mux.Handle(c.ctl_prefix + "/client-conns/{conn_id}/routes/{route_id}/peers/{peer_id}", &client_ctl_client_conns_id_routes_id_peers_id{c: &c}) c.ctl_mux.Handle(c.ctl_prefix + "/stats", &client_ctl_stats{c: &c}) c.ctl_addr = make([]string, len(ctl_addrs)) c.ctl = make([]*http.Server, len(ctl_addrs)) copy(c.ctl_addr, ctl_addrs) hs_log = log.New(&client_ctl_log_writer{cli: &c}, "", 0); for i = 0; i < len(ctl_addrs); i++ { c.ctl[i] = &http.Server{ Addr: ctl_addrs[i], Handler: c.ctl_mux, TLSConfig: c.ctltlscfg, ErrorLog: hs_log, // TODO: more settings } } c.stats.conns.Store(0) c.stats.routes.Store(0) c.stats.peers.Store(0) return &c } func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) { var cts *ClientConn var ok bool var start_id ConnId if len(cfg.ServerAddrs) <= 0 { return nil, fmt.Errorf("no server rpc address specified") } cts = NewClientConn(c, cfg) c.cts_mtx.Lock() if c.cts_limit > 0 && len(c.cts_map) >= c.cts_limit { c.cts_mtx.Unlock() return nil, fmt.Errorf("too many connections - %d", c.cts_limit) } //start_id = rand.Uint64() //start_id = ConnId(monotonic_time() / 1000) start_id = c.cts_next_id for { _, ok = c.cts_map[c.cts_next_id] if !ok { break } c.cts_next_id++ if c.cts_next_id == start_id { c.cts_mtx.Lock() return nil, fmt.Errorf("unable to assign id") } } cts.id = c.cts_next_id cts.cfg.Id = cts.id // store it again in the active configuration for easy access via control channel cts.sid = fmt.Sprintf("%d", cts.id) // id in string used for logging c.cts_map[cts.id] = cts c.cts_next_id++ c.stats.conns.Add(1) c.cts_mtx.Unlock() c.log.Write("", LOG_INFO, "Added client connection(%d) to %v", cts.id, cfg.ServerAddrs) return cts, nil } func (c *Client) ReqStopAllClientConns() { var cts *ClientConn c.cts_mtx.Lock() defer c.cts_mtx.Unlock() for _, cts = range c.cts_map { cts.ReqStop() } } /* func (c *Client) RemoveAllClientConns() { var cts *ClientConn c.cts_mtx.Lock() defer c.cts_mtx.Unlock() for _, cts = range c.cts_map { delete(c.cts_map_by_addr, cts.cfg.ServerAddr) delete(c.cts_map, cts.id) c.stats.conns.Store(int64(len(c.cts_map))) cts.ReqStop() } } */ func (c *Client) RemoveClientConn(cts *ClientConn) error { var conn *ClientConn var ok bool c.cts_mtx.Lock() conn, ok = c.cts_map[cts.id] if !ok { c.cts_mtx.Unlock() return fmt.Errorf("non-existent connection id - %d", cts.id) } if conn != cts { c.cts_mtx.Unlock() return fmt.Errorf("conflicting connection id - %d", cts.id) } delete(c.cts_map, cts.id) c.stats.conns.Store(int64(len(c.cts_map))) c.cts_mtx.Unlock() c.log.Write("", LOG_INFO, "Removed client connection(%d) to %v", cts.id, cts.cfg.ServerAddrs) cts.ReqStop() return nil } func (c *Client) RemoveClientConnById(conn_id ConnId) error { var cts *ClientConn var ok bool c.cts_mtx.Lock() cts, ok = c.cts_map[conn_id] if !ok { c.cts_mtx.Unlock() return fmt.Errorf("non-existent connection id - %d", conn_id) } // NOTE: removal by id doesn't perform identity check delete(c.cts_map, cts.id) c.stats.conns.Store(int64(len(c.cts_map))) c.cts_mtx.Unlock() c.log.Write("", LOG_INFO, "Removed client connection(%d) to %v", cts.id, cts.cfg.ServerAddrs) cts.ReqStop() return nil } func (c *Client) FindClientConnById(id ConnId) *ClientConn { var cts *ClientConn var ok bool c.cts_mtx.Lock() defer c.cts_mtx.Unlock() cts, ok = c.cts_map[id] if !ok { return nil } return cts } func (c *Client) FindClientRouteById(conn_id ConnId, route_id RouteId) *ClientRoute { var cts *ClientConn var ok bool c.cts_mtx.Lock() defer c.cts_mtx.Unlock() cts, ok = c.cts_map[conn_id] if !ok { return nil } return cts.FindClientRouteById(route_id) } func (c *Client) FindClientPeerConnById(conn_id ConnId, route_id RouteId, peer_id PeerId) *ClientPeerConn { var cts *ClientConn var r *ClientRoute var ok bool c.cts_mtx.Lock() defer c.cts_mtx.Unlock() cts, ok = c.cts_map[conn_id] if !ok { return nil } cts.route_mtx.Lock() defer cts.route_mtx.Unlock() r, ok = cts.route_map[route_id] if !ok { return nil } return r.FindClientPeerConnById(peer_id) } func (c *Client) ReqStop() { if c.stop_req.CompareAndSwap(false, true) { var cts *ClientConn var ctl *http.Server for _, ctl = range c.ctl { ctl.Shutdown(c.ctx) // to break c.ctl.ListenAndServe() } for _, cts = range c.cts_map { cts.ReqStop() } c.stop_chan <- true c.ctx_cancel() } } func (c *Client) RunCtlTask(wg *sync.WaitGroup) { var err error var ctl *http.Server var idx int var l_wg sync.WaitGroup defer wg.Done() for idx, ctl = range c.ctl { l_wg.Add(1) go func(i int, cs *http.Server) { var l net.Listener c.log.Write("", LOG_INFO, "Control channel[%d] started on %s", i, c.ctl_addr[i]) // defeat hard-coded "tcp" in ListenAndServe() and ListenAndServeTLS() // by creating the listener explicitly. // err = cs.ListenAndServe() // err = cs.ListenAndServeTLS("", "") // c.tlscfg must provide a certificate and a key //cs.shuttingDown(), as the name indicates, is not expoosed by the net/http //so I have to use my own indicator to check if it's been shutdown.. // if c.stop_req.Load() == false { // this guard has a flaw in that the stop request can be made // between the check above and net.Listen() below. l, err = net.Listen(tcp_addr_str_class(cs.Addr), cs.Addr) if err == nil { if c.stop_req.Load() == false { // check it again to make the guard slightly more stable // although it's still possible that the stop request is made // after Listen() if c.ctltlscfg == nil { err = cs.Serve(l) } else { err = cs.ServeTLS(l, "", "") // c.ctltlscfg must provide a certificate and a key } } else { err = fmt.Errorf("stop requested") } l.Close() } } else { err = fmt.Errorf("stop requested") } if errors.Is(err, http.ErrServerClosed) { c.log.Write("", LOG_INFO, "Control channel[%d] ended", i) } else { c.log.Write("", LOG_ERROR, "Control channel[%d] error - %s", i, err.Error()) } l_wg.Done() }(idx, ctl) } l_wg.Wait() } func (c *Client) StartCtlService() { c.wg.Add(1) go c.RunCtlTask(&c.wg) } func (c *Client) RunTask(wg *sync.WaitGroup) { // just a place holder to pacify the Service interface // StartService() calls cts.RunTask() instead. it is not called. // so no call to wg.Done() } func (c *Client) start_service(cfg *ClientConfig) (*ClientConn, error) { var cts *ClientConn var err error cts, err = c.AddNewClientConn(cfg) if err != nil { err = fmt.Errorf("unable to add server connection structure to %v - %s", cfg.ServerAddrs, err.Error()) return nil, err } c.wg.Add(1) go cts.RunTask(&c.wg) return cts, nil } func (c *Client) StartService(data interface{}) { var cfg *ClientConfig var ok bool cfg, ok = data.(*ClientConfig) if !ok { c.log.Write("", LOG_ERROR, "Failed to start service - invalid configuration - %v", data) } else { var cts *ClientConn var err error if len(cfg.ServerAddrs) > 0 { cts, err = c.start_service(cfg) if err != nil { c.log.Write("", LOG_ERROR, "Failed to start service - %s", err.Error()) } else { c.log.Write("", LOG_INFO, "Started service for %v [%d]", cts.cfg.ServerAddrs, cts.cfg.Id) } } } } func (c *Client) StartExtService(svc Service, data interface{}) { c.ext_mtx.Lock() c.ext_svcs = append(c.ext_svcs, svc) c.ext_mtx.Unlock() c.wg.Add(1) go svc.RunTask(&c.wg) } func (c *Client) StopServices() { var ext_svc Service c.ReqStop() for _, ext_svc = range c.ext_svcs { ext_svc.StopServices() } } func (c *Client) FixServices() { c.log.Rotate() } func (c *Client) WaitForTermination() { c.wg.Wait() } func (c *Client) WriteLog(id string, level LogLevel, fmtstr string, args ...interface{}) { c.log.Write(id, level, fmtstr, args...) }