trying make code more robust

This commit is contained in:
2024-11-24 20:39:51 +09:00
parent 493309a4e9
commit 93f84fbc98
4 changed files with 177 additions and 83 deletions

122
server.go
View File

@ -25,10 +25,14 @@ type ServerPeerConnMap = map[uint32]*ServerPeerConn
type ServerRouteMap = map[uint32]*ServerRoute
type Server struct {
ctx context.Context
ctx_cancel context.CancelFunc
tlscfg *tls.Config
wg sync.WaitGroup
ext_svcs []Service
stop_req atomic.Bool
stop_chan chan bool
ctl *http.Server // control server
@ -179,7 +183,7 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) {
conn, err = r.l.AcceptTCP()
if err != nil {
if errors.Is(err, net.ErrClosed) {
r.cts.svr.log.Write(log_id, LOG_INFO, "Rervice-side peer listener closed")
r.cts.svr.log.Write(log_id, LOG_INFO, "Server-side peer listener closed")
} else {
r.cts.svr.log.Write(log_id, LOG_INFO, "Server-side peer listener error - %s", err.Error())
}
@ -197,9 +201,12 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) {
}
}
r.l.Close() // don't care about double close. it could have been closed in ReqStop
r.ReqStop()
r.pts_wg.Wait()
r.cts.svr.log.Write(log_id, LOG_DEBUG, "All service-side peer handlers completed")
r.cts.RemoveServerRoute(r) // final phase...
}
func (r *ServerRoute) ReqStop() {
@ -295,7 +302,28 @@ func (cts *ClientConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO) (*S
return r, nil
}
func (cts *ClientConn) RemoveServerRoute (route_id uint32) error {
func (cts *ClientConn) RemoveServerRoute (route* ServerRoute) error {
var r *ServerRoute
var ok bool
cts.route_mtx.Lock()
r, ok = cts.route_map[route.id]
if (!ok) {
cts.route_mtx.Unlock()
return fmt.Errorf ("non-existent route id - %d", route.id)
}
if (r != route) {
cts.route_mtx.Unlock()
return fmt.Errorf ("non-existent route - %d", route.id)
}
delete(cts.route_map, route.id)
cts.route_mtx.Unlock()
r.ReqStop()
return nil
}
func (cts *ClientConn) RemoveServerRouteById (route_id uint32) (*ServerRoute, error) {
var r *ServerRoute
var ok bool
@ -303,13 +331,13 @@ func (cts *ClientConn) RemoveServerRoute (route_id uint32) error {
r, ok = cts.route_map[route_id]
if (!ok) {
cts.route_mtx.Unlock()
return fmt.Errorf ("non-existent route id - %d", route_id)
return nil, fmt.Errorf ("non-existent route id - %d", route_id)
}
delete(cts.route_map, route_id)
cts.route_mtx.Unlock()
r.ReqStop() // TODO: make this unblocking or blocking?
return nil
r.ReqStop()
return r, nil
}
func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error {
@ -336,37 +364,37 @@ func (cts *ClientConn) receive_from_stream(wg *sync.WaitGroup) {
for {
pkt, err = cts.pss.Recv()
if errors.Is(err, io.EOF) {
// return will close stream from server side
// TODO: clean up route_map and server-side peers releated to the client connection 'cts'
fmt.Printf ("grpd stream ended\n")
cts.svr.log.Write("", LOG_INFO, "GRPC stream closed for client %s", cts.caddr)
goto done
}
if err != nil {
//log.Printf("receive error %v", err)
fmt.Printf ("grpc stream error - %s\n", err.Error())
cts.svr.log.Write("", LOG_ERROR, "GRPC stream error for client %s - %s", cts.caddr, err.Error())
goto done
}
switch pkt.Kind {
case PACKET_KIND_ROUTE_START:
var x *Packet_Route
//var t *ServerRoute
var ok bool
x, ok = pkt.U.(*Packet_Route)
if ok {
var r* ServerRoute
fmt.Printf ("ADDED SERVER ROUTE FOR CLEINT PEER %s\n", x.Route.AddrStr)
r, err = cts.AddNewServerRoute(x.Route.RouteId, x.Route.Proto)
if err != nil {
// TODO: Send Error Response...
cts.svr.log.Write("", LOG_ERROR, "Failed to add server route for client %s peer %s", cts.caddr, x.Route.AddrStr)
} else {
cts.svr.log.Write("", LOG_INFO, "Added server route(id=%d) for client %s peer %s", r.id, cts.caddr, x.Route.AddrStr)
err = cts.pss.Send(MakeRouteStartedPacket(r.id, x.Route.Proto, r.laddr.String()))
if err != nil {
// TODO:
r.ReqStop()
cts.svr.log.Write("", LOG_ERROR, "Failed to inform client %s of server route started for peer %s", cts.caddr, x.Route.AddrStr)
goto done
}
}
} else {
// TODO: send invalid request... or simply keep quiet?
cts.svr.log.Write("", LOG_INFO, "Received invalid packet from %s", cts.caddr)
// TODO: need to abort this client?
}
case PACKET_KIND_ROUTE_STOP:
@ -374,17 +402,23 @@ fmt.Printf ("grpd stream ended\n")
var ok bool
x, ok = pkt.U.(*Packet_Route)
if ok {
err = cts.RemoveServerRoute(x.Route.RouteId) // TODO: this must be unblocking. otherwide, other route_map will get blocked...
var r* ServerRoute
r, err = cts.RemoveServerRouteById(x.Route.RouteId)
if err != nil {
// TODO: Send Error Response...
cts.svr.log.Write("", LOG_ERROR, "Failed to delete server route(id=%d) for client %s peer %s", x.Route.RouteId, cts.caddr, x.Route.AddrStr)
} else {
cts.svr.log.Write("", LOG_ERROR, "Deleted server route(id=%d) for client %s peer %s", x.Route.RouteId, cts.caddr, x.Route.AddrStr)
err = cts.pss.Send(MakeRouteStoppedPacket(x.Route.RouteId, x.Route.Proto))
if err != nil {
// TODO:
r.ReqStop()
cts.svr.log.Write("", LOG_ERROR, "Failed to inform client %s of server route(id=%d) stopped for peer %s", cts.caddr, x.Route.RouteId, x.Route.AddrStr)
goto done
}
}
} else {
// TODO: send invalid request... or simply keep quiet?
cts.svr.log.Write("", LOG_INFO, "Received invalid packet from %s", cts.caddr)
// TODO: need to abort this client?
}
case PACKET_KIND_PEER_STARTED:
@ -471,10 +505,13 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
// or continue
select {
case <-ctx.Done(): // the stream context is done
fmt.Printf("grpd server done - %s\n", ctx.Err().Error())
fmt.Printf("grpc server done - %s\n", ctx.Err().Error())
goto done
case <- cts.stop_chan:
// get out of the loop to eventually to exit from
// this handler to let the main grpc server to
// close this specific client connection.
goto done
//default:
@ -485,6 +522,7 @@ fmt.Printf("grpd server done - %s\n", ctx.Err().Error())
done:
fmt.Printf ("^^^^^^^^^^^^^^^^^ waiting for reoute_wg...\n")
cts.ReqStop() // just in case
cts.route_wg.Wait()
fmt.Printf ("^^^^^^^^^^^^^^^^^ waited for reoute_wg...\n")
}
@ -497,8 +535,12 @@ func (cts *ClientConn) ReqStop() {
r.ReqStop()
}
// there is no good way to break a specific connection client to
// the grpc server. while the global grpc server is closed in
// ReqStop() for Server, the individuation connection is closed
// by returing from the grpc handler goroutine. See the comment
// RunTask() for ClientConn.
cts.stop_chan <- true
//cts.c.Close() // close the accepted connection from the client
}
}
@ -647,7 +689,7 @@ func unaryInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo, han
return m, err
}
func NewServer(laddrs []string, logger Logger, tlscfg *tls.Config) (*Server, error) {
func NewServer(ctx context.Context, laddrs []string, logger Logger, tlscfg *tls.Config) (*Server, error) {
var s Server
var l *net.TCPListener
var laddr *net.TCPAddr
@ -656,9 +698,10 @@ func NewServer(laddrs []string, logger Logger, tlscfg *tls.Config) (*Server, err
var gl *net.TCPListener
if len(laddrs) <= 0 {
return nil, fmt.Errorf("no or too many addresses provided")
return nil, fmt.Errorf("no server addresses provided")
}
s.ctx, s.ctx_cancel = context.WithCancel(ctx)
s.log = logger
/* create the specified number of listeners */
s.l = make([]*net.TCPListener, 0)
@ -679,6 +722,7 @@ func NewServer(laddrs []string, logger Logger, tlscfg *tls.Config) (*Server, err
s.tlscfg = tlscfg
s.ext_svcs = make([]Service, 0, 1)
s.cts_map = make(ClientConnMap) // TODO: make it configurable...
s.stop_chan = make(chan bool, 8)
s.stop_req.Store(false)
/*
creds, err := credentials.NewServerTLSFromFile(data.Path("x509/server_cert.pem"), data.Path("x509/server_key.pem"))
@ -741,13 +785,25 @@ func (s *Server) RunTask(wg *sync.WaitGroup) {
go s.run_grpc_server(idx, &s.l_wg)
}
s.l_wg.Wait()
s.log.Write("", LOG_DEBUG, "All GRPC listeners completed")
s.cts_wg.Wait()
s.log.Write("", LOG_DEBUG, "All CTS handlers completed")
// most the work is done by in separate goroutines (s.run_grp_server)
// this loop serves as a placeholder to prevent the logic flow from
// descening down to s.ReqStop()
task_loop:
for {
select {
case <-s.stop_chan:
break task_loop
}
}
s.ReqStop()
s.l_wg.Wait()
s.log.Write("", LOG_DEBUG, "All GRPC listeners completed")
s.cts_wg.Wait()
s.log.Write("", LOG_DEBUG, "All CTS handlers completed")
// stop the main grpc server after all the other tasks are finished.
s.gs.Stop()
}
@ -770,6 +826,11 @@ func (s *Server) ReqStop() {
var l *net.TCPListener
var cts *ClientConn
if (s.ctl != nil) {
// shutdown the control server if ever started.
s.ctl.Shutdown(s.ctx)
}
//s.gs.GracefulStop()
//s.gs.Stop()
for _, l = range s.l {
@ -781,6 +842,9 @@ func (s *Server) ReqStop() {
cts.ReqStop() // request to stop connections from/to peer held in the cts structure
}
s.cts_mtx.Unlock()
s.stop_chan <- true
s.ctx_cancel()
}
}
@ -794,7 +858,7 @@ func (s *Server) AddNewClientConn(addr net.Addr, pss Hodu_PacketStreamServer) (*
cts.pss = &GuardedPacketStreamServer{Hodu_PacketStreamServer: pss}
cts.stop_req.Store(false)
cts.stop_chan = make(chan bool, 1)
cts.stop_chan = make(chan bool, 8)
s.cts_mtx.Lock()
defer s.cts_mtx.Unlock()