added function to maintain the list of service addresses

This commit is contained in:
hyung-hwan 2025-03-31 23:40:45 +09:00
parent 918b887517
commit b41df682e1

View File

@ -107,12 +107,18 @@ type Server struct {
pxy_mux *http.ServeMux
pxy []*http.Server // proxy server
pxy_addrs_mtx sync.Mutex
pxy_addrs *list.List // of net.Addr
wpx_mux *http.ServeMux
wpx []*http.Server // proxy server than handles http/https only
wpx_addrs_mtx sync.Mutex
wpx_addrs *list.List // of net.Addr
ctl_mux *http.ServeMux
ctl []*http.Server // control server
ctl_addrs_mtx sync.Mutex
ctl_addrs *list.List // of net.Addr
rpc []*net.TCPListener // main listener for grpc
rpc_wg sync.WaitGroup
@ -446,6 +452,7 @@ func (cts *ServerConn) make_route_listener(id RouteId, option RouteOption, svc_r
var nw string
var prev_cri ConnRouteId
var ok bool
var is4 bool
var err error
if svc_requested_addr != "" {
@ -456,11 +463,16 @@ func (cts *ServerConn) make_route_listener(id RouteId, option RouteOption, svc_r
return nil, nil, fmt.Errorf("invalid service address %s - %s", svc_requested_addr, err.Error())
}
if (ap.Addr().Is4()) { is4 = true }
svcaddr = &net.TCPAddr{IP: ap.Addr().AsSlice(), Port: int(ap.Port())}
}
if option & RouteOption(ROUTE_OPTION_TCP) != 0 {
// go seems to use ipv4 for 0.0.0.0:XXX if ipv6 is enabled.
// i don't want the behavior.. I force tcp4 if the ip address given
// is ipv4 address
nw = "tcp"
if (is4) { nw = "tcp4" }
if svcaddr == nil {
// port 0 for automatic assignment.
svcaddr = &net.TCPAddr{Port: 0}
@ -492,9 +504,9 @@ func (cts *ServerConn) make_route_listener(id RouteId, option RouteOption, svc_r
cts.S.svc_port_mtx.Unlock()
cts.S.log.Write(cts.Sid, LOG_ERROR,
"Route(%d,%d) on %s not unique by port number - existing route(%d,%d)",
cts.Id, id, prev_cri.conn_id, prev_cri.route_id, svcaddr.String())
cts.Id, id, svcaddr.String(), prev_cri.conn_id, prev_cri.route_id)
l.Close()
return nil, nil, err
return nil, nil, fmt.Errorf("port not unique")
}
cts.S.svc_port_map[PortId(svcaddr.Port)] = ConnRouteId{conn_id: cts.Id, route_id: id}
cts.S.svc_port_mtx.Unlock()
@ -1289,6 +1301,10 @@ func NewServer(ctx context.Context, name string, logger Logger, cfg *ServerConfi
s.stop_req.Store(false)
s.bulletin = NewBulletin[*ServerEvent](&s, 1024)
s.ctl_addrs = list.New()
s.pxy_addrs = list.New()
s.wpx_addrs = list.New()
opts = append(opts, grpc.StatsHandler(&ConnCatcher{server: &s}))
if s.Cfg.RpcTls != nil { opts = append(opts, grpc.Creds(credentials.NewTLS(s.Cfg.RpcTls))) }
//opts = append(opts, grpc.UnaryInterceptor(unaryInterceptor))
@ -1341,6 +1357,7 @@ func NewServer(ctx context.Context, name string, logger Logger, cfg *ServerConfi
s.ctl_mux.Handle("/_ctl/events",
s.WrapWebsocketHandler(&server_ctl_ws{ServerCtl{S: &s, Id: HS_ID_CTL}}))
/*
// this part is duplcate of pxy_mux.
s.ctl_mux.Handle("/_ssh-ws/{conn_id}/{route_id}",
s.WrapWebsocketHandler(&server_pxy_ssh_ws{S: &s, Id: HS_ID_PXY_WS}))
@ -1369,6 +1386,7 @@ func NewServer(ctx context.Context, name string, logger Logger, cfg *ServerConfi
s.ctl_mux.Handle("/_http/{conn_id}/{route_id}/{trailer...}",
s.WrapHttpHandler(&server_pxy_http_main{server_pxy: server_pxy{S: &s, Id: HS_ID_CTL}, prefix: "/_http"}))
*/
s.ctl = make([]*http.Server, len(cfg.CtlAddrs))
for i = 0; i < len(cfg.CtlAddrs); i++ {
@ -1578,11 +1596,21 @@ func (s *Server) RunCtlTask(wg *sync.WaitGroup) {
l, err = net.Listen(TcpAddrStrClass(cs.Addr), cs.Addr)
if err == nil {
if s.stop_req.Load() == false {
var node *list.Element
s.ctl_addrs_mtx.Lock()
node = s.ctl_addrs.PushBack(l.Addr().(*net.TCPAddr))
s.ctl_addrs_mtx.Unlock()
if s.Cfg.CtlTls == nil {
err = cs.Serve(l)
} else {
err = cs.ServeTLS(l, "", "") // s.Cfg.CtlTls must provide a certificate and a key
}
s.ctl_addrs_mtx.Lock()
s.ctl_addrs.Remove(node)
s.ctl_addrs_mtx.Unlock()
} else {
err = fmt.Errorf("stop requested")
}
@ -1621,11 +1649,21 @@ func (s *Server) RunPxyTask(wg *sync.WaitGroup) {
l, err = net.Listen(TcpAddrStrClass(cs.Addr), cs.Addr)
if err == nil {
if s.stop_req.Load() == false {
var node *list.Element
s.pxy_addrs_mtx.Lock()
node = s.pxy_addrs.PushBack(l.Addr().(*net.TCPAddr))
s.pxy_addrs_mtx.Unlock()
if s.Cfg.PxyTls == nil { // TODO: change this
err = cs.Serve(l)
} else {
err = cs.ServeTLS(l, "", "") // s.Cfg.PxyTls must provide a certificate and a key
}
s.pxy_addrs_mtx.Lock()
s.pxy_addrs.Remove(node)
s.pxy_addrs_mtx.Unlock()
} else {
err = fmt.Errorf("stop requested")
}
@ -1664,11 +1702,21 @@ func (s *Server) RunWpxTask(wg *sync.WaitGroup) {
l, err = net.Listen(TcpAddrStrClass(cs.Addr), cs.Addr)
if err == nil {
if s.stop_req.Load() == false {
if s.Cfg.WpxTls == nil { // TODO: change this
var node *list.Element
s.wpx_addrs_mtx.Lock()
node = s.wpx_addrs.PushBack(l.Addr().(*net.TCPAddr))
s.wpx_addrs_mtx.Unlock()
if s.Cfg.WpxTls == nil {
err = cs.Serve(l)
} else {
err = cs.ServeTLS(l, "", "") // s.Cfg.WpxTls must provide a certificate and a key
}
s.wpx_addrs_mtx.Lock()
s.wpx_addrs.Remove(node)
s.wpx_addrs_mtx.Unlock()
} else {
err = fmt.Errorf("stop requested")
}
@ -2109,6 +2157,34 @@ func (s *Server) SetConnNoticeHandlers(handlers []ServerConnNoticeHandler) {
s.conn_notice_handlers = handlers
}
func (s *Server) GetFirstCtlAddr() *net.TCPAddr {
var e *list.Element
s.ctl_addrs_mtx.Lock()
defer s.ctl_addrs_mtx.Unlock()
e = s.ctl_addrs.Front()
if e == nil { return nil }
return e.Value.(*net.TCPAddr)
}
func (s *Server) GetFirstPxyAddr() *net.TCPAddr{
var e *list.Element
s.pxy_addrs_mtx.Lock()
defer s.pxy_addrs_mtx.Unlock()
e = s.pxy_addrs.Front()
if e == nil { return nil }
return e.Value.(*net.TCPAddr)
}
func (s *Server) GetFirstWpxAddr() *net.TCPAddr {
var e *list.Element
s.wpx_addrs_mtx.Lock()
defer s.wpx_addrs_mtx.Unlock()
e = s.wpx_addrs.Front()
if e == nil { return nil }
return e.Value.(*net.TCPAddr)
}
func (s *Server) AddCtlHandler(path string, handler ServerHttpHandler) {
// parked under /_ctl
s.ctl_mux.Handle(s.Cfg.CtlPrefix + "/_ctl" + path, s.WrapHttpHandler(handler))